Skip to content
202 changes: 172 additions & 30 deletions e2e/e2e_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,9 @@ func TestMetadata_Subscriptions(t *testing.T) {

uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano())

eventReceived := waitForEvent(t, uniqueEventID)
subscriberName := "func-e2e-test-subscriber-knative"
eventReceived := waitForEvent(t, subscriberName, uniqueEventID)

subscriber := "func-e2e-test-subscriber"
subscriberName := subscriber
subscriberRoot := fromCleanEnv(t, subscriberName)
if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -605,7 +604,7 @@ func TestMetadata_Subscriptions(t *testing.T) {
if !waitFor(t, subscriberURL, withTemplate("cloudevents")) {
t.Fatal("subscriber not ready")
}
waitForTrigger(t, Namespace, subscriberName)
waitForTriggerKnative(t, Namespace, subscriberName)

transport := fnhttp.NewRoundTripper()
defer transport.Close()
Expand Down Expand Up @@ -640,7 +639,108 @@ func TestMetadata_Subscriptions(t *testing.T) {
}
}

func waitForEvent(t *testing.T, eventId string) <-chan string {
func TestMetadata_Subscriptions_Raw(t *testing.T) {
brokerName := "default"

createBrokerWithCheck(t, Namespace, brokerName)

uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano())

subscriberName := "func-e2e-test-subscriber-raw"
eventReceived := waitForEvent(t, subscriberName, uniqueEventID)

subscriberRoot := fromCleanEnv(t, subscriberName)
if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"),
[]byte(subscriberCode()), 0644); err != nil {
t.Fatal(err)
}

subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event")
subscribeCmd.Stdout, subscribeCmd.Stderr = os.Stdout, os.Stderr
if err := subscribeCmd.Run(); err != nil {
t.Fatal(err)
}

f, err := fn.NewFunction(subscriberRoot)
if err != nil {
t.Fatal(err)
}
if len(f.Deploy.Subscriptions) != 1 {
t.Fatalf("expected 1 subscription, got %d", len(f.Deploy.Subscriptions))
}

// Deploy with raw deployer to test trigger creation
if err := newCmd(t, "deploy", "--deployer", "raw").Run(); err != nil {
t.Fatal(err)
}
defer clean(t, subscriberName, Namespace)

// Note: Raw deployer creates cluster-internal services without external routes,
// so we can't use waitFor with domain-based URLs. Instead, we verify the
// deployment is ready and then test event delivery directly.
t.Log("Waiting for deployment to be ready...")
waitForDeployment(t, Namespace, subscriberName)

// Wait for trigger to be created and ready
waitForTriggerRaw(t, Namespace, subscriberName)

transport := fnhttp.NewRoundTripper()
defer transport.Close()
client := http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}
url := fmt.Sprintf("http://broker-ingress.knative-eventing.svc/%s/%s", Namespace, brokerName)
req, _ := http.NewRequestWithContext(t.Context(), "POST", url, strings.NewReader(`{}`))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("ce-specversion", "1.0")
req.Header.Set("ce-type", "test.event")
req.Header.Set("ce-source", "producer")
req.Header.Set("ce-id", uniqueEventID)

resp, err := client.Do(req)
if err != nil {
t.Fatalf("Failed to invoke producer: %v", err)
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode != 202 {
t.Fatalf("Broker rejected event: code: %d, body: %q", resp.StatusCode, body)
}
t.Logf("Broker accepted event %s", uniqueEventID)

select {
case receivedID := <-eventReceived:
t.Logf("Event flow verified (received: %s)", receivedID)
case <-time.After(60 * time.Second):
t.Fatal("Timeout: No callback from subscriber")
}
}

// CloudEvents handler that logs events
func subscriberCode() string {
return `package function
import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/v2/event"
)
func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source())
r := event.New()
r.SetID("response-" + e.ID())
r.SetSource("subscriber")
r.SetType("test.response")
r.SetData("application/json", map[string]string{"status": "received"})
return &r, nil
}
`
}

func waitForEvent(t *testing.T, functionName, eventId string) <-chan string {
t.Helper()

eventReceived := make(chan string, 10)
Expand All @@ -649,7 +749,7 @@ func waitForEvent(t *testing.T, eventId string) <-chan string {
t.Cleanup(cancel)

pr, pw := io.Pipe()
cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*")
cmd := exec.CommandContext(ctx, "stern", functionName+"-.*")
cmd.Stderr = io.Discard
cmd.Stdout = pw
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
Expand All @@ -674,28 +774,6 @@ func waitForEvent(t *testing.T, eventId string) <-chan string {
return eventReceived
}

// CloudEvents handler that logs events
func subscriberCode() string {
return `package function

import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/v2/event"
)

func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source())
r := event.New()
r.SetID("response-" + e.ID())
r.SetSource("subscriber")
r.SetType("test.response")
r.SetData("application/json", map[string]string{"status": "received"})
return &r, nil
}
`
}

// createBrokerWithCheck creates a Knative Broker
func createBrokerWithCheck(t *testing.T, namespace, name string) {
t.Helper()
Expand Down Expand Up @@ -758,10 +836,12 @@ func deleteBroker(t *testing.T, namespace, name string) {
t.Logf("Deleted broker %s from namespace %s", name, namespace)
}

// waitForTrigger waits for the function's trigger to become ready.
func waitForTrigger(t *testing.T, namespace, functionName string) {
// waitForTriggerKnative waits for the function's trigger to become ready.
// For Knative deployer, triggers are named with pattern: {functionName}-function-trigger-{index}
func waitForTriggerKnative(t *testing.T, namespace, functionName string) {
t.Helper()

// Knative deployer uses predictable sequential naming
triggerName := fmt.Sprintf("%s-function-trigger-0", functionName)

cmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
Expand All @@ -775,3 +855,65 @@ func waitForTrigger(t *testing.T, namespace, functionName string) {
t.Logf("Trigger %s is ready", triggerName)
}
}

// waitForTriggerRaw waits for the function's trigger to become ready.
// For raw deployer, triggers are named with pattern: {functionName}-trigger-{hash}
func waitForTriggerRaw(t *testing.T, namespace, functionName string) {
t.Helper()

// List triggers matching the pattern using kubectl
// Raw deployer creates triggers with pattern: {functionName}-trigger-{hash}
listCmd := exec.Command("kubectl", "get", "triggers", "-n", namespace,
"-o", "name", "--field-selector", "metadata.name!=")
listCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
output, err := listCmd.CombinedOutput()
if err != nil {
t.Logf("Warning: could not list triggers: %v", err)
return
}

// Find trigger matching our function name pattern
prefix := "trigger.eventing.knative.dev/" + functionName + "-trigger-"
var triggerName string
for _, line := range strings.Split(string(output), "\n") {
if strings.HasPrefix(line, prefix) {
// Extract just the trigger name
triggerName = strings.TrimPrefix(line, "trigger.eventing.knative.dev/")
break
}
}

if triggerName == "" {
t.Logf("Warning: no trigger found for function %s", functionName)
return
}

t.Logf("Found trigger: %s", triggerName)

cmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s")
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)

waitOutput, err := cmd.CombinedOutput()
if err != nil {
t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(waitOutput))
} else {
t.Logf("Trigger %s is ready", triggerName)
}
}

// waitForDeployment waits for a Kubernetes Deployment to become ready.
// This is used for raw deployer which creates Deployments instead of Knative Services.
func waitForDeployment(t *testing.T, namespace, name string) {
t.Helper()

cmd := exec.Command("kubectl", "wait", "--for=condition=Available",
fmt.Sprintf("deployment/%s", name), "-n", namespace, "--timeout=120s")
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)

output, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("Deployment %s not ready: %v, output: %s", name, err, string(output))
}
t.Logf("Deployment %s is ready", name)
}
Loading
Loading