diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 99a11d8ae84..46b9b6f60fe 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -6,6 +6,7 @@ * `databricks api` now works against unified hosts. Adds `--account` to scope a call to the account API and `--workspace-id` to override the workspace routing identifier per call. A `?o=` query parameter on the path (the SPOG URL convention used by the Databricks UI) is also recognized as a per-call workspace override, so URLs pasted from the browser route correctly. * JSON output for single objects now uses standard `"key": "value"` spacing (matching list output and `encoding/json` defaults). +* Add `--concurrency` flag to `databricks sync` and `databricks bundle sync` to control the number of parallel requests to the workspace (default 20). Useful when uploading many medium-sized files where the previous fixed concurrency could trigger stream timeouts ([#5197](https://github.com/databricks/cli/pull/5197)). ### Bundles * Validate that resource keys do not contain variable references ([#5169](https://github.com/databricks/cli/pull/5169)) diff --git a/acceptance/bundle/help/bundle-sync/output.txt b/acceptance/bundle/help/bundle-sync/output.txt index a357882cbd3..02956ab4b77 100644 --- a/acceptance/bundle/help/bundle-sync/output.txt +++ b/acceptance/bundle/help/bundle-sync/output.txt @@ -15,6 +15,7 @@ Usage: databricks bundle sync [flags] Flags: + --concurrency int number of parallel requests to the workspace (default 20) --dry-run simulate sync execution without making actual changes --full perform full synchronization (default is incremental) -h, --help help for sync diff --git a/acceptance/cmd/sync-without-args/output.txt b/acceptance/cmd/sync-without-args/output.txt index 5d63d426c67..b0d24907f96 100644 --- a/acceptance/cmd/sync-without-args/output.txt +++ b/acceptance/cmd/sync-without-args/output.txt @@ -6,6 +6,7 @@ Usage: databricks sync [flags] SRC DST Flags: + --concurrency int number of parallel requests to the workspace (default 20) --dry-run simulate sync execution without making actual changes --exclude strings patterns to exclude from sync (can be specified multiple times) --exclude-from string file containing patterns to exclude from sync (one pattern per line) diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 462ca4c19a0..f41bba3846f 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -2,6 +2,7 @@ package bundle import ( "context" + "errors" "fmt" "io" "time" @@ -16,12 +17,22 @@ import ( "github.com/spf13/cobra" ) +var errInvalidConcurrency = errors.New("--concurrency must be at least 1") + type syncFlags struct { - interval time.Duration - full bool - watch bool - output flags.Output - dryRun bool + interval time.Duration + full bool + watch bool + output flags.Output + dryRun bool + concurrency int +} + +func (f *syncFlags) validate() error { + if f.concurrency < 1 { + return errInvalidConcurrency + } + return nil } func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOptions, error) { @@ -48,6 +59,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) opts.Full = f.full opts.PollInterval = f.interval opts.DryRun = f.dryRun + opts.Concurrency = f.concurrency return opts, nil } @@ -74,8 +86,13 @@ Use 'databricks bundle deploy' for full resource deployment.`, cmd.Flags().BoolVar(&f.watch, "watch", false, "watch local file system for changes") cmd.Flags().Var(&f.output, "output", "type of the output format") cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes") + cmd.Flags().IntVar(&f.concurrency, "concurrency", sync.MaxRequestsInFlight, "number of parallel requests to the workspace") cmd.RunE = func(cmd *cobra.Command, args []string) error { + if err := f.validate(); err != nil { + return err + } + b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{}) if err != nil { return err diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index 5c110cecb2b..7d2eec58a9a 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -24,6 +24,8 @@ import ( "github.com/spf13/cobra" ) +var errInvalidConcurrency = errors.New("--concurrency must be at least 1") + type syncFlags struct { // project files polling interval interval time.Duration @@ -35,6 +37,7 @@ type syncFlags struct { dryRun bool excludeFrom string includeFrom string + concurrency int } func readPatternsFile(filePath string) ([]string, error) { @@ -89,6 +92,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b * opts.Include = append(opts.Include, f.include...) opts.Include = append(opts.Include, includePatterns...) opts.DryRun = f.dryRun + opts.Concurrency = f.concurrency return opts, nil } @@ -163,6 +167,7 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn OutputHandler: outputHandler, DryRun: f.dryRun, + Concurrency: f.concurrency, } return &opts, nil } @@ -187,6 +192,7 @@ func New() *cobra.Command { cmd.Flags().StringVar(&f.excludeFrom, "exclude-from", "", "file containing patterns to exclude from sync (one pattern per line)") cmd.Flags().StringVar(&f.includeFrom, "include-from", "", "file containing patterns to include to sync (one pattern per line)") cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes") + cmd.Flags().IntVar(&f.concurrency, "concurrency", sync.MaxRequestsInFlight, "number of parallel requests to the workspace") // Wrapper for [root.MustWorkspaceClient] that disables loading authentication configuration from a bundle. mustWorkspaceClient := func(cmd *cobra.Command, args []string) error { @@ -194,7 +200,12 @@ func New() *cobra.Command { return root.MustWorkspaceClient(cmd, args) } - cmd.PreRunE = mustWorkspaceClient + cmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if f.concurrency < 1 { + return errInvalidConcurrency + } + return mustWorkspaceClient(cmd, args) + } cmd.RunE = func(cmd *cobra.Command, args []string) error { var opts *sync.SyncOptions var err error diff --git a/cmd/sync/sync_test.go b/cmd/sync/sync_test.go index 818c7e013aa..81834247bd4 100644 --- a/cmd/sync/sync_test.go +++ b/cmd/sync/sync_test.go @@ -32,12 +32,13 @@ func TestSyncOptionsFromBundle(t *testing.T) { }, } - f := syncFlags{} + f := syncFlags{concurrency: 5} opts, err := f.syncOptionsFromBundle(New(), []string{}, b) require.NoError(t, err) assert.Equal(t, tempDir, opts.LocalRoot.Native()) assert.Equal(t, "/Users/jane@doe.com/path", opts.RemotePath) assert.Equal(t, filepath.Join(tempDir, ".databricks", "bundle", "default"), opts.SnapshotBasePath) + assert.Equal(t, 5, opts.Concurrency) assert.NotNil(t, opts.WorkspaceClient) } @@ -56,13 +57,14 @@ func TestSyncOptionsFromArgs(t *testing.T) { local := t.TempDir() remote := "/remote" - f := syncFlags{} + f := syncFlags{concurrency: 7} cmd := New() cmd.SetContext(cmdctx.SetWorkspaceClient(t.Context(), nil)) opts, err := f.syncOptionsFromArgs(cmd, []string{local, remote}) require.NoError(t, err) assert.Equal(t, local, opts.LocalRoot.Native()) assert.Equal(t, remote, opts.RemotePath) + assert.Equal(t, 7, opts.Concurrency) } func TestExcludeFromFlag(t *testing.T) { diff --git a/libs/sync/sync.go b/libs/sync/sync.go index c65b49eb775..0727a27b871 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -43,6 +43,8 @@ type SyncOptions struct { OutputHandler OutputHandler DryRun bool + + Concurrency int } type Sync struct { @@ -96,6 +98,10 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, errors.New("failed to resolve host for snapshot") } + if opts.Concurrency <= 0 { + opts.Concurrency = MaxRequestsInFlight + } + // For full sync, we start with an empty snapshot. // For incremental sync, we try to load an existing snapshot to start from. var snapshot *Snapshot diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 4a47acfb836..d8088495fb9 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -96,9 +96,9 @@ func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context. }) } -func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error { +func groupRunParallel(ctx context.Context, paths []string, limit int, fn func(context.Context, string) error) error { group, ctx := errgroup.WithContext(ctx) - group.SetLimit(MaxRequestsInFlight) + group.SetLimit(limit) for _, path := range paths { groupRunSingle(ctx, group, fn, path) @@ -110,16 +110,17 @@ func groupRunParallel(ctx context.Context, paths []string, fn func(context.Conte func (s *Sync) applyDiff(ctx context.Context, d diff) error { var err error + limit := s.Concurrency // Delete files in parallel. - err = groupRunParallel(ctx, d.delete, s.applyDelete) + err = groupRunParallel(ctx, d.delete, limit, s.applyDelete) if err != nil { return err } // Delete directories ordered by depth from leaf to root. for _, group := range d.groupedRmdir() { - err = groupRunParallel(ctx, group, s.applyRmdir) + err = groupRunParallel(ctx, group, limit, s.applyRmdir) if err != nil { return err } @@ -127,14 +128,14 @@ func (s *Sync) applyDiff(ctx context.Context, d diff) error { // Create directories (leafs only because intermediates are created automatically). for _, group := range d.groupedMkdir() { - err = groupRunParallel(ctx, group, s.applyMkdir) + err = groupRunParallel(ctx, group, limit, s.applyMkdir) if err != nil { return err } } // Put files in parallel. - err = groupRunParallel(ctx, d.put, s.applyPut) + err = groupRunParallel(ctx, d.put, limit, s.applyPut) return err }