diff --git a/kubelink/config/GlobalConfig.go b/kubelink/config/GlobalConfig.go index bdce4a765..76b8d96f0 100644 --- a/kubelink/config/GlobalConfig.go +++ b/kubelink/config/GlobalConfig.go @@ -22,11 +22,12 @@ import "github.com/caarlos0/env" type HelmReleaseConfig struct { EnableHelmReleaseCache bool `env:"ENABLE_HELM_RELEASE_CACHE" envDefault:"true" description:"Enable helm releases list cache" deprecated:"false" example:"true"` MaxCountForHelmRelease int `env:"MAX_COUNT_FOR_HELM_RELEASE" envDefault:"20" description:"Max count for helm release history list" deprecated:"false" example:"20"` - ManifestFetchBatchSize int `env:"MANIFEST_FETCH_BATCH_SIZE" envDefault:"2" description:"Manifest fetch parallelism batch size (applied only for parent objects)" deprecated:"false" example:"2"` + ManifestFetchBatchSize int `env:"MANIFEST_FETCH_BATCH_SIZE" envDefault:"10" description:"Manifest fetch parallelism batch size (applied for parent objects)" deprecated:"false" example:"10"` RunHelmInstallInAsyncMode bool `env:"RUN_HELM_INSTALL_IN_ASYNC_MODE" envDefault:"false" description:"Run helm install/ upgrade in async mode" deprecated:"false" example:"false"` ChartWorkingDirectory string `env:"CHART_WORKING_DIRECTORY" envDefault:"/home/devtron/devtroncd/charts/" description:"Helm charts working directory" deprecated:"false" example:"/home/devtron/devtroncd/charts/"` - BuildNodesBatchSize int `env:"BUILD_NODES_BATCH_SIZE" envDefault:"2" description:"Resource tree build nodes parallelism batch size (applied only for depth-1 child objects of a parent object)" deprecated:"false" example:"2"` - FeatChildChildObjectListingPaginationEnable bool `env:"FEAT_CHILD_OBJECT_LISTING_PAGINATION" envDefault:"true" description:"use pagination in listing all the dependent child objects. use 'CHILD_OBJECT_LISTING_PAGE_SIZE' to set the page size." deprecated:"false" example:"true"` + BuildNodesBatchSize int `env:"BUILD_NODES_BATCH_SIZE" envDefault:"10" description:"Resource tree build nodes parallelism batch size; controls depth-1 worker pool size and the shared semaphore size for all deeper levels" deprecated:"false" example:"10"` + FeatChildChildObjectListingPaginationEnable bool `env:"FEAT_CHILD_OBJECT_LISTING_PAGINATION" envDefault:"true" description:"use pagination in listing all the dependent child objects. use 'CHILD_OBJECT_LISTING_PAGE_SIZE' to set the page size." deprecated:"false" example:"true"` + FeatAllDepthChildNodeBuildParallelism bool `env:"FEAT_ALL_DEPTH_CHILD_NODE_BUILD_PARALLELISM" envDefault:"false" description:"enable semaphore-bounded parallelism for child node building at all recursion depths (depth-2+); when disabled only depth-1 parallelism via worker pool is active" deprecated:"false" example:"false"` } func GetHelmReleaseConfig() (*HelmReleaseConfig, error) { diff --git a/kubelink/pkg/service/commonHelmService/bean.go b/kubelink/pkg/service/commonHelmService/bean.go index c27961755..87247925c 100644 --- a/kubelink/pkg/service/commonHelmService/bean.go +++ b/kubelink/pkg/service/commonHelmService/bean.go @@ -48,6 +48,7 @@ type BuildNodesRequest struct { RestConfig *rest.Config ReleaseNamespace string ParentResourceRef *commonBean.ResourceRef + concurrencySem chan struct{} // shared semaphore for bounded all-depth parallelism in child node building } func NewBuildNodesRequest(buildNodesConfig *BuildNodesRequest) *BuildNodesConfig { @@ -117,6 +118,11 @@ func (req *BuildNodesRequest) WithParentResourceRef(parentResourceRef *commonBea return req } +func (req *BuildNodesRequest) WithSemaphore(sem chan struct{}) *BuildNodesRequest { + req.concurrencySem = sem + return req +} + type BuildNodeResponse struct { Nodes []*commonBean.ResourceNode HealthStatusArray []*commonBean.HealthStatus diff --git a/kubelink/pkg/service/commonHelmService/resourceTreeService.go b/kubelink/pkg/service/commonHelmService/resourceTreeService.go index 5b771aa2a..a458b06fc 100644 --- a/kubelink/pkg/service/commonHelmService/resourceTreeService.go +++ b/kubelink/pkg/service/commonHelmService/resourceTreeService.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" "net/http" + "sync" ) type ResourceTreeServiceImpl struct { @@ -74,9 +75,22 @@ func sanitizeParentObjects(parentObjects []*client.ObjectIdentifier) []*client.O func (impl *ResourceTreeServiceImpl) BuildResourceTreeUsingK8s(ctx context.Context, appDetailRequest *client.AppDetailRequest, conf *rest.Config, parentObjects []*client.ObjectIdentifier) (*bean.ResourceTreeResponse, error) { liveManifests := impl.getLiveManifestsForGVKList(conf, parentObjects) + // when FeatAllDepthChildNodeBuildParallelism is enabled, create a shared semaphore that bounds + // concurrent K8s calls across all recursion depths (depth-2+); a nil semaphore makes + // buildChildNodesInBatch fall back to the previous depth-1-only parallel behaviour + var sem chan struct{} + if impl.helmReleaseConfig.FeatAllDepthChildNodeBuildParallelism { + semSize := impl.helmReleaseConfig.BuildNodesBatchSize + if semSize <= 0 { + semSize = 1 + } + sem = make(chan struct{}, semSize) + } + // build resource Nodes req := NewBuildNodesRequest(NewBuildNodesConfig(conf). - WithReleaseNamespace(appDetailRequest.Namespace)). + WithReleaseNamespace(appDetailRequest.Namespace). + WithSemaphore(sem)). // nil when flag is off → depth-2+ falls back to sequential WithDesiredOrLiveManifests(liveManifests...). WithBatchWorker(impl.helmReleaseConfig.BuildNodesBatchSize, impl.logger) buildNodesResponse, err := impl.BuildNodes(req) @@ -106,36 +120,60 @@ func (impl *ResourceTreeServiceImpl) BuildResourceTreeUsingK8s(ctx context.Conte } func (impl *ResourceTreeServiceImpl) getLiveManifestsForGVKList(restConfig *rest.Config, gvkList []*client.ObjectIdentifier) []*bean.DesiredOrLiveManifest { - var manifests []*bean.DesiredOrLiveManifest - for _, resource := range gvkList { - gvk := &schema.GroupVersionKind{ - Group: resource.GetGroup(), - Version: resource.GetVersion(), - Kind: resource.GetKind(), + total := len(gvkList) + if total == 0 { + return nil + } + // pre-allocate by index so concurrent writes to distinct slots are safe without a mutex + manifests := make([]*bean.DesiredOrLiveManifest, total) + + batchSize := impl.helmReleaseConfig.ManifestFetchBatchSize + if batchSize <= 0 { + batchSize = 1 + } + + for i := 0; i < total; { + currentBatch := batchSize + if remaining := total - i; remaining < currentBatch { + currentBatch = remaining } - manifest, _, err := impl.k8sService.GetLiveManifest(restConfig, resource.GetNamespace(), gvk, resource.GetName()) - if err != nil { - impl.logger.Errorw("Error in getting live manifest", "err", err) - statusError, _ := err.(*errors2.StatusError) - desiredManifest := &unstructured.Unstructured{} - desiredManifest.SetGroupVersionKind(*gvk) - desiredManifest.SetName(resource.Name) - desiredManifest.SetNamespace(resource.Namespace) - desiredManifest.SetAnnotations(resource.Annotations) - desiredOrLiveManifest := &bean.DesiredOrLiveManifest{ - Manifest: desiredManifest, - // using deep copy as it replaces item in manifest in loop - IsLiveManifestFetchError: true, - } - if statusError != nil { - desiredOrLiveManifest.LiveManifestFetchErrorCode = statusError.Status().Code - } - manifests = append(manifests, desiredOrLiveManifest) - } else { - manifests = append(manifests, &bean.DesiredOrLiveManifest{ - Manifest: manifest, - }) + var wg sync.WaitGroup + for j := 0; j < currentBatch; j++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + resource := gvkList[idx] + gvk := &schema.GroupVersionKind{ + Group: resource.GetGroup(), + Version: resource.GetVersion(), + Kind: resource.GetKind(), + } + manifest, _, err := impl.k8sService.GetLiveManifest(restConfig, resource.GetNamespace(), gvk, resource.GetName()) + if err != nil { + impl.logger.Errorw("Error in getting live manifest", "err", err) + statusError, _ := err.(*errors2.StatusError) + desiredManifest := &unstructured.Unstructured{} + desiredManifest.SetGroupVersionKind(*gvk) + desiredManifest.SetName(resource.Name) + desiredManifest.SetNamespace(resource.Namespace) + desiredManifest.SetAnnotations(resource.Annotations) + desiredOrLiveManifest := &bean.DesiredOrLiveManifest{ + Manifest: desiredManifest, + IsLiveManifestFetchError: true, + } + if statusError != nil { + desiredOrLiveManifest.LiveManifestFetchErrorCode = statusError.Status().Code + } + manifests[idx] = desiredOrLiveManifest + } else { + manifests[idx] = &bean.DesiredOrLiveManifest{ + Manifest: manifest, + } + } + }(i + j) } + wg.Wait() + i += currentBatch } return manifests } @@ -167,19 +205,20 @@ func (impl *ResourceTreeServiceImpl) BuildNodes(request *BuildNodesConfig) (*Bui response.HealthStatusArray = append(response.HealthStatusArray, getNodesFromManifestResponse.Node.Health) } - // add child Nodes request + // add child Nodes request; propagate the shared semaphore so all-depth parallelism is bounded if len(getNodesFromManifestResponse.DesiredOrLiveChildrenManifests) > 0 { req := NewBuildNodesRequest(NewBuildNodesConfig(request.RestConfig). WithReleaseNamespace(request.ReleaseNamespace). - WithParentResourceRef(getNodesFromManifestResponse.ResourceRef)). + WithParentResourceRef(getNodesFromManifestResponse.ResourceRef). + WithSemaphore(request.concurrencySem)). WithDesiredOrLiveManifests(getNodesFromManifestResponse.DesiredOrLiveChildrenManifests...) - // NOTE: Do not use batch worker for child Nodes as it will create batch worker recursively + // NOTE: Do not attach a new batch worker for child Nodes to avoid recursive pool creation buildChildNodesRequests = append(buildChildNodesRequests, req) } } // build child Nodes, if any. // NOTE: build child Nodes calls buildNodes recursively - childNodeResponse, err := impl.buildChildNodesInBatch(request.batchWorker, buildChildNodesRequests) + childNodeResponse, err := impl.buildChildNodesInBatch(request.batchWorker, request.concurrencySem, buildChildNodesRequests) if err != nil { return response, err } @@ -205,11 +244,16 @@ func (impl *ResourceTreeServiceImpl) buildChildNodes(buildChildNodesRequests []* } // buildChildNodesInBatch builds child Nodes in parallel from desired or live manifest. -// - It uses batch workers workerPool.WorkerPool[*BuildNodeResponse] to build child Nodes in parallel. -// - If workerPool is not defined, it builds child Nodes sequentially. -func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.WorkerPool[*BuildNodeResponse], buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) { +// - If a workerPool is provided (depth-1), it uses it for parallel execution. +// - If no workerPool but a semaphore is provided (depth-2+), it uses semaphore-bounded goroutines. +// - Falls back to sequential when neither is available. +func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.WorkerPool[*BuildNodeResponse], sem chan struct{}, buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) { if wp == nil { - // build child Nodes sequentially + if sem != nil { + // depth-2+ parallel path: bounded by shared semaphore + return impl.buildChildNodesWithSemaphore(sem, buildChildNodesRequests) + } + // no concurrency available; build child Nodes sequentially return impl.buildChildNodes(buildChildNodesRequests) } response := NewBuildNodeResponse() @@ -235,6 +279,68 @@ func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.Worke return response, nil } +// buildChildNodesWithSemaphore builds child Nodes in parallel using a shared semaphore for bounded concurrency. +// It uses a non-blocking semaphore acquisition: if the semaphore is full the request is processed sequentially +// in the calling goroutine, preventing deadlocks during recursive invocations where the caller already holds a slot. +func (impl *ResourceTreeServiceImpl) buildChildNodesWithSemaphore(sem chan struct{}, buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) { + response := NewBuildNodeResponse() + var mu sync.Mutex + var wg sync.WaitGroup + // buffered channel of size 1 captures the first error; subsequent errors are dropped + errCh := make(chan error, 1) + + for _, req := range buildChildNodesRequests { + // check for an earlier error before launching more work + select { + case err := <-errCh: + wg.Wait() + return response, err + default: + } + + // try to acquire a semaphore slot without blocking + select { + case sem <- struct{}{}: + // slot acquired — run in a goroutine + wg.Add(1) + go func(r *BuildNodesConfig) { + defer func() { <-sem }() // release slot after wg.Done so the semaphore bound stays tight + defer wg.Done() // signal completion first (LIFO: wg.Done runs before <-sem) + childResp, err := impl.BuildNodes(r) + if err != nil { + impl.logger.Errorw("error in building child Nodes", "ReleaseNamespace", r.ReleaseNamespace, "parentResource", r.ParentResourceRef.GetGvk(), "err", err) + select { + case errCh <- err: + default: + } + return + } + mu.Lock() + response.WithNodes(childResp.Nodes).WithHealthStatusArray(childResp.HealthStatusArray) + mu.Unlock() + }(req) + default: + // semaphore full — process sequentially in the calling goroutine to avoid deadlock + childResp, err := impl.BuildNodes(req) + if err != nil { + impl.logger.Errorw("error in building child Nodes sequentially", "ReleaseNamespace", req.ReleaseNamespace, "err", err) + wg.Wait() + return response, err + } + mu.Lock() + response.WithNodes(childResp.Nodes).WithHealthStatusArray(childResp.HealthStatusArray) + mu.Unlock() + } + } + wg.Wait() + select { + case err := <-errCh: + return response, err + default: + return response, nil + } +} + func (impl *ResourceTreeServiceImpl) getNodeFromDesiredOrLiveManifest(request *GetNodeFromManifestRequest) (*GetNodeFromManifestResponse, error) { response := NewGetNodesFromManifestResponse() manifest := request.DesiredOrLiveManifest.Manifest