From 6b53e857d8da762f6d0adf71c4f103365679426a Mon Sep 17 00:00:00 2001 From: Ryan Swanson Date: Fri, 1 May 2026 12:43:52 -0600 Subject: [PATCH 1/2] Parallelize import download/read/unmarshal work while preserving ordered merge semantics Replace global dependency download locking with per-cache locking and in-flight dedupe Add regression coverage for concurrent imports, merge order, disabled imports, and shared git subpaths Make URL dependency downloads context-aware and document download synchronization behavior Signed-off-by: Ryan Swanson --- e2e/tests/imports/imports.go | 24 ++ .../testdata/git-subpaths/devspace.yaml | 13 + pkg/devspace/config/loader/imports.go | 109 ++++-- pkg/devspace/config/loader/imports_test.go | 215 ++++++++++++ pkg/devspace/dependency/util/util.go | 314 ++++++++++++------ pkg/devspace/dependency/util/util_test.go | 26 ++ 6 files changed, 579 insertions(+), 122 deletions(-) create mode 100644 e2e/tests/imports/testdata/git-subpaths/devspace.yaml create mode 100644 pkg/devspace/config/loader/imports_test.go diff --git a/e2e/tests/imports/imports.go b/e2e/tests/imports/imports.go index 2eaccadda7..0153a1418a 100644 --- a/e2e/tests/imports/imports.go +++ b/e2e/tests/imports/imports.go @@ -139,6 +139,30 @@ var _ = DevSpaceDescribe("imports", func() { framework.ExpectError(err) }) + ginkgo.It("should import multiple git subpaths from the same repository", func() { + tempDir, err := framework.CopyToTempDir("tests/imports/testdata/git-subpaths") + framework.ExpectNoError(err) + defer framework.CleanupTempDir(initialDir, tempDir) + + configBuffer := &bytes.Buffer{} + printCmd := &cmd.PrintCmd{ + GlobalFlags: &flags.GlobalFlags{}, + Out: configBuffer, + SkipInfo: true, + } + + err = printCmd.Run(f) + framework.ExpectNoError(err) + + latestConfig := &latest.Config{} + err = yaml.Unmarshal(configBuffer.Bytes(), latestConfig) + framework.ExpectNoError(err) + + framework.ExpectEqual(latestConfig.Pipelines["pipeline-a"].Run, "echo \"pipeline-a\"") + framework.ExpectEqual(latestConfig.Pipelines["pipeline-b"].Run, "echo \"pipeline-b\"") + framework.ExpectEqual(latestConfig.Pipelines["pipeline-c"].Run, "echo \"pipeline-c\"") + }) + ginkgo.It("should import correctly with localRegistry", func() { tempDir, err := framework.CopyToTempDir("tests/imports/testdata/localregistry") framework.ExpectNoError(err) diff --git a/e2e/tests/imports/testdata/git-subpaths/devspace.yaml b/e2e/tests/imports/testdata/git-subpaths/devspace.yaml new file mode 100644 index 0000000000..e1c91fc775 --- /dev/null +++ b/e2e/tests/imports/testdata/git-subpaths/devspace.yaml @@ -0,0 +1,13 @@ +version: v2beta1 +name: git-subpath-imports + +imports: + - git: https://github.com/loft-sh/e2e-test-dependency.git + branch: main + subPath: subA + - git: https://github.com/loft-sh/e2e-test-dependency.git + branch: main + subPath: subB + - git: https://github.com/loft-sh/e2e-test-dependency.git + branch: main + subPath: subC diff --git a/pkg/devspace/config/loader/imports.go b/pkg/devspace/config/loader/imports.go index decc43f183..451103bf50 100644 --- a/pkg/devspace/config/loader/imports.go +++ b/pkg/devspace/config/loader/imports.go @@ -8,11 +8,13 @@ import ( "github.com/loft-sh/devspace/pkg/devspace/config/loader/variable" "github.com/loft-sh/devspace/pkg/devspace/config/versions" + "github.com/loft-sh/devspace/pkg/devspace/config/versions/latest" "github.com/loft-sh/devspace/pkg/devspace/config/versions/util" dependencyutil "github.com/loft-sh/devspace/pkg/devspace/dependency/util" "github.com/loft-sh/devspace/pkg/util/log" "github.com/loft-sh/devspace/pkg/util/yamlutil" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) var ImportSections = []string{ @@ -31,6 +33,14 @@ var ImportSections = []string{ "localRegistry", } +const maxConcurrentImportDownloads = 8 + +type resolvedImport struct { + ConfigPath string + Data map[string]interface{} + Disabled bool +} + func ResolveImports(ctx context.Context, resolver variable.Resolver, basePath string, rawData map[string]interface{}, log log.Logger) (map[string]interface{}, error) { // initially reload variables err := reloadVariables(resolver, rawData, log) @@ -59,6 +69,11 @@ func ResolveImports(ctx context.Context, resolver variable.Resolver, basePath st return nil, err } + resolvedImports, err := loadImports(ctx, basePath, imports.Imports, version, log) + if err != nil { + return nil, err + } + mergedMap := map[string]interface{}{} err = util.Convert(rawData, mergedMap) if err != nil { @@ -66,33 +81,14 @@ func ResolveImports(ctx context.Context, resolver variable.Resolver, basePath st } // load imports - for _, i := range imports.Imports { - if i.Enabled != nil && !*i.Enabled { + for _, resolvedImport := range resolvedImports { + if resolvedImport.Disabled { continue + } else if resolvedImport.Data == nil { + return nil, errors.Errorf("resolved import is missing data") } - configPath, err := dependencyutil.DownloadDependency(ctx, basePath, &i.SourceConfig, log) - if err != nil { - return nil, errors.Wrap(err, "resolve import") - } - - fileContent, err := os.ReadFile(configPath) - if err != nil { - return nil, errors.Wrap(err, "read import config") - } - - importData := map[string]interface{}{} - err = yamlutil.Unmarshal(fileContent, &importData) - if err != nil { - return nil, err - } - - configVersion, ok := importData["version"].(string) - if !ok { - return nil, fmt.Errorf("version is missing in import config %s", configPath) - } else if version != configVersion { - return nil, fmt.Errorf("import mismatch %s != %s. Import %s has different version than currently used devspace.yaml, please make sure the versions match between an import and the devspace.yaml using it", version, configVersion, configPath) - } + importData := resolvedImport.Data // merge sections for _, section := range ImportSections { @@ -130,16 +126,71 @@ func ResolveImports(ctx context.Context, resolver variable.Resolver, basePath st // resolve the import imports if importData["imports"] != nil { mergedMap["imports"] = importData["imports"] + + // resolve imports + mergedMap, err = ResolveImports(ctx, resolver, filepath.Dir(resolvedImport.ConfigPath), mergedMap, log) + if err != nil { + return nil, err + } } else { delete(mergedMap, "imports") - } - // resolve imports - mergedMap, err = ResolveImports(ctx, resolver, filepath.Dir(configPath), mergedMap, log) - if err != nil { - return nil, err + err = reloadVariables(resolver, mergedMap, log) + if err != nil { + return nil, err + } } } return mergedMap, nil } + +func loadImports(ctx context.Context, basePath string, imports []latest.Import, version string, log log.Logger) ([]resolvedImport, error) { + resolvedImports := make([]resolvedImport, len(imports)) + + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(maxConcurrentImportDownloads) + for index := range imports { + importConfig := imports[index] + if importConfig.Enabled != nil && !*importConfig.Enabled { + resolvedImports[index] = resolvedImport{Disabled: true} + continue + } + + eg.Go(func() error { + configPath, err := dependencyutil.DownloadDependency(ctx, basePath, &importConfig.SourceConfig, log) + if err != nil { + return errors.Wrap(err, "resolve import") + } + + fileContent, err := os.ReadFile(configPath) + if err != nil { + return errors.Wrap(err, "read import config") + } + + importData := map[string]interface{}{} + err = yamlutil.Unmarshal(fileContent, &importData) + if err != nil { + return err + } + + configVersion, ok := importData["version"].(string) + if !ok { + return fmt.Errorf("version is missing in import config %s", configPath) + } else if version != configVersion { + return fmt.Errorf("import mismatch %s != %s. Import %s has different version than currently used devspace.yaml, please make sure the versions match between an import and the devspace.yaml using it", version, configVersion, configPath) + } + + resolvedImports[index] = resolvedImport{ + ConfigPath: configPath, + Data: importData, + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + + return resolvedImports, nil +} diff --git a/pkg/devspace/config/loader/imports_test.go b/pkg/devspace/config/loader/imports_test.go new file mode 100644 index 0000000000..a230280a00 --- /dev/null +++ b/pkg/devspace/config/loader/imports_test.go @@ -0,0 +1,215 @@ +package loader + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/loft-sh/devspace/pkg/devspace/config/loader/variable" + "github.com/loft-sh/devspace/pkg/devspace/config/localcache" + "github.com/loft-sh/devspace/pkg/devspace/config/versions/latest" + dependencyutil "github.com/loft-sh/devspace/pkg/devspace/dependency/util" + "github.com/loft-sh/devspace/pkg/util/log" + "gotest.tools/assert" +) + +func TestResolveImportsDownloadsSiblingImportsConcurrently(t *testing.T) { + oldDependencyFolderPath := dependencyutil.DependencyFolderPath + dependencyutil.DependencyFolderPath = filepath.Join(t.TempDir(), "dependencies") + defer func() { + dependencyutil.DependencyFolderPath = oldDependencyFolderPath + }() + + var activeRequests int32 + var maxActiveRequests int32 + release := make(chan struct{}) + var releaseOnce sync.Once + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + current := atomic.AddInt32(&activeRequests, 1) + defer atomic.AddInt32(&activeRequests, -1) + + for { + maxActive := atomic.LoadInt32(&maxActiveRequests) + if current <= maxActive || atomic.CompareAndSwapInt32(&maxActiveRequests, maxActive, current) { + break + } + } + + if current == 3 { + releaseOnce.Do(func() { + close(release) + }) + } + + select { + case <-release: + case <-time.After(2 * time.Second): + } + + name := strings.TrimPrefix(r.URL.Path, "/") + _, _ = fmt.Fprintf(w, "version: %s\npipelines:\n pipeline-%s:\n run: echo %s\n", latest.Version, name, name) + })) + defer server.Close() + + resolver := newImportTestResolver(t) + rawConfig := map[string]interface{}{ + "version": latest.Version, + "name": "test", + "imports": []interface{}{ + map[string]interface{}{"path": server.URL + "/a"}, + map[string]interface{}{"path": server.URL + "/b"}, + map[string]interface{}{"path": server.URL + "/c"}, + }, + } + + resolved, err := ResolveImports(context.Background(), resolver, t.TempDir(), rawConfig, log.Discard) + assert.NilError(t, err) + maxActive := atomic.LoadInt32(&maxActiveRequests) + assert.Assert(t, maxActive == 3, "expected 3 concurrent import downloads, got %d", maxActive) + + pipelines := resolved["pipelines"].(map[string]interface{}) + for _, name := range []string{"a", "b", "c"} { + assert.Assert(t, pipelines["pipeline-"+name] != nil, "expected pipeline-%s to be imported", name) + } +} + +func TestResolveImportsKeepsOrderedMergeSemantics(t *testing.T) { + tempDir := t.TempDir() + writeImportFile(t, tempDir, "import-a.yaml", `version: v2beta1 +pipelines: + shared: + run: echo import-a + import-a: + run: echo import-a +hooks: + - command: echo import-a +`) + writeImportFile(t, tempDir, "import-b.yaml", `version: v2beta1 +pipelines: + shared: + run: echo import-b + import-b: + run: echo import-b +hooks: + - command: echo import-b +`) + + resolver := newImportTestResolver(t) + rawConfig := map[string]interface{}{ + "version": latest.Version, + "name": "test", + "imports": []interface{}{ + map[string]interface{}{"path": "import-a.yaml"}, + map[string]interface{}{"path": "import-b.yaml"}, + }, + "pipelines": map[string]interface{}{ + "root": map[string]interface{}{ + "run": "echo root", + }, + }, + "hooks": []interface{}{ + map[string]interface{}{"command": "echo root"}, + }, + } + + resolved, err := ResolveImports(context.Background(), resolver, tempDir, rawConfig, log.Discard) + assert.NilError(t, err) + + pipelines := resolved["pipelines"].(map[string]interface{}) + assert.Equal(t, pipelines["root"].(map[string]interface{})["run"], "echo root") + assert.Equal(t, pipelines["shared"].(map[string]interface{})["run"], "echo import-a") + assert.Equal(t, pipelines["import-a"].(map[string]interface{})["run"], "echo import-a") + assert.Equal(t, pipelines["import-b"].(map[string]interface{})["run"], "echo import-b") + + hooks := resolved["hooks"].([]interface{}) + assert.Equal(t, len(hooks), 3) + assert.Equal(t, hooks[0].(map[string]interface{})["command"], "echo root") + assert.Equal(t, hooks[1].(map[string]interface{})["command"], "echo import-a") + assert.Equal(t, hooks[2].(map[string]interface{})["command"], "echo import-b") + + _, ok := resolved["imports"] + assert.Assert(t, !ok, "expected imports to be removed after resolution") +} + +func TestResolveImportsMergesAndReloadsImportedVariables(t *testing.T) { + tempDir := t.TempDir() + writeImportFile(t, tempDir, "import-a.yaml", `version: v2beta1 +vars: + IMPORT_A: import-a +`) + writeImportFile(t, tempDir, "import-b.yaml", `version: v2beta1 +vars: + IMPORT_B: import-b +`) + + resolver := newImportTestResolver(t) + rawConfig := map[string]interface{}{ + "version": latest.Version, + "name": "test", + "imports": []interface{}{ + map[string]interface{}{"path": "import-a.yaml"}, + map[string]interface{}{"path": "import-b.yaml"}, + }, + } + + resolved, err := ResolveImports(context.Background(), resolver, tempDir, rawConfig, log.Discard) + assert.NilError(t, err) + + vars := resolved["vars"].(map[string]interface{}) + assert.Assert(t, vars["IMPORT_A"] != nil, "expected IMPORT_A to be present in merged vars") + assert.Assert(t, vars["IMPORT_B"] != nil, "expected IMPORT_B to be present in merged vars") + assert.Assert(t, resolver.DefinedVars()["IMPORT_A"] != nil, "expected resolver to know IMPORT_A") + assert.Assert(t, resolver.DefinedVars()["IMPORT_B"] != nil, "expected resolver to know IMPORT_B") +} + +func TestResolveImportsSkipsDisabledImports(t *testing.T) { + tempDir := t.TempDir() + writeImportFile(t, tempDir, "enabled.yaml", `version: v2beta1 +pipelines: + enabled: + run: echo enabled +`) + + resolver := newImportTestResolver(t) + rawConfig := map[string]interface{}{ + "version": latest.Version, + "name": "test", + "imports": []interface{}{ + map[string]interface{}{"path": "enabled.yaml"}, + map[string]interface{}{"path": "missing-disabled.yaml", "enabled": false}, + }, + } + + resolved, err := ResolveImports(context.Background(), resolver, tempDir, rawConfig, log.Discard) + assert.NilError(t, err) + + pipelines := resolved["pipelines"].(map[string]interface{}) + assert.Assert(t, pipelines["enabled"] != nil, "expected enabled import to be merged") +} + +func newImportTestResolver(t *testing.T) variable.Resolver { + t.Helper() + + resolver, err := variable.NewResolver(localcache.New(filepath.Join(t.TempDir(), "cache.yaml")), &variable.PredefinedVariableOptions{ + ConfigPath: filepath.Join(t.TempDir(), "devspace.yaml"), + }, nil, log.Discard) + assert.NilError(t, err) + + return resolver +} + +func writeImportFile(t *testing.T, dir, name, content string) { + t.Helper() + + err := os.WriteFile(filepath.Join(dir, name), []byte(content), 0644) + assert.NilError(t, err) +} diff --git a/pkg/devspace/dependency/util/util.go b/pkg/devspace/dependency/util/util.go index 76e4483561..79fe5e6b41 100644 --- a/pkg/devspace/dependency/util/util.go +++ b/pkg/devspace/dependency/util/util.go @@ -33,8 +33,23 @@ func init() { DependencyFolderPath = filepath.Join(homedir, filepath.FromSlash(DependencyFolder)) } -// downloadMutex makes sure we only download a single dependency at a time -var downloadMutex = sync.Mutex{} +var ( + downloadLocksMutex sync.Mutex + downloadLocks = map[string]*downloadLock{} + + downloadCallsMutex sync.Mutex + downloadCalls = map[string]*downloadCall{} +) + +type downloadLock struct { + mutex sync.Mutex + refs int +} + +type downloadCall struct { + done chan struct{} + err error +} func GetDependencyPath(workingDirectory string, source *latest.SourceConfig) (configPath string, err error) { ID, err := GetDependencyID(source) @@ -79,9 +94,6 @@ func switchURLType(gitPath string) string { } func DownloadDependency(ctx context.Context, workingDirectory string, source *latest.SourceConfig, log log.Logger) (configPath string, err error) { - downloadMutex.Lock() - defer downloadMutex.Unlock() - ID, err := GetDependencyID(source) if err != nil { return "", err @@ -96,123 +108,239 @@ func DownloadDependency(ctx context.Context, workingDirectory string, source *la _ = os.MkdirAll(DependencyFolderPath, 0755) localPath = filepath.Join(DependencyFolderPath, ID) - // Check if dependency are cached locally - _, statErr := os.Stat(localPath) - - // Verify git cli works - repo, err := git.NewGitCLIRepository(ctx, localPath) + err = runDownloadOnce(downloadCallKey("git", ID, source), func() error { + return withDownloadLock(ID, func() error { + return downloadGitDependency(ctx, localPath, gitPath, source, log) + }) + }) if err != nil { - if statErr == nil { - log.Warnf("Error creating git cli: %v", err) - return getDependencyConfigPath(localPath, source) - } return "", err } - // Create git clone options - var gitCloneOptions = git.CloneOptions{ - URL: gitPath, - Tag: source.Tag, - Branch: source.Branch, - Commit: source.Revision, - Args: source.CloneArgs, - DisableShallow: source.DisableShallow, - } - - // Git clone - if statErr != nil { - err = repo.Clone(ctx, gitCloneOptions) + // Resolve local source + } else if source.Path != "" { + if isURL(source.Path) { + localPath = filepath.Join(DependencyFolderPath, ID) + err = runDownloadOnce(downloadCallKey("url", ID, source), func() error { + return withDownloadLock(ID, func() error { + return downloadURLDependency(ctx, localPath, source, log) + }) + }) if err != nil { - log.Warn("Error cloning repo: ", err) - - gitCloneOptions.URL = switchURLType(gitPath) - log.Infof("Switching URL from %s to %s and will try cloning again", gitPath, gitCloneOptions.URL) - err = repo.Clone(ctx, gitCloneOptions) - + return "", err + } + } else { + if filepath.IsAbs(source.Path) { + localPath = source.Path + } else { + localPath, err = filepath.Abs(filepath.Join(workingDirectory, filepath.FromSlash(source.Path))) if err != nil { - log.Warn("Failed to clone repo with both HTTPS and SSH URL. Please make sure if your git login or ssh setup is correct.") - if statErr == nil { - log.Warnf("Error cloning or pulling git repository %s: %v", gitPath, err) - return getDependencyConfigPath(localPath, source) - } - - return "", errors.Wrap(err, "clone repository") + return "", errors.Wrap(err, "filepath absolute") } } + } + } - log.Debugf("Cloned %s", gitPath) + return getDependencyConfigPath(localPath, source) +} + +func downloadGitDependency(ctx context.Context, localPath, gitPath string, source *latest.SourceConfig, log log.Logger) error { + // Check if dependency are cached locally + _, statErr := os.Stat(localPath) + + // Verify git cli works + repo, err := git.NewGitCLIRepository(ctx, localPath) + if err != nil { + if statErr == nil { + log.Warnf("Error creating git cli: %v", err) + return nil } + return err + } + + // Create git clone options + var gitCloneOptions = git.CloneOptions{ + URL: gitPath, + Tag: source.Tag, + Branch: source.Branch, + Commit: source.Revision, + Args: source.CloneArgs, + DisableShallow: source.DisableShallow, + } + + // Git clone + if statErr != nil { + err = repo.Clone(ctx, gitCloneOptions) + + if err != nil { + log.Warn("Error cloning repo: ", err) + + gitCloneOptions.URL = switchURLType(gitPath) + log.Infof("Switching URL from %s to %s and will try cloning again", gitPath, gitCloneOptions.URL) + err = repo.Clone(ctx, gitCloneOptions) - // Git pull - if !source.DisablePull && source.Revision == "" { - err = repo.Pull(ctx) if err != nil { - log.Warn(err) + log.Warn("Failed to clone repo with both HTTPS and SSH URL. Please make sure if your git login or ssh setup is correct.") + if statErr == nil { + log.Warnf("Error cloning or pulling git repository %s: %v", gitPath, err) + return nil + } + + return errors.Wrap(err, "clone repository") } + } - log.Debugf("Pulled %s", gitPath) + log.Debugf("Cloned %s", gitPath) + } + + // Git pull + if !source.DisablePull && source.Revision == "" { + err = repo.Pull(ctx) + if err != nil { + log.Warn(err) } - // Resolve local source - } else if source.Path != "" { - if isURL(source.Path) { - localPath = filepath.Join(DependencyFolderPath, ID) - _ = os.MkdirAll(localPath, 0755) + log.Debugf("Pulled %s", gitPath) + } + + return nil +} - // Check if dependency exists - configPath := filepath.Join(localPath, constants.DefaultConfigPath) - _, statErr := os.Stat(configPath) +func downloadURLDependency(ctx context.Context, localPath string, source *latest.SourceConfig, log log.Logger) error { + _ = os.MkdirAll(localPath, 0755) - if !source.DisablePull || statErr != nil { - // Create the file - out, err := os.Create(configPath) - if err != nil { - if statErr == nil { - log.Warnf("Error creating file: %v", err) - return getDependencyConfigPath(localPath, source) - } + // Check if dependency exists + configPath := filepath.Join(localPath, constants.DefaultConfigPath) + _, statErr := os.Stat(configPath) - return "", err - } - defer out.Close() + if !source.DisablePull || statErr != nil { + // Create the file + out, err := os.Create(configPath) + if err != nil { + if statErr == nil { + log.Warnf("Error creating file: %v", err) + return nil + } - // Get the data - resp, err := http.Get(source.Path) - if err != nil { - if statErr == nil { - log.Warnf("Error retrieving url %s: %v", source.Path, err) - return getDependencyConfigPath(localPath, source) - } + return err + } + defer func() { + if err := out.Close(); err != nil { + log.Warnf("Error closing file: %v", err) + } + }() - return "", errors.Wrapf(err, "request %s", source.Path) - } - defer resp.Body.Close() + // Get the data + req, err := http.NewRequestWithContext(ctx, http.MethodGet, source.Path, nil) + if err != nil { + if statErr == nil { + log.Warnf("Error creating request for url %s: %v", source.Path, err) + return nil + } - // Write the body to file - _, err = io.Copy(out, resp.Body) - if err != nil { - if statErr == nil { - log.Warnf("Error retrieving url %s: %v", source.Path, err) - return getDependencyConfigPath(localPath, source) - } + return errors.Wrapf(err, "request %s", source.Path) + } - return "", errors.Wrapf(err, "download %s", source.Path) - } + resp, err := http.DefaultClient.Do(req) + if err != nil { + if statErr == nil { + log.Warnf("Error retrieving url %s: %v", source.Path, err) + return nil } - } else { - if filepath.IsAbs(source.Path) { - localPath = source.Path - } else { - localPath, err = filepath.Abs(filepath.Join(workingDirectory, filepath.FromSlash(source.Path))) - if err != nil { - return "", errors.Wrap(err, "filepath absolute") - } + + return errors.Wrapf(err, "request %s", source.Path) + } + defer func() { + if err := resp.Body.Close(); err != nil { + log.Warnf("Error closing body: %v", err) } + }() + + // Write the body to file + _, err = io.Copy(out, resp.Body) + if err != nil { + if statErr == nil { + log.Warnf("Error retrieving url %s: %v", source.Path, err) + return nil + } + + return errors.Wrapf(err, "download %s", source.Path) } } - return getDependencyConfigPath(localPath, source) + return nil +} + +func withDownloadLock(id string, download func() error) error { + lock := acquireDownloadLock(id) + lock.mutex.Lock() + defer releaseDownloadLock(id, lock) // runs second: decrement ref after unlock + defer lock.mutex.Unlock() // runs first: release mutex before decrementing ref + + return download() +} + +func acquireDownloadLock(id string) *downloadLock { + downloadLocksMutex.Lock() + defer downloadLocksMutex.Unlock() + + lock := downloadLocks[id] + if lock == nil { + lock = &downloadLock{} + downloadLocks[id] = lock + } + lock.refs++ + return lock +} + +func releaseDownloadLock(id string, lock *downloadLock) { + downloadLocksMutex.Lock() + defer downloadLocksMutex.Unlock() + + lock.refs-- + if lock.refs == 0 { + delete(downloadLocks, id) + } +} + +// runDownloadOnce deduplicates identical in-flight download operations. The +// per-ID lock inside the callback still serializes different operation keys +// that write to the same cache directory. +func runDownloadOnce(key string, download func() error) error { + downloadCallsMutex.Lock() + call, ok := downloadCalls[key] + if ok { + downloadCallsMutex.Unlock() + <-call.done + return call.err + } + + call = &downloadCall{done: make(chan struct{})} + downloadCalls[key] = call + downloadCallsMutex.Unlock() + + call.err = download() + close(call.done) + + downloadCallsMutex.Lock() + delete(downloadCalls, key) + downloadCallsMutex.Unlock() + + return call.err +} + +func downloadCallKey(sourceType, id string, source *latest.SourceConfig) string { + // SubPath is encoded in id (via GetDependencyID) and does not need to be + // listed separately. CloneArgs/DisableShallow/DisablePull affect download + // behavior but not the cache location, so they are listed explicitly. + return strings.Join([]string{ + sourceType, + id, + strings.Join(source.CloneArgs, "\x00"), + fmt.Sprintf("%t", source.DisableShallow), + fmt.Sprintf("%t", source.DisablePull), + }, "\x00") } func getDependencyConfigPath(dependencyPath string, source *latest.SourceConfig) (string, error) { diff --git a/pkg/devspace/dependency/util/util_test.go b/pkg/devspace/dependency/util/util_test.go index e84fee96b9..edd221f454 100644 --- a/pkg/devspace/dependency/util/util_test.go +++ b/pkg/devspace/dependency/util/util_test.go @@ -13,3 +13,29 @@ func TestSwitchURLType(t *testing.T) { assert.Equal(t, sshURL, switchURLType(httpURL)) assert.Equal(t, httpURL, switchURLType(sshURL)) } + +func TestDownloadLockCleansUp(t *testing.T) { + id := "test-lock-cleanup" + err := withDownloadLock(id, func() error { + return nil + }) + assert.NilError(t, err) + + downloadLocksMutex.Lock() + _, ok := downloadLocks[id] + downloadLocksMutex.Unlock() + assert.Assert(t, !ok, "expected download lock to be removed") +} + +func TestRunDownloadOnceCleansUpCall(t *testing.T) { + key := "test-download-call-cleanup" + err := runDownloadOnce(key, func() error { + return nil + }) + assert.NilError(t, err) + + downloadCallsMutex.Lock() + _, ok := downloadCalls[key] + downloadCallsMutex.Unlock() + assert.Assert(t, !ok, "expected download call to be removed") +} From abe0135270df380e4929323cf5b615497826b100 Mon Sep 17 00:00:00 2001 From: Ryan Swanson Date: Fri, 1 May 2026 13:30:24 -0600 Subject: [PATCH 2/2] Addressing cluade comments around panic safety, testing and comments Signed-off-by: Ryan Swanson --- .../testdata/git-subpaths/devspace.yaml | 6 +-- pkg/devspace/config/loader/imports.go | 7 +-- pkg/devspace/config/loader/imports_test.go | 51 +++++++++++++++---- pkg/devspace/dependency/util/util.go | 33 ++++++++---- pkg/devspace/dependency/util/util_test.go | 48 +++++++++++++++++ 5 files changed, 120 insertions(+), 25 deletions(-) diff --git a/e2e/tests/imports/testdata/git-subpaths/devspace.yaml b/e2e/tests/imports/testdata/git-subpaths/devspace.yaml index e1c91fc775..38765ceb92 100644 --- a/e2e/tests/imports/testdata/git-subpaths/devspace.yaml +++ b/e2e/tests/imports/testdata/git-subpaths/devspace.yaml @@ -3,11 +3,11 @@ name: git-subpath-imports imports: - git: https://github.com/loft-sh/e2e-test-dependency.git - branch: main + revision: 1ec44dcffc83f64ea7e8c169751e56d233cd4b20 subPath: subA - git: https://github.com/loft-sh/e2e-test-dependency.git - branch: main + revision: 1ec44dcffc83f64ea7e8c169751e56d233cd4b20 subPath: subB - git: https://github.com/loft-sh/e2e-test-dependency.git - branch: main + revision: 1ec44dcffc83f64ea7e8c169751e56d233cd4b20 subPath: subC diff --git a/pkg/devspace/config/loader/imports.go b/pkg/devspace/config/loader/imports.go index 451103bf50..bb65189ec1 100644 --- a/pkg/devspace/config/loader/imports.go +++ b/pkg/devspace/config/loader/imports.go @@ -84,8 +84,6 @@ func ResolveImports(ctx context.Context, resolver variable.Resolver, basePath st for _, resolvedImport := range resolvedImports { if resolvedImport.Disabled { continue - } else if resolvedImport.Data == nil { - return nil, errors.Errorf("resolved import is missing data") } importData := resolvedImport.Data @@ -127,7 +125,7 @@ func ResolveImports(ctx context.Context, resolver variable.Resolver, basePath st if importData["imports"] != nil { mergedMap["imports"] = importData["imports"] - // resolve imports + // The recursive call starts by reloading variables from mergedMap. mergedMap, err = ResolveImports(ctx, resolver, filepath.Dir(resolvedImport.ConfigPath), mergedMap, log) if err != nil { return nil, err @@ -135,6 +133,8 @@ func ResolveImports(ctx context.Context, resolver variable.Resolver, basePath st } else { delete(mergedMap, "imports") + // Leaf imports used to recurse too, so preserve the variable reload + // after each ordered import merge. err = reloadVariables(resolver, mergedMap, log) if err != nil { return nil, err @@ -151,6 +151,7 @@ func loadImports(ctx context.Context, basePath string, imports []latest.Import, eg, ctx := errgroup.WithContext(ctx) eg.SetLimit(maxConcurrentImportDownloads) for index := range imports { + // Keep an explicit per-iteration copy for the goroutine closure. importConfig := imports[index] if importConfig.Enabled != nil && !*importConfig.Enabled { resolvedImports[index] = resolvedImport{Disabled: true} diff --git a/pkg/devspace/config/loader/imports_test.go b/pkg/devspace/config/loader/imports_test.go index a230280a00..2a982fce89 100644 --- a/pkg/devspace/config/loader/imports_test.go +++ b/pkg/devspace/config/loader/imports_test.go @@ -28,10 +28,11 @@ func TestResolveImportsDownloadsSiblingImportsConcurrently(t *testing.T) { dependencyutil.DependencyFolderPath = oldDependencyFolderPath }() + const importCount int32 = 3 var activeRequests int32 var maxActiveRequests int32 - release := make(chan struct{}) - var releaseOnce sync.Once + allInFlight := make(chan struct{}) + var closeAllInFlight sync.Once server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { current := atomic.AddInt32(&activeRequests, 1) @@ -44,15 +45,16 @@ func TestResolveImportsDownloadsSiblingImportsConcurrently(t *testing.T) { } } - if current == 3 { - releaseOnce.Do(func() { - close(release) + if current == importCount { + closeAllInFlight.Do(func() { + close(allInFlight) }) } select { - case <-release: - case <-time.After(2 * time.Second): + case <-allInFlight: + case <-r.Context().Done(): + return } name := strings.TrimPrefix(r.URL.Path, "/") @@ -71,8 +73,39 @@ func TestResolveImportsDownloadsSiblingImportsConcurrently(t *testing.T) { }, } - resolved, err := ResolveImports(context.Background(), resolver, t.TempDir(), rawConfig, log.Discard) - assert.NilError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + type result struct { + resolved map[string]interface{} + err error + } + results := make(chan result, 1) + go func() { + resolved, err := ResolveImports(ctx, resolver, t.TempDir(), rawConfig, log.Discard) + results <- result{resolved: resolved, err: err} + }() + + select { + case <-allInFlight: + case result := <-results: + assert.NilError(t, result.err) + t.Fatalf("expected all import downloads to be in flight before resolution completed") + case <-time.After(5 * time.Second): + cancel() + t.Fatalf("timed out waiting for concurrent import downloads") + } + + var resolved map[string]interface{} + select { + case result := <-results: + assert.NilError(t, result.err) + resolved = result.resolved + case <-time.After(5 * time.Second): + cancel() + t.Fatalf("timed out waiting for import resolution") + } + maxActive := atomic.LoadInt32(&maxActiveRequests) assert.Assert(t, maxActive == 3, "expected 3 concurrent import downloads, got %d", maxActive) diff --git a/pkg/devspace/dependency/util/util.go b/pkg/devspace/dependency/util/util.go index 79fe5e6b41..781955fdb2 100644 --- a/pkg/devspace/dependency/util/util.go +++ b/pkg/devspace/dependency/util/util.go @@ -253,9 +253,9 @@ func downloadURLDependency(ctx context.Context, localPath string, source *latest } defer func() { if err := resp.Body.Close(); err != nil { - log.Warnf("Error closing body: %v", err) + log.Warnf("Error closing body: %v", err) } - }() + }() // Write the body to file _, err = io.Copy(out, resp.Body) @@ -320,20 +320,33 @@ func runDownloadOnce(key string, download func() error) error { downloadCalls[key] = call downloadCallsMutex.Unlock() - call.err = download() - close(call.done) + cleanup := func() { + close(call.done) + downloadCallsMutex.Lock() + delete(downloadCalls, key) + downloadCallsMutex.Unlock() + } + defer func() { + panicValue := recover() + if panicValue != nil { + call.err = fmt.Errorf("download panicked: %v", panicValue) + cleanup() + panic(panicValue) + } - downloadCallsMutex.Lock() - delete(downloadCalls, key) - downloadCallsMutex.Unlock() + cleanup() + }() + call.err = download() return call.err } func downloadCallKey(sourceType, id string, source *latest.SourceConfig) string { - // SubPath is encoded in id (via GetDependencyID) and does not need to be - // listed separately. CloneArgs/DisableShallow/DisablePull affect download - // behavior but not the cache location, so they are listed explicitly. + // SubPath navigates within an already-cloned repo; all imports sharing the + // same git URL write to the same cache directory, so SubPath need not + // distinguish download calls. CloneArgs/DisableShallow/DisablePull affect + // download behavior but not the cache location, so they are listed + // explicitly. return strings.Join([]string{ sourceType, id, diff --git a/pkg/devspace/dependency/util/util_test.go b/pkg/devspace/dependency/util/util_test.go index edd221f454..e58a891941 100644 --- a/pkg/devspace/dependency/util/util_test.go +++ b/pkg/devspace/dependency/util/util_test.go @@ -1,6 +1,7 @@ package util import ( + "errors" "testing" "gotest.tools/assert" @@ -39,3 +40,50 @@ func TestRunDownloadOnceCleansUpCall(t *testing.T) { downloadCallsMutex.Unlock() assert.Assert(t, !ok, "expected download call to be removed") } + +func TestRunDownloadOnceReturnsErrorToConcurrentWaiter(t *testing.T) { + key := "test-download-call-error-waiter" + expectedErr := errors.New("download failed") + call := &downloadCall{done: make(chan struct{})} + + downloadCallsMutex.Lock() + downloadCalls[key] = call + downloadCallsMutex.Unlock() + defer func() { + downloadCallsMutex.Lock() + delete(downloadCalls, key) + downloadCallsMutex.Unlock() + }() + + errs := make(chan error, 1) + go func() { + errs <- runDownloadOnce(key, func() error { + return nil + }) + }() + + call.err = expectedErr + close(call.done) + + assert.Equal(t, <-errs, expectedErr) +} + +func TestRunDownloadOnceCleansUpAfterPanic(t *testing.T) { + key := "test-download-call-panic-cleanup" + + func() { + defer func() { + recovered := recover() + assert.Assert(t, recovered != nil, "expected panic") + }() + + _ = runDownloadOnce(key, func() error { + panic("download panic") + }) + }() + + downloadCallsMutex.Lock() + _, ok := downloadCalls[key] + downloadCallsMutex.Unlock() + assert.Assert(t, !ok, "expected download call to be removed after panic") +}