Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/bundle-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ jobs:
username: ${{ secrets.DOCKER_USER }}
password: ${{ secrets.DOCKER_PASSWORD }}

- name: Set up GO 1.25.8
- name: Set up GO 1.25.9
uses: actions/setup-go@v5
with:
go-version: 1.25.8
go-version: 1.25.9
id: go

- name: InstallKubebuilder
Expand Down Expand Up @@ -180,10 +180,10 @@ jobs:
username: ${{ secrets.DOCKER_USER }}
password: ${{ secrets.DOCKER_PASSWORD }}

- name: Set up GO 1.25.8
- name: Set up GO 1.25.9
uses: actions/setup-go@v5
with:
go-version: 1.25.8
go-version: 1.25.9
id: go

- name: InstallKubebuilder
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/olm-verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ jobs:
- name: checkout
uses: actions/checkout@v2

- name: Set up GO 1.25.8
- name: Set up GO 1.25.9
uses: actions/setup-go@v5
with:
go-version: 1.25.8
go-version: 1.25.9
id: go

- name: InstallKubebuilder
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go-version: [1.25.8]
go-version: [1.25.9]
steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ jobs:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Set up GO 1.25.8
- name: Set up GO 1.25.9
uses: actions/setup-go@v5
with:
go-version: 1.25.8
go-version: 1.25.9
id: go

- name: InstallKubebuilder
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-helm-charts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ jobs:
run: hack/kind-cluster-build.sh --name chart-testing -c 1 -v 10 --k8sVersion v1.23.17
if: steps.list-changed.outputs.changed == 'true'

- name: Set up GO 1.25.8
- name: Set up GO 1.25.9
if: steps.list-changed.outputs.changed == 'true'
uses: actions/setup-go@v5
with:
go-version: 1.25.8
go-version: 1.25.9
id: go

- name: setup kubebuilder 3.6.0
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/trivy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ jobs:
repository: ${{github.event.pull_request.head.repo.full_name}}
ref: ${{ github.event.pull_request.head.sha }}

- name: Set up GO 1.25.8
- name: Set up GO 1.25.9
uses: actions/setup-go@v5
with:
go-version: 1.25.8
go-version: 1.25.9
id: go

- name: InstallKubebuilder
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/trivy_scheduled_master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ jobs:
repository: ${{github.event.pull_request.head.repo.full_name}}
ref: ${{ github.event.pull_request.head.sha }}

- name: Set up GO 1.25.8
- name: Set up GO 1.25.9
uses: actions/setup-go@v5
with:
go-version: 1.25.8
go-version: 1.25.9
id: go

- name: InstallKubebuilder
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM --platform=$BUILDPLATFORM golang:1.25.8-trixie AS builder
FROM --platform=$BUILDPLATFORM golang:1.25.9-trixie AS builder

ARG TARGETOS
ARG TARGETARCH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ spec:
- --global-backend-config-namespace={{ .Values.controllerManager.globalBackendConfigNamespace }}
- --namespaced-backend-config={{ .Values.controllerManager.namespacedBackendConfig }}
- --add-default-affinity={{ .Values.controllerManager.addDefaultAffinity }}
- --pause-rollout={{ .Values.controllerManager.pauseRollout }}
env:
- name: NAMESPACE
valueFrom:
Expand Down
1 change: 1 addition & 0 deletions charts/function-mesh-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ controllerManager:
globalBackendConfigNamespace: default
namespacedBackendConfig: backend-config
addDefaultAffinity: true
pauseRollout: false

admissionWebhook:
enabled: true
1 change: 1 addition & 0 deletions config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ spec:
- "--namespaced-backend-config=backend-config"
- "--global-backend-config=global-backend-config"
- "--global-backend-config-namespace=sn-system"
- "--pause-rollout=false"
ports:
- containerPort: 8443
name: metrics
Expand Down
1 change: 1 addition & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ spec:
- --namespaced-backend-config=backend-config
- --global-backend-config=global-backend-config
- --global-backend-config-namespace=sn-system
- --pause-rollout=false
image: controller:latest
name: manager
resources:
Expand Down
8 changes: 8 additions & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ const (
CleanUpFinalizerName = "cleanup.subscription.finalizer"
)

func shouldPauseNonGenerationRollout(object metav1.Object, isNewGeneration bool) bool {
return spec.IsPauseRollout(object) && !isNewGeneration && object.GetDeletionTimestamp().IsZero()
}

func shouldApplyPausedResourceAction(action v1alpha1.ReconcileAction) bool {
return action == v1alpha1.Create || action == v1alpha1.Delete
}

func deleteHPAV2Beta2(ctx context.Context, r client.Client, name types.NamespacedName) error {
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
err := r.Get(ctx, name, hpa)
Expand Down
70 changes: 70 additions & 0 deletions controllers/common_pause_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package controllers

import (
"testing"
"time"

"github.com/streamnative/function-mesh/api/compute/v1alpha1"
"github.com/streamnative/function-mesh/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestShouldPauseNonGenerationRollout(t *testing.T) {
oldPauseRollout := utils.PauseRollout
t.Cleanup(func() {
utils.PauseRollout = oldPauseRollout
})

utils.PauseRollout = true
object := &metav1.ObjectMeta{}
if !shouldPauseNonGenerationRollout(object, false) {
t.Fatal("expected pause rollout for non-generation reconcile")
}

if shouldPauseNonGenerationRollout(object, true) {
t.Fatal("did not expect pause rollout for new generation")
}

now := metav1.NewTime(time.Now())
object.DeletionTimestamp = &now
if shouldPauseNonGenerationRollout(object, false) {
t.Fatal("did not expect pause rollout while object is deleting")
}
}

func TestShouldApplyPausedResourceAction(t *testing.T) {
if !shouldApplyPausedResourceAction(v1alpha1.Create) {
t.Fatal("expected create action to be allowed while paused")
}
if !shouldApplyPausedResourceAction(v1alpha1.Delete) {
t.Fatal("expected delete action to be allowed while paused")
}

blockedActions := []v1alpha1.ReconcileAction{
v1alpha1.Update,
v1alpha1.Wait,
v1alpha1.NoAction,
}
for _, action := range blockedActions {
if shouldApplyPausedResourceAction(action) {
t.Fatalf("did not expect %s action to be allowed while paused", action)
}
}
}
52 changes: 41 additions & 11 deletions controllers/function_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,13 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function)
pauseRollout := shouldPauseNonGenerationRollout(function, isNewGeneration)
function.Status.PendingChange = ""

err = r.ObserveFunctionStatefulSet(ctx, function)
if err != nil {
return reconcile.Result{}, err
}
// skip reconcile if pauseRollout is set to true and the generation is not increased
if spec.IsPauseRollout(function) && !isNewGeneration {
err = r.Status().Update(ctx, function)
if err != nil {
r.Log.Error(err, "failed to update function status after observing statefulset")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
} else {
function.Status.PendingChange = ""
}

err = r.ObserveFunctionService(ctx, function)
if err != nil {
Expand Down Expand Up @@ -144,6 +135,45 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

if pauseRollout {
if shouldApplyPausedResourceAction(function.Status.Conditions[v1alpha1.StatefulSet].Action) {
err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
}
}
if shouldApplyPausedResourceAction(function.Status.Conditions[v1alpha1.Service].Action) {
err = r.ApplyFunctionService(ctx, function, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
}
}
if condition, ok := function.Status.Conditions[v1alpha1.HPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2Beta2 {
err = r.ApplyFunctionHPAV2Beta2(ctx, function, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
}
} else if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2 {
err = r.ApplyFunctionHPA(ctx, function, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
}
}
}
if condition, ok := function.Status.Conditions[v1alpha1.VPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
err = r.ApplyFunctionVPA(ctx, function)
if err != nil {
return reconcile.Result{}, err
}
}
err = r.ApplyFunctionCleanUpJob(ctx, function)
if err != nil {
return reconcile.Result{}, err
}
return ctrl.Result{}, nil
}

err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
Expand Down
52 changes: 41 additions & 11 deletions controllers/sink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,13 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink)
pauseRollout := shouldPauseNonGenerationRollout(sink, isNewGeneration)
sink.Status.PendingChange = ""

err = r.ObserveSinkStatefulSet(ctx, sink)
if err != nil {
return reconcile.Result{}, err
}
// skip reconcile if pauseRollout is set to true and the generation is not increased
if spec.IsPauseRollout(sink) && !isNewGeneration {
err = r.Status().Update(ctx, sink)
if err != nil {
r.Log.Error(err, "failed to update sink status after observing statefulset")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
} else {
sink.Status.PendingChange = ""
}

err = r.ObserveSinkService(ctx, sink)
if err != nil {
Expand Down Expand Up @@ -143,6 +134,45 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, err
}

if pauseRollout {
if shouldApplyPausedResourceAction(sink.Status.Conditions[v1alpha1.StatefulSet].Action) {
err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
}
}
if shouldApplyPausedResourceAction(sink.Status.Conditions[v1alpha1.Service].Action) {
err = r.ApplySinkService(ctx, sink, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
}
}
if condition, ok := sink.Status.Conditions[v1alpha1.HPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2Beta2 {
err = r.ApplySinkHPAV2Beta2(ctx, sink, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
}
} else if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2 {
err = r.ApplySinkHPA(ctx, sink, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
}
}
}
if condition, ok := sink.Status.Conditions[v1alpha1.VPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
err = r.ApplySinkVPA(ctx, sink)
if err != nil {
return reconcile.Result{}, err
}
}
err = r.ApplySinkCleanUpJob(ctx, sink)
if err != nil {
return reconcile.Result{}, err
}
return ctrl.Result{}, nil
}

err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
Expand Down
Loading
Loading