From 5eb449a982488bc19db203c2d3c5e6107440e5af Mon Sep 17 00:00:00 2001 From: Igor Rekun Date: Wed, 6 May 2026 18:33:45 +0100 Subject: [PATCH 1/4] sync: add --max-concurrent-requests flag Expose the previously-hardcoded request concurrency cap as a flag on the `databricks sync` command. Defaults to the existing limit (20) so behavior is unchanged when the flag is omitted; values < 1 are rejected. Co-authored-by: Isaac --- acceptance/cmd/sync-without-args/output.txt | 21 ++++++------ cmd/sync/sync.go | 37 +++++++++++++++------ cmd/sync/sync_test.go | 17 ++++++++-- libs/sync/sync.go | 6 ++++ libs/sync/watchdog.go | 13 ++++---- 5 files changed, 65 insertions(+), 29 deletions(-) diff --git a/acceptance/cmd/sync-without-args/output.txt b/acceptance/cmd/sync-without-args/output.txt index 5d63d426c67..a98e0891ac8 100644 --- a/acceptance/cmd/sync-without-args/output.txt +++ b/acceptance/cmd/sync-without-args/output.txt @@ -6,16 +6,17 @@ Usage: databricks sync [flags] SRC DST Flags: - --dry-run simulate sync execution without making actual changes - --exclude strings patterns to exclude from sync (can be specified multiple times) - --exclude-from string file containing patterns to exclude from sync (one pattern per line) - --full perform full synchronization (default is incremental) - -h, --help help for sync - --include strings patterns to include in sync (can be specified multiple times) - --include-from string file containing patterns to include to sync (one pattern per line) - --interval duration file system polling interval (for --watch) (default 1s) - --output type type of output format (default text) - --watch watch local file system for changes + --dry-run simulate sync execution without making actual changes + --exclude strings patterns to exclude from sync (can be specified multiple times) + --exclude-from string file containing patterns to exclude from sync (one pattern per line) + --full perform full synchronization (default is incremental) + -h, --help help for sync + --include strings patterns to include in sync (can be specified multiple times) + --include-from string file containing patterns to include to sync (one pattern per line) + --interval duration file system polling interval (for --watch) (default 1s) + --max-concurrent-requests int maximum number of concurrent requests to the workspace (default 20) + --output type type of output format (default text) + --watch watch local file system for changes Global Flags: --debug enable debug logging diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index 5c110cecb2b..332ac172edb 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -26,15 +26,23 @@ import ( type syncFlags struct { // project files polling interval - interval time.Duration - full bool - watch bool - output flags.Output - exclude []string - include []string - dryRun bool - excludeFrom string - includeFrom string + interval time.Duration + full bool + watch bool + output flags.Output + exclude []string + include []string + dryRun bool + excludeFrom string + includeFrom string + maxConcurrentRequests int +} + +func (f *syncFlags) validate() error { + if f.maxConcurrentRequests < 1 { + return fmt.Errorf("--max-concurrent-requests must be >= 1, got %d", f.maxConcurrentRequests) + } + return nil } func readPatternsFile(filePath string) ([]string, error) { @@ -89,6 +97,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b * opts.Include = append(opts.Include, f.include...) opts.Include = append(opts.Include, includePatterns...) opts.DryRun = f.dryRun + opts.MaxConcurrentRequests = f.maxConcurrentRequests return opts, nil } @@ -161,8 +170,9 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn SnapshotBasePath: filepath.Join(args[0], ".databricks"), WorkspaceClient: client, - OutputHandler: outputHandler, - DryRun: f.dryRun, + OutputHandler: outputHandler, + DryRun: f.dryRun, + MaxConcurrentRequests: f.maxConcurrentRequests, } return &opts, nil } @@ -187,6 +197,7 @@ func New() *cobra.Command { cmd.Flags().StringVar(&f.excludeFrom, "exclude-from", "", "file containing patterns to exclude from sync (one pattern per line)") cmd.Flags().StringVar(&f.includeFrom, "include-from", "", "file containing patterns to include to sync (one pattern per line)") cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes") + cmd.Flags().IntVar(&f.maxConcurrentRequests, "max-concurrent-requests", sync.MaxRequestsInFlight, "maximum number of concurrent requests to the workspace") // Wrapper for [root.MustWorkspaceClient] that disables loading authentication configuration from a bundle. mustWorkspaceClient := func(cmd *cobra.Command, args []string) error { @@ -196,6 +207,10 @@ func New() *cobra.Command { cmd.PreRunE = mustWorkspaceClient cmd.RunE = func(cmd *cobra.Command, args []string) error { + if err := f.validate(); err != nil { + return err + } + var opts *sync.SyncOptions var err error diff --git a/cmd/sync/sync_test.go b/cmd/sync/sync_test.go index 818c7e013aa..4d573dd224f 100644 --- a/cmd/sync/sync_test.go +++ b/cmd/sync/sync_test.go @@ -32,12 +32,13 @@ func TestSyncOptionsFromBundle(t *testing.T) { }, } - f := syncFlags{} + f := syncFlags{maxConcurrentRequests: 5} opts, err := f.syncOptionsFromBundle(New(), []string{}, b) require.NoError(t, err) assert.Equal(t, tempDir, opts.LocalRoot.Native()) assert.Equal(t, "/Users/jane@doe.com/path", opts.RemotePath) assert.Equal(t, filepath.Join(tempDir, ".databricks", "bundle", "default"), opts.SnapshotBasePath) + assert.Equal(t, 5, opts.MaxConcurrentRequests) assert.NotNil(t, opts.WorkspaceClient) } @@ -56,13 +57,25 @@ func TestSyncOptionsFromArgs(t *testing.T) { local := t.TempDir() remote := "/remote" - f := syncFlags{} + f := syncFlags{maxConcurrentRequests: 7} cmd := New() cmd.SetContext(cmdctx.SetWorkspaceClient(t.Context(), nil)) opts, err := f.syncOptionsFromArgs(cmd, []string{local, remote}) require.NoError(t, err) assert.Equal(t, local, opts.LocalRoot.Native()) assert.Equal(t, remote, opts.RemotePath) + assert.Equal(t, 7, opts.MaxConcurrentRequests) +} + +func TestSyncFlagsValidate(t *testing.T) { + f := syncFlags{maxConcurrentRequests: 1} + require.NoError(t, f.validate()) + + f.maxConcurrentRequests = 0 + require.ErrorContains(t, f.validate(), "--max-concurrent-requests must be >= 1") + + f.maxConcurrentRequests = -3 + require.ErrorContains(t, f.validate(), "--max-concurrent-requests must be >= 1") } func TestExcludeFromFlag(t *testing.T) { diff --git a/libs/sync/sync.go b/libs/sync/sync.go index c65b49eb775..246c78a2ed1 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -43,6 +43,8 @@ type SyncOptions struct { OutputHandler OutputHandler DryRun bool + + MaxConcurrentRequests int } type Sync struct { @@ -96,6 +98,10 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, errors.New("failed to resolve host for snapshot") } + if opts.MaxConcurrentRequests <= 0 { + opts.MaxConcurrentRequests = MaxRequestsInFlight + } + // For full sync, we start with an empty snapshot. // For incremental sync, we try to load an existing snapshot to start from. var snapshot *Snapshot diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 4a47acfb836..acbe364d472 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -96,9 +96,9 @@ func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context. }) } -func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error { +func groupRunParallel(ctx context.Context, paths []string, limit int, fn func(context.Context, string) error) error { group, ctx := errgroup.WithContext(ctx) - group.SetLimit(MaxRequestsInFlight) + group.SetLimit(limit) for _, path := range paths { groupRunSingle(ctx, group, fn, path) @@ -110,16 +110,17 @@ func groupRunParallel(ctx context.Context, paths []string, fn func(context.Conte func (s *Sync) applyDiff(ctx context.Context, d diff) error { var err error + limit := s.MaxConcurrentRequests // Delete files in parallel. - err = groupRunParallel(ctx, d.delete, s.applyDelete) + err = groupRunParallel(ctx, d.delete, limit, s.applyDelete) if err != nil { return err } // Delete directories ordered by depth from leaf to root. for _, group := range d.groupedRmdir() { - err = groupRunParallel(ctx, group, s.applyRmdir) + err = groupRunParallel(ctx, group, limit, s.applyRmdir) if err != nil { return err } @@ -127,14 +128,14 @@ func (s *Sync) applyDiff(ctx context.Context, d diff) error { // Create directories (leafs only because intermediates are created automatically). for _, group := range d.groupedMkdir() { - err = groupRunParallel(ctx, group, s.applyMkdir) + err = groupRunParallel(ctx, group, limit, s.applyMkdir) if err != nil { return err } } // Put files in parallel. - err = groupRunParallel(ctx, d.put, s.applyPut) + err = groupRunParallel(ctx, d.put, limit, s.applyPut) return err } From 89bfb14fc8d1d5ec389b6e2732ae745332dce183 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 7 May 2026 10:59:59 +0200 Subject: [PATCH 2/4] sync: rename flag to --concurrency and extend to bundle sync Match the name of `fs cp --concurrency` (#4132) for consistency, and expose the same flag on `databricks bundle sync` so both sync entry points are tunable. Co-authored-by: Isaac --- acceptance/bundle/help/bundle-sync/output.txt | 1 + acceptance/cmd/sync-without-args/output.txt | 22 ++++++------ cmd/bundle/sync.go | 27 +++++++++++--- cmd/sync/sync.go | 36 ++++++++++--------- cmd/sync/sync_test.go | 18 +++++----- libs/sync/sync.go | 6 ++-- libs/sync/watchdog.go | 2 +- 7 files changed, 66 insertions(+), 46 deletions(-) diff --git a/acceptance/bundle/help/bundle-sync/output.txt b/acceptance/bundle/help/bundle-sync/output.txt index a357882cbd3..02956ab4b77 100644 --- a/acceptance/bundle/help/bundle-sync/output.txt +++ b/acceptance/bundle/help/bundle-sync/output.txt @@ -15,6 +15,7 @@ Usage: databricks bundle sync [flags] Flags: + --concurrency int number of parallel requests to the workspace (default 20) --dry-run simulate sync execution without making actual changes --full perform full synchronization (default is incremental) -h, --help help for sync diff --git a/acceptance/cmd/sync-without-args/output.txt b/acceptance/cmd/sync-without-args/output.txt index a98e0891ac8..b0d24907f96 100644 --- a/acceptance/cmd/sync-without-args/output.txt +++ b/acceptance/cmd/sync-without-args/output.txt @@ -6,17 +6,17 @@ Usage: databricks sync [flags] SRC DST Flags: - --dry-run simulate sync execution without making actual changes - --exclude strings patterns to exclude from sync (can be specified multiple times) - --exclude-from string file containing patterns to exclude from sync (one pattern per line) - --full perform full synchronization (default is incremental) - -h, --help help for sync - --include strings patterns to include in sync (can be specified multiple times) - --include-from string file containing patterns to include to sync (one pattern per line) - --interval duration file system polling interval (for --watch) (default 1s) - --max-concurrent-requests int maximum number of concurrent requests to the workspace (default 20) - --output type type of output format (default text) - --watch watch local file system for changes + --concurrency int number of parallel requests to the workspace (default 20) + --dry-run simulate sync execution without making actual changes + --exclude strings patterns to exclude from sync (can be specified multiple times) + --exclude-from string file containing patterns to exclude from sync (one pattern per line) + --full perform full synchronization (default is incremental) + -h, --help help for sync + --include strings patterns to include in sync (can be specified multiple times) + --include-from string file containing patterns to include to sync (one pattern per line) + --interval duration file system polling interval (for --watch) (default 1s) + --output type type of output format (default text) + --watch watch local file system for changes Global Flags: --debug enable debug logging diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 462ca4c19a0..f41bba3846f 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -2,6 +2,7 @@ package bundle import ( "context" + "errors" "fmt" "io" "time" @@ -16,12 +17,22 @@ import ( "github.com/spf13/cobra" ) +var errInvalidConcurrency = errors.New("--concurrency must be at least 1") + type syncFlags struct { - interval time.Duration - full bool - watch bool - output flags.Output - dryRun bool + interval time.Duration + full bool + watch bool + output flags.Output + dryRun bool + concurrency int +} + +func (f *syncFlags) validate() error { + if f.concurrency < 1 { + return errInvalidConcurrency + } + return nil } func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOptions, error) { @@ -48,6 +59,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) opts.Full = f.full opts.PollInterval = f.interval opts.DryRun = f.dryRun + opts.Concurrency = f.concurrency return opts, nil } @@ -74,8 +86,13 @@ Use 'databricks bundle deploy' for full resource deployment.`, cmd.Flags().BoolVar(&f.watch, "watch", false, "watch local file system for changes") cmd.Flags().Var(&f.output, "output", "type of the output format") cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes") + cmd.Flags().IntVar(&f.concurrency, "concurrency", sync.MaxRequestsInFlight, "number of parallel requests to the workspace") cmd.RunE = func(cmd *cobra.Command, args []string) error { + if err := f.validate(); err != nil { + return err + } + b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{}) if err != nil { return err diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index 332ac172edb..8955baf89ce 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -24,23 +24,25 @@ import ( "github.com/spf13/cobra" ) +var errInvalidConcurrency = errors.New("--concurrency must be at least 1") + type syncFlags struct { // project files polling interval - interval time.Duration - full bool - watch bool - output flags.Output - exclude []string - include []string - dryRun bool - excludeFrom string - includeFrom string - maxConcurrentRequests int + interval time.Duration + full bool + watch bool + output flags.Output + exclude []string + include []string + dryRun bool + excludeFrom string + includeFrom string + concurrency int } func (f *syncFlags) validate() error { - if f.maxConcurrentRequests < 1 { - return fmt.Errorf("--max-concurrent-requests must be >= 1, got %d", f.maxConcurrentRequests) + if f.concurrency < 1 { + return errInvalidConcurrency } return nil } @@ -97,7 +99,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b * opts.Include = append(opts.Include, f.include...) opts.Include = append(opts.Include, includePatterns...) opts.DryRun = f.dryRun - opts.MaxConcurrentRequests = f.maxConcurrentRequests + opts.Concurrency = f.concurrency return opts, nil } @@ -170,9 +172,9 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn SnapshotBasePath: filepath.Join(args[0], ".databricks"), WorkspaceClient: client, - OutputHandler: outputHandler, - DryRun: f.dryRun, - MaxConcurrentRequests: f.maxConcurrentRequests, + OutputHandler: outputHandler, + DryRun: f.dryRun, + Concurrency: f.concurrency, } return &opts, nil } @@ -197,7 +199,7 @@ func New() *cobra.Command { cmd.Flags().StringVar(&f.excludeFrom, "exclude-from", "", "file containing patterns to exclude from sync (one pattern per line)") cmd.Flags().StringVar(&f.includeFrom, "include-from", "", "file containing patterns to include to sync (one pattern per line)") cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes") - cmd.Flags().IntVar(&f.maxConcurrentRequests, "max-concurrent-requests", sync.MaxRequestsInFlight, "maximum number of concurrent requests to the workspace") + cmd.Flags().IntVar(&f.concurrency, "concurrency", sync.MaxRequestsInFlight, "number of parallel requests to the workspace") // Wrapper for [root.MustWorkspaceClient] that disables loading authentication configuration from a bundle. mustWorkspaceClient := func(cmd *cobra.Command, args []string) error { diff --git a/cmd/sync/sync_test.go b/cmd/sync/sync_test.go index 4d573dd224f..071efcaefc8 100644 --- a/cmd/sync/sync_test.go +++ b/cmd/sync/sync_test.go @@ -32,13 +32,13 @@ func TestSyncOptionsFromBundle(t *testing.T) { }, } - f := syncFlags{maxConcurrentRequests: 5} + f := syncFlags{concurrency: 5} opts, err := f.syncOptionsFromBundle(New(), []string{}, b) require.NoError(t, err) assert.Equal(t, tempDir, opts.LocalRoot.Native()) assert.Equal(t, "/Users/jane@doe.com/path", opts.RemotePath) assert.Equal(t, filepath.Join(tempDir, ".databricks", "bundle", "default"), opts.SnapshotBasePath) - assert.Equal(t, 5, opts.MaxConcurrentRequests) + assert.Equal(t, 5, opts.Concurrency) assert.NotNil(t, opts.WorkspaceClient) } @@ -57,25 +57,25 @@ func TestSyncOptionsFromArgs(t *testing.T) { local := t.TempDir() remote := "/remote" - f := syncFlags{maxConcurrentRequests: 7} + f := syncFlags{concurrency: 7} cmd := New() cmd.SetContext(cmdctx.SetWorkspaceClient(t.Context(), nil)) opts, err := f.syncOptionsFromArgs(cmd, []string{local, remote}) require.NoError(t, err) assert.Equal(t, local, opts.LocalRoot.Native()) assert.Equal(t, remote, opts.RemotePath) - assert.Equal(t, 7, opts.MaxConcurrentRequests) + assert.Equal(t, 7, opts.Concurrency) } func TestSyncFlagsValidate(t *testing.T) { - f := syncFlags{maxConcurrentRequests: 1} + f := syncFlags{concurrency: 1} require.NoError(t, f.validate()) - f.maxConcurrentRequests = 0 - require.ErrorContains(t, f.validate(), "--max-concurrent-requests must be >= 1") + f.concurrency = 0 + require.ErrorIs(t, f.validate(), errInvalidConcurrency) - f.maxConcurrentRequests = -3 - require.ErrorContains(t, f.validate(), "--max-concurrent-requests must be >= 1") + f.concurrency = -3 + require.ErrorIs(t, f.validate(), errInvalidConcurrency) } func TestExcludeFromFlag(t *testing.T) { diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 246c78a2ed1..0727a27b871 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -44,7 +44,7 @@ type SyncOptions struct { DryRun bool - MaxConcurrentRequests int + Concurrency int } type Sync struct { @@ -98,8 +98,8 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, errors.New("failed to resolve host for snapshot") } - if opts.MaxConcurrentRequests <= 0 { - opts.MaxConcurrentRequests = MaxRequestsInFlight + if opts.Concurrency <= 0 { + opts.Concurrency = MaxRequestsInFlight } // For full sync, we start with an empty snapshot. diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index acbe364d472..d8088495fb9 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -110,7 +110,7 @@ func groupRunParallel(ctx context.Context, paths []string, limit int, fn func(co func (s *Sync) applyDiff(ctx context.Context, d diff) error { var err error - limit := s.MaxConcurrentRequests + limit := s.Concurrency // Delete files in parallel. err = groupRunParallel(ctx, d.delete, limit, s.applyDelete) From fbe56f436e559bee1233b39949ea040d6fa9e4b9 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 7 May 2026 11:16:12 +0200 Subject: [PATCH 3/4] Add changelog entry for --concurrency flag Co-authored-by: Isaac --- NEXT_CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 99a11d8ae84..46b9b6f60fe 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -6,6 +6,7 @@ * `databricks api` now works against unified hosts. Adds `--account` to scope a call to the account API and `--workspace-id` to override the workspace routing identifier per call. A `?o=` query parameter on the path (the SPOG URL convention used by the Databricks UI) is also recognized as a per-call workspace override, so URLs pasted from the browser route correctly. * JSON output for single objects now uses standard `"key": "value"` spacing (matching list output and `encoding/json` defaults). +* Add `--concurrency` flag to `databricks sync` and `databricks bundle sync` to control the number of parallel requests to the workspace (default 20). Useful when uploading many medium-sized files where the previous fixed concurrency could trigger stream timeouts ([#5197](https://github.com/databricks/cli/pull/5197)). ### Bundles * Validate that resource keys do not contain variable references ([#5169](https://github.com/databricks/cli/pull/5169)) From 3b7c3202807016c257669e6e990a6b4dd7dabe59 Mon Sep 17 00:00:00 2001 From: Igor Rekun Date: Thu, 7 May 2026 13:20:35 +0100 Subject: [PATCH 4/4] fix arg validation order --- cmd/sync/sync.go | 18 ++++++------------ cmd/sync/sync_test.go | 11 ----------- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index 8955baf89ce..7d2eec58a9a 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -40,13 +40,6 @@ type syncFlags struct { concurrency int } -func (f *syncFlags) validate() error { - if f.concurrency < 1 { - return errInvalidConcurrency - } - return nil -} - func readPatternsFile(filePath string) ([]string, error) { if filePath == "" { return nil, nil @@ -207,12 +200,13 @@ func New() *cobra.Command { return root.MustWorkspaceClient(cmd, args) } - cmd.PreRunE = mustWorkspaceClient - cmd.RunE = func(cmd *cobra.Command, args []string) error { - if err := f.validate(); err != nil { - return err + cmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if f.concurrency < 1 { + return errInvalidConcurrency } - + return mustWorkspaceClient(cmd, args) + } + cmd.RunE = func(cmd *cobra.Command, args []string) error { var opts *sync.SyncOptions var err error diff --git a/cmd/sync/sync_test.go b/cmd/sync/sync_test.go index 071efcaefc8..81834247bd4 100644 --- a/cmd/sync/sync_test.go +++ b/cmd/sync/sync_test.go @@ -67,17 +67,6 @@ func TestSyncOptionsFromArgs(t *testing.T) { assert.Equal(t, 7, opts.Concurrency) } -func TestSyncFlagsValidate(t *testing.T) { - f := syncFlags{concurrency: 1} - require.NoError(t, f.validate()) - - f.concurrency = 0 - require.ErrorIs(t, f.validate(), errInvalidConcurrency) - - f.concurrency = -3 - require.ErrorIs(t, f.validate(), errInvalidConcurrency) -} - func TestExcludeFromFlag(t *testing.T) { // Create a temporary directory tempDir := t.TempDir()