From b3d9853da366fa69eadc994997ce68684c3ed0e3 Mon Sep 17 00:00:00 2001 From: Brian DeHamer Date: Mon, 23 Mar 2026 15:33:34 -0700 Subject: [PATCH 1/2] refactor: split controller.go into focused files by concern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split the 682-line controller.go into four files organized by responsibility: - controller.go (~160 lines): types, interfaces, Controller struct, New(), Run() - handlers.go (~170 lines): event handler registration, worker loop - reconcile.go (~270 lines): event processing, deployment recording, caching - pod.go (~130 lines): pod utility functions, informer factory Extract registerEventHandlers() from New() as a Controller method and newAPIClient() as a package-level helper to simplify the constructor. No behavioral changes — pure structural refactoring. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- internal/controller/controller.go | 579 ++---------------------------- internal/controller/handlers.go | 170 +++++++++ internal/controller/pod.go | 129 +++++++ internal/controller/reconcile.go | 274 ++++++++++++++ 4 files changed, 602 insertions(+), 550 deletions(-) create mode 100644 internal/controller/handlers.go create mode 100644 internal/controller/pod.go create mode 100644 internal/controller/reconcile.go diff --git a/internal/controller/controller.go b/internal/controller/controller.go index abc0367..864ccb4 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -5,22 +5,15 @@ import ( "errors" "fmt" "log/slog" - "slices" - "strings" "time" "github.com/github/deployment-tracker/internal/metadata" "github.com/github/deployment-tracker/pkg/deploymentrecord" - "github.com/github/deployment-tracker/pkg/dtmetrics" - "github.com/github/deployment-tracker/pkg/ociutil" amcache "k8s.io/apimachinery/pkg/util/cache" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/cache" @@ -81,20 +74,36 @@ type Controller struct { // New creates a new deployment tracker controller. func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregator, namespace string, excludeNamespaces string, cfg *Config) (*Controller, error) { - // Create informer factory factory := createInformerFactory(clientset, namespace, excludeNamespaces) - podInformer := factory.Core().V1().Pods().Informer() - deploymentInformer := factory.Apps().V1().Deployments().Informer() - deploymentLister := factory.Apps().V1().Deployments().Lister() + apiClient, err := newAPIClient(cfg) + if err != nil { + return nil, err + } - // Create work queue with rate limiting - queue := workqueue.NewTypedRateLimitingQueue( - workqueue.DefaultTypedControllerRateLimiter[PodEvent](), - ) + cntrl := &Controller{ + clientset: clientset, + metadataAggregator: metadataAggregator, + podInformer: factory.Core().V1().Pods().Informer(), + deploymentInformer: factory.Apps().V1().Deployments().Informer(), + deploymentLister: factory.Apps().V1().Deployments().Lister(), + workqueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[PodEvent]()), + apiClient: apiClient, + cfg: cfg, + observedDeployments: amcache.NewExpiring(), + unknownArtifacts: amcache.NewExpiring(), + } + + if err := cntrl.registerEventHandlers(); err != nil { + return nil, err + } + + return cntrl, nil +} - // Create API client with optional token - clientOpts := []deploymentrecord.ClientOption{} +// newAPIClient creates a deployment record API client from the controller config. +func newAPIClient(cfg *Config) (deploymentRecordPoster, error) { + var clientOpts []deploymentrecord.ClientOption if cfg.APIToken != "" { clientOpts = append(clientOpts, deploymentrecord.WithAPIToken(cfg.APIToken)) } @@ -109,7 +118,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato )) } - apiClient, err := deploymentrecord.NewClient( + client, err := deploymentrecord.NewClient( cfg.BaseURL, cfg.Organization, clientOpts..., @@ -117,126 +126,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato if err != nil { return nil, fmt.Errorf("failed to create API client: %w", err) } - - cntrl := &Controller{ - clientset: clientset, - metadataAggregator: metadataAggregator, - podInformer: podInformer, - deploymentInformer: deploymentInformer, - deploymentLister: deploymentLister, - workqueue: queue, - apiClient: apiClient, - cfg: cfg, - observedDeployments: amcache.NewExpiring(), - unknownArtifacts: amcache.NewExpiring(), - } - - // Add event handlers to the informer - _, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { - pod, ok := obj.(*corev1.Pod) - if !ok { - slog.Error("Invalid object returned", - "object", obj, - ) - return - } - - // Only process pods that are running and belong - // to a deployment - if pod.Status.Phase == corev1.PodRunning && getDeploymentName(pod) != "" { - key, err := cache.MetaNamespaceKeyFunc(obj) - - // For our purposes, there are in practice - // no error event we care about, so don't - // bother with handling it. - if err == nil { - queue.Add(PodEvent{ - Key: key, - EventType: EventCreated, - }) - } - } - }, - UpdateFunc: func(oldObj, newObj any) { - oldPod, ok := oldObj.(*corev1.Pod) - if !ok { - slog.Error("Invalid old object returned", - "object", oldObj, - ) - return - } - newPod, ok := newObj.(*corev1.Pod) - if !ok { - slog.Error("Invalid new object returned", - "object", newObj, - ) - return - } - - // Skip if pod is being deleted or doesn't belong - // to a deployment - if newPod.DeletionTimestamp != nil || getDeploymentName(newPod) == "" { - return - } - - // Only process if pod just became running. - // We need to process this as often when a container - // is created, the spec does not contain the digest - // so we need to wait for the status field to be - // populated from where we can get the digest. - if oldPod.Status.Phase != corev1.PodRunning && - newPod.Status.Phase == corev1.PodRunning { - key, err := cache.MetaNamespaceKeyFunc(newObj) - - // For our purposes, there are in practice - // no error event we care about, so don't - // bother with handling it. - if err == nil { - queue.Add(PodEvent{ - Key: key, - EventType: EventCreated, - }) - } - } - }, - DeleteFunc: func(obj any) { - pod, ok := obj.(*corev1.Pod) - if !ok { - // Handle deleted final state unknown - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - return - } - pod, ok = tombstone.Obj.(*corev1.Pod) - if !ok { - return - } - } - - // Only process pods that belong to a deployment - if getDeploymentName(pod) == "" { - return - } - - key, err := cache.MetaNamespaceKeyFunc(obj) - // For our purposes, there are in practice - // no error event we care about, so don't - // bother with handling it. - if err == nil { - queue.Add(PodEvent{ - Key: key, - EventType: EventDeleted, - DeletedPod: pod, - }) - } - }, - }) - if err != nil { - return nil, fmt.Errorf("failed to add event handlers: %w", err) - } - - return cntrl, nil + return client, nil } // Run starts the controller. @@ -260,10 +150,7 @@ func (c *Controller) Run(ctx context.Context, workers int) error { "count", workers, ) - // Start workers - for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.runWorker, time.Second) - } + c.startWorkers(ctx, workers) slog.Info("Controller started") @@ -272,411 +159,3 @@ func (c *Controller) Run(ctx context.Context, workers int) error { return nil } - -// runWorker runs a worker to process items from the work queue. -func (c *Controller) runWorker(ctx context.Context) { - for c.processNextItem(ctx) { - } -} - -// processNextItem processes the next item from the work queue. -func (c *Controller) processNextItem(ctx context.Context) bool { - event, shutdown := c.workqueue.Get() - if shutdown { - return false - } - defer c.workqueue.Done(event) - - start := time.Now() - err := c.processEvent(ctx, event) - dur := time.Since(start) - - if err == nil { - dtmetrics.EventsProcessedOk.WithLabelValues(event.EventType).Inc() - dtmetrics.EventsProcessedTimer.WithLabelValues("ok").Observe(dur.Seconds()) - - c.workqueue.Forget(event) - return true - } - dtmetrics.EventsProcessedTimer.WithLabelValues("failed").Observe(dur.Seconds()) - dtmetrics.EventsProcessedFailed.WithLabelValues(event.EventType).Inc() - - // Requeue on error with rate limiting - slog.Error("Failed to process event, requeuing", - "event_key", event.Key, - "error", err, - ) - c.workqueue.AddRateLimited(event) - - return true -} - -// processEvent processes a single pod event. -func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { - var pod *corev1.Pod - - if event.EventType == EventDeleted { - // For delete events, use the pod captured at deletion time - pod = event.DeletedPod - if pod == nil { - slog.Error("Delete event missing pod data", - "key", event.Key, - ) - return nil - } - - // Check if the parent deployment still exists - // If it does, this is just a scale-down event, skip it. - // - // If a deployment changes image versions, this will not - // fire delete/decommissioned events to the remote API. - // This is as intended, as the server will keep track of - // the (cluster unique) deployment name, and just update - // the referenced image digest to the newly observed (via - // the create event). - deploymentName := getDeploymentName(pod) - if deploymentName != "" && c.deploymentExists(pod.Namespace, deploymentName) { - slog.Debug("Deployment still exists, skipping pod delete (scale down)", - "namespace", pod.Namespace, - "deployment", deploymentName, - "pod", pod.Name, - ) - return nil - } - } else { - // For create events, get the pod from the informer's cache - obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) - if err != nil { - slog.Error("Failed to get pod from cache", - "key", event.Key, - "error", err, - ) - return nil - } - if !exists { - // Pod no longer exists in cache, skip processing - return nil - } - - var ok bool - pod, ok = obj.(*corev1.Pod) - if !ok { - slog.Error("Invalid object type in cache", - "key", event.Key, - ) - return nil - } - } - - status := deploymentrecord.StatusDeployed - if event.EventType == EventDeleted { - status = deploymentrecord.StatusDecommissioned - } - - var lastErr error - - // Gather aggregate metadata for adds/updates - var aggPodMetadata *metadata.AggregatePodMetadata - if status != deploymentrecord.StatusDecommissioned { - aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) - } - - // Record info for each container in the pod - for _, container := range pod.Spec.Containers { - if err := c.recordContainer(ctx, pod, container, status, event.EventType, aggPodMetadata); err != nil { - lastErr = err - } - } - - // Also record init containers - for _, container := range pod.Spec.InitContainers { - if err := c.recordContainer(ctx, pod, container, status, event.EventType, aggPodMetadata); err != nil { - lastErr = err - } - } - - return lastErr -} - -// deploymentExists checks if a deployment exists in the local informer cache. -func (c *Controller) deploymentExists(namespace, name string) bool { - _, err := c.deploymentLister.Deployments(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - return false - } - slog.Warn("Failed to check if deployment exists in cache, assuming it does", - "namespace", namespace, - "deployment", name, - "error", err, - ) - return true - } - return true -} - -// recordContainer records a single container's deployment info. -func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *metadata.AggregatePodMetadata) error { - var cacheKey string - - dn := getARDeploymentName(pod, container, c.cfg.Template) - digest := getContainerDigest(pod, container.Name) - - if dn == "" || digest == "" { - slog.Debug("Skipping container: missing deployment name or digest", - "namespace", pod.Namespace, - "pod", pod.Name, - "container", container.Name, - "deployment_name", dn, - "has_digest", digest != "", - ) - return nil - } - - // Check if we've already recorded this deployment - switch status { - case deploymentrecord.StatusDeployed: - cacheKey = getCacheKey(EventCreated, dn, digest) - if _, exists := c.observedDeployments.Get(cacheKey); exists { - slog.Debug("Deployment already observed, skipping post", - "deployment_name", dn, - "digest", digest, - ) - return nil - } - case deploymentrecord.StatusDecommissioned: - cacheKey = getCacheKey(EventDeleted, dn, digest) - if _, exists := c.observedDeployments.Get(cacheKey); exists { - slog.Debug("Deployment already deleted, skipping post", - "deployment_name", dn, - "digest", digest, - ) - return nil - } - default: - return fmt.Errorf("invalid status: %s", status) - } - - // Check if this artifact was previously unknown (404 from the API) - if _, exists := c.unknownArtifacts.Get(digest); exists { - dtmetrics.PostDeploymentRecordUnknownArtifactCacheHit.Inc() - slog.Debug("Artifact previously returned 404, skipping post", - "deployment_name", dn, - "digest", digest, - ) - return nil - } - - // Extract image name and tag - imageName, version := ociutil.ExtractName(container.Image) - - // Format runtime risks and tags - var runtimeRisks []deploymentrecord.RuntimeRisk - var tags map[string]string - if aggPodMetadata != nil { - for risk := range aggPodMetadata.RuntimeRisks { - runtimeRisks = append(runtimeRisks, risk) - } - slices.Sort(runtimeRisks) - tags = aggPodMetadata.Tags - } - - // Create deployment record - record := deploymentrecord.NewDeploymentRecord( - imageName, - digest, - version, - c.cfg.LogicalEnvironment, - c.cfg.PhysicalEnvironment, - c.cfg.Cluster, - status, - dn, - runtimeRisks, - tags, - ) - - if err := c.apiClient.PostOne(ctx, record); err != nil { - // Return if no artifact is found and cache the digest - var noArtifactErr *deploymentrecord.NoArtifactError - if errors.As(err, &noArtifactErr) { - c.unknownArtifacts.Set(digest, true, unknownArtifactTTL) - slog.Info("No artifact found, digest cached as unknown", - "deployment_name", dn, - "digest", digest, - ) - return nil - } - - // Make sure to not retry on client error messages - var clientErr *deploymentrecord.ClientError - if errors.As(err, &clientErr) { - slog.Warn("Failed to post record", - "event_type", eventType, - "name", record.Name, - "deployment_name", record.DeploymentName, - "status", record.Status, - "digest", record.Digest, - "error", err, - ) - return nil - } - - slog.Error("Failed to post record", - "event_type", eventType, - "name", record.Name, - "deployment_name", record.DeploymentName, - "status", record.Status, - "digest", record.Digest, - "error", err, - ) - return err - } - - slog.Info("Posted record", - "event_type", eventType, - "name", record.Name, - "deployment_name", record.DeploymentName, - "status", record.Status, - "digest", record.Digest, - "runtime_risks", record.RuntimeRisks, - "tags", record.Tags, - ) - - // Update cache after successful post - switch status { - case deploymentrecord.StatusDeployed: - cacheKey = getCacheKey(EventCreated, dn, digest) - c.observedDeployments.Set(cacheKey, true, 2*time.Minute) - // If there was a previous delete event, remove that - cacheKey = getCacheKey(EventDeleted, dn, digest) - c.observedDeployments.Delete(cacheKey) - case deploymentrecord.StatusDecommissioned: - cacheKey = getCacheKey(EventDeleted, dn, digest) - c.observedDeployments.Set(cacheKey, true, 2*time.Minute) - // If there was a previous create event, remove that - cacheKey = getCacheKey(EventCreated, dn, digest) - c.observedDeployments.Delete(cacheKey) - default: - return fmt.Errorf("invalid status: %s", status) - } - - return nil -} - -func getCacheKey(ev, dn, digest string) string { - return ev + "||" + dn + "||" + digest -} - -// createInformerFactory creates a shared informer factory with the given resync period. -// If excludeNamespaces is non-empty, it will exclude those namespaces from being watched. -// If namespace is non-empty, it will only watch that namespace. -func createInformerFactory(clientset kubernetes.Interface, namespace string, excludeNamespaces string) informers.SharedInformerFactory { - var factory informers.SharedInformerFactory - switch { - case namespace != "": - slog.Info("Namespace to watch", - "namespace", - namespace, - ) - factory = informers.NewSharedInformerFactoryWithOptions( - clientset, - 30*time.Second, - informers.WithNamespace(namespace), - ) - case excludeNamespaces != "": - seenNamespaces := make(map[string]bool) - fieldSelectorParts := make([]string, 0) - - for _, ns := range strings.Split(excludeNamespaces, ",") { - ns = strings.TrimSpace(ns) - if ns != "" && !seenNamespaces[ns] { - seenNamespaces[ns] = true - fieldSelectorParts = append(fieldSelectorParts, fmt.Sprintf("metadata.namespace!=%s", ns)) - } - } - - slog.Info("Excluding namespaces from watch", - "field_selector", - strings.Join(fieldSelectorParts, ","), - ) - tweakListOptions := func(options *metav1.ListOptions) { - options.FieldSelector = strings.Join(fieldSelectorParts, ",") - } - - factory = informers.NewSharedInformerFactoryWithOptions( - clientset, - 30*time.Second, - informers.WithTweakListOptions(tweakListOptions), - ) - default: - factory = informers.NewSharedInformerFactory(clientset, - 30*time.Second, - ) - } - - return factory -} - -// getARDeploymentName converts the pod's metadata into the correct format -// for the deployment name for the artifact registry (this is not the same -// as the K8s deployment's name!) -// The deployment name must unique within logical, physical environment and -// the cluster. -func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl string) string { - res := tmpl - res = strings.ReplaceAll(res, TmplNS, p.Namespace) - res = strings.ReplaceAll(res, TmplDN, getDeploymentName(p)) - res = strings.ReplaceAll(res, TmplCN, c.Name) - return res -} - -// getContainerDigest extracts the image digest from the container status. -// The spec only contains the desired state, so any resolved digests must -// be pulled from the status field. -func getContainerDigest(pod *corev1.Pod, containerName string) string { - // Check regular container statuses - for _, status := range pod.Status.ContainerStatuses { - if status.Name == containerName { - return ociutil.ExtractDigest(status.ImageID) - } - } - - // Check init container statuses - for _, status := range pod.Status.InitContainerStatuses { - if status.Name == containerName { - return ociutil.ExtractDigest(status.ImageID) - } - } - - return "" -} - -// getDeploymentName returns the deployment name for a pod, if it belongs -// to one. -func getDeploymentName(pod *corev1.Pod) string { - // Pods created by Deployments are owned by ReplicaSets - // The ReplicaSet name follows the pattern: - - for _, owner := range pod.OwnerReferences { - if owner.Kind == "ReplicaSet" { - // Extract deployment name by removing the hash suffix - // ReplicaSet name format: - - rsName := owner.Name - lastDash := strings.LastIndex(rsName, "-") - if lastDash > 0 { - return rsName[:lastDash] - } - return rsName - } - } - return "" -} - -func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata { - return &metav1.PartialObjectMetadata{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: pod.ObjectMeta, - } -} diff --git a/internal/controller/handlers.go b/internal/controller/handlers.go new file mode 100644 index 0000000..aa1e898 --- /dev/null +++ b/internal/controller/handlers.go @@ -0,0 +1,170 @@ +package controller + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/github/deployment-tracker/pkg/dtmetrics" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" +) + +// registerEventHandlers adds pod event handlers to the informer. Events +// are filtered and enqueued to the controller's work queue for processing. +func (c *Controller) registerEventHandlers() error { + _, err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + pod, ok := obj.(*corev1.Pod) + if !ok { + slog.Error("Invalid object returned", + "object", obj, + ) + return + } + + // Only process pods that are running and belong + // to a deployment + if pod.Status.Phase == corev1.PodRunning && getDeploymentName(pod) != "" { + key, err := cache.MetaNamespaceKeyFunc(obj) + + // For our purposes, there are in practice + // no error event we care about, so don't + // bother with handling it. + if err == nil { + c.workqueue.Add(PodEvent{ + Key: key, + EventType: EventCreated, + }) + } + } + }, + UpdateFunc: func(oldObj, newObj any) { + oldPod, ok := oldObj.(*corev1.Pod) + if !ok { + slog.Error("Invalid old object returned", + "object", oldObj, + ) + return + } + newPod, ok := newObj.(*corev1.Pod) + if !ok { + slog.Error("Invalid new object returned", + "object", newObj, + ) + return + } + + // Skip if pod is being deleted or doesn't belong + // to a deployment + if newPod.DeletionTimestamp != nil || getDeploymentName(newPod) == "" { + return + } + + // Only process if pod just became running. + // We need to process this as often when a container + // is created, the spec does not contain the digest + // so we need to wait for the status field to be + // populated from where we can get the digest. + if oldPod.Status.Phase != corev1.PodRunning && + newPod.Status.Phase == corev1.PodRunning { + key, err := cache.MetaNamespaceKeyFunc(newObj) + + // For our purposes, there are in practice + // no error event we care about, so don't + // bother with handling it. + if err == nil { + c.workqueue.Add(PodEvent{ + Key: key, + EventType: EventCreated, + }) + } + } + }, + DeleteFunc: func(obj any) { + pod, ok := obj.(*corev1.Pod) + if !ok { + // Handle deleted final state unknown + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + pod, ok = tombstone.Obj.(*corev1.Pod) + if !ok { + return + } + } + + // Only process pods that belong to a deployment + if getDeploymentName(pod) == "" { + return + } + + key, err := cache.MetaNamespaceKeyFunc(obj) + // For our purposes, there are in practice + // no error event we care about, so don't + // bother with handling it. + if err == nil { + c.workqueue.Add(PodEvent{ + Key: key, + EventType: EventDeleted, + DeletedPod: pod, + }) + } + }, + }) + if err != nil { + return fmt.Errorf("failed to add event handlers: %w", err) + } + + return nil +} + +// runWorker runs a worker to process items from the work queue. +func (c *Controller) runWorker(ctx context.Context) { + for c.processNextItem(ctx) { + } +} + +// startWorkers launches the specified number of workers and blocks until +// the context is cancelled. +func (c *Controller) startWorkers(ctx context.Context, workers int) { + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } +} + +// processNextItem processes the next item from the work queue. +func (c *Controller) processNextItem(ctx context.Context) bool { + event, shutdown := c.workqueue.Get() + if shutdown { + return false + } + defer c.workqueue.Done(event) + + start := time.Now() + err := c.processEvent(ctx, event) + dur := time.Since(start) + + if err == nil { + dtmetrics.EventsProcessedOk.WithLabelValues(event.EventType).Inc() + dtmetrics.EventsProcessedTimer.WithLabelValues("ok").Observe(dur.Seconds()) + + c.workqueue.Forget(event) + return true + } + dtmetrics.EventsProcessedTimer.WithLabelValues("failed").Observe(dur.Seconds()) + dtmetrics.EventsProcessedFailed.WithLabelValues(event.EventType).Inc() + + // Requeue on error with rate limiting + slog.Error("Failed to process event, requeuing", + "event_key", event.Key, + "error", err, + ) + c.workqueue.AddRateLimited(event) + + return true +} diff --git a/internal/controller/pod.go b/internal/controller/pod.go new file mode 100644 index 0000000..9d9bc2e --- /dev/null +++ b/internal/controller/pod.go @@ -0,0 +1,129 @@ +package controller + +import ( + "fmt" + "log/slog" + "strings" + "time" + + "github.com/github/deployment-tracker/pkg/ociutil" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" +) + +// createInformerFactory creates a shared informer factory with the given resync period. +// If excludeNamespaces is non-empty, it will exclude those namespaces from being watched. +// If namespace is non-empty, it will only watch that namespace. +func createInformerFactory(clientset kubernetes.Interface, namespace string, excludeNamespaces string) informers.SharedInformerFactory { + var factory informers.SharedInformerFactory + switch { + case namespace != "": + slog.Info("Namespace to watch", + "namespace", + namespace, + ) + factory = informers.NewSharedInformerFactoryWithOptions( + clientset, + 30*time.Second, + informers.WithNamespace(namespace), + ) + case excludeNamespaces != "": + seenNamespaces := make(map[string]bool) + fieldSelectorParts := make([]string, 0) + + for _, ns := range strings.Split(excludeNamespaces, ",") { + ns = strings.TrimSpace(ns) + if ns != "" && !seenNamespaces[ns] { + seenNamespaces[ns] = true + fieldSelectorParts = append(fieldSelectorParts, fmt.Sprintf("metadata.namespace!=%s", ns)) + } + } + + slog.Info("Excluding namespaces from watch", + "field_selector", + strings.Join(fieldSelectorParts, ","), + ) + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = strings.Join(fieldSelectorParts, ",") + } + + factory = informers.NewSharedInformerFactoryWithOptions( + clientset, + 30*time.Second, + informers.WithTweakListOptions(tweakListOptions), + ) + default: + factory = informers.NewSharedInformerFactory(clientset, + 30*time.Second, + ) + } + + return factory +} + +// getARDeploymentName converts the pod's metadata into the correct format +// for the deployment name for the artifact registry (this is not the same +// as the K8s deployment's name!) +// The deployment name must unique within logical, physical environment and +// the cluster. +func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl string) string { + res := tmpl + res = strings.ReplaceAll(res, TmplNS, p.Namespace) + res = strings.ReplaceAll(res, TmplDN, getDeploymentName(p)) + res = strings.ReplaceAll(res, TmplCN, c.Name) + return res +} + +// getContainerDigest extracts the image digest from the container status. +// The spec only contains the desired state, so any resolved digests must +// be pulled from the status field. +func getContainerDigest(pod *corev1.Pod, containerName string) string { + // Check regular container statuses + for _, status := range pod.Status.ContainerStatuses { + if status.Name == containerName { + return ociutil.ExtractDigest(status.ImageID) + } + } + + // Check init container statuses + for _, status := range pod.Status.InitContainerStatuses { + if status.Name == containerName { + return ociutil.ExtractDigest(status.ImageID) + } + } + + return "" +} + +// getDeploymentName returns the deployment name for a pod, if it belongs +// to one. +func getDeploymentName(pod *corev1.Pod) string { + // Pods created by Deployments are owned by ReplicaSets + // The ReplicaSet name follows the pattern: - + for _, owner := range pod.OwnerReferences { + if owner.Kind == "ReplicaSet" { + // Extract deployment name by removing the hash suffix + // ReplicaSet name format: - + rsName := owner.Name + lastDash := strings.LastIndex(rsName, "-") + if lastDash > 0 { + return rsName[:lastDash] + } + return rsName + } + } + return "" +} + +func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata { + return &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: pod.ObjectMeta, + } +} diff --git a/internal/controller/reconcile.go b/internal/controller/reconcile.go new file mode 100644 index 0000000..e7fb241 --- /dev/null +++ b/internal/controller/reconcile.go @@ -0,0 +1,274 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "log/slog" + "slices" + "time" + + "github.com/github/deployment-tracker/internal/metadata" + "github.com/github/deployment-tracker/pkg/deploymentrecord" + "github.com/github/deployment-tracker/pkg/dtmetrics" + "github.com/github/deployment-tracker/pkg/ociutil" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" +) + +// processEvent processes a single pod event. +func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { + var pod *corev1.Pod + + if event.EventType == EventDeleted { + // For delete events, use the pod captured at deletion time + pod = event.DeletedPod + if pod == nil { + slog.Error("Delete event missing pod data", + "key", event.Key, + ) + return nil + } + + // Check if the parent deployment still exists + // If it does, this is just a scale-down event, skip it. + // + // If a deployment changes image versions, this will not + // fire delete/decommissioned events to the remote API. + // This is as intended, as the server will keep track of + // the (cluster unique) deployment name, and just update + // the referenced image digest to the newly observed (via + // the create event). + deploymentName := getDeploymentName(pod) + if deploymentName != "" && c.deploymentExists(pod.Namespace, deploymentName) { + slog.Debug("Deployment still exists, skipping pod delete (scale down)", + "namespace", pod.Namespace, + "deployment", deploymentName, + "pod", pod.Name, + ) + return nil + } + } else { + // For create events, get the pod from the informer's cache + obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key) + if err != nil { + slog.Error("Failed to get pod from cache", + "key", event.Key, + "error", err, + ) + return nil + } + if !exists { + // Pod no longer exists in cache, skip processing + return nil + } + + var ok bool + pod, ok = obj.(*corev1.Pod) + if !ok { + slog.Error("Invalid object type in cache", + "key", event.Key, + ) + return nil + } + } + + status := deploymentrecord.StatusDeployed + if event.EventType == EventDeleted { + status = deploymentrecord.StatusDecommissioned + } + + var lastErr error + + // Gather aggregate metadata for adds/updates + var aggPodMetadata *metadata.AggregatePodMetadata + if status != deploymentrecord.StatusDecommissioned { + aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) + } + + // Record info for each container in the pod + for _, container := range pod.Spec.Containers { + if err := c.recordContainer(ctx, pod, container, status, event.EventType, aggPodMetadata); err != nil { + lastErr = err + } + } + + // Also record init containers + for _, container := range pod.Spec.InitContainers { + if err := c.recordContainer(ctx, pod, container, status, event.EventType, aggPodMetadata); err != nil { + lastErr = err + } + } + + return lastErr +} + +// deploymentExists checks if a deployment exists in the local informer cache. +func (c *Controller) deploymentExists(namespace, name string) bool { + _, err := c.deploymentLister.Deployments(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if deployment exists in cache, assuming it does", + "namespace", namespace, + "deployment", name, + "error", err, + ) + return true + } + return true +} + +// recordContainer records a single container's deployment info. +func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *metadata.AggregatePodMetadata) error { + var cacheKey string + + dn := getARDeploymentName(pod, container, c.cfg.Template) + digest := getContainerDigest(pod, container.Name) + + if dn == "" || digest == "" { + slog.Debug("Skipping container: missing deployment name or digest", + "namespace", pod.Namespace, + "pod", pod.Name, + "container", container.Name, + "deployment_name", dn, + "has_digest", digest != "", + ) + return nil + } + + // Check if we've already recorded this deployment + switch status { + case deploymentrecord.StatusDeployed: + cacheKey = getCacheKey(EventCreated, dn, digest) + if _, exists := c.observedDeployments.Get(cacheKey); exists { + slog.Debug("Deployment already observed, skipping post", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + case deploymentrecord.StatusDecommissioned: + cacheKey = getCacheKey(EventDeleted, dn, digest) + if _, exists := c.observedDeployments.Get(cacheKey); exists { + slog.Debug("Deployment already deleted, skipping post", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + default: + return fmt.Errorf("invalid status: %s", status) + } + + // Check if this artifact was previously unknown (404 from the API) + if _, exists := c.unknownArtifacts.Get(digest); exists { + dtmetrics.PostDeploymentRecordUnknownArtifactCacheHit.Inc() + slog.Debug("Artifact previously returned 404, skipping post", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + + // Extract image name and tag + imageName, version := ociutil.ExtractName(container.Image) + + // Format runtime risks and tags + var runtimeRisks []deploymentrecord.RuntimeRisk + var tags map[string]string + if aggPodMetadata != nil { + for risk := range aggPodMetadata.RuntimeRisks { + runtimeRisks = append(runtimeRisks, risk) + } + slices.Sort(runtimeRisks) + tags = aggPodMetadata.Tags + } + + // Create deployment record + record := deploymentrecord.NewDeploymentRecord( + imageName, + digest, + version, + c.cfg.LogicalEnvironment, + c.cfg.PhysicalEnvironment, + c.cfg.Cluster, + status, + dn, + runtimeRisks, + tags, + ) + + if err := c.apiClient.PostOne(ctx, record); err != nil { + // Return if no artifact is found and cache the digest + var noArtifactErr *deploymentrecord.NoArtifactError + if errors.As(err, &noArtifactErr) { + c.unknownArtifacts.Set(digest, true, unknownArtifactTTL) + slog.Info("No artifact found, digest cached as unknown", + "deployment_name", dn, + "digest", digest, + ) + return nil + } + + // Make sure to not retry on client error messages + var clientErr *deploymentrecord.ClientError + if errors.As(err, &clientErr) { + slog.Warn("Failed to post record", + "event_type", eventType, + "name", record.Name, + "deployment_name", record.DeploymentName, + "status", record.Status, + "digest", record.Digest, + "error", err, + ) + return nil + } + + slog.Error("Failed to post record", + "event_type", eventType, + "name", record.Name, + "deployment_name", record.DeploymentName, + "status", record.Status, + "digest", record.Digest, + "error", err, + ) + return err + } + + slog.Info("Posted record", + "event_type", eventType, + "name", record.Name, + "deployment_name", record.DeploymentName, + "status", record.Status, + "digest", record.Digest, + "runtime_risks", record.RuntimeRisks, + "tags", record.Tags, + ) + + // Update cache after successful post + switch status { + case deploymentrecord.StatusDeployed: + cacheKey = getCacheKey(EventCreated, dn, digest) + c.observedDeployments.Set(cacheKey, true, 2*time.Minute) + // If there was a previous delete event, remove that + cacheKey = getCacheKey(EventDeleted, dn, digest) + c.observedDeployments.Delete(cacheKey) + case deploymentrecord.StatusDecommissioned: + cacheKey = getCacheKey(EventDeleted, dn, digest) + c.observedDeployments.Set(cacheKey, true, 2*time.Minute) + // If there was a previous create event, remove that + cacheKey = getCacheKey(EventCreated, dn, digest) + c.observedDeployments.Delete(cacheKey) + default: + return fmt.Errorf("invalid status: %s", status) + } + + return nil +} + +func getCacheKey(ev, dn, digest string) string { + return ev + "||" + dn + "||" + digest +} From 8130fd71c06f4f4f9eef33bfd79c845b90732ac0 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Mar 2026 15:59:13 -0700 Subject: [PATCH 2/2] fix: use DeletionHandlingMetaNamespaceKeyFunc in DeleteFunc for tombstone support (#70) * Initial plan * fix: use DeletionHandlingMetaNamespaceKeyFunc in DeleteFunc for tombstone support Co-authored-by: bdehamer <398027+bdehamer@users.noreply.github.com> Agent-Logs-Url: https://github.com/github/deployment-tracker/sessions/620a8570-c815-4b5f-95cd-25d107fcd82a --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: bdehamer <398027+bdehamer@users.noreply.github.com> --- internal/controller/handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controller/handlers.go b/internal/controller/handlers.go index aa1e898..c5cca11 100644 --- a/internal/controller/handlers.go +++ b/internal/controller/handlers.go @@ -103,7 +103,7 @@ func (c *Controller) registerEventHandlers() error { return } - key, err := cache.MetaNamespaceKeyFunc(obj) + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) // For our purposes, there are in practice // no error event we care about, so don't // bother with handling it.