Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* `databricks api` now works against unified hosts. Adds `--account` to scope a call to the account API and `--workspace-id` to override the workspace routing identifier per call. A `?o=<workspace-id>` query parameter on the path (the SPOG URL convention used by the Databricks UI) is also recognized as a per-call workspace override, so URLs pasted from the browser route correctly.
* JSON output for single objects now uses standard `"key": "value"` spacing (matching list output and `encoding/json` defaults).
* Add `--concurrency` flag to `databricks sync` and `databricks bundle sync` to control the number of parallel requests to the workspace (default 20). Useful when uploading many medium-sized files where the previous fixed concurrency could trigger stream timeouts ([#5197](https://github.com/databricks/cli/pull/5197)).

### Bundles
* Validate that resource keys do not contain variable references ([#5169](https://github.com/databricks/cli/pull/5169))
Expand Down
1 change: 1 addition & 0 deletions acceptance/bundle/help/bundle-sync/output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Usage:
databricks bundle sync [flags]

Flags:
--concurrency int number of parallel requests to the workspace (default 20)
--dry-run simulate sync execution without making actual changes
--full perform full synchronization (default is incremental)
-h, --help help for sync
Expand Down
1 change: 1 addition & 0 deletions acceptance/cmd/sync-without-args/output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Usage:
databricks sync [flags] SRC DST

Flags:
--concurrency int number of parallel requests to the workspace (default 20)
--dry-run simulate sync execution without making actual changes
--exclude strings patterns to exclude from sync (can be specified multiple times)
--exclude-from string file containing patterns to exclude from sync (one pattern per line)
Expand Down
27 changes: 22 additions & 5 deletions cmd/bundle/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bundle

import (
"context"
"errors"
"fmt"
"io"
"time"
Expand All @@ -16,12 +17,22 @@ import (
"github.com/spf13/cobra"
)

var errInvalidConcurrency = errors.New("--concurrency must be at least 1")

type syncFlags struct {
interval time.Duration
full bool
watch bool
output flags.Output
dryRun bool
interval time.Duration
full bool
watch bool
output flags.Output
dryRun bool
concurrency int
}

func (f *syncFlags) validate() error {
if f.concurrency < 1 {
return errInvalidConcurrency
}
return nil
}

func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOptions, error) {
Expand All @@ -48,6 +59,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle)
opts.Full = f.full
opts.PollInterval = f.interval
opts.DryRun = f.dryRun
opts.Concurrency = f.concurrency
return opts, nil
}

Expand All @@ -74,8 +86,13 @@ Use 'databricks bundle deploy' for full resource deployment.`,
cmd.Flags().BoolVar(&f.watch, "watch", false, "watch local file system for changes")
cmd.Flags().Var(&f.output, "output", "type of the output format")
cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes")
cmd.Flags().IntVar(&f.concurrency, "concurrency", sync.MaxRequestsInFlight, "number of parallel requests to the workspace")

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if err := f.validate(); err != nil {
return err
}

b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{})
if err != nil {
return err
Expand Down
13 changes: 12 additions & 1 deletion cmd/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/spf13/cobra"
)

var errInvalidConcurrency = errors.New("--concurrency must be at least 1")

type syncFlags struct {
// project files polling interval
interval time.Duration
Expand All @@ -35,6 +37,7 @@ type syncFlags struct {
dryRun bool
excludeFrom string
includeFrom string
concurrency int
}

func readPatternsFile(filePath string) ([]string, error) {
Expand Down Expand Up @@ -89,6 +92,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b *
opts.Include = append(opts.Include, f.include...)
opts.Include = append(opts.Include, includePatterns...)
opts.DryRun = f.dryRun
opts.Concurrency = f.concurrency
return opts, nil
}

Expand Down Expand Up @@ -163,6 +167,7 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn

OutputHandler: outputHandler,
DryRun: f.dryRun,
Concurrency: f.concurrency,
}
return &opts, nil
}
Expand All @@ -187,14 +192,20 @@ func New() *cobra.Command {
cmd.Flags().StringVar(&f.excludeFrom, "exclude-from", "", "file containing patterns to exclude from sync (one pattern per line)")
cmd.Flags().StringVar(&f.includeFrom, "include-from", "", "file containing patterns to include to sync (one pattern per line)")
cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes")
cmd.Flags().IntVar(&f.concurrency, "concurrency", sync.MaxRequestsInFlight, "number of parallel requests to the workspace")

// Wrapper for [root.MustWorkspaceClient] that disables loading authentication configuration from a bundle.
mustWorkspaceClient := func(cmd *cobra.Command, args []string) error {
cmd.SetContext(root.SkipLoadBundle(cmd.Context()))
return root.MustWorkspaceClient(cmd, args)
}

cmd.PreRunE = mustWorkspaceClient
cmd.PreRunE = func(cmd *cobra.Command, args []string) error {
if f.concurrency < 1 {
return errInvalidConcurrency
}
return mustWorkspaceClient(cmd, args)
}
cmd.RunE = func(cmd *cobra.Command, args []string) error {
var opts *sync.SyncOptions
var err error
Expand Down
6 changes: 4 additions & 2 deletions cmd/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ func TestSyncOptionsFromBundle(t *testing.T) {
},
}

f := syncFlags{}
f := syncFlags{concurrency: 5}
opts, err := f.syncOptionsFromBundle(New(), []string{}, b)
require.NoError(t, err)
assert.Equal(t, tempDir, opts.LocalRoot.Native())
assert.Equal(t, "/Users/jane@doe.com/path", opts.RemotePath)
assert.Equal(t, filepath.Join(tempDir, ".databricks", "bundle", "default"), opts.SnapshotBasePath)
assert.Equal(t, 5, opts.Concurrency)
assert.NotNil(t, opts.WorkspaceClient)
}

Expand All @@ -56,13 +57,14 @@ func TestSyncOptionsFromArgs(t *testing.T) {
local := t.TempDir()
remote := "/remote"

f := syncFlags{}
f := syncFlags{concurrency: 7}
cmd := New()
cmd.SetContext(cmdctx.SetWorkspaceClient(t.Context(), nil))
opts, err := f.syncOptionsFromArgs(cmd, []string{local, remote})
require.NoError(t, err)
assert.Equal(t, local, opts.LocalRoot.Native())
assert.Equal(t, remote, opts.RemotePath)
assert.Equal(t, 7, opts.Concurrency)
}

func TestExcludeFromFlag(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions libs/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type SyncOptions struct {
OutputHandler OutputHandler

DryRun bool

Concurrency int
}

type Sync struct {
Expand Down Expand Up @@ -96,6 +98,10 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
return nil, errors.New("failed to resolve host for snapshot")
}

if opts.Concurrency <= 0 {
opts.Concurrency = MaxRequestsInFlight
}

// For full sync, we start with an empty snapshot.
// For incremental sync, we try to load an existing snapshot to start from.
var snapshot *Snapshot
Expand Down
13 changes: 7 additions & 6 deletions libs/sync/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context.
})
}

func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error {
func groupRunParallel(ctx context.Context, paths []string, limit int, fn func(context.Context, string) error) error {
group, ctx := errgroup.WithContext(ctx)
group.SetLimit(MaxRequestsInFlight)
group.SetLimit(limit)

for _, path := range paths {
groupRunSingle(ctx, group, fn, path)
Expand All @@ -110,31 +110,32 @@ func groupRunParallel(ctx context.Context, paths []string, fn func(context.Conte

func (s *Sync) applyDiff(ctx context.Context, d diff) error {
var err error
limit := s.Concurrency

// Delete files in parallel.
err = groupRunParallel(ctx, d.delete, s.applyDelete)
err = groupRunParallel(ctx, d.delete, limit, s.applyDelete)
if err != nil {
return err
}

// Delete directories ordered by depth from leaf to root.
for _, group := range d.groupedRmdir() {
err = groupRunParallel(ctx, group, s.applyRmdir)
err = groupRunParallel(ctx, group, limit, s.applyRmdir)
if err != nil {
return err
}
}

// Create directories (leafs only because intermediates are created automatically).
for _, group := range d.groupedMkdir() {
err = groupRunParallel(ctx, group, s.applyMkdir)
err = groupRunParallel(ctx, group, limit, s.applyMkdir)
if err != nil {
return err
}
}

// Put files in parallel.
err = groupRunParallel(ctx, d.put, s.applyPut)
err = groupRunParallel(ctx, d.put, limit, s.applyPut)

return err
}