Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions kubelink/config/GlobalConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import "github.com/caarlos0/env"
type HelmReleaseConfig struct {
EnableHelmReleaseCache bool `env:"ENABLE_HELM_RELEASE_CACHE" envDefault:"true" description:"Enable helm releases list cache" deprecated:"false" example:"true"`
MaxCountForHelmRelease int `env:"MAX_COUNT_FOR_HELM_RELEASE" envDefault:"20" description:"Max count for helm release history list" deprecated:"false" example:"20"`
ManifestFetchBatchSize int `env:"MANIFEST_FETCH_BATCH_SIZE" envDefault:"2" description:"Manifest fetch parallelism batch size (applied only for parent objects)" deprecated:"false" example:"2"`
ManifestFetchBatchSize int `env:"MANIFEST_FETCH_BATCH_SIZE" envDefault:"10" description:"Manifest fetch parallelism batch size (applied for parent objects)" deprecated:"false" example:"10"`
RunHelmInstallInAsyncMode bool `env:"RUN_HELM_INSTALL_IN_ASYNC_MODE" envDefault:"false" description:"Run helm install/ upgrade in async mode" deprecated:"false" example:"false"`
ChartWorkingDirectory string `env:"CHART_WORKING_DIRECTORY" envDefault:"/home/devtron/devtroncd/charts/" description:"Helm charts working directory" deprecated:"false" example:"/home/devtron/devtroncd/charts/"`
BuildNodesBatchSize int `env:"BUILD_NODES_BATCH_SIZE" envDefault:"2" description:"Resource tree build nodes parallelism batch size (applied only for depth-1 child objects of a parent object)" deprecated:"false" example:"2"`
FeatChildChildObjectListingPaginationEnable bool `env:"FEAT_CHILD_OBJECT_LISTING_PAGINATION" envDefault:"true" description:"use pagination in listing all the dependent child objects. use 'CHILD_OBJECT_LISTING_PAGE_SIZE' to set the page size." deprecated:"false" example:"true"`
BuildNodesBatchSize int `env:"BUILD_NODES_BATCH_SIZE" envDefault:"10" description:"Resource tree build nodes parallelism batch size; controls depth-1 worker pool size and the shared semaphore size for all deeper levels" deprecated:"false" example:"10"`
FeatChildChildObjectListingPaginationEnable bool `env:"FEAT_CHILD_OBJECT_LISTING_PAGINATION" envDefault:"true" description:"use pagination in listing all the dependent child objects. use 'CHILD_OBJECT_LISTING_PAGE_SIZE' to set the page size." deprecated:"false" example:"true"`
FeatAllDepthChildNodeBuildParallelism bool `env:"FEAT_ALL_DEPTH_CHILD_NODE_BUILD_PARALLELISM" envDefault:"false" description:"enable semaphore-bounded parallelism for child node building at all recursion depths (depth-2+); when disabled only depth-1 parallelism via worker pool is active" deprecated:"false" example:"false"`
}

func GetHelmReleaseConfig() (*HelmReleaseConfig, error) {
Expand Down
6 changes: 6 additions & 0 deletions kubelink/pkg/service/commonHelmService/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type BuildNodesRequest struct {
RestConfig *rest.Config
ReleaseNamespace string
ParentResourceRef *commonBean.ResourceRef
concurrencySem chan struct{} // shared semaphore for bounded all-depth parallelism in child node building
}

func NewBuildNodesRequest(buildNodesConfig *BuildNodesRequest) *BuildNodesConfig {
Expand Down Expand Up @@ -117,6 +118,11 @@ func (req *BuildNodesRequest) WithParentResourceRef(parentResourceRef *commonBea
return req
}

func (req *BuildNodesRequest) WithSemaphore(sem chan struct{}) *BuildNodesRequest {
req.concurrencySem = sem
return req
}

type BuildNodeResponse struct {
Nodes []*commonBean.ResourceNode
HealthStatusArray []*commonBean.HealthStatus
Expand Down
180 changes: 143 additions & 37 deletions kubelink/pkg/service/commonHelmService/resourceTreeService.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"net/http"
"sync"
)

type ResourceTreeServiceImpl struct {
Expand Down Expand Up @@ -74,9 +75,22 @@ func sanitizeParentObjects(parentObjects []*client.ObjectIdentifier) []*client.O
func (impl *ResourceTreeServiceImpl) BuildResourceTreeUsingK8s(ctx context.Context, appDetailRequest *client.AppDetailRequest, conf *rest.Config, parentObjects []*client.ObjectIdentifier) (*bean.ResourceTreeResponse, error) {
liveManifests := impl.getLiveManifestsForGVKList(conf, parentObjects)

// when FeatAllDepthChildNodeBuildParallelism is enabled, create a shared semaphore that bounds
// concurrent K8s calls across all recursion depths (depth-2+); a nil semaphore makes
// buildChildNodesInBatch fall back to the previous depth-1-only parallel behaviour
var sem chan struct{}
if impl.helmReleaseConfig.FeatAllDepthChildNodeBuildParallelism {
semSize := impl.helmReleaseConfig.BuildNodesBatchSize
if semSize <= 0 {
semSize = 1
}
sem = make(chan struct{}, semSize)
}

// build resource Nodes
req := NewBuildNodesRequest(NewBuildNodesConfig(conf).
WithReleaseNamespace(appDetailRequest.Namespace)).
WithReleaseNamespace(appDetailRequest.Namespace).
WithSemaphore(sem)). // nil when flag is off → depth-2+ falls back to sequential
WithDesiredOrLiveManifests(liveManifests...).
WithBatchWorker(impl.helmReleaseConfig.BuildNodesBatchSize, impl.logger)
buildNodesResponse, err := impl.BuildNodes(req)
Expand Down Expand Up @@ -106,36 +120,60 @@ func (impl *ResourceTreeServiceImpl) BuildResourceTreeUsingK8s(ctx context.Conte
}

func (impl *ResourceTreeServiceImpl) getLiveManifestsForGVKList(restConfig *rest.Config, gvkList []*client.ObjectIdentifier) []*bean.DesiredOrLiveManifest {
var manifests []*bean.DesiredOrLiveManifest
for _, resource := range gvkList {
gvk := &schema.GroupVersionKind{
Group: resource.GetGroup(),
Version: resource.GetVersion(),
Kind: resource.GetKind(),
total := len(gvkList)
if total == 0 {
return nil
}
// pre-allocate by index so concurrent writes to distinct slots are safe without a mutex
manifests := make([]*bean.DesiredOrLiveManifest, total)

batchSize := impl.helmReleaseConfig.ManifestFetchBatchSize
if batchSize <= 0 {
batchSize = 1
}

for i := 0; i < total; {
currentBatch := batchSize
if remaining := total - i; remaining < currentBatch {
currentBatch = remaining
}
manifest, _, err := impl.k8sService.GetLiveManifest(restConfig, resource.GetNamespace(), gvk, resource.GetName())
if err != nil {
impl.logger.Errorw("Error in getting live manifest", "err", err)
statusError, _ := err.(*errors2.StatusError)
desiredManifest := &unstructured.Unstructured{}
desiredManifest.SetGroupVersionKind(*gvk)
desiredManifest.SetName(resource.Name)
desiredManifest.SetNamespace(resource.Namespace)
desiredManifest.SetAnnotations(resource.Annotations)
desiredOrLiveManifest := &bean.DesiredOrLiveManifest{
Manifest: desiredManifest,
// using deep copy as it replaces item in manifest in loop
IsLiveManifestFetchError: true,
}
if statusError != nil {
desiredOrLiveManifest.LiveManifestFetchErrorCode = statusError.Status().Code
}
manifests = append(manifests, desiredOrLiveManifest)
} else {
manifests = append(manifests, &bean.DesiredOrLiveManifest{
Manifest: manifest,
})
var wg sync.WaitGroup
for j := 0; j < currentBatch; j++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
resource := gvkList[idx]
gvk := &schema.GroupVersionKind{
Group: resource.GetGroup(),
Version: resource.GetVersion(),
Kind: resource.GetKind(),
}
manifest, _, err := impl.k8sService.GetLiveManifest(restConfig, resource.GetNamespace(), gvk, resource.GetName())
if err != nil {
impl.logger.Errorw("Error in getting live manifest", "err", err)
statusError, _ := err.(*errors2.StatusError)
desiredManifest := &unstructured.Unstructured{}
desiredManifest.SetGroupVersionKind(*gvk)
desiredManifest.SetName(resource.Name)
desiredManifest.SetNamespace(resource.Namespace)
desiredManifest.SetAnnotations(resource.Annotations)
desiredOrLiveManifest := &bean.DesiredOrLiveManifest{
Manifest: desiredManifest,
IsLiveManifestFetchError: true,
}
if statusError != nil {
desiredOrLiveManifest.LiveManifestFetchErrorCode = statusError.Status().Code
}
manifests[idx] = desiredOrLiveManifest
} else {
manifests[idx] = &bean.DesiredOrLiveManifest{
Manifest: manifest,
}
}
}(i + j)
}
wg.Wait()
i += currentBatch
}
return manifests
}
Expand Down Expand Up @@ -167,19 +205,20 @@ func (impl *ResourceTreeServiceImpl) BuildNodes(request *BuildNodesConfig) (*Bui
response.HealthStatusArray = append(response.HealthStatusArray, getNodesFromManifestResponse.Node.Health)
}

// add child Nodes request
// add child Nodes request; propagate the shared semaphore so all-depth parallelism is bounded
if len(getNodesFromManifestResponse.DesiredOrLiveChildrenManifests) > 0 {
req := NewBuildNodesRequest(NewBuildNodesConfig(request.RestConfig).
WithReleaseNamespace(request.ReleaseNamespace).
WithParentResourceRef(getNodesFromManifestResponse.ResourceRef)).
WithParentResourceRef(getNodesFromManifestResponse.ResourceRef).
WithSemaphore(request.concurrencySem)).
WithDesiredOrLiveManifests(getNodesFromManifestResponse.DesiredOrLiveChildrenManifests...)
// NOTE: Do not use batch worker for child Nodes as it will create batch worker recursively
// NOTE: Do not attach a new batch worker for child Nodes to avoid recursive pool creation
buildChildNodesRequests = append(buildChildNodesRequests, req)
}
}
// build child Nodes, if any.
// NOTE: build child Nodes calls buildNodes recursively
childNodeResponse, err := impl.buildChildNodesInBatch(request.batchWorker, buildChildNodesRequests)
childNodeResponse, err := impl.buildChildNodesInBatch(request.batchWorker, request.concurrencySem, buildChildNodesRequests)
if err != nil {
return response, err
}
Expand All @@ -205,11 +244,16 @@ func (impl *ResourceTreeServiceImpl) buildChildNodes(buildChildNodesRequests []*
}

// buildChildNodesInBatch builds child Nodes in parallel from desired or live manifest.
// - It uses batch workers workerPool.WorkerPool[*BuildNodeResponse] to build child Nodes in parallel.
// - If workerPool is not defined, it builds child Nodes sequentially.
func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.WorkerPool[*BuildNodeResponse], buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) {
// - If a workerPool is provided (depth-1), it uses it for parallel execution.
// - If no workerPool but a semaphore is provided (depth-2+), it uses semaphore-bounded goroutines.
// - Falls back to sequential when neither is available.
func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.WorkerPool[*BuildNodeResponse], sem chan struct{}, buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) {
if wp == nil {
// build child Nodes sequentially
if sem != nil {
// depth-2+ parallel path: bounded by shared semaphore
return impl.buildChildNodesWithSemaphore(sem, buildChildNodesRequests)
}
// no concurrency available; build child Nodes sequentially
return impl.buildChildNodes(buildChildNodesRequests)
}
response := NewBuildNodeResponse()
Expand All @@ -235,6 +279,68 @@ func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.Worke
return response, nil
}

// buildChildNodesWithSemaphore builds child Nodes in parallel using a shared semaphore for bounded concurrency.
// It uses a non-blocking semaphore acquisition: if the semaphore is full the request is processed sequentially
// in the calling goroutine, preventing deadlocks during recursive invocations where the caller already holds a slot.
func (impl *ResourceTreeServiceImpl) buildChildNodesWithSemaphore(sem chan struct{}, buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) {
response := NewBuildNodeResponse()
var mu sync.Mutex
var wg sync.WaitGroup
// buffered channel of size 1 captures the first error; subsequent errors are dropped
errCh := make(chan error, 1)

for _, req := range buildChildNodesRequests {
// check for an earlier error before launching more work
select {
case err := <-errCh:
wg.Wait()
return response, err
default:
}

// try to acquire a semaphore slot without blocking
select {
case sem <- struct{}{}:
// slot acquired — run in a goroutine
wg.Add(1)
go func(r *BuildNodesConfig) {
defer func() { <-sem }() // release slot after wg.Done so the semaphore bound stays tight
defer wg.Done() // signal completion first (LIFO: wg.Done runs before <-sem)
childResp, err := impl.BuildNodes(r)
if err != nil {
impl.logger.Errorw("error in building child Nodes", "ReleaseNamespace", r.ReleaseNamespace, "parentResource", r.ParentResourceRef.GetGvk(), "err", err)
select {
case errCh <- err:
default:
}
return
}
mu.Lock()
response.WithNodes(childResp.Nodes).WithHealthStatusArray(childResp.HealthStatusArray)
mu.Unlock()
}(req)
default:
// semaphore full — process sequentially in the calling goroutine to avoid deadlock
childResp, err := impl.BuildNodes(req)
if err != nil {
impl.logger.Errorw("error in building child Nodes sequentially", "ReleaseNamespace", req.ReleaseNamespace, "err", err)
wg.Wait()
return response, err
}
mu.Lock()
response.WithNodes(childResp.Nodes).WithHealthStatusArray(childResp.HealthStatusArray)
mu.Unlock()
}
}
wg.Wait()
select {
case err := <-errCh:
return response, err
default:
return response, nil
}
}

func (impl *ResourceTreeServiceImpl) getNodeFromDesiredOrLiveManifest(request *GetNodeFromManifestRequest) (*GetNodeFromManifestResponse, error) {
response := NewGetNodesFromManifestResponse()
manifest := request.DesiredOrLiveManifest.Manifest
Expand Down
Loading