Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion event-exporter/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
event-exporter
.idea/**
.idea/**
2 changes: 1 addition & 1 deletion event-exporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
k8s.io/api v0.35.2
k8s.io/apimachinery v0.35.2
k8s.io/client-go v0.35.2
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4
)

Expand Down Expand Up @@ -65,7 +66,6 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
Expand Down
73 changes: 48 additions & 25 deletions event-exporter/kubernetes/podlabels/pod_labels_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientfeatures "k8s.io/client-go/features"
"k8s.io/client-go/metadata"

"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

// PodLabelCollector defines the interface for GetLabels for a namespace and pod name pair.
Expand All @@ -16,16 +19,17 @@ type PodLabelCollector interface {
}

type PodLabelsSharedInformerFactory struct {
factory informers.SharedInformerFactory
factory metadatainformer.SharedInformerFactory
}

func (f *PodLabelsSharedInformerFactory) Run(stopCh <-chan struct{}) {
f.factory.Start(stopCh)
f.factory.WaitForCacheSync(stopCh)
klog.Info("Cache synced!")
}

func (f *PodLabelsSharedInformerFactory) NewPodLabelsSharedInformer() *PodLabelsSharedInformer {
podInformer := f.factory.Core().V1().Pods().Informer()
podInformer := f.factory.ForResource(corev1.SchemeGroupVersion.WithResource("pods")).Informer()
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ any) {
recordAddition()
Expand All @@ -39,38 +43,57 @@ func (f *PodLabelsSharedInformerFactory) NewPodLabelsSharedInformer() *PodLabels
}
}

func NewPodLabelsSharedInformerFactory(client kubernetes.Interface, ignoredNamespaces []string) *PodLabelsSharedInformerFactory {
type customFeatureGates struct {
clientfeatures.Gates
enableWatchListClient bool
}

func (c *customFeatureGates) Enabled(key clientfeatures.Feature) bool {
if key == clientfeatures.WatchListClient {
klog.Info("Enabled feature gate WatchListClient: ", c.enableWatchListClient)
return c.enableWatchListClient
}
return c.Gates.Enabled(key)
}

func NewPodLabelsSharedInformerFactory(client metadata.Interface, ignoredNamespaces []string, enableWatchListClient bool) *PodLabelsSharedInformerFactory {
// Set the custom feature gates based on the flag
clientfeatures.ReplaceFeatureGates(&customFeatureGates{
Comment thread
chelseychen marked this conversation as resolved.
Gates: clientfeatures.FeatureGates(),
enableWatchListClient: enableWatchListClient,
})

ignoredNamespacesMap := make(map[string]struct{})
for _, ns := range ignoredNamespaces {
ignoredNamespacesMap[ns] = struct{}{}
}
return &PodLabelsSharedInformerFactory{
factory: informers.NewSharedInformerFactoryWithOptions(
factory: metadatainformer.NewSharedInformerFactoryWithOptions(
client,
0,
informers.WithTransform(func(obj interface{}) (interface{}, error) {
pod := obj.(*corev1.Pod)
if _, ok := ignoredNamespacesMap[pod.Namespace]; ok {
metadatainformer.WithTransform(func(obj interface{}) (interface{}, error) {
meta := obj.(*metav1.PartialObjectMetadata)
if _, ok := ignoredNamespacesMap[meta.Namespace]; ok {
return nil, nil
}
labels := make(map[string]string)
if v, ok := pod.Labels["pod-template-hash"]; ok {
if v, ok := meta.Labels["pod-template-hash"]; ok {
labels["pod-template-hash"] = v
}
if v, ok := pod.Labels[jobSetNameLabelKey]; ok {
if v, ok := meta.Labels[jobSetNameLabelKey]; ok {
labels[jobSetNameLabelKey] = v
}
if v, ok := pod.Labels[jobSetRestartAttemptLabelKey]; ok {
if v, ok := meta.Labels[jobSetRestartAttemptLabelKey]; ok {
labels[jobSetRestartAttemptLabelKey] = v
}
if v, ok := pod.Labels[jobsetUIDLabelKey]; ok {
if v, ok := meta.Labels[jobsetUIDLabelKey]; ok {
labels[jobsetUIDLabelKey] = v
}
return &corev1.Pod{
return &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: pod.OwnerReferences,
Name: meta.Name,
Namespace: meta.Namespace,
OwnerReferences: meta.OwnerReferences,
Labels: labels,
},
}, nil
Expand All @@ -89,16 +112,16 @@ func (informer *PodLabelsSharedInformer) GetLabels(namespace, podName string) ma
recordQueryMiss()
return nil
}
pod := podItem.(*corev1.Pod)
meta := podItem.(*metav1.PartialObjectMetadata)
recordQueryHit()
return getLabelsFromPod(pod)
return getLabelsFromMeta(meta)
}

// get owner labels for go/gke-log-owner-label for non-system logs.
// If there are multiple owners, the loop below picks the last valid one.
func getLabelsFromPod(pod *corev1.Pod) map[string]string {
func getLabelsFromMeta(meta *metav1.PartialObjectMetadata) map[string]string {
transformedLabels := map[string]string{}
for _, owner := range pod.GetObjectMeta().GetOwnerReferences() {
for _, owner := range meta.GetOwnerReferences() {
switch owner.Kind {
case "DaemonSet":
transformedLabels[ownerTypeKeyName] = "DaemonSet"
Expand All @@ -110,7 +133,7 @@ func getLabelsFromPod(pod *corev1.Pod) map[string]string {
// Pod that is eventually owned by Deployment has pod name:
// <DeploymentName>-<PodTemplateHash>-<RandomString>
// and owner replicaset name: <DeploymentName>-<PodTemplateHash>
if templateHashSuffix := "-" + pod.GetObjectMeta().GetLabels()["pod-template-hash"]; len(templateHashSuffix) > 1 {
if templateHashSuffix := "-" + meta.GetLabels()["pod-template-hash"]; len(templateHashSuffix) > 1 {
if ownerName, ok := strings.CutSuffix(owner.Name, templateHashSuffix); ok {
transformedLabels[ownerTypeKeyName] = "Deployment"
transformedLabels[ownerNameKeyName] = ownerName
Expand All @@ -123,16 +146,16 @@ func getLabelsFromPod(pod *corev1.Pod) map[string]string {
// and owner job name: <CronJobName>-<UnixTimeInMin>
transformedLabels[ownerTypeKeyName] = "CronJob"
transformedLabels[ownerNameKeyName] = ownerName
} else if jobsetName := pod.GetObjectMeta().GetLabels()[jobSetNameLabelKey]; jobsetName != "" {
} else if jobsetName := meta.GetLabels()[jobSetNameLabelKey]; jobsetName != "" {
// Pod that is eventually owned by a JobSet has the jobset name label set.
transformedLabels[ownerTypeKeyName] = "JobSet"
transformedLabels[ownerNameKeyName] = jobsetName

// Add restart_attempt and uid labels for JobSet events.
if restartAttempt, ok := pod.GetObjectMeta().GetLabels()[jobSetRestartAttemptLabelKey]; ok {
if restartAttempt, ok := meta.GetLabels()[jobSetRestartAttemptLabelKey]; ok {
transformedLabels[jobSetRestartAttemptLabelKey] = restartAttempt
}
if uid, ok := pod.GetObjectMeta().GetLabels()[jobsetUIDLabelKey]; ok {
if uid, ok := meta.GetLabels()[jobsetUIDLabelKey]; ok {
transformedLabels[jobsetUIDLabelKey] = uid
}
} else {
Expand Down
78 changes: 54 additions & 24 deletions event-exporter/kubernetes/podlabels/pod_labels_informer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package podlabels

import (
"context"
"math"
"testing"
"time"
Expand All @@ -10,7 +9,8 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/apimachinery/pkg/runtime"
metadatafake "k8s.io/client-go/metadata/fake"
)

type mockPodOptions struct {
Expand Down Expand Up @@ -178,7 +178,15 @@ func TestGetLabelsFromPod(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.description, func(t *testing.T) {
gotLabels := getLabelsFromPod(tc.pod)
meta := &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: tc.pod.Name,
Namespace: tc.pod.Namespace,
OwnerReferences: tc.pod.OwnerReferences,
Labels: tc.pod.Labels,
},
}
gotLabels := getLabelsFromMeta(meta)
if len(tc.wantLabels) != len(gotLabels) {
t.Errorf("getLabelsFromPod() got unexpected labels %v, want %v", gotLabels, tc.wantLabels)
}
Expand All @@ -196,22 +204,37 @@ func intValueOfMetric(c prometheus.Collector) int {
}

func TestGetLabelsCacheOperations(t *testing.T) {
fakeclient := fake.NewSimpleClientset(
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "my-agent-12abc",
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{{
Kind: "DaemonSet",
Name: "my-agent",
}},

scheme := runtime.NewScheme()
metav1.AddMetaToScheme(scheme)

pod1 := &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "my-agent-12abc",
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{{
Kind: "DaemonSet",
Name: "my-agent",
}},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
Namespace: "default",
}})
factory := NewPodLabelsSharedInformerFactory(fakeclient, nil)
},
}
pod2 := &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
Namespace: "default",
},
}
fakeMetadataClient := metadatafake.NewSimpleMetadataClient(scheme, pod1, pod2)

factory := NewPodLabelsSharedInformerFactory(fakeMetadataClient, nil, false)
collector := factory.NewPodLabelsSharedInformer()
stopCh := make(chan struct{})
defer close(stopCh)
Expand Down Expand Up @@ -250,7 +273,11 @@ func TestGetLabelsCacheOperations(t *testing.T) {
t.Errorf("At this point, cacheOpsCount with operation=querymiss should be 2, but got %d", count)
}

newPod := &corev1.Pod{
newPodMeta := &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
Expand All @@ -261,16 +288,17 @@ func TestGetLabelsCacheOperations(t *testing.T) {
},
}

ctx := context.TODO()
_, err := fakeclient.CoreV1().Pods("default").Create(ctx, newPod, metav1.CreateOptions{})
err := fakeMetadataClient.Tracker().Create(corev1.SchemeGroupVersion.WithResource("pods"), newPodMeta, "default")
if err != nil {
t.Fatalf("unexpected error creating pod: %v", err)
}
// Simulate an add event in the informer
collector.informer.GetStore().Add(newPodMeta)

time.Sleep(time.Second)

if count := intValueOfMetric(cacheOpsCount.With(prometheus.Labels{"operation": "add"})); count != 3 {
t.Errorf("At this point, cacheOpsCount with operation=add should be 3, but got %d", count)
if count := intValueOfMetric(cacheOpsCount.With(prometheus.Labels{"operation": "add"})); count != 2 {
t.Errorf("At this point, cacheOpsCount with operation=add should be 2, but got %d", count)
}

labels = collector.GetLabels("default", "test-pod") // query hit +1
Expand All @@ -281,10 +309,12 @@ func TestGetLabelsCacheOperations(t *testing.T) {
t.Errorf("At this point, cacheOpsCount with operation=queryhit should be 5, but got %d", count)
}

err = fakeclient.CoreV1().Pods("default").Delete(ctx, "test-pod", metav1.DeleteOptions{})
err = fakeMetadataClient.Tracker().Delete(corev1.SchemeGroupVersion.WithResource("pods"), "default", "test-pod")
if err != nil {
t.Fatalf("unexpected error deleting pod: %v", err)
}
// Simulate a delete event in the informer
collector.informer.GetStore().Delete(newPodMeta)

time.Sleep(time.Second)

Expand Down
6 changes: 4 additions & 2 deletions event-exporter/kubernetes/watchers/events/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type EventWatcherConfig struct {
// NewEventWatcher create a new watcher that only watches the events resource.
func NewEventWatcher(client kubernetes.Interface, config *EventWatcherConfig) watchers.Watcher {
watchListFeatureGateEnabled := IsFeatureGateEnabled(client, "WatchList")
glog.Infof("Feature gate WatchList is enabled: %v", watchListFeatureGateEnabled)
glog.Infof("Feature gate WatchList is enabled: %v, config.ListerWatcherEnableStreaming: %v", watchListFeatureGateEnabled, config.ListerWatcherEnableStreaming)
return watchers.NewWatcher(&watchers.WatcherConfig{
// List and watch events in all namespaces.
ListerWatcher: &cache.ListWatch{
Expand All @@ -93,7 +93,6 @@ func NewEventWatcher(client kubernetes.Interface, config *EventWatcherConfig) wa
return list, err
}
},

WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = config.EventLabelSelector.String()
return client.CoreV1().Events(meta_v1.NamespaceAll).Watch(context.TODO(), options)
Expand All @@ -120,13 +119,16 @@ func streamingListEvents(client kubernetes.Interface, config *EventWatcherConfig
options.Watch = true
options.LabelSelector = config.EventLabelSelector.String()
options.AllowWatchBookmarks = true
glog.Infof("streamingListEvents started watching events with options: %v", options)

// Perform the streaming list (actually a Watch)
watcher, err := client.CoreV1().Events(meta_v1.NamespaceAll).Watch(context.TODO(), options)
if err != nil {
glog.Errorf("streamingListEvents failed to watch events: %v", err)
return nil, err
}
defer watcher.Stop()
glog.Infof("streamingListEvents started watching events")

// Call OnList once to start the sink (it just logs "Started watching")
config.OnList(&corev1.EventList{})
Expand Down
Loading
Loading