diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index d211a9810c..db056fd8f8 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -569,10 +569,9 @@ func TestMetadata_Subscriptions(t *testing.T) { uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) - eventReceived := waitForEvent(t, uniqueEventID) + subscriberName := "func-e2e-test-subscriber-knative" + eventReceived := waitForEvent(t, subscriberName, uniqueEventID) - subscriber := "func-e2e-test-subscriber" - subscriberName := subscriber subscriberRoot := fromCleanEnv(t, subscriberName) if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { t.Fatal(err) @@ -605,7 +604,7 @@ func TestMetadata_Subscriptions(t *testing.T) { if !waitFor(t, subscriberURL, withTemplate("cloudevents")) { t.Fatal("subscriber not ready") } - waitForTrigger(t, Namespace, subscriberName) + waitForTriggerKnative(t, Namespace, subscriberName) transport := fnhttp.NewRoundTripper() defer transport.Close() @@ -640,7 +639,108 @@ func TestMetadata_Subscriptions(t *testing.T) { } } -func waitForEvent(t *testing.T, eventId string) <-chan string { +func TestMetadata_Subscriptions_Raw(t *testing.T) { + brokerName := "default" + + createBrokerWithCheck(t, Namespace, brokerName) + + uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) + + subscriberName := "func-e2e-test-subscriber-raw" + eventReceived := waitForEvent(t, subscriberName, uniqueEventID) + + subscriberRoot := fromCleanEnv(t, subscriberName) + if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"), + []byte(subscriberCode()), 0644); err != nil { + t.Fatal(err) + } + + subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event") + subscribeCmd.Stdout, subscribeCmd.Stderr = os.Stdout, os.Stderr + if err := subscribeCmd.Run(); err != nil { + t.Fatal(err) + } + + f, err := fn.NewFunction(subscriberRoot) + if err != nil { + t.Fatal(err) + } + if len(f.Deploy.Subscriptions) != 1 { + t.Fatalf("expected 1 subscription, got %d", len(f.Deploy.Subscriptions)) + } + + // Deploy with raw deployer to test trigger creation + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, subscriberName, Namespace) + + // Note: Raw deployer creates cluster-internal services without external routes, + // so we can't use waitFor with domain-based URLs. Instead, we verify the + // deployment is ready and then test event delivery directly. + t.Log("Waiting for deployment to be ready...") + waitForDeployment(t, Namespace, subscriberName) + + // Wait for trigger to be created and ready + waitForTriggerRaw(t, Namespace, subscriberName) + + transport := fnhttp.NewRoundTripper() + defer transport.Close() + client := http.Client{ + Transport: transport, + Timeout: 30 * time.Second, + } + url := fmt.Sprintf("http://broker-ingress.knative-eventing.svc/%s/%s", Namespace, brokerName) + req, _ := http.NewRequestWithContext(t.Context(), "POST", url, strings.NewReader(`{}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("ce-specversion", "1.0") + req.Header.Set("ce-type", "test.event") + req.Header.Set("ce-source", "producer") + req.Header.Set("ce-id", uniqueEventID) + + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Failed to invoke producer: %v", err) + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode != 202 { + t.Fatalf("Broker rejected event: code: %d, body: %q", resp.StatusCode, body) + } + t.Logf("Broker accepted event %s", uniqueEventID) + + select { + case receivedID := <-eventReceived: + t.Logf("Event flow verified (received: %s)", receivedID) + case <-time.After(60 * time.Second): + t.Fatal("Timeout: No callback from subscriber") + } +} + +// CloudEvents handler that logs events +func subscriberCode() string { + return `package function +import ( + "context" + "fmt" + "github.com/cloudevents/sdk-go/v2/event" +) +func Handle(ctx context.Context, e event.Event) (*event.Event, error) { + fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source()) + r := event.New() + r.SetID("response-" + e.ID()) + r.SetSource("subscriber") + r.SetType("test.response") + r.SetData("application/json", map[string]string{"status": "received"}) + return &r, nil +} +` +} + +func waitForEvent(t *testing.T, functionName, eventId string) <-chan string { t.Helper() eventReceived := make(chan string, 10) @@ -649,7 +749,7 @@ func waitForEvent(t *testing.T, eventId string) <-chan string { t.Cleanup(cancel) pr, pw := io.Pipe() - cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*") + cmd := exec.CommandContext(ctx, "stern", functionName+"-.*") cmd.Stderr = io.Discard cmd.Stdout = pw cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) @@ -674,28 +774,6 @@ func waitForEvent(t *testing.T, eventId string) <-chan string { return eventReceived } -// CloudEvents handler that logs events -func subscriberCode() string { - return `package function - -import ( - "context" - "fmt" - "github.com/cloudevents/sdk-go/v2/event" -) - -func Handle(ctx context.Context, e event.Event) (*event.Event, error) { - fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source()) - r := event.New() - r.SetID("response-" + e.ID()) - r.SetSource("subscriber") - r.SetType("test.response") - r.SetData("application/json", map[string]string{"status": "received"}) - return &r, nil -} -` -} - // createBrokerWithCheck creates a Knative Broker func createBrokerWithCheck(t *testing.T, namespace, name string) { t.Helper() @@ -758,10 +836,12 @@ func deleteBroker(t *testing.T, namespace, name string) { t.Logf("Deleted broker %s from namespace %s", name, namespace) } -// waitForTrigger waits for the function's trigger to become ready. -func waitForTrigger(t *testing.T, namespace, functionName string) { +// waitForTriggerKnative waits for the function's trigger to become ready. +// For Knative deployer, triggers are named with pattern: {functionName}-function-trigger-{index} +func waitForTriggerKnative(t *testing.T, namespace, functionName string) { t.Helper() + // Knative deployer uses predictable sequential naming triggerName := fmt.Sprintf("%s-function-trigger-0", functionName) cmd := exec.Command("kubectl", "wait", "--for=condition=Ready", @@ -775,3 +855,65 @@ func waitForTrigger(t *testing.T, namespace, functionName string) { t.Logf("Trigger %s is ready", triggerName) } } + +// waitForTriggerRaw waits for the function's trigger to become ready. +// For raw deployer, triggers are named with pattern: {functionName}-trigger-{hash} +func waitForTriggerRaw(t *testing.T, namespace, functionName string) { + t.Helper() + + // List triggers matching the pattern using kubectl + // Raw deployer creates triggers with pattern: {functionName}-trigger-{hash} + listCmd := exec.Command("kubectl", "get", "triggers", "-n", namespace, + "-o", "name", "--field-selector", "metadata.name!=") + listCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + output, err := listCmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not list triggers: %v", err) + return + } + + // Find trigger matching our function name pattern + prefix := "trigger.eventing.knative.dev/" + functionName + "-trigger-" + var triggerName string + for _, line := range strings.Split(string(output), "\n") { + if strings.HasPrefix(line, prefix) { + // Extract just the trigger name + triggerName = strings.TrimPrefix(line, "trigger.eventing.knative.dev/") + break + } + } + + if triggerName == "" { + t.Logf("Warning: no trigger found for function %s", functionName) + return + } + + t.Logf("Found trigger: %s", triggerName) + + cmd := exec.Command("kubectl", "wait", "--for=condition=Ready", + fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + waitOutput, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(waitOutput)) + } else { + t.Logf("Trigger %s is ready", triggerName) + } +} + +// waitForDeployment waits for a Kubernetes Deployment to become ready. +// This is used for raw deployer which creates Deployments instead of Knative Services. +func waitForDeployment(t *testing.T, namespace, name string) { + t.Helper() + + cmd := exec.Command("kubectl", "wait", "--for=condition=Available", + fmt.Sprintf("deployment/%s", name), "-n", namespace, "--timeout=120s") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Deployment %s not ready: %v, output: %s", name, err, string(output)) + } + t.Logf("Deployment %s is ready", name) +} diff --git a/e2e/e2e_trigger_sync_test.go b/e2e/e2e_trigger_sync_test.go new file mode 100644 index 0000000000..7fa677604b --- /dev/null +++ b/e2e/e2e_trigger_sync_test.go @@ -0,0 +1,546 @@ +//go:build e2e +// +build e2e + +package e2e + +import ( + "encoding/json" + "fmt" + "os" + "os/exec" + "strings" + "testing" + "time" + + fn "knative.dev/func/pkg/functions" +) + +// TestTriggerSync_StaleTriggerCleanup verifies that stale triggers are deleted +// when subscriptions are removed from func.yaml +func TestTriggerSync_StaleTriggerCleanup(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-cleanup" + root := fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add first subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Add second subscription by editing func.yaml + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + // Add another subscription manually + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: "default", + Filters: map[string]string{ + "type": "order.updated", + }, + }) + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Verify 2 triggers were created + triggers := listTriggersForFunction(t, Namespace, functionName) + if len(triggers) != 2 { + t.Fatalf("Expected 2 triggers after initial deploy, got %d: %v", len(triggers), triggers) + } + t.Logf("Initial deploy created 2 triggers: %v", triggers) + + // Verify triggers have managed-by annotation + for _, trigger := range triggers { + if !hasManagedByAnnotation(t, Namespace, trigger) { + t.Errorf("Trigger %s missing managed-by annotation", trigger) + } + } + t.Log("All triggers have managed-by annotation") + + // Remove one subscription by editing func.yaml + f, err = fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + // Keep only the first subscription + f.Deploy.Subscriptions = f.Deploy.Subscriptions[:1] + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Redeploy + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Second) + + // Verify only 1 trigger remains + triggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(triggersAfter) != 1 { + t.Fatalf("Expected 1 trigger after removing subscription, got %d: %v", len(triggersAfter), triggersAfter) + } + t.Logf("Stale trigger deleted, 1 trigger remains: %v", triggersAfter) +} + +// TestTriggerSync_ManualTriggerPreservation verifies that manually created +// triggers (without managed-by annotation) are NOT deleted during sync +func TestTriggerSync_ManualTriggerPreservation(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-manual" + _ = fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add one subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Create a manual trigger (without managed-by annotation) + manualTriggerName := fmt.Sprintf("%s-manual-trigger", functionName) + createManualTrigger(t, Namespace, manualTriggerName, functionName, brokerName) + + // Verify we have 2 triggers (1 managed + 1 manual) + allTriggers := listAllTriggers(t, Namespace) + managedTriggers := listTriggersForFunction(t, Namespace, functionName) + if len(managedTriggers) != 1 { + t.Fatalf("Expected 1 managed trigger, got %d", len(managedTriggers)) + } + if len(allTriggers) < 2 { + t.Fatalf("Expected at least 2 total triggers (managed + manual), got %d", len(allTriggers)) + } + t.Logf("Created manual trigger: %s", manualTriggerName) + + // Redeploy (no changes to subscriptions) + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Second) + + // Verify manual trigger still exists + if !triggerExists(t, Namespace, manualTriggerName) { + t.Fatal("Manual trigger was deleted during sync - should have been preserved!") + } + t.Log("Manual trigger preserved after redeploy") + + // Verify managed trigger still exists + managedTriggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(managedTriggersAfter) != 1 { + t.Fatalf("Expected 1 managed trigger after redeploy, got %d", len(managedTriggersAfter)) + } + t.Log("Managed trigger still exists") +} + +// TestTriggerSync_AddSubscription verifies that new triggers are created +// when subscriptions are added +func TestTriggerSync_AddSubscription(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-add" + root := fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add one subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Verify 1 trigger created + triggers := listTriggersForFunction(t, Namespace, functionName) + if len(triggers) != 1 { + t.Fatalf("Expected 1 trigger, got %d", len(triggers)) + } + t.Logf("Initial deploy created 1 trigger: %v", triggers) + + // Add another subscription by editing func.yaml + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: "default", + Filters: map[string]string{ + "type": "order.shipped", + }, + }) + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Redeploy + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Second) + + // Verify 2 triggers now exist + triggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(triggersAfter) != 2 { + t.Fatalf("Expected 2 triggers after adding subscription, got %d: %v", len(triggersAfter), triggersAfter) + } + t.Logf("New trigger created, 2 triggers total: %v", triggersAfter) +} + +// TestTriggerSync_Idempotency verifies that repeated deploys with the same +// subscriptions don't create duplicate triggers +func TestTriggerSync_Idempotency(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-idempotent" + root := fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add first subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Add second subscription by editing func.yaml + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: "default", + Filters: map[string]string{ + "type": "order.updated", + }, + }) + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Verify 2 triggers created + triggers := listTriggersForFunction(t, Namespace, functionName) + if len(triggers) != 2 { + t.Fatalf("Expected 2 triggers, got %d", len(triggers)) + } + initialTriggers := make([]string, len(triggers)) + copy(initialTriggers, triggers) + t.Logf("Initial triggers: %v", initialTriggers) + + // Redeploy multiple times + for i := 1; i <= 3; i++ { + t.Logf("Redeploy #%d", i) + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + time.Sleep(3 * time.Second) + + triggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(triggersAfter) != 2 { + t.Fatalf("Redeploy #%d: Expected 2 triggers, got %d: %v", i, len(triggersAfter), triggersAfter) + } + + // Verify trigger names haven't changed + if !equalStringSlices(initialTriggers, triggersAfter) { + t.Fatalf("Redeploy #%d: Trigger names changed! Initial: %v, After: %v", i, initialTriggers, triggersAfter) + } + } + t.Log("Idempotency verified: 3 redeploys produced same triggers") +} + +// TestTriggerSync_RemoveAllSubscriptions verifies that all managed triggers +// are deleted when all subscriptions are removed +func TestTriggerSync_RemoveAllSubscriptions(t *testing.T) { + brokerName := "default" + createBrokerWithCheck(t, Namespace, brokerName) + + functionName := "func-e2e-test-trigger-sync-remove-all" + root := fromCleanEnv(t, functionName) + + // Create function + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + + // Add first subscription + if err := newCmd(t, "subscribe", + "--source", "default", + "--filter", "type=order.created").Run(); err != nil { + t.Fatal(err) + } + + // Add second subscription by editing func.yaml + f, err := fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + f.Deploy.Subscriptions = append(f.Deploy.Subscriptions, fn.KnativeSubscription{ + Source: "default", + Filters: map[string]string{ + "type": "order.updated", + }, + }) + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Deploy with raw deployer + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, functionName, Namespace) + + waitForDeployment(t, Namespace, functionName) + + // Verify 2 triggers created + triggers := listTriggersForFunction(t, Namespace, functionName) + if len(triggers) != 2 { + t.Fatalf("Expected 2 triggers, got %d", len(triggers)) + } + t.Logf("Initial triggers: %v", triggers) + + // Create a manual trigger for comparison + manualTriggerName := fmt.Sprintf("%s-manual", functionName) + createManualTrigger(t, Namespace, manualTriggerName, functionName, brokerName) + + // Remove all subscriptions by editing func.yaml directly + f, err = fn.NewFunction(root) + if err != nil { + t.Fatal(err) + } + f.Deploy.Subscriptions = []fn.KnativeSubscription{} + if err := f.Write(); err != nil { + t.Fatal(err) + } + + // Redeploy + if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Second) + + // Verify no managed triggers remain + managedTriggersAfter := listTriggersForFunction(t, Namespace, functionName) + if len(managedTriggersAfter) != 0 { + t.Fatalf("Expected 0 managed triggers after removing all subscriptions, got %d: %v", len(managedTriggersAfter), managedTriggersAfter) + } + t.Log("All managed triggers deleted") + + // Verify manual trigger still exists + if !triggerExists(t, Namespace, manualTriggerName) { + t.Fatal("Manual trigger was deleted - should have been preserved!") + } + t.Log("Manual trigger preserved") +} + +// Helper functions + +// listTriggersForFunction lists all triggers with managed-by annotation for a function +func listTriggersForFunction(t *testing.T, namespace, functionName string) []string { + t.Helper() + + // Get all triggers in namespace (kubectl doesn't support annotation selectors) + cmd := exec.Command("kubectl", "get", "triggers", "-n", namespace, + "-o", "json") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not list triggers: %v, output: %s", err, string(output)) + return []string{} + } + + // Parse JSON to filter by annotation + var triggerList struct { + Items []struct { + Metadata struct { + Name string `json:"name"` + Annotations map[string]string `json:"annotations"` + } `json:"metadata"` + } `json:"items"` + } + + if err := json.Unmarshal(output, &triggerList); err != nil { + t.Logf("Warning: could not parse triggers JSON: %v", err) + return []string{} + } + + // Filter triggers that: + // 1. Have the managed-by annotation + // 2. Belong to this function (name starts with functionName-trigger-) + var functionTriggers []string + for _, trigger := range triggerList.Items { + if trigger.Metadata.Annotations["func.knative.dev/managed-by"] == "func-raw-deployer" { + if strings.HasPrefix(trigger.Metadata.Name, functionName+"-trigger-") { + functionTriggers = append(functionTriggers, trigger.Metadata.Name) + } + } + } + + return functionTriggers +} + +// listAllTriggers lists all triggers in namespace +func listAllTriggers(t *testing.T, namespace string) []string { + t.Helper() + + cmd := exec.Command("kubectl", "get", "triggers", "-n", namespace, + "-o", "jsonpath={.items[*].metadata.name}") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not list all triggers: %v", err) + return []string{} + } + + triggersStr := strings.TrimSpace(string(output)) + if triggersStr == "" { + return []string{} + } + + return strings.Fields(triggersStr) +} + +// hasManagedByAnnotation checks if a trigger has the managed-by annotation +func hasManagedByAnnotation(t *testing.T, namespace, triggerName string) bool { + t.Helper() + + cmd := exec.Command("kubectl", "get", "trigger", triggerName, "-n", namespace, + "-o", "jsonpath={.metadata.annotations.func\\.knative\\.dev/managed-by}") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not get trigger annotation: %v", err) + return false + } + + return strings.TrimSpace(string(output)) == "func-raw-deployer" +} + +// createManualTrigger creates a trigger without the managed-by annotation +func createManualTrigger(t *testing.T, namespace, triggerName, functionName, brokerName string) { + t.Helper() + + triggerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1 +kind: Trigger +metadata: + name: %s + namespace: %s + # Note: NO managed-by annotation +spec: + broker: %s + subscriber: + uri: http://%s.%s.svc.cluster.local + filter: + attributes: + type: manual.event +`, triggerName, namespace, brokerName, functionName, namespace) + + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(triggerYAML) + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to create manual trigger: %v, output: %s", err, string(output)) + } + + t.Cleanup(func() { + deleteCmd := exec.Command("kubectl", "delete", "trigger", triggerName, "-n", namespace, "--ignore-not-found") + deleteCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + _ = deleteCmd.Run() + }) + + t.Logf("Created manual trigger: %s", triggerName) +} + +// triggerExists checks if a trigger exists +func triggerExists(t *testing.T, namespace, triggerName string) bool { + t.Helper() + + cmd := exec.Command("kubectl", "get", "trigger", triggerName, "-n", namespace) + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + return cmd.Run() == nil +} + +// equalStringSlices checks if two string slices contain the same elements (order-independent) +func equalStringSlices(a, b []string) bool { + if len(a) != len(b) { + return false + } + + aMap := make(map[string]bool) + for _, s := range a { + aMap[s] = true + } + + for _, s := range b { + if !aMap[s] { + return false + } + } + + return true +} diff --git a/pkg/k8s/deployer.go b/pkg/k8s/deployer.go index de048034cb..dbc12ba4f1 100644 --- a/pkg/k8s/deployer.go +++ b/pkg/k8s/deployer.go @@ -2,9 +2,12 @@ package k8s import ( "context" + "crypto/sha256" "fmt" + "maps" "os" "regexp" + "sort" "strings" "time" @@ -16,8 +19,15 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + clienteventingv1 "knative.dev/client/pkg/eventing/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1client "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" "knative.dev/func/pkg/deployer" fn "knative.dev/func/pkg/functions" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" ) const ( @@ -26,6 +36,10 @@ const ( DefaultLivenessEndpoint = "/health/liveness" DefaultReadinessEndpoint = "/health/readiness" DefaultHTTPPort = 8080 + + // managedByAnnotation identifies triggers managed by this deployer + managedByAnnotation = "func.knative.dev/managed-by" + managedByValue = "func-raw-deployer" ) type DeployerOpt func(*Deployer) @@ -69,6 +83,15 @@ func onClusterFix(f fn.Function) fn.Function { return f } +// newEventingClient creates a Knative Eventing client from a REST config +func newEventingClient(config *rest.Config, namespace string) (clienteventingv1.KnEventingClient, error) { + eventingClient, err := eventingv1client.NewForConfig(config) + if err != nil { + return nil, err + } + return clienteventingv1.NewKnEventingClient(eventingClient, namespace), nil +} + func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResult, error) { f = onClusterFix(f) // Choosing f.Namespace vs f.Deploy.Namespace: @@ -98,7 +121,13 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu f.Deploy.Image = f.Build.Image } - clientset, err := NewKubernetesClientset() + // Get the Kubernetes REST config + config, err := GetClientConfig().ClientConfig() + if err != nil { + return fn.DeploymentResult{}, err + } + + clientset, err := kubernetes.NewForConfig(config) if err != nil { return fn.DeploymentResult{}, err } @@ -188,6 +217,15 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu return fn.DeploymentResult{}, fmt.Errorf("deployment did not become ready: %w", err) } + // Sync triggers + eventingClient, err := newEventingClient(config, namespace) + if err != nil { + return fn.DeploymentResult{}, fmt.Errorf("failed to create eventing client: %w", err) + } + if err := syncTriggers(ctx, f, namespace, eventingClient, clientset); err != nil { + return fn.DeploymentResult{}, fmt.Errorf("failed to sync triggers: %w", err) + } + url := fmt.Sprintf("http://%s.%s.svc.cluster.local", f.Name, namespace) return fn.DeploymentResult{ @@ -197,6 +235,129 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu }, nil } +// generateTriggerName creates a deterministic trigger name based on subscription content +func generateTriggerName(functionName, broker string, filters map[string]string) string { + filterKeys := make([]string, 0, len(filters)) + for k := range filters { + filterKeys = append(filterKeys, k) + } + sort.Strings(filterKeys) + + parts := make([]string, 0, 1+len(filters)) + parts = append(parts, broker) + for _, k := range filterKeys { + parts = append(parts, fmt.Sprintf("%s=%s", k, filters[k])) + } + + hash := sha256.Sum256([]byte(strings.Join(parts, "|"))) + hashStr := fmt.Sprintf("%x", hash[:4]) + + return fmt.Sprintf("%s-trigger-%s", functionName, hashStr) +} + +func syncTriggers(ctx context.Context, f fn.Function, namespace string, eventingClient clienteventingv1.KnEventingClient, clientset kubernetes.Interface) error { + // Build set of desired trigger names from current subscriptions + desiredTriggers := sets.New[string]() + for _, sub := range f.Deploy.Subscriptions { + triggerName := generateTriggerName(f.Name, sub.Source, sub.Filters) + desiredTriggers.Insert(triggerName) + } + + // Create or update triggers from current subscriptions + if len(f.Deploy.Subscriptions) > 0 { + svc, err := clientset.CoreV1().Services(namespace).Get(ctx, f.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get service: %w", err) + } + + deployment, err := clientset.AppsV1().Deployments(namespace).Get(ctx, f.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get deployment: %w", err) + } + + fmt.Fprintf(os.Stderr, "🎯 Syncing Triggers on the cluster\n") + + for _, sub := range f.Deploy.Subscriptions { + attributes := make(map[string]string) + maps.Copy(attributes, sub.Filters) + + triggerName := generateTriggerName(f.Name, sub.Source, sub.Filters) + + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: triggerName, + Annotations: map[string]string{ + managedByAnnotation: managedByValue, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: deployment.Name, + UID: deployment.UID, + }, + }, + }, + Spec: eventingv1.TriggerSpec{ + Broker: sub.Source, + Subscriber: duckv1.Destination{ + URI: &apis.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, namespace), + }, + }, + Filter: &eventingv1.TriggerFilter{ + Attributes: attributes, + }, + }, + } + + err := eventingClient.CreateTrigger(ctx, trigger) + if err != nil && !errors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create trigger: %w", err) + } + } + } + + // Clean up stale triggers + return deleteStaleTriggers(ctx, eventingClient, f.Name, desiredTriggers) +} + +// deleteStaleTriggers removes triggers managed by this deployer that are no longer in the desired set +func deleteStaleTriggers(ctx context.Context, eventingClient clienteventingv1.KnEventingClient, functionName string, desiredTriggers sets.Set[string]) error { + // List existing triggers in the namespace + existingTriggers, err := eventingClient.ListTriggers(ctx) + if err != nil { + // If triggers can't be listed ,skip cleanup + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to list triggers: %w", err) + } + + // Delete stale triggers (only those belonging to this function) + triggerPrefix := functionName + "-trigger-" + for _, trigger := range existingTriggers.Items { + if !strings.HasPrefix(trigger.Name, triggerPrefix) { + continue + } + + // Only delete triggers we manage + if trigger.Annotations[managedByAnnotation] == managedByValue { + // Check if this trigger is still desired + if !desiredTriggers.Has(trigger.Name) { + fmt.Fprintf(os.Stderr, "🗑️ Deleting stale trigger: %s\n", trigger.Name) + err := eventingClient.DeleteTrigger(ctx, trigger.Name) + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete stale trigger %s: %w", trigger.Name, err) + } + } + } + } + + return nil +} + func (d *Deployer) generateDeployment(f fn.Function, namespace string, daprInstalled bool) (*appsv1.Deployment, error) { labels, err := deployer.GenerateCommonLabels(f, d.decorator) if err != nil { @@ -207,9 +368,7 @@ func (d *Deployer) generateDeployment(f fn.Function, namespace string, daprInsta // Use annotations for pod template podAnnotations := make(map[string]string) - for k, v := range annotations { - podAnnotations[k] = v - } + maps.Copy(podAnnotations, annotations) // Process environment variables and volumes referencedSecrets := sets.New[string]() diff --git a/pkg/k8s/deployer_test.go b/pkg/k8s/deployer_test.go index b515c6b575..c173b8c8c0 100644 --- a/pkg/k8s/deployer_test.go +++ b/pkg/k8s/deployer_test.go @@ -89,3 +89,277 @@ func Test_processValue(t *testing.T) { }) } } + +// Tests for generateTriggerName + +func TestGenerateTriggerName_Deterministic(t *testing.T) { + functionName := "order-processor" + broker := "default" + filters := map[string]string{ + "type": "order.created", + "source": "api", + } + + // Call multiple times with same input + name1 := generateTriggerName(functionName, broker, filters) + name2 := generateTriggerName(functionName, broker, filters) + name3 := generateTriggerName(functionName, broker, filters) + + // Should always produce the same result + if name1 != name2 || name2 != name3 || name1 != name3 { + t.Errorf("generateTriggerName() is not deterministic: got %v, %v, %v", name1, name2, name3) + } +} + +func TestGenerateTriggerName_FilterOrderIndependent(t *testing.T) { + functionName := "order-processor" + broker := "default" + + // Same filters, different order + filters1 := map[string]string{ + "type": "order.created", + "status": "pending", + "source": "api", + } + + filters2 := map[string]string{ + "source": "api", + "type": "order.created", + "status": "pending", + } + + filters3 := map[string]string{ + "status": "pending", + "source": "api", + "type": "order.created", + } + + name1 := generateTriggerName(functionName, broker, filters1) + name2 := generateTriggerName(functionName, broker, filters2) + name3 := generateTriggerName(functionName, broker, filters3) + + // Should produce the same hash regardless of map iteration order + if name1 != name2 || name2 != name3 || name1 != name3 { + t.Errorf("generateTriggerName() is sensitive to filter order: got %v, %v, %v", name1, name2, name3) + } +} + +func TestGenerateTriggerName_DifferentInputsDifferentNames(t *testing.T) { + functionName := "order-processor" + broker := "default" + + // Different filters should produce different names + name1 := generateTriggerName(functionName, broker, map[string]string{"type": "order.created"}) + name2 := generateTriggerName(functionName, broker, map[string]string{"type": "order.paid"}) + name3 := generateTriggerName(functionName, broker, map[string]string{"type": "order.shipped"}) + + if name1 == name2 || name2 == name3 || name1 == name3 { + t.Errorf("generateTriggerName() produced same name for different filters: %v, %v, %v", name1, name2, name3) + } + + // Different brokers should produce different names + name4 := generateTriggerName(functionName, "default", map[string]string{"type": "order.created"}) + name5 := generateTriggerName(functionName, "production", map[string]string{"type": "order.created"}) + + if name4 == name5 { + t.Errorf("generateTriggerName() produced same name for different brokers: %v, %v", name4, name5) + } + + // Different function names should produce different names + name6 := generateTriggerName("order-processor", broker, map[string]string{"type": "order.created"}) + name7 := generateTriggerName("payment-processor", broker, map[string]string{"type": "order.created"}) + + if name6 == name7 { + t.Errorf("generateTriggerName() produced same name for different functions: %v, %v", name6, name7) + } +} + +func TestGenerateTriggerName_ValidKubernetesName(t *testing.T) { + tests := []struct { + name string + functionName string + broker string + filters map[string]string + }{ + { + name: "standard case", + functionName: "order-processor", + broker: "default", + filters: map[string]string{"type": "order.created"}, + }, + { + name: "long function name", + functionName: "very-long-function-name-that-might-cause-issues", + broker: "default", + filters: map[string]string{"type": "test"}, + }, + { + name: "many filters", + functionName: "test-func", + broker: "default", + filters: map[string]string{ + "type": "order.created", + "source": "api", + "status": "pending", + "priority": "high", + }, + }, + { + name: "empty filters", + functionName: "test-func", + broker: "default", + filters: map[string]string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := generateTriggerName(tt.functionName, tt.broker, tt.filters) + + // Kubernetes name requirements: + // - Max 253 characters + if len(got) > 253 { + t.Errorf("generateTriggerName() = %v, length %d exceeds Kubernetes limit of 253", got, len(got)) + } + + // Check format matches expected pattern + if got[:len(tt.functionName)] != tt.functionName { + t.Errorf("generateTriggerName() = %v, doesn't start with function name %s", got, tt.functionName) + } + + // Should contain "-trigger-" + if len(got) < len(tt.functionName)+17 { + t.Errorf("generateTriggerName() = %v, invalid format (too short)", got) + } + }) + } +} + +func TestGenerateTriggerName_ReorderingScenario(t *testing.T) { + // Simulate the reordering scenario from the bug report + functionName := "order-processor" + broker := "default" + + // Original order + sub1 := map[string]string{"type": "order.created"} + sub2 := map[string]string{"type": "order.paid"} + sub3 := map[string]string{"type": "order.shipped"} + + name1_original := generateTriggerName(functionName, broker, sub1) + name2_original := generateTriggerName(functionName, broker, sub2) + name3_original := generateTriggerName(functionName, broker, sub3) + + // Reordered (sub2, sub1, sub3) + name2_reordered := generateTriggerName(functionName, broker, sub2) + name1_reordered := generateTriggerName(functionName, broker, sub1) + name3_reordered := generateTriggerName(functionName, broker, sub3) + + // Names should be the same regardless of subscription order + if name1_original != name1_reordered { + t.Errorf("Reordering changed trigger name for sub1: %v != %v", name1_original, name1_reordered) + } + if name2_original != name2_reordered { + t.Errorf("Reordering changed trigger name for sub2: %v != %v", name2_original, name2_reordered) + } + if name3_original != name3_reordered { + t.Errorf("Reordering changed trigger name for sub3: %v != %v", name3_original, name3_reordered) + } +} + +// TestGenerateTriggerName_TriggerNamingConsistency verifies that the naming +// follows a consistent pattern across multiple subscriptions +func TestGenerateTriggerName_TriggerNamingConsistency(t *testing.T) { + functionName := "order-processor" + + // Simulate subscriptions from func.yaml + subscriptions := []struct { + source string + filters map[string]string + }{ + { + source: "default", + filters: map[string]string{"type": "com.example.order.created"}, + }, + { + source: "default", + filters: map[string]string{"type": "com.example.order.paid"}, + }, + { + source: "default", + filters: map[string]string{"type": "com.example.order.shipped"}, + }, + } + + triggerNames := make(map[string]bool) + + for _, sub := range subscriptions { + name := generateTriggerName(functionName, sub.source, sub.filters) + + // Each trigger should have a unique name + if triggerNames[name] { + t.Errorf("Duplicate trigger name generated: %s", name) + } + triggerNames[name] = true + + // Verify name format + if len(name) < len(functionName)+17 { // functionName + "-trigger-" + 8 hex chars + t.Errorf("Trigger name too short: %s", name) + } + } + + // Verify we generated 3 unique names + if len(triggerNames) != 3 { + t.Errorf("Expected 3 unique trigger names, got %d", len(triggerNames)) + } +} + +// TestGenerateTriggerName_EmptyFilters verifies behavior with empty filters +func TestGenerateTriggerName_EmptyFilters(t *testing.T) { + name := generateTriggerName("test-func", "default", map[string]string{}) + + // Should still generate a valid name based on broker alone + expectedPrefix := "test-func-trigger-" + if len(name) <= len(expectedPrefix) { + t.Errorf("Expected name to have hash suffix, got: %s", name) + } + + if name[:len(expectedPrefix)] != expectedPrefix { + t.Errorf("Expected prefix %s, got: %s", expectedPrefix, name) + } +} + +// TestGenerateTriggerName_SpecialCharacters verifies handling of special chars in filters +func TestGenerateTriggerName_SpecialCharacters(t *testing.T) { + // Filters with special characters + filters := map[string]string{ + "type": "com.example.order/created", + "source": "https://api.example.com", + } + + name := generateTriggerName("test-func", "default", filters) + + // Name should be valid despite special chars in filters + expectedPrefix := "test-func-trigger-" + if name[:len(expectedPrefix)] != expectedPrefix { + t.Errorf("Expected prefix %s, got: %s", expectedPrefix, name) + } + + // Hash should be 8 hex characters + hash := name[len(expectedPrefix):] + if len(hash) != 8 { + t.Errorf("Expected 8-char hash, got %d chars: %s", len(hash), hash) + } +} + +// TestGenerateTriggerName_DifferentBrokers verifies different brokers produce different names +func TestGenerateTriggerName_DifferentBrokers(t *testing.T) { + filters := map[string]string{"type": "test.event"} + + name1 := generateTriggerName("test-func", "default", filters) + name2 := generateTriggerName("test-func", "production", filters) + name3 := generateTriggerName("test-func", "staging", filters) + + if name1 == name2 || name2 == name3 || name1 == name3 { + t.Errorf("Different brokers should produce different names: %s, %s, %s", name1, name2, name3) + } +}