From 7ed40b99fcd33cb2055fc9b9680aefc7c1467fcb Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Tue, 14 Apr 2026 15:36:19 +0800 Subject: [PATCH 1/5] feat: add a global flag to enable/disable PauseRollout --- .../templates/controller-manager-deployment.yaml | 1 + charts/function-mesh-operator/values.yaml | 1 + config/default/manager_auth_proxy_patch.yaml | 1 + config/manager/manager.yaml | 1 + controllers/spec/common.go | 5 +++++ main.go | 3 +++ utils/configs.go | 1 + 7 files changed, 13 insertions(+) diff --git a/charts/function-mesh-operator/templates/controller-manager-deployment.yaml b/charts/function-mesh-operator/templates/controller-manager-deployment.yaml index 7699b81b6..09f57f692 100644 --- a/charts/function-mesh-operator/templates/controller-manager-deployment.yaml +++ b/charts/function-mesh-operator/templates/controller-manager-deployment.yaml @@ -67,6 +67,7 @@ spec: - --global-backend-config-namespace={{ .Values.controllerManager.globalBackendConfigNamespace }} - --namespaced-backend-config={{ .Values.controllerManager.namespacedBackendConfig }} - --add-default-affinity={{ .Values.controllerManager.addDefaultAffinity }} + - --pause-rollout={{ .Values.controllerManager.pauseRollout }} env: - name: NAMESPACE valueFrom: diff --git a/charts/function-mesh-operator/values.yaml b/charts/function-mesh-operator/values.yaml index 4fcd9dc6a..b874712d2 100644 --- a/charts/function-mesh-operator/values.yaml +++ b/charts/function-mesh-operator/values.yaml @@ -75,6 +75,7 @@ controllerManager: globalBackendConfigNamespace: default namespacedBackendConfig: backend-config addDefaultAffinity: true + pauseRollout: false admissionWebhook: enabled: true diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index d13dda2fe..78a5322ae 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -16,6 +16,7 @@ spec: - "--namespaced-backend-config=backend-config" - "--global-backend-config=global-backend-config" - "--global-backend-config-namespace=sn-system" + - "--pause-rollout=false" ports: - containerPort: 8443 name: metrics diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index dc0ac8ec9..6ef7f48b2 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -36,6 +36,7 @@ spec: - --namespaced-backend-config=backend-config - --global-backend-config=global-backend-config - --global-backend-config-namespace=sn-system + - --pause-rollout=false image: controller:latest name: manager resources: diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 1060637ee..09d7a5130 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -310,6 +310,11 @@ func IsManaged(object metav1.Object) bool { } func IsPauseRollout(object metav1.Object) bool { + // the global flag + if utils.PauseRollout { + return true + } + // the annotation flag per object pauseRollout, exists := object.GetAnnotations()[AnnotationPauseRollout] return exists && pauseRollout == "true" } diff --git a/main.go b/main.go index cede58afa..b99d829c1 100644 --- a/main.go +++ b/main.go @@ -80,6 +80,7 @@ func main() { var namespacedBackendConfig string var secureMetrics bool var addDefaultAffinity bool + var pauseRollout bool flag.StringVar(&metricsAddr, "metrics-addr", lookupEnvOrString("METRICS_ADDR", ":8443"), "The address the metric endpoint binds to.") flag.StringVar(&leaderElectionID, "leader-election-id", @@ -115,6 +116,7 @@ func main() { " Use --metrics-secure=false to use HTTP instead.") flag.BoolVar(&addDefaultAffinity, "add-default-affinity", lookupEnvOrBool("ADD_DEFAULT_AFFINITY", true), "If set, the generated pod will add one default podAntiAffinity:"+ " make pods prefer not be scheduled on the same node (soft rule).") + flag.BoolVar(&pauseRollout, "pause-rollout", lookupEnvOrBool("PAUSE_ROLLOUT", false), "If set, the controller will not rollout the function/sink/source when its spec is not updated.") flag.Parse() ctrl.SetLogger(zap.New(zap.UseDevMode(true))) @@ -123,6 +125,7 @@ func main() { utils.GlobalBackendConfigNamespace = globalBackendConfigNamespace utils.NamespacedBackendConfig = namespacedBackendConfig utils.AddDefaultAffinity = addDefaultAffinity + utils.PauseRollout = pauseRollout // enable pprof if enablePprof { diff --git a/utils/configs.go b/utils/configs.go index 14a8066dd..f5a316592 100644 --- a/utils/configs.go +++ b/utils/configs.go @@ -23,3 +23,4 @@ var GlobalBackendConfig = "" var GlobalBackendConfigNamespace = "default" var NamespacedBackendConfig = "" var AddDefaultAffinity = true +var PauseRollout = false From b4d59e243ceee5ac61aeedeebd291b1f02747b66 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Tue, 14 Apr 2026 17:02:27 +0800 Subject: [PATCH 2/5] fix vulner --- .github/workflows/bundle-release.yml | 8 ++++---- .github/workflows/olm-verify.yml | 4 ++-- .github/workflows/project.yml | 2 +- .github/workflows/release.yml | 4 ++-- .github/workflows/test-helm-charts.yml | 4 ++-- .github/workflows/trivy.yml | 4 ++-- .github/workflows/trivy_scheduled_master.yml | 4 ++-- Dockerfile | 2 +- go.mod | 12 ++++++------ go.sum | 5 +++++ images/samples/go-function-samples/Dockerfile | 2 +- images/samples/go-function-samples/func/go.mod | 2 +- operator.Dockerfile | 2 +- redhat.Dockerfile | 2 +- 14 files changed, 31 insertions(+), 26 deletions(-) diff --git a/.github/workflows/bundle-release.yml b/.github/workflows/bundle-release.yml index 5c17babe7..68543d91b 100644 --- a/.github/workflows/bundle-release.yml +++ b/.github/workflows/bundle-release.yml @@ -49,10 +49,10 @@ jobs: username: ${{ secrets.DOCKER_USER }} password: ${{ secrets.DOCKER_PASSWORD }} - - name: Set up GO 1.25.8 + - name: Set up GO 1.25.9 uses: actions/setup-go@v5 with: - go-version: 1.25.8 + go-version: 1.25.9 id: go - name: InstallKubebuilder @@ -180,10 +180,10 @@ jobs: username: ${{ secrets.DOCKER_USER }} password: ${{ secrets.DOCKER_PASSWORD }} - - name: Set up GO 1.25.8 + - name: Set up GO 1.25.9 uses: actions/setup-go@v5 with: - go-version: 1.25.8 + go-version: 1.25.9 id: go - name: InstallKubebuilder diff --git a/.github/workflows/olm-verify.yml b/.github/workflows/olm-verify.yml index 46674218d..50c734bbd 100644 --- a/.github/workflows/olm-verify.yml +++ b/.github/workflows/olm-verify.yml @@ -34,10 +34,10 @@ jobs: - name: checkout uses: actions/checkout@v2 - - name: Set up GO 1.25.8 + - name: Set up GO 1.25.9 uses: actions/setup-go@v5 with: - go-version: 1.25.8 + go-version: 1.25.9 id: go - name: InstallKubebuilder diff --git a/.github/workflows/project.yml b/.github/workflows/project.yml index 4bcfed199..5eff1082d 100644 --- a/.github/workflows/project.yml +++ b/.github/workflows/project.yml @@ -18,7 +18,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.25.8] + go-version: [1.25.9] steps: - name: Free Disk Space (Ubuntu) uses: jlumbroso/free-disk-space@v1.3.0 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 02a1221f1..513b52de4 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -40,10 +40,10 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - - name: Set up GO 1.25.8 + - name: Set up GO 1.25.9 uses: actions/setup-go@v5 with: - go-version: 1.25.8 + go-version: 1.25.9 id: go - name: InstallKubebuilder diff --git a/.github/workflows/test-helm-charts.yml b/.github/workflows/test-helm-charts.yml index 9cdc0d0f8..66a0a992b 100644 --- a/.github/workflows/test-helm-charts.yml +++ b/.github/workflows/test-helm-charts.yml @@ -83,11 +83,11 @@ jobs: run: hack/kind-cluster-build.sh --name chart-testing -c 1 -v 10 --k8sVersion v1.23.17 if: steps.list-changed.outputs.changed == 'true' - - name: Set up GO 1.25.8 + - name: Set up GO 1.25.9 if: steps.list-changed.outputs.changed == 'true' uses: actions/setup-go@v5 with: - go-version: 1.25.8 + go-version: 1.25.9 id: go - name: setup kubebuilder 3.6.0 diff --git a/.github/workflows/trivy.yml b/.github/workflows/trivy.yml index 3c4e8c586..74faf5df4 100644 --- a/.github/workflows/trivy.yml +++ b/.github/workflows/trivy.yml @@ -57,10 +57,10 @@ jobs: repository: ${{github.event.pull_request.head.repo.full_name}} ref: ${{ github.event.pull_request.head.sha }} - - name: Set up GO 1.25.8 + - name: Set up GO 1.25.9 uses: actions/setup-go@v5 with: - go-version: 1.25.8 + go-version: 1.25.9 id: go - name: InstallKubebuilder diff --git a/.github/workflows/trivy_scheduled_master.yml b/.github/workflows/trivy_scheduled_master.yml index 42549ffed..f3a453327 100644 --- a/.github/workflows/trivy_scheduled_master.yml +++ b/.github/workflows/trivy_scheduled_master.yml @@ -67,10 +67,10 @@ jobs: repository: ${{github.event.pull_request.head.repo.full_name}} ref: ${{ github.event.pull_request.head.sha }} - - name: Set up GO 1.25.8 + - name: Set up GO 1.25.9 uses: actions/setup-go@v5 with: - go-version: 1.25.8 + go-version: 1.25.9 id: go - name: InstallKubebuilder diff --git a/Dockerfile b/Dockerfile index f37d54273..69c0f2c4a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM --platform=$BUILDPLATFORM golang:1.25.8-trixie AS builder +FROM --platform=$BUILDPLATFORM golang:1.25.9-trixie AS builder ARG TARGETOS ARG TARGETARCH diff --git a/go.mod b/go.mod index f2ca44bf7..bf94f8ed4 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/streamnative/function-mesh -go 1.25 +go 1.25.0 require ( github.com/apache/pulsar-client-go v0.17.0 @@ -85,12 +85,12 @@ require ( github.com/stoewer/go-strcase v1.2.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect - go.opentelemetry.io/otel v1.40.0 // indirect + go.opentelemetry.io/otel v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect - go.opentelemetry.io/otel/metric v1.40.0 // indirect - go.opentelemetry.io/otel/sdk v1.40.0 // indirect - go.opentelemetry.io/otel/trace v1.40.0 // indirect + go.opentelemetry.io/otel/metric v1.43.0 // indirect + go.opentelemetry.io/otel/sdk v1.43.0 // indirect + go.opentelemetry.io/otel/trace v1.43.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect @@ -98,7 +98,7 @@ require ( golang.org/x/net v0.48.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.40.0 // indirect + golang.org/x/sys v0.42.0 // indirect golang.org/x/term v0.38.0 // indirect golang.org/x/text v0.32.0 // indirect golang.org/x/time v0.10.0 // indirect diff --git a/go.sum b/go.sum index cae75f652..a4ebf9f30 100644 --- a/go.sum +++ b/go.sum @@ -231,18 +231,22 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSG go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 h1:qFffATk0X+HD+f1Z8lswGiOQYKHRlzfmdJm0wEaVrFA= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0/go.mod h1:MOiCmryaYtc+V0Ei+Tx9o5S1ZjA7kzLucuVuyzBZloQ= go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -291,6 +295,7 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.38.0 h1:PQ5pkm/rLO6HnxFR7N2lJHOZX6Kez5Y1gDSJla6jo7Q= golang.org/x/term v0.38.0/go.mod h1:bSEAKrOT1W+VSu9TSCMtoGEOUcKxOKgl3LE5QEF/xVg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/images/samples/go-function-samples/Dockerfile b/images/samples/go-function-samples/Dockerfile index 8b469ce71..f9341b39e 100644 --- a/images/samples/go-function-samples/Dockerfile +++ b/images/samples/go-function-samples/Dockerfile @@ -1,5 +1,5 @@ ARG PULSAR_IMAGE_TAG -FROM --platform=$BUILDPLATFORM golang:1.25.8-trixie as builder +FROM --platform=$BUILDPLATFORM golang:1.25.9-trixie as builder ARG TARGETOS ARG TARGETARCH diff --git a/images/samples/go-function-samples/func/go.mod b/images/samples/go-function-samples/func/go.mod index 1f6e83f99..f212a5a31 100644 --- a/images/samples/go-function-samples/func/go.mod +++ b/images/samples/go-function-samples/func/go.mod @@ -1,6 +1,6 @@ module github.com/apache/pulsar/pulsar-function-go/examples -go 1.25.8 +go 1.25.9 require github.com/apache/pulsar/pulsar-function-go v0.0.0-20250430085326-611dc3f360b5 diff --git a/operator.Dockerfile b/operator.Dockerfile index 5f3864e62..68fb852c3 100644 --- a/operator.Dockerfile +++ b/operator.Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM golang:1.25.8-trixie AS builder +FROM --platform=$BUILDPLATFORM golang:1.25.9-trixie AS builder ARG TARGETOS ARG TARGETARCH diff --git a/redhat.Dockerfile b/redhat.Dockerfile index 9bf8d33a2..680c2d34c 100644 --- a/redhat.Dockerfile +++ b/redhat.Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM --platform=$BUILDPLATFORM golang:1.25.8-trixie as builder +FROM --platform=$BUILDPLATFORM golang:1.25.9-trixie as builder ARG TARGETOS ARG TARGETARCH From dc2728a973f678f1ea9ed05721a77eabe15adfd7 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Tue, 14 Apr 2026 17:12:54 +0800 Subject: [PATCH 3/5] update --- controllers/common.go | 8 +++++ controllers/common_pause_test.go | 53 ++++++++++++++++++++++++++++++ controllers/function_controller.go | 52 ++++++++++++++++++++++------- controllers/sink_controller.go | 52 ++++++++++++++++++++++------- controllers/source_controller.go | 52 ++++++++++++++++++++++------- 5 files changed, 184 insertions(+), 33 deletions(-) create mode 100644 controllers/common_pause_test.go diff --git a/controllers/common.go b/controllers/common.go index d2321ffd9..b798fa657 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -46,6 +46,14 @@ const ( CleanUpFinalizerName = "cleanup.subscription.finalizer" ) +func shouldPauseNonGenerationRollout(object metav1.Object, isNewGeneration bool) bool { + return spec.IsPauseRollout(object) && !isNewGeneration && object.GetDeletionTimestamp().IsZero() +} + +func shouldApplyPausedResourceAction(action v1alpha1.ReconcileAction) bool { + return action == v1alpha1.Create || action == v1alpha1.Delete +} + func deleteHPAV2Beta2(ctx context.Context, r client.Client, name types.NamespacedName) error { hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{} err := r.Get(ctx, name, hpa) diff --git a/controllers/common_pause_test.go b/controllers/common_pause_test.go new file mode 100644 index 000000000..01a8f9ea7 --- /dev/null +++ b/controllers/common_pause_test.go @@ -0,0 +1,53 @@ +package controllers + +import ( + "testing" + "time" + + "github.com/streamnative/function-mesh/api/compute/v1alpha1" + "github.com/streamnative/function-mesh/utils" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestShouldPauseNonGenerationRollout(t *testing.T) { + oldPauseRollout := utils.PauseRollout + t.Cleanup(func() { + utils.PauseRollout = oldPauseRollout + }) + + utils.PauseRollout = true + object := &metav1.ObjectMeta{} + if !shouldPauseNonGenerationRollout(object, false) { + t.Fatal("expected pause rollout for non-generation reconcile") + } + + if shouldPauseNonGenerationRollout(object, true) { + t.Fatal("did not expect pause rollout for new generation") + } + + now := metav1.NewTime(time.Now()) + object.DeletionTimestamp = &now + if shouldPauseNonGenerationRollout(object, false) { + t.Fatal("did not expect pause rollout while object is deleting") + } +} + +func TestShouldApplyPausedResourceAction(t *testing.T) { + if !shouldApplyPausedResourceAction(v1alpha1.Create) { + t.Fatal("expected create action to be allowed while paused") + } + if !shouldApplyPausedResourceAction(v1alpha1.Delete) { + t.Fatal("expected delete action to be allowed while paused") + } + + blockedActions := []v1alpha1.ReconcileAction{ + v1alpha1.Update, + v1alpha1.Wait, + v1alpha1.NoAction, + } + for _, action := range blockedActions { + if shouldApplyPausedResourceAction(action) { + t.Fatalf("did not expect %s action to be allowed while paused", action) + } + } +} diff --git a/controllers/function_controller.go b/controllers/function_controller.go index 4719034f0..d8a3e2be1 100644 --- a/controllers/function_controller.go +++ b/controllers/function_controller.go @@ -100,22 +100,13 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function) + pauseRollout := shouldPauseNonGenerationRollout(function, isNewGeneration) + function.Status.PendingChange = "" err = r.ObserveFunctionStatefulSet(ctx, function) if err != nil { return reconcile.Result{}, err } - // skip reconcile if pauseRollout is set to true and the generation is not increased - if spec.IsPauseRollout(function) && !isNewGeneration { - err = r.Status().Update(ctx, function) - if err != nil { - r.Log.Error(err, "failed to update function status after observing statefulset") - return ctrl.Result{}, err - } - return ctrl.Result{}, nil - } else { - function.Status.PendingChange = "" - } err = r.ObserveFunctionService(ctx, function) if err != nil { @@ -144,6 +135,45 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } + if pauseRollout { + if shouldApplyPausedResourceAction(function.Status.Conditions[v1alpha1.StatefulSet].Action) { + err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + if shouldApplyPausedResourceAction(function.Status.Conditions[v1alpha1.Service].Action) { + err = r.ApplyFunctionService(ctx, function, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + if condition, ok := function.Status.Conditions[v1alpha1.HPA]; ok && shouldApplyPausedResourceAction(condition.Action) { + if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2Beta2 { + err = r.ApplyFunctionHPAV2Beta2(ctx, function, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } else if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2 { + err = r.ApplyFunctionHPA(ctx, function, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + } + if condition, ok := function.Status.Conditions[v1alpha1.VPA]; ok && shouldApplyPausedResourceAction(condition.Action) { + err = r.ApplyFunctionVPA(ctx, function) + if err != nil { + return reconcile.Result{}, err + } + } + err = r.ApplyFunctionCleanUpJob(ctx, function) + if err != nil { + return reconcile.Result{}, err + } + return ctrl.Result{}, nil + } + err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration) if err != nil { return reconcile.Result{}, err diff --git a/controllers/sink_controller.go b/controllers/sink_controller.go index 039c1611d..6de7ddeb0 100644 --- a/controllers/sink_controller.go +++ b/controllers/sink_controller.go @@ -99,22 +99,13 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. } isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink) + pauseRollout := shouldPauseNonGenerationRollout(sink, isNewGeneration) + sink.Status.PendingChange = "" err = r.ObserveSinkStatefulSet(ctx, sink) if err != nil { return reconcile.Result{}, err } - // skip reconcile if pauseRollout is set to true and the generation is not increased - if spec.IsPauseRollout(sink) && !isNewGeneration { - err = r.Status().Update(ctx, sink) - if err != nil { - r.Log.Error(err, "failed to update sink status after observing statefulset") - return ctrl.Result{}, err - } - return ctrl.Result{}, nil - } else { - sink.Status.PendingChange = "" - } err = r.ObserveSinkService(ctx, sink) if err != nil { @@ -143,6 +134,45 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. return ctrl.Result{}, err } + if pauseRollout { + if shouldApplyPausedResourceAction(sink.Status.Conditions[v1alpha1.StatefulSet].Action) { + err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + if shouldApplyPausedResourceAction(sink.Status.Conditions[v1alpha1.Service].Action) { + err = r.ApplySinkService(ctx, sink, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + if condition, ok := sink.Status.Conditions[v1alpha1.HPA]; ok && shouldApplyPausedResourceAction(condition.Action) { + if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2Beta2 { + err = r.ApplySinkHPAV2Beta2(ctx, sink, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } else if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2 { + err = r.ApplySinkHPA(ctx, sink, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + } + if condition, ok := sink.Status.Conditions[v1alpha1.VPA]; ok && shouldApplyPausedResourceAction(condition.Action) { + err = r.ApplySinkVPA(ctx, sink) + if err != nil { + return reconcile.Result{}, err + } + } + err = r.ApplySinkCleanUpJob(ctx, sink) + if err != nil { + return reconcile.Result{}, err + } + return ctrl.Result{}, nil + } + err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration) if err != nil { return reconcile.Result{}, err diff --git a/controllers/source_controller.go b/controllers/source_controller.go index b6185667a..14af20ad1 100644 --- a/controllers/source_controller.go +++ b/controllers/source_controller.go @@ -99,22 +99,13 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source) + pauseRollout := shouldPauseNonGenerationRollout(source, isNewGeneration) + source.Status.PendingChange = "" err = r.ObserveSourceStatefulSet(ctx, source) if err != nil { return reconcile.Result{}, err } - // skip reconcile if pauseRollout is set to true and the generation is not increased - if spec.IsPauseRollout(source) && !isNewGeneration { - err = r.Status().Update(ctx, source) - if err != nil { - r.Log.Error(err, "failed to update source status after observing statefulset") - return ctrl.Result{}, err - } - return ctrl.Result{}, nil - } else { - source.Status.PendingChange = "" - } err = r.ObserveSourceService(ctx, source) if err != nil { @@ -143,6 +134,45 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, err } + if pauseRollout { + if shouldApplyPausedResourceAction(source.Status.Conditions[v1alpha1.StatefulSet].Action) { + err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + if shouldApplyPausedResourceAction(source.Status.Conditions[v1alpha1.Service].Action) { + err = r.ApplySourceService(ctx, source, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + if condition, ok := source.Status.Conditions[v1alpha1.HPA]; ok && shouldApplyPausedResourceAction(condition.Action) { + if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2Beta2 { + err = r.ApplySourceHPAV2Beta2(ctx, source, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } else if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2 { + err = r.ApplySourceHPA(ctx, source, isNewGeneration) + if err != nil { + return reconcile.Result{}, err + } + } + } + if condition, ok := source.Status.Conditions[v1alpha1.VPA]; ok && shouldApplyPausedResourceAction(condition.Action) { + err = r.ApplySourceVPA(ctx, source) + if err != nil { + return reconcile.Result{}, err + } + } + err = r.ApplySourceCleanUpJob(ctx, source) + if err != nil { + return reconcile.Result{}, err + } + return ctrl.Result{}, nil + } + err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration) if err != nil { return reconcile.Result{}, err From 480b9f1c1feb9924420553e86bcdfb697637d22b Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Tue, 14 Apr 2026 19:28:40 +0800 Subject: [PATCH 4/5] fix error --- go.sum | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/go.sum b/go.sum index a4ebf9f30..d3a268434 100644 --- a/go.sum +++ b/go.sum @@ -229,23 +229,19 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= -go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= -go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 h1:qFffATk0X+HD+f1Z8lswGiOQYKHRlzfmdJm0wEaVrFA= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0/go.mod h1:MOiCmryaYtc+V0Ei+Tx9o5S1ZjA7kzLucuVuyzBZloQ= -go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= -go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= -go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= -go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= -go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= -go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= -go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= -go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw= +go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= +go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= @@ -293,8 +289,7 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.38.0 h1:PQ5pkm/rLO6HnxFR7N2lJHOZX6Kez5Y1gDSJla6jo7Q= golang.org/x/term v0.38.0/go.mod h1:bSEAKrOT1W+VSu9TSCMtoGEOUcKxOKgl3LE5QEF/xVg= From 6fa83cfb3f6a8142b1dc17f0e564a3b94770bab3 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Wed, 15 Apr 2026 09:10:14 +0800 Subject: [PATCH 5/5] fix license --- controllers/common_pause_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/controllers/common_pause_test.go b/controllers/common_pause_test.go index 01a8f9ea7..6eef58865 100644 --- a/controllers/common_pause_test.go +++ b/controllers/common_pause_test.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package controllers import (