From 51a57aceff469e72ba095a150c4bcd19e436b12f Mon Sep 17 00:00:00 2001 From: ayush Date: Thu, 8 Jan 2026 01:23:00 +0530 Subject: [PATCH 01/12] added subscribe in raw deployment Signed-off-by: ayush --- pkg/k8s/deployer.go | 85 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index 129ac0a620..6b2560a193 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -3,6 +3,7 @@ package k8s import ( "context" "fmt" + "maps" "os" "regexp" "strings" @@ -16,8 +17,14 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + clienteventingv1 "knative.dev/client/pkg/eventing/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1client "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" "knative.dev/func/pkg/deployer" fn "knative.dev/func/pkg/functions" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" ) const ( @@ -69,6 +76,19 @@ func onClusterFix(f fn.Function) fn.Function { return f } +// NewEventingClient creates a Knative Eventing client +func NewEventingClient(namespace string) (clienteventingv1.KnEventingClient, error) { + config, err := GetClientConfig().ClientConfig() + if err != nil { + return nil, err + } + eventingClient, err := eventingv1client.NewForConfig(config) + if err != nil { + return nil, err + } + return clienteventingv1.NewKnEventingClient(eventingClient, namespace), nil +} + func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResult, error) { f = onClusterFix(f) // Choosing f.Namespace vs f.Deploy.Namespace: @@ -177,6 +197,15 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, fmt.Errorf("deployment did not become ready: %w", err) } + // Create triggers + eventingClient, err := NewEventingClient(namespace) + if err != nil { + return fn.DeploymentResult{}, fmt.Errorf("failed to create eventing client: %w", err) + } + if err := createTriggers(ctx, f, namespace, eventingClient, clientset); err != nil { + return fn.DeploymentResult{}, fmt.Errorf("failed to create triggers: %w", err) + } + url := fmt.Sprintf("http://%s.%s.svc.cluster.local", f.Name, namespace) return fn.DeploymentResult{ @@ -186,6 +215,62 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu }, nil } +func createTriggers(ctx context.Context, f fn.Function, namespace string, eventingClient clienteventingv1.KnEventingClient, clientset kubernetes.Interface) error { + if len(f.Deploy.Subscriptions) == 0 { + return nil + } + + svc, err := clientset.CoreV1().Services(namespace).Get(ctx, f.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get service: %w", err) + } + + deployment, err := clientset.AppsV1().Deployments(namespace).Get(ctx, f.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get deployment: %w", err) + } + + fmt.Fprintf(os.Stderr, "🎯 Creating Triggers on the cluster\n") + + for i, sub := range f.Deploy.Subscriptions { + attributes := make(map[string]string) + maps.Copy(attributes, sub.Filters) + + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-trigger-%d", f.Name, i), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: deployment.Name, + UID: deployment.UID, + }, + }, + }, + Spec: eventingv1.TriggerSpec{ + Broker: sub.Source, + Subscriber: duckv1.Destination{ + URI: &apis.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, namespace), + }, + }, + Filter: &eventingv1.TriggerFilter{ + Attributes: attributes, + }, + }, + } + + err := eventingClient.CreateTrigger(ctx, trigger) + if err != nil && !errors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create trigger: %w", err) + } + } + + return nil +} + func (d *Deployer) generateResources(f fn.Function, namespace string, daprInstalled bool) (*appsv1.Deployment, *corev1.Service, error) { labels, err := deployer.GenerateCommonLabels(f, d.decorator) if err != nil { From 42352cdbd2e6354bf05def893edd66b3781dbcd3 Mon Sep 17 00:00:00 2001 From: ayush Date: Thu, 8 Jan 2026 15:52:17 +0530 Subject: [PATCH 02/12] remove hardcoded vals Signed-off-by: ayush --- pkg/k8s/deployer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index 6b2560a193..eab401f133 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -241,8 +241,8 @@ func createTriggers(ctx context.Context, f fn.Function, namespace string, eventi Name: fmt.Sprintf("%s-trigger-%d", f.Name, i), OwnerReferences: []metav1.OwnerReference{ { - APIVersion: "apps/v1", - Kind: "Deployment", + APIVersion: deployment.APIVersion, + Kind: deployment.Kind, Name: deployment.Name, UID: deployment.UID, }, From 3968256a27a8826eaf618e258f621f98322ddbde Mon Sep 17 00:00:00 2001 From: ayush Date: Sun, 18 Jan 2026 15:30:13 +0530 Subject: [PATCH 03/12] added unit tests --- pkg/k8s/deployer.go | 32 +++- pkg/k8s/deployer_test.go | 335 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 363 insertions(+), 4 deletions(-) diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index eab401f133..583a895466 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -2,10 +2,12 @@ package k8s import ( "context" + "crypto/sha256" "fmt" "maps" "os" "regexp" + "sort" "strings" "time" @@ -215,6 +217,26 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu }, nil } +// generateTriggerName creates a deterministic trigger name based on subscription content +func generateTriggerName(functionName, broker string, filters map[string]string) string { + var filterKeys []string + for k := range filters { + filterKeys = append(filterKeys, k) + } + sort.Strings(filterKeys) + + var parts []string + parts = append(parts, broker) + for _, k := range filterKeys { + parts = append(parts, fmt.Sprintf("%s=%s", k, filters[k])) + } + + hash := sha256.Sum256([]byte(strings.Join(parts, "|"))) + hashStr := fmt.Sprintf("%x", hash[:4]) + + return fmt.Sprintf("%s-trigger-%s", functionName, hashStr) +} + func createTriggers(ctx context.Context, f fn.Function, namespace string, eventingClient clienteventingv1.KnEventingClient, clientset kubernetes.Interface) error { if len(f.Deploy.Subscriptions) == 0 { return nil @@ -232,17 +254,19 @@ func createTriggers(ctx context.Context, f fn.Function, namespace string, eventi fmt.Fprintf(os.Stderr, "🎯 Creating Triggers on the cluster\n") - for i, sub := range f.Deploy.Subscriptions { + for _, sub := range f.Deploy.Subscriptions { attributes := make(map[string]string) maps.Copy(attributes, sub.Filters) + triggerName := generateTriggerName(f.Name, sub.Source, sub.Filters) + trigger := &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-trigger-%d", f.Name, i), + Name: triggerName, OwnerReferences: []metav1.OwnerReference{ { - APIVersion: deployment.APIVersion, - Kind: deployment.Kind, + APIVersion: "apps/v1", + Kind: "Deployment", Name: deployment.Name, UID: deployment.UID, }, diff --git a/pkg/k8s/deployer_test.go b/pkg/k8s/deployer_test.go index b515c6b575..f320d9b06b 100644 --- a/pkg/k8s/deployer_test.go +++ b/pkg/k8s/deployer_test.go @@ -89,3 +89,338 @@ func Test_processValue(t *testing.T) { }) } } + +// Tests for generateTriggerName + +func TestGenerateTriggerName_Deterministic(t *testing.T) { + functionName := "order-processor" + broker := "default" + filters := map[string]string{ + "type": "order.created", + "source": "api", + } + + // Call multiple times with same input + name1 := generateTriggerName(functionName, broker, filters) + name2 := generateTriggerName(functionName, broker, filters) + name3 := generateTriggerName(functionName, broker, filters) + + // Should always produce the same result + if name1 != name2 || name2 != name3 || name1 != name3 { + t.Errorf("generateTriggerName() is not deterministic: got %v, %v, %v", name1, name2, name3) + } +} + +func TestGenerateTriggerName_FilterOrderIndependent(t *testing.T) { + functionName := "order-processor" + broker := "default" + + // Same filters, different order + filters1 := map[string]string{ + "type": "order.created", + "status": "pending", + "source": "api", + } + + filters2 := map[string]string{ + "source": "api", + "type": "order.created", + "status": "pending", + } + + filters3 := map[string]string{ + "status": "pending", + "source": "api", + "type": "order.created", + } + + name1 := generateTriggerName(functionName, broker, filters1) + name2 := generateTriggerName(functionName, broker, filters2) + name3 := generateTriggerName(functionName, broker, filters3) + + // Should produce the same hash regardless of map iteration order + if name1 != name2 || name2 != name3 || name1 != name3 { + t.Errorf("generateTriggerName() is sensitive to filter order: got %v, %v, %v", name1, name2, name3) + } +} + +func TestGenerateTriggerName_DifferentInputsDifferentNames(t *testing.T) { + functionName := "order-processor" + broker := "default" + + // Different filters should produce different names + name1 := generateTriggerName(functionName, broker, map[string]string{"type": "order.created"}) + name2 := generateTriggerName(functionName, broker, map[string]string{"type": "order.paid"}) + name3 := generateTriggerName(functionName, broker, map[string]string{"type": "order.shipped"}) + + if name1 == name2 || name2 == name3 || name1 == name3 { + t.Errorf("generateTriggerName() produced same name for different filters: %v, %v, %v", name1, name2, name3) + } + + // Different brokers should produce different names + name4 := generateTriggerName(functionName, "default", map[string]string{"type": "order.created"}) + name5 := generateTriggerName(functionName, "production", map[string]string{"type": "order.created"}) + + if name4 == name5 { + t.Errorf("generateTriggerName() produced same name for different brokers: %v, %v", name4, name5) + } + + // Different function names should produce different names + name6 := generateTriggerName("order-processor", broker, map[string]string{"type": "order.created"}) + name7 := generateTriggerName("payment-processor", broker, map[string]string{"type": "order.created"}) + + if name6 == name7 { + t.Errorf("generateTriggerName() produced same name for different functions: %v, %v", name6, name7) + } +} + +func TestGenerateTriggerName_ValidKubernetesName(t *testing.T) { + tests := []struct { + name string + functionName string + broker string + filters map[string]string + }{ + { + name: "standard case", + functionName: "order-processor", + broker: "default", + filters: map[string]string{"type": "order.created"}, + }, + { + name: "long function name", + functionName: "very-long-function-name-that-might-cause-issues", + broker: "default", + filters: map[string]string{"type": "test"}, + }, + { + name: "many filters", + functionName: "test-func", + broker: "default", + filters: map[string]string{ + "type": "order.created", + "source": "api", + "status": "pending", + "priority": "high", + }, + }, + { + name: "empty filters", + functionName: "test-func", + broker: "default", + filters: map[string]string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := generateTriggerName(tt.functionName, tt.broker, tt.filters) + + // Kubernetes name requirements: + // - Max 253 characters + if len(got) > 253 { + t.Errorf("generateTriggerName() = %v, length %d exceeds Kubernetes limit of 253", got, len(got)) + } + + // Check format matches expected pattern + if got[:len(tt.functionName)] != tt.functionName { + t.Errorf("generateTriggerName() = %v, doesn't start with function name %s", got, tt.functionName) + } + + // Should contain "-trigger-" + if len(got) < len(tt.functionName)+17 { + t.Errorf("generateTriggerName() = %v, invalid format (too short)", got) + } + }) + } +} + +func TestGenerateTriggerName_ReorderingScenario(t *testing.T) { + // Simulate the reordering scenario from the bug report + functionName := "order-processor" + broker := "default" + + // Original order + sub1 := map[string]string{"type": "order.created"} + sub2 := map[string]string{"type": "order.paid"} + sub3 := map[string]string{"type": "order.shipped"} + + name1_original := generateTriggerName(functionName, broker, sub1) + name2_original := generateTriggerName(functionName, broker, sub2) + name3_original := generateTriggerName(functionName, broker, sub3) + + // Reordered (sub2, sub1, sub3) + name2_reordered := generateTriggerName(functionName, broker, sub2) + name1_reordered := generateTriggerName(functionName, broker, sub1) + name3_reordered := generateTriggerName(functionName, broker, sub3) + + // Names should be the same regardless of subscription order + if name1_original != name1_reordered { + t.Errorf("Reordering changed trigger name for sub1: %v != %v", name1_original, name1_reordered) + } + if name2_original != name2_reordered { + t.Errorf("Reordering changed trigger name for sub2: %v != %v", name2_original, name2_reordered) + } + if name3_original != name3_reordered { + t.Errorf("Reordering changed trigger name for sub3: %v != %v", name3_original, name3_reordered) + } +} + +// TestCreateTriggers_NoSubscriptions verifies createTriggers returns early when no subscriptions exist +func TestCreateTriggers_NoSubscriptions(t *testing.T) { + f := fn.Function{ + Name: "test-func", + Deploy: fn.DeploySpec{ + Subscriptions: []fn.KnativeSubscription{}, + }, + } + + // With no subscriptions, createTriggers should return immediately without error + // We can't fully test without mocking clients, but we can verify the early return logic + if len(f.Deploy.Subscriptions) != 0 { + t.Error("Test setup error: expected empty subscriptions") + } +} + +// TestGenerateTriggerName_TriggerNamingConsistency verifies that the naming +// follows a consistent pattern across multiple subscriptions +func TestGenerateTriggerName_TriggerNamingConsistency(t *testing.T) { + functionName := "order-processor" + + // Simulate subscriptions from func.yaml + subscriptions := []struct { + source string + filters map[string]string + }{ + { + source: "default", + filters: map[string]string{"type": "com.example.order.created"}, + }, + { + source: "default", + filters: map[string]string{"type": "com.example.order.paid"}, + }, + { + source: "default", + filters: map[string]string{"type": "com.example.order.shipped"}, + }, + } + + triggerNames := make(map[string]bool) + + for _, sub := range subscriptions { + name := generateTriggerName(functionName, sub.source, sub.filters) + + // Each trigger should have a unique name + if triggerNames[name] { + t.Errorf("Duplicate trigger name generated: %s", name) + } + triggerNames[name] = true + + // Verify name format + if len(name) < len(functionName)+17 { // functionName + "-trigger-" + 8 hex chars + t.Errorf("Trigger name too short: %s", name) + } + } + + // Verify we generated 3 unique names + if len(triggerNames) != 3 { + t.Errorf("Expected 3 unique trigger names, got %d", len(triggerNames)) + } +} + +// TestGenerateTriggerName_EmptyFilters verifies behavior with empty filters +func TestGenerateTriggerName_EmptyFilters(t *testing.T) { + name := generateTriggerName("test-func", "default", map[string]string{}) + + // Should still generate a valid name based on broker alone + expectedPrefix := "test-func-trigger-" + if len(name) <= len(expectedPrefix) { + t.Errorf("Expected name to have hash suffix, got: %s", name) + } + + if name[:len(expectedPrefix)] != expectedPrefix { + t.Errorf("Expected prefix %s, got: %s", expectedPrefix, name) + } +} + +// TestGenerateTriggerName_SpecialCharacters verifies handling of special chars in filters +func TestGenerateTriggerName_SpecialCharacters(t *testing.T) { + // Filters with special characters + filters := map[string]string{ + "type": "com.example.order/created", + "source": "https://api.example.com", + } + + name := generateTriggerName("test-func", "default", filters) + + // Name should be valid despite special chars in filters + expectedPrefix := "test-func-trigger-" + if name[:len(expectedPrefix)] != expectedPrefix { + t.Errorf("Expected prefix %s, got: %s", expectedPrefix, name) + } + + // Hash should be 8 hex characters + hash := name[len(expectedPrefix):] + if len(hash) != 8 { + t.Errorf("Expected 8-char hash, got %d chars: %s", len(hash), hash) + } +} + +// TestGenerateTriggerName_DifferentBrokers verifies different brokers produce different names +func TestGenerateTriggerName_DifferentBrokers(t *testing.T) { + filters := map[string]string{"type": "test.event"} + + name1 := generateTriggerName("test-func", "default", filters) + name2 := generateTriggerName("test-func", "production", filters) + name3 := generateTriggerName("test-func", "staging", filters) + + if name1 == name2 || name2 == name3 || name1 == name3 { + t.Errorf("Different brokers should produce different names: %s, %s, %s", name1, name2, name3) + } +} + +// TestCreateTriggers_OwnerReferenceFormat documents the expected owner reference format +func TestCreateTriggers_OwnerReferenceFormat(t *testing.T) { + // This test documents the expected owner reference format for triggers + // The actual creation is tested via integration/E2E tests + + // Expected format: + // OwnerReference { + // APIVersion: "apps/v1", // Hardcoded for Deployments + // Kind: "Deployment", // Hardcoded for Deployments + // Name: deployment.Name, // From the deployment object + // UID: deployment.UID, // From the deployment object + // } + + expectedAPIVersion := "apps/v1" + expectedKind := "Deployment" + + // These are hardcoded in createTriggers due to Kubernetes not populating + // TypeMeta fields when getting objects via client-go + if expectedAPIVersion != "apps/v1" { + t.Errorf("Expected APIVersion 'apps/v1'") + } + if expectedKind != "Deployment" { + t.Errorf("Expected Kind 'Deployment'") + } +} + +// TestCreateTriggers_URIFormat documents the expected URI format for subscribers +func TestCreateTriggers_URIFormat(t *testing.T) { + // This test documents the expected URI format for trigger subscribers + + // Format: http://..svc.cluster.local + namespace := "default" + serviceName := "order-processor" + + expectedHost := serviceName + "." + namespace + ".svc.cluster.local" + expectedScheme := "http" + + if expectedHost != "order-processor.default.svc.cluster.local" { + t.Errorf("Unexpected host format: %s", expectedHost) + } + if expectedScheme != "http" { + t.Errorf("Expected scheme 'http', got '%s'", expectedScheme) + } +} From 9895abcc510d3d809f5f286624ef7d7d74308ab3 Mon Sep 17 00:00:00 2001 From: ayush Date: Sun, 18 Jan 2026 19:39:38 +0530 Subject: [PATCH 04/12] fix --- pkg/functions/client_int_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/functions/client_int_test.go b/pkg/functions/client_int_test.go index 7f06be1afd..08e88c88ff 100644 --- a/pkg/functions/client_int_test.go +++ b/pkg/functions/client_int_test.go @@ -454,7 +454,6 @@ func Handle(res http.ResponseWriter, req *http.Request) { // TestInt_Invoke_ServiceToService ensures that a Function can invoke another // service via localhost service discovery api provided by the Dapr sidecar. func TestInt_Invoke_ServiceToService(t *testing.T) { - t.Skip("TODO: dapr appears to be borked") // https://github.com/knative/func/issues/3210 resetEnv() var ( verbose = true From 2a21b235493b943589fb1ffab8f5148f54b472c5 Mon Sep 17 00:00:00 2001 From: ayush Date: Mon, 19 Jan 2026 23:18:26 +0530 Subject: [PATCH 05/12] refactors --- e2e/e2e_metadata_test.go | 136 --------------------------------------- pkg/k8s/deployer.go | 10 +-- pkg/k8s/deployer_test.go | 61 ------------------ 3 files changed, 5 insertions(+), 202 deletions(-) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 095800a26a..48afb65aa3 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -640,142 +640,6 @@ func TestMetadata_Subscriptions(t *testing.T) { } } -func waitForEvent(t *testing.T, eventId string) <-chan string { - t.Helper() - - eventReceived := make(chan string, 10) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - pr, pw := io.Pipe() - cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*") - cmd.Stderr = io.Discard - cmd.Stdout = pw - cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) - err := cmd.Start() - if err != nil { - t.Fatal(err) - } - go func() { - r := bufio.NewReader(pr) - m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r) - if e != nil { - panic(e) - } - if m { - eventReceived <- "OK" - close(eventReceived) - cancel() - } - _, _ = io.Copy(io.Discard, r) - }() - - return eventReceived -} - -// CloudEvents handler that logs events -func subscriberCode() string { - return `package function - -import ( - "context" - "fmt" - "github.com/cloudevents/sdk-go/v2/event" -) - -func Handle(ctx context.Context, e event.Event) (*event.Event, error) { - fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source()) - r := event.New() - r.SetID("response-" + e.ID()) - r.SetSource("subscriber") - r.SetType("test.response") - r.SetData("application/json", map[string]string{"status": "received"}) - return &r, nil -} -` -} - -// createBrokerWithCheck creates a Knative Broker -func createBrokerWithCheck(t *testing.T, namespace, name string) { - t.Helper() - - brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1 -kind: Broker -metadata: - name: %s - namespace: %s -`, name, namespace) - - cmd := exec.Command("kubectl", "apply", "-f", "-") - cmd.Stdin = strings.NewReader(brokerYAML) - cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) - - output, err := cmd.CombinedOutput() - if err != nil { - t.Fatalf("Failed to create broker: %v, output: %s", err, string(output)) - } - t.Cleanup(func() { - deleteBroker(t, namespace, name) - }) - t.Logf("Created broker %s in namespace %s", name, namespace) - - waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready", - fmt.Sprintf("broker/%s", name), "-n", namespace, "--timeout=60s") - waitCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) - waitOutput, err := waitCmd.CombinedOutput() - if err != nil { - t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput)) - } - t.Logf("Broker %s is ready", name) - - // Wait for broker-ingress service to be available - t.Log("Waiting for broker-ingress service to be available...") - for i := 0; i < 30; i++ { - checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress") - checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) - if err := checkCmd.Run(); err == nil { - t.Log("broker-ingress service is available") - return - } - time.Sleep(2 * time.Second) - } - t.Fatal("broker-ingress service check timed out") -} - -// deleteBroker removes a Knative Broker from the given namespace. -func deleteBroker(t *testing.T, namespace, name string) { - t.Helper() - - cmd := exec.Command("kubectl", "delete", "broker", name, "-n", namespace, "--ignore-not-found") - cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) - - output, err := cmd.CombinedOutput() - if err != nil { - t.Logf("Warning: could not delete broker: %v, output: %s", err, string(output)) - return - } - t.Logf("Deleted broker %s from namespace %s", name, namespace) -} - -// waitForTrigger waits for the function's trigger to become ready. -func waitForTrigger(t *testing.T, namespace, functionName string) { - t.Helper() - - triggerName := fmt.Sprintf("%s-function-trigger-0", functionName) - - cmd := exec.Command("kubectl", "wait", "--for=condition=Ready", - fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s") - cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) - - output, err := cmd.CombinedOutput() - if err != nil { - t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(output)) - } else { - t.Logf("Trigger %s is ready", triggerName) - } -} - func TestMetadata_Subscriptions_Raw(t *testing.T) { brokerName := "default" diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index 583a895466..db6ad27f52 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -78,8 +78,8 @@ func onClusterFix(f fn.Function) fn.Function { return f } -// NewEventingClient creates a Knative Eventing client -func NewEventingClient(namespace string) (clienteventingv1.KnEventingClient, error) { +// newEventingClient creates a Knative Eventing client +func newEventingClient(namespace string) (clienteventingv1.KnEventingClient, error) { config, err := GetClientConfig().ClientConfig() if err != nil { return nil, err @@ -200,7 +200,7 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu } // Create triggers - eventingClient, err := NewEventingClient(namespace) + eventingClient, err := newEventingClient(namespace) if err != nil { return fn.DeploymentResult{}, fmt.Errorf("failed to create eventing client: %w", err) } @@ -219,13 +219,13 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu // generateTriggerName creates a deterministic trigger name based on subscription content func generateTriggerName(functionName, broker string, filters map[string]string) string { - var filterKeys []string + filterKeys := make([]string, 0, len(filters)) for k := range filters { filterKeys = append(filterKeys, k) } sort.Strings(filterKeys) - var parts []string + parts := make([]string, 0, 1+len(filters)) parts = append(parts, broker) for _, k := range filterKeys { parts = append(parts, fmt.Sprintf("%s=%s", k, filters[k])) diff --git a/pkg/k8s/deployer_test.go b/pkg/k8s/deployer_test.go index f320d9b06b..c173b8c8c0 100644 --- a/pkg/k8s/deployer_test.go +++ b/pkg/k8s/deployer_test.go @@ -266,22 +266,6 @@ func TestGenerateTriggerName_ReorderingScenario(t *testing.T) { } } -// TestCreateTriggers_NoSubscriptions verifies createTriggers returns early when no subscriptions exist -func TestCreateTriggers_NoSubscriptions(t *testing.T) { - f := fn.Function{ - Name: "test-func", - Deploy: fn.DeploySpec{ - Subscriptions: []fn.KnativeSubscription{}, - }, - } - - // With no subscriptions, createTriggers should return immediately without error - // We can't fully test without mocking clients, but we can verify the early return logic - if len(f.Deploy.Subscriptions) != 0 { - t.Error("Test setup error: expected empty subscriptions") - } -} - // TestGenerateTriggerName_TriggerNamingConsistency verifies that the naming // follows a consistent pattern across multiple subscriptions func TestGenerateTriggerName_TriggerNamingConsistency(t *testing.T) { @@ -379,48 +363,3 @@ func TestGenerateTriggerName_DifferentBrokers(t *testing.T) { t.Errorf("Different brokers should produce different names: %s, %s, %s", name1, name2, name3) } } - -// TestCreateTriggers_OwnerReferenceFormat documents the expected owner reference format -func TestCreateTriggers_OwnerReferenceFormat(t *testing.T) { - // This test documents the expected owner reference format for triggers - // The actual creation is tested via integration/E2E tests - - // Expected format: - // OwnerReference { - // APIVersion: "apps/v1", // Hardcoded for Deployments - // Kind: "Deployment", // Hardcoded for Deployments - // Name: deployment.Name, // From the deployment object - // UID: deployment.UID, // From the deployment object - // } - - expectedAPIVersion := "apps/v1" - expectedKind := "Deployment" - - // These are hardcoded in createTriggers due to Kubernetes not populating - // TypeMeta fields when getting objects via client-go - if expectedAPIVersion != "apps/v1" { - t.Errorf("Expected APIVersion 'apps/v1'") - } - if expectedKind != "Deployment" { - t.Errorf("Expected Kind 'Deployment'") - } -} - -// TestCreateTriggers_URIFormat documents the expected URI format for subscribers -func TestCreateTriggers_URIFormat(t *testing.T) { - // This test documents the expected URI format for trigger subscribers - - // Format: http://..svc.cluster.local - namespace := "default" - serviceName := "order-processor" - - expectedHost := serviceName + "." + namespace + ".svc.cluster.local" - expectedScheme := "http" - - if expectedHost != "order-processor.default.svc.cluster.local" { - t.Errorf("Unexpected host format: %s", expectedHost) - } - if expectedScheme != "http" { - t.Errorf("Expected scheme 'http', got '%s'", expectedScheme) - } -} From d04530cff59839070a8101b2c01a2a4a79bab64f Mon Sep 17 00:00:00 2001 From: ayush Date: Mon, 19 Jan 2026 23:52:10 +0530 Subject: [PATCH 06/12] add seperate logic for waitForTrigger --- e2e/e2e_metadata_test.go | 55 ++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 48afb65aa3..c546d60fde 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -605,7 +605,7 @@ func TestMetadata_Subscriptions(t *testing.T) { if !waitFor(t, subscriberURL, withTemplate("cloudevents")) { t.Fatal("subscriber not ready") } - waitForTrigger(t, Namespace, subscriberName) + waitForTriggerKnative(t, Namespace, subscriberName) transport := fnhttp.NewRoundTripper() defer transport.Close() @@ -687,7 +687,7 @@ func TestMetadata_Subscriptions_Raw(t *testing.T) { waitForDeployment(t, Namespace, subscriberName) // Wait for trigger to be created and ready - waitForTrigger(t, Namespace, subscriberName) + waitForTriggerRaw(t, Namespace, subscriberName) transport := fnhttp.NewRoundTripper() defer transport.Close() @@ -838,14 +838,35 @@ func deleteBroker(t *testing.T, namespace, name string) { t.Logf("Deleted broker %s from namespace %s", name, namespace) } -// waitForTrigger waits for the function's trigger to become ready. +// waitForTriggerKnative waits for the function's trigger to become ready. +// For Knative deployer, triggers are named with pattern: {functionName}-function-trigger-{index} +func waitForTriggerKnative(t *testing.T, namespace, functionName string) { + t.Helper() + + // Knative deployer uses predictable sequential naming + triggerName := fmt.Sprintf("%s-function-trigger-0", functionName) + + cmd := exec.Command("kubectl", "wait", "--for=condition=Ready", + fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(output)) + } else { + t.Logf("Trigger %s is ready", triggerName) + } +} + +// waitForTriggerRaw waits for the function's trigger to become ready. // For raw deployer, triggers are named with pattern: {functionName}-trigger-{hash} -func waitForTrigger(t *testing.T, namespace, functionName string) { +func waitForTriggerRaw(t *testing.T, namespace, functionName string) { t.Helper() - // List all triggers and find the one for our function + // List triggers matching the pattern using kubectl // Raw deployer creates triggers with pattern: {functionName}-trigger-{hash} - listCmd := exec.Command("kubectl", "get", "triggers", "-n", namespace, "-o", "json") + listCmd := exec.Command("kubectl", "get", "triggers", "-n", namespace, + "-o", "name", "--field-selector", "metadata.name!=") listCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) output, err := listCmd.CombinedOutput() if err != nil { @@ -853,23 +874,13 @@ func waitForTrigger(t *testing.T, namespace, functionName string) { return } - // Parse trigger list to find our function's trigger - var triggerList struct { - Items []struct { - Metadata struct { - Name string `json:"name"` - } `json:"metadata"` - } `json:"items"` - } - if err := json.Unmarshal(output, &triggerList); err != nil { - t.Logf("Warning: could not parse trigger list: %v", err) - return - } - + // Find trigger matching our function name pattern + prefix := "trigger.eventing.knative.dev/" + functionName + "-trigger-" var triggerName string - for _, trigger := range triggerList.Items { - if strings.HasPrefix(trigger.Metadata.Name, functionName+"-trigger-") { - triggerName = trigger.Metadata.Name + for _, line := range strings.Split(string(output), "\n") { + if strings.HasPrefix(line, prefix) { + // Extract just the trigger name + triggerName = strings.TrimPrefix(line, "trigger.eventing.knative.dev/") break } } From 1098ace655837ed397db5adee3e345fced1014e5 Mon Sep 17 00:00:00 2001 From: ayush Date: Tue, 20 Jan 2026 18:34:01 +0530 Subject: [PATCH 07/12] change naming in tests --- e2e/e2e_metadata_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index c546d60fde..f22516abaf 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -569,10 +569,9 @@ func TestMetadata_Subscriptions(t *testing.T) { uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) - eventReceived := waitForEvent(t, uniqueEventID) + subscriberName := "func-e2e-test-subscriber-knative" + eventReceived := waitForEvent(t, subscriberName, uniqueEventID) - subscriber := "func-e2e-test-subscriber" - subscriberName := subscriber subscriberRoot := fromCleanEnv(t, subscriberName) if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { t.Fatal(err) @@ -647,10 +646,9 @@ func TestMetadata_Subscriptions_Raw(t *testing.T) { uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) - eventReceived := waitForEvent(t, uniqueEventID) + subscriberName := "func-e2e-test-subscriber-raw" + eventReceived := waitForEvent(t, subscriberName, uniqueEventID) - subscriber := "func-e2e-test-subscriber" - subscriberName := subscriber subscriberRoot := fromCleanEnv(t, subscriberName) if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { t.Fatal(err) @@ -742,7 +740,7 @@ func Handle(ctx context.Context, e event.Event) (*event.Event, error) { ` } -func waitForEvent(t *testing.T, eventId string) <-chan string { +func waitForEvent(t *testing.T, functionName, eventId string) <-chan string { t.Helper() eventReceived := make(chan string, 10) @@ -751,7 +749,7 @@ func waitForEvent(t *testing.T, eventId string) <-chan string { t.Cleanup(cancel) pr, pw := io.Pipe() - cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*") + cmd := exec.CommandContext(ctx, "stern", functionName+"-.*") cmd.Stderr = io.Discard cmd.Stdout = pw cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) From e1094aedee9e964cd974f0c375a5a00c768af294 Mon Sep 17 00:00:00 2001 From: ayush Date: Wed, 21 Jan 2026 15:46:05 +0530 Subject: [PATCH 08/12] remove duplicate call --- pkg/k8s/deployer.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index db6ad27f52..e68772096a 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" clienteventingv1 "knative.dev/client/pkg/eventing/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1client "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" @@ -78,12 +79,8 @@ func onClusterFix(f fn.Function) fn.Function { return f } -// newEventingClient creates a Knative Eventing client -func newEventingClient(namespace string) (clienteventingv1.KnEventingClient, error) { - config, err := GetClientConfig().ClientConfig() - if err != nil { - return nil, err - } +// newEventingClient creates a Knative Eventing client from a REST config +func newEventingClient(config *rest.Config, namespace string) (clienteventingv1.KnEventingClient, error) { eventingClient, err := eventingv1client.NewForConfig(config) if err != nil { return nil, err @@ -120,7 +117,13 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu f.Deploy.Image = f.Build.Image } - clientset, err := NewKubernetesClientset() + // Get the Kubernetes REST config + config, err := GetClientConfig().ClientConfig() + if err != nil { + return fn.DeploymentResult{}, err + } + + clientset, err := kubernetes.NewForConfig(config) if err != nil { return fn.DeploymentResult{}, err } @@ -200,7 +203,7 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu } // Create triggers - eventingClient, err := newEventingClient(namespace) + eventingClient, err := newEventingClient(config, namespace) if err != nil { return fn.DeploymentResult{}, fmt.Errorf("failed to create eventing client: %w", err) } From 49500349bd7059b8263affce3f2688fcceca4f22 Mon Sep 17 00:00:00 2001 From: ayush Date: Thu, 22 Jan 2026 00:43:11 +0530 Subject: [PATCH 09/12] sync triggers --- pkg/k8s/deployer.go | 132 +++++++++++++++++++++++++++++--------------- 1 file changed, 87 insertions(+), 45 deletions(-) diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index e68772096a..a3974505b9 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -36,6 +36,10 @@ const ( DefaultLivenessEndpoint = "/health/liveness" DefaultReadinessEndpoint = "/health/readiness" DefaultHTTPPort = 8080 + + // managedByAnnotation identifies triggers managed by this deployer + managedByAnnotation = "func.knative.dev/managed-by" + managedByValue = "func-raw-deployer" ) type DeployerOpt func(*Deployer) @@ -202,13 +206,13 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, fmt.Errorf("deployment did not become ready: %w", err) } - // Create triggers + // Sync triggers eventingClient, err := newEventingClient(config, namespace) if err != nil { return fn.DeploymentResult{}, fmt.Errorf("failed to create eventing client: %w", err) } - if err := createTriggers(ctx, f, namespace, eventingClient, clientset); err != nil { - return fn.DeploymentResult{}, fmt.Errorf("failed to create triggers: %w", err) + if err := syncTriggers(ctx, f, namespace, eventingClient, clientset); err != nil { + return fn.DeploymentResult{}, fmt.Errorf("failed to sync triggers: %w", err) } url := fmt.Sprintf("http://%s.%s.svc.cluster.local", f.Name, namespace) @@ -240,58 +244,98 @@ func generateTriggerName(functionName, broker string, filters map[string]string) return fmt.Sprintf("%s-trigger-%s", functionName, hashStr) } -func createTriggers(ctx context.Context, f fn.Function, namespace string, eventingClient clienteventingv1.KnEventingClient, clientset kubernetes.Interface) error { - if len(f.Deploy.Subscriptions) == 0 { - return nil +func syncTriggers(ctx context.Context, f fn.Function, namespace string, eventingClient clienteventingv1.KnEventingClient, clientset kubernetes.Interface) error { + // Build set of desired trigger names from current subscriptions + desiredTriggers := sets.New[string]() + for _, sub := range f.Deploy.Subscriptions { + triggerName := generateTriggerName(f.Name, sub.Source, sub.Filters) + desiredTriggers.Insert(triggerName) } - svc, err := clientset.CoreV1().Services(namespace).Get(ctx, f.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get service: %w", err) - } + // Create or update triggers from current subscriptions + if len(f.Deploy.Subscriptions) > 0 { + svc, err := clientset.CoreV1().Services(namespace).Get(ctx, f.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get service: %w", err) + } - deployment, err := clientset.AppsV1().Deployments(namespace).Get(ctx, f.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get deployment: %w", err) - } + deployment, err := clientset.AppsV1().Deployments(namespace).Get(ctx, f.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get deployment: %w", err) + } - fmt.Fprintf(os.Stderr, "🎯 Creating Triggers on the cluster\n") + fmt.Fprintf(os.Stderr, "🎯 Syncing Triggers on the cluster\n") - for _, sub := range f.Deploy.Subscriptions { - attributes := make(map[string]string) - maps.Copy(attributes, sub.Filters) + for _, sub := range f.Deploy.Subscriptions { + attributes := make(map[string]string) + maps.Copy(attributes, sub.Filters) - triggerName := generateTriggerName(f.Name, sub.Source, sub.Filters) + triggerName := generateTriggerName(f.Name, sub.Source, sub.Filters) - trigger := &eventingv1.Trigger{ - ObjectMeta: metav1.ObjectMeta{ - Name: triggerName, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "Deployment", - Name: deployment.Name, - UID: deployment.UID, + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: triggerName, + Annotations: map[string]string{ + managedByAnnotation: managedByValue, }, - }, - }, - Spec: eventingv1.TriggerSpec{ - Broker: sub.Source, - Subscriber: duckv1.Destination{ - URI: &apis.URL{ - Scheme: "http", - Host: fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, namespace), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: deployment.Name, + UID: deployment.UID, + }, }, }, - Filter: &eventingv1.TriggerFilter{ - Attributes: attributes, + Spec: eventingv1.TriggerSpec{ + Broker: sub.Source, + Subscriber: duckv1.Destination{ + URI: &apis.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, namespace), + }, + }, + Filter: &eventingv1.TriggerFilter{ + Attributes: attributes, + }, }, - }, + } + + err := eventingClient.CreateTrigger(ctx, trigger) + if err != nil && !errors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create trigger: %w", err) + } } + } + + // Clean up stale triggers + return deleteStaleTriggers(ctx, eventingClient, desiredTriggers) +} - err := eventingClient.CreateTrigger(ctx, trigger) - if err != nil && !errors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create trigger: %w", err) +// deleteStaleTriggers removes triggers managed by this deployer that are no longer in the desired set +func deleteStaleTriggers(ctx context.Context, eventingClient clienteventingv1.KnEventingClient, desiredTriggers sets.Set[string]) error { + // List existing triggers in the namespace + existingTriggers, err := eventingClient.ListTriggers(ctx) + if err != nil { + // If triggers can't be listed ,skip cleanup + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to list triggers: %w", err) + } + + // Delete stale triggers + for _, trigger := range existingTriggers.Items { + // Only delete triggers we manage + if trigger.Annotations[managedByAnnotation] == managedByValue { + // Check if this trigger is still desired + if !desiredTriggers.Has(trigger.Name) { + fmt.Fprintf(os.Stderr, "🗑️ Deleting stale trigger: %s\n", trigger.Name) + err := eventingClient.DeleteTrigger(ctx, trigger.Name) + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete stale trigger %s: %w", trigger.Name, err) + } + } } } @@ -308,9 +352,7 @@ func (d *Deployer) generateResources(f fn.Function, namespace string, daprInstal // Use annotations for pod template podAnnotations := make(map[string]string) - for k, v := range annotations { - podAnnotations[k] = v - } + maps.Copy(podAnnotations, annotations) // Process environment variables and volumes referencedSecrets := sets.New[string]() From ad85258ea9a9662197fc8804ae31632b982998c0 Mon Sep 17 00:00:00 2001 From: ayush Date: Thu, 22 Jan 2026 01:09:50 +0530 Subject: [PATCH 10/12] add sync tests --- e2e/e2e_trigger_sync_test.go | 546 +++++++++++++++++++++++++++++++++++ 1 file changed, 546 insertions(+) create mode 100644 e2e/e2e_trigger_sync_test.go diff --git a/e2e/e2e_trigger_sync_test.go b/e2e/e2e_trigger_sync_test.go new file mode 100644 index 0000000000..7fa677604b --- /dev/null +++ b/e2e/e2e_trigger_sync_test.go @@ -0,0 +1,546 @@ +//go:build e2e +// +build e2e + +package e2e + +import ( + "encoding/json" + "fmt" + "os" + "os/exec" + "strings" + "testing" + "time" + + fn "knative.dev/func/pkg/functions" +) + +// TestTriggerSync_StaleTriggerCleanup verifies that stale triggers are deleted +// when subscriptions are removed from func.yaml +func TestTriggerSync_StaleTriggerCleanup(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-cleanup" + root := fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add first subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Add second subscription by editing func.yaml + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + // Add another subscription manually + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: "default", + Filters: map[string]string{ + "type": "order.updated", + }, + }) + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Verify 2 triggers were created + triggers := listTriggersForFunction(t, Namespace, functionName) + if len(triggers) != 2 { + t.Fatalf("Expected 2 triggers after initial deploy, got %d: %v", len(triggers), triggers) + } + t.Logf("Initial deploy created 2 triggers: %v", triggers) + + // Verify triggers have managed-by annotation + for _, trigger := range triggers { + if !hasManagedByAnnotation(t, Namespace, trigger) { + t.Errorf("Trigger %s missing managed-by annotation", trigger) + } + } + t.Log("All triggers have managed-by annotation") + + // Remove one subscription by editing func.yaml + f, err = fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + // Keep only the first subscription + f.Deploy.Subscriptions = f.Deploy.Subscriptions[:1] + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Redeploy + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Second) + + // Verify only 1 trigger remains + triggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(triggersAfter) != 1 { + t.Fatalf("Expected 1 trigger after removing subscription, got %d: %v", len(triggersAfter), triggersAfter) + } + t.Logf("Stale trigger deleted, 1 trigger remains: %v", triggersAfter) +} + +// TestTriggerSync_ManualTriggerPreservation verifies that manually created +// triggers (without managed-by annotation) are NOT deleted during sync +func TestTriggerSync_ManualTriggerPreservation(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-manual" + _ = fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add one subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Create a manual trigger (without managed-by annotation) + manualTriggerName := fmt.Sprintf("%s-manual-trigger", functionName) + createManualTrigger(t, Namespace, manualTriggerName, functionName, brokerName) + + // Verify we have 2 triggers (1 managed + 1 manual) + allTriggers := listAllTriggers(t, Namespace) + managedTriggers := listTriggersForFunction(t, Namespace, functionName) + if len(managedTriggers) != 1 { + t.Fatalf("Expected 1 managed trigger, got %d", len(managedTriggers)) + } + if len(allTriggers) < 2 { + t.Fatalf("Expected at least 2 total triggers (managed + manual), got %d", len(allTriggers)) + } + t.Logf("Created manual trigger: %s", manualTriggerName) + + // Redeploy (no changes to subscriptions) + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Second) + + // Verify manual trigger still exists + if !triggerExists(t, Namespace, manualTriggerName) { + t.Fatal("Manual trigger was deleted during sync - should have been preserved!") + } + t.Log("Manual trigger preserved after redeploy") + + // Verify managed trigger still exists + managedTriggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(managedTriggersAfter) != 1 { + t.Fatalf("Expected 1 managed trigger after redeploy, got %d", len(managedTriggersAfter)) + } + t.Log("Managed trigger still exists") +} + +// TestTriggerSync_AddSubscription verifies that new triggers are created +// when subscriptions are added +func TestTriggerSync_AddSubscription(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-add" + root := fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add one subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Verify 1 trigger created + triggers := listTriggersForFunction(t, Namespace, functionName) + if len(triggers) != 1 { + t.Fatalf("Expected 1 trigger, got %d", len(triggers)) + } + t.Logf("Initial deploy created 1 trigger: %v", triggers) + + // Add another subscription by editing func.yaml + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: "default", + Filters: map[string]string{ + "type": "order.shipped", + }, + }) + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Redeploy + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Second) + + // Verify 2 triggers now exist + triggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(triggersAfter) != 2 { + t.Fatalf("Expected 2 triggers after adding subscription, got %d: %v", len(triggersAfter), triggersAfter) + } + t.Logf("New trigger created, 2 triggers total: %v", triggersAfter) +} + +// TestTriggerSync_Idempotency verifies that repeated deploys with the same +// subscriptions don't create duplicate triggers +func TestTriggerSync_Idempotency(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-idempotent" + root := fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add first subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Add second subscription by editing func.yaml + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: "default", + Filters: map[string]string{ + "type": "order.updated", + }, + }) + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Verify 2 triggers created + triggers := listTriggersForFunction(t, Namespace, functionName) + if len(triggers) != 2 { + t.Fatalf("Expected 2 triggers, got %d", len(triggers)) + } + initialTriggers := make([]string, len(triggers)) + copy(initialTriggers, triggers) + t.Logf("Initial triggers: %v", initialTriggers) + + // Redeploy multiple times + for i := 1; i <= 3; i++ { + t.Logf("Redeploy #%d", i) + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + time.Sleep(3 * time.Second) + + triggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(triggersAfter) != 2 { + t.Fatalf("Redeploy #%d: Expected 2 triggers, got %d: %v", i, len(triggersAfter), triggersAfter) + } + + // Verify trigger names haven't changed + if !equalStringSlices(initialTriggers, triggersAfter) { + t.Fatalf("Redeploy #%d: Trigger names changed! Initial: %v, After: %v", i, initialTriggers, triggersAfter) + } + } + t.Log("Idempotency verified: 3 redeploys produced same triggers") +} + +// TestTriggerSync_RemoveAllSubscriptions verifies that all managed triggers +// are deleted when all subscriptions are removed +func TestTriggerSync_RemoveAllSubscriptions(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-remove-all" + root := fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add first subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Add second subscription by editing func.yaml + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: "default", + Filters: map[string]string{ + "type": "order.updated", + }, + }) + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Verify 2 triggers created + triggers := listTriggersForFunction(t, Namespace, functionName) + if len(triggers) != 2 { + t.Fatalf("Expected 2 triggers, got %d", len(triggers)) + } + t.Logf("Initial triggers: %v", triggers) + + // Create a manual trigger for comparison + manualTriggerName := fmt.Sprintf("%s-manual", functionName) + createManualTrigger(t, Namespace, manualTriggerName, functionName, brokerName) + + // Remove all subscriptions by editing func.yaml directly + f, err = fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + f.Deploy.Subscriptions = []fn.KnativeSubscription{} + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Redeploy + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Second) + + // Verify no managed triggers remain + managedTriggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(managedTriggersAfter) != 0 { + t.Fatalf("Expected 0 managed triggers after removing all subscriptions, got %d: %v", len(managedTriggersAfter), managedTriggersAfter) + } + t.Log("All managed triggers deleted") + + // Verify manual trigger still exists + if !triggerExists(t, Namespace, manualTriggerName) { + t.Fatal("Manual trigger was deleted - should have been preserved!") + } + t.Log("Manual trigger preserved") +} + +// Helper functions + +// listTriggersForFunction lists all triggers with managed-by annotation for a function +func listTriggersForFunction(t *testing.T, namespace, functionName string) []string { + t.Helper() + + // Get all triggers in namespace (kubectl doesn't support annotation selectors) + cmd := exec.Command("kubectl", "get", "triggers", "-n", namespace, + "-o", "json") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not list triggers: %v, output: %s", err, string(output)) + return []string{} + } + + // Parse JSON to filter by annotation + var triggerList struct { + Items []struct { + Metadata struct { + Name string `json:"name"` + Annotations map[string]string `json:"annotations"` + } `json:"metadata"` + } `json:"items"` + } + + if err := json.Unmarshal(output, &triggerList); err != nil { + t.Logf("Warning: could not parse triggers JSON: %v", err) + return []string{} + } + + // Filter triggers that: + // 1. Have the managed-by annotation + // 2. Belong to this function (name starts with functionName-trigger-) + var functionTriggers []string + for _, trigger := range triggerList.Items { + if trigger.Metadata.Annotations["func.knative.dev/managed-by"] == "func-raw-deployer" { + if strings.HasPrefix(trigger.Metadata.Name, functionName+"-trigger-") { + functionTriggers = append(functionTriggers, trigger.Metadata.Name) + } + } + } + + return functionTriggers +} + +// listAllTriggers lists all triggers in namespace +func listAllTriggers(t *testing.T, namespace string) []string { + t.Helper() + + cmd := exec.Command("kubectl", "get", "triggers", "-n", namespace, + "-o", "jsonpath={.items[*].metadata.name}") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not list all triggers: %v", err) + return []string{} + } + + triggersStr := strings.TrimSpace(string(output)) + if triggersStr == "" { + return []string{} + } + + return strings.Fields(triggersStr) +} + +// hasManagedByAnnotation checks if a trigger has the managed-by annotation +func hasManagedByAnnotation(t *testing.T, namespace, triggerName string) bool { + t.Helper() + + cmd := exec.Command("kubectl", "get", "trigger", triggerName, "-n", namespace, + "-o", "jsonpath={.metadata.annotations.func\\.knative\\.dev/managed-by}") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not get trigger annotation: %v", err) + return false + } + + return strings.TrimSpace(string(output)) == "func-raw-deployer" +} + +// createManualTrigger creates a trigger without the managed-by annotation +func createManualTrigger(t *testing.T, namespace, triggerName, functionName, brokerName string) { + t.Helper() + + triggerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1 +kind: Trigger +metadata: + name: %s + namespace: %s + # Note: NO managed-by annotation +spec: + broker: %s + subscriber: + uri: http://%s.%s.svc.cluster.local + filter: + attributes: + type: manual.event +`, triggerName, namespace, brokerName, functionName, namespace) + + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(triggerYAML) + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to create manual trigger: %v, output: %s", err, string(output)) + } + + t.Cleanup(func() { + deleteCmd := exec.Command("kubectl", "delete", "trigger", triggerName, "-n", namespace, "--ignore-not-found") + deleteCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + _ = deleteCmd.Run() + }) + + t.Logf("Created manual trigger: %s", triggerName) +} + +// triggerExists checks if a trigger exists +func triggerExists(t *testing.T, namespace, triggerName string) bool { + t.Helper() + + cmd := exec.Command("kubectl", "get", "trigger", triggerName, "-n", namespace) + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + return cmd.Run() == nil +} + +// equalStringSlices checks if two string slices contain the same elements (order-independent) +func equalStringSlices(a, b []string) bool { + if len(a) != len(b) { + return false + } + + aMap := make(map[string]bool) + for _, s := range a { + aMap[s] = true + } + + for _, s := range b { + if !aMap[s] { + return false + } + } + + return true +} From 660161aaebf2ba1d61c6323d980a18607156f0a0 Mon Sep 17 00:00:00 2001 From: ayush Date: Thu, 22 Jan 2026 19:07:56 +0530 Subject: [PATCH 11/12] add fn filter in state management --- pkg/k8s/deployer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index a3974505b9..c35c88646a 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -309,11 +309,11 @@ func syncTriggers(ctx context.Context, f fn.Function, namespace string, eventing } // Clean up stale triggers - return deleteStaleTriggers(ctx, eventingClient, desiredTriggers) + return deleteStaleTriggers(ctx, eventingClient, f.Name, desiredTriggers) } // deleteStaleTriggers removes triggers managed by this deployer that are no longer in the desired set -func deleteStaleTriggers(ctx context.Context, eventingClient clienteventingv1.KnEventingClient, desiredTriggers sets.Set[string]) error { +func deleteStaleTriggers(ctx context.Context, eventingClient clienteventingv1.KnEventingClient, functionName string, desiredTriggers sets.Set[string]) error { // List existing triggers in the namespace existingTriggers, err := eventingClient.ListTriggers(ctx) if err != nil { @@ -324,8 +324,13 @@ func deleteStaleTriggers(ctx context.Context, eventingClient clienteventingv1.Kn return fmt.Errorf("failed to list triggers: %w", err) } - // Delete stale triggers + // Delete stale triggers (only those belonging to this function) + triggerPrefix := functionName + "-trigger-" for _, trigger := range existingTriggers.Items { + if !strings.HasPrefix(trigger.Name, triggerPrefix) { + continue + } + // Only delete triggers we manage if trigger.Annotations[managedByAnnotation] == managedByValue { // Check if this trigger is still desired From 0e28e177c5ddbfad5de347a7e4c0097a51137c91 Mon Sep 17 00:00:00 2001 From: ayush Date: Thu, 22 Jan 2026 20:58:34 +0530 Subject: [PATCH 12/12] fixes --- pkg/k8s/deployer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index b8e328c213..dbc12ba4f1 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -358,7 +358,7 @@ func deleteStaleTriggers(ctx context.Context, eventingClient clienteventingv1.Kn return nil } -func (d *Deployer) generateResources(f fn.Function, namespace string, daprInstalled bool) (*appsv1.Deployment, *corev1.Service, error) { +func (d *Deployer) generateDeployment(f fn.Function, namespace string, daprInstalled bool) (*appsv1.Deployment, error) { labels, err := deployer.GenerateCommonLabels(f, d.decorator) if err != nil { return nil, err