From 558f0843d504b2e94735134709b899c7355092b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Depriester?= Date: Sun, 30 Nov 2025 14:46:42 +0100 Subject: [PATCH 1/2] feat(controller): use custom requeue interval for reconciliation --- cmd/main.go | 7 ++ .../controller/postgresdatabase_controller.go | 58 +++++++++++------ .../controller/postgresrole_controller.go | 65 ++++++++++++------- .../controller/postgresschema_controller.go | 58 +++++++++++------ 4 files changed, 124 insertions(+), 64 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index d0be9e7..0166d79 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,6 +6,7 @@ import ( "flag" "os" "path/filepath" + "time" "go.uber.org/zap/zapcore" _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -50,6 +51,7 @@ func main() { var enableHTTP2 bool var tlsOpts []func(*tls.Config) var operatorInstanceName string + var reconciliationRequeueInterval time.Duration flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") @@ -66,6 +68,8 @@ func main() { flag.BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers") flag.StringVar(&operatorInstanceName, "operator-instance-name", "", "The name of this operator instance.") + flag.DurationVar(&reconciliationRequeueInterval, "reconciliation-requeue-interval", 5*time.Minute, + "Default interval between resource reconciliation") opts := zap.Options{ Development: true, @@ -180,6 +184,7 @@ func main() { if err = (&controller.PostgresDatabaseReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + RequeueInterval: reconciliationRequeueInterval, PGPools: pgpools, OperatorInstanceName: operatorInstanceName, }).SetupWithManager(mgr); err != nil { @@ -190,6 +195,7 @@ func main() { if err = (&controller.PostgresRoleReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + RequeueInterval: reconciliationRequeueInterval, PGPools: pgpools, OperatorInstanceName: operatorInstanceName, CacheRolePasswords: cacheRolePasswords, @@ -200,6 +206,7 @@ func main() { if err = (&controller.PostgresSchemaReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + RequeueInterval: reconciliationRequeueInterval, PGPools: pgpools, OperatorInstanceName: operatorInstanceName, }).SetupWithManager(mgr); err != nil { diff --git a/internal/controller/postgresdatabase_controller.go b/internal/controller/postgresdatabase_controller.go index c3903b0..c6d2769 100644 --- a/internal/controller/postgresdatabase_controller.go +++ b/internal/controller/postgresdatabase_controller.go @@ -7,12 +7,16 @@ import ( "time" "github.com/go-logr/logr" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" managedpostgresoperatorhoppscalecomv1alpha1 "github.com/hoppscale/managed-postgres-operator/api/v1alpha1" "github.com/hoppscale/managed-postgres-operator/internal/postgresql" @@ -27,6 +31,8 @@ type PostgresDatabaseReconciler struct { Scheme *runtime.Scheme logging logr.Logger + RequeueInterval time.Duration + PGPools *postgresql.PGPools OperatorInstanceName string } @@ -37,23 +43,20 @@ type PostgresDatabaseReconciler struct { func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { r.logging = log.FromContext(ctx) - ctrlSuccessResult := ctrl.Result{RequeueAfter: time.Minute} - ctrlFailResult := ctrl.Result{} - resource := &managedpostgresoperatorhoppscalecomv1alpha1.PostgresDatabase{} if err := r.Client.Get(ctx, req.NamespacedName, resource); err != nil { - return ctrlFailResult, client.IgnoreNotFound(err) + return r.Result(client.IgnoreNotFound(err)) } // Skip reconcile if the resource is not managed by this operator if !utils.IsManagedByOperatorInstance(resource.ObjectMeta.Annotations, r.OperatorInstanceName) { - return ctrlSuccessResult, nil + return r.Result(nil) } existingDatabase, err := postgresql.GetDatabase(r.PGPools.Default, resource.Spec.Name) if err != nil { - return ctrlFailResult, fmt.Errorf("failed to retrieve database: %s", err) + return r.Result(fmt.Errorf("failed to retrieve database: %s", err)) } desiredDatabase := postgresql.Database{ @@ -62,36 +65,37 @@ func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Req Extensions: resource.Spec.Extensions, } - // - // Deletion logic - // - if resource.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(resource, PostgresDatabaseFinalizer) { controllerutil.AddFinalizer(resource, PostgresDatabaseFinalizer) if err := r.Update(ctx, resource); err != nil { - return ctrlFailResult, err + return r.Result(err) } } } else { + + // + // Deletion logic + // + // If there is no finalizer, delete the resource immediately if !controllerutil.ContainsFinalizer(resource, PostgresDatabaseFinalizer) { - return ctrlSuccessResult, nil + return r.Result(nil) } err = r.reconcileOnDeletion(resource, existingDatabase) if err != nil { - return ctrlFailResult, err + return r.Result(err) } // Remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(resource, PostgresDatabaseFinalizer) if err := r.Update(ctx, resource); err != nil { - return ctrlFailResult, err + return r.Result(err) } // Stop reconciliation as the item is being deleted - return ctrlSuccessResult, nil + return r.Result(nil) } // @@ -100,12 +104,12 @@ func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Req err = r.reconcileOnCreation(existingDatabase, &desiredDatabase) if err != nil { - return ctrlFailResult, err + return r.Result(err) } err = r.reconcileExtensions(&desiredDatabase) if err != nil { - return ctrlFailResult, err + return r.Result(err) } for roleName, rolePrivileges := range resource.Spec.PrivilegesByRole { @@ -115,18 +119,18 @@ func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Req r.convertPrivilegesSpecToList(rolePrivileges), ) if err != nil { - return ctrlFailResult, err + return r.Result(err) } } if !resource.Status.Succeeded { resource.Status.Succeeded = true if err = r.Client.Status().Update(context.Background(), resource); err != nil { - return ctrlFailResult, fmt.Errorf("failed to update object: %s", err) + return r.Result(fmt.Errorf("failed to update object: %s", err)) } } - return ctrlSuccessResult, nil + return r.Result(nil) } // SetupWithManager sets up the controller with the Manager. @@ -134,9 +138,23 @@ func (r *PostgresDatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&managedpostgresoperatorhoppscalecomv1alpha1.PostgresDatabase{}). Named("postgresdatabase"). + WithOptions(controller.Options{ + RateLimiter: workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Second, r.RequeueInterval), + &workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + }). Complete(r) } +// Result builds reconciler result depending on error +func (r *PostgresDatabaseReconciler) Result(err error) (ctrl.Result, error) { + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{RequeueAfter: r.RequeueInterval}, nil +} + // reconcileOnDeletion performs all actions related to deleting the resource func (r *PostgresDatabaseReconciler) reconcileOnDeletion(resource *managedpostgresoperatorhoppscalecomv1alpha1.PostgresDatabase, existingDatabase *postgresql.Database) (err error) { if existingDatabase == nil { diff --git a/internal/controller/postgresrole_controller.go b/internal/controller/postgresrole_controller.go index 93bbe5d..5e051d3 100644 --- a/internal/controller/postgresrole_controller.go +++ b/internal/controller/postgresrole_controller.go @@ -24,15 +24,19 @@ import ( "text/template" "time" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/go-logr/logr" managedpostgresoperatorhoppscalecomv1alpha1 "github.com/hoppscale/managed-postgres-operator/api/v1alpha1" @@ -49,6 +53,8 @@ type PostgresRoleReconciler struct { Scheme *runtime.Scheme logging logr.Logger + RequeueInterval time.Duration + PGPools *postgresql.PGPools OperatorInstanceName string @@ -61,18 +67,15 @@ type PostgresRoleReconciler struct { func (r *PostgresRoleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { r.logging = log.FromContext(ctx) - ctrlSuccessResult := ctrl.Result{RequeueAfter: time.Minute} - ctrlFailResult := ctrl.Result{RequeueAfter: time.Second} - resource := &managedpostgresoperatorhoppscalecomv1alpha1.PostgresRole{} if err := r.Client.Get(ctx, req.NamespacedName, resource); err != nil { - return ctrlFailResult, client.IgnoreNotFound(err) + return r.Result(client.IgnoreNotFound(err)) } // Skip reconcile if the resource is not managed by this operator if !utils.IsManagedByOperatorInstance(resource.ObjectMeta.Annotations, r.OperatorInstanceName) { - return ctrlSuccessResult, nil + return r.Result(nil) } rolePassword := "" @@ -88,7 +91,7 @@ func (r *PostgresRoleReconciler) Reconcile(ctx context.Context, req ctrl.Request err := r.Client.Get(ctx, secretNamespacedName, resourceSecret) if err != nil { - return ctrlFailResult, fmt.Errorf("failed to retrieve password from secret: %s", err) + return r.Result(fmt.Errorf("failed to retrieve password from secret: %s", err)) } rolePassword = string(resourceSecret.Data[resource.Spec.PasswordFromSecret.Key]) @@ -115,44 +118,45 @@ func (r *PostgresRoleReconciler) Reconcile(ctx context.Context, req ctrl.Request existingRole, err := postgresql.GetRole(r.PGPools.Default, resource.Spec.Name) if err != nil { - return ctrlFailResult, fmt.Errorf("failed to get role: %s", err) + return r.Result(fmt.Errorf("failed to get role: %s", err)) } operatorRole, err := postgresql.GetRole(r.PGPools.Default, r.PGPools.Default.Config().ConnConfig.User) if err != nil { - return ctrlFailResult, fmt.Errorf("failed to get operator's role: %s", err) + return r.Result(fmt.Errorf("failed to get operator's role: %s", err)) } - // - // Deletion logic - // - if resource.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(resource, PostgresRoleFinalizer) { controllerutil.AddFinalizer(resource, PostgresRoleFinalizer) if err := r.Update(ctx, resource); err != nil { - return ctrlFailResult, err + return r.Result(err) } } } else { + + // + // Deletion logic + // + // If there is no finalizer, delete the resource immediately if !controllerutil.ContainsFinalizer(resource, PostgresRoleFinalizer) { - return ctrlSuccessResult, nil + return r.Result(nil) } err = r.reconcileOnDeletion(existingRole, resource.Spec.KeepOnDelete) if err != nil { - return ctrlFailResult, err + return r.Result(err) } // Remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(resource, PostgresRoleFinalizer) if err := r.Update(ctx, resource); err != nil { - return ctrlFailResult, err + return r.Result(err) } // Stop reconciliation as the item is being deleted - return ctrlSuccessResult, nil + return r.Result(nil) } // @@ -161,12 +165,12 @@ func (r *PostgresRoleReconciler) Reconcile(ctx context.Context, req ctrl.Request err = r.reconcileOnCreation(operatorRole, existingRole, &desiredRole) if err != nil { - return ctrlFailResult, err + return r.Result(err) } err = r.reconcileRoleMembership(desiredRole.Name, resource.Spec.MemberOfRoles) if err != nil { - return ctrlFailResult, err + return r.Result(err) } err = r.reconcileRoleSecret( @@ -177,17 +181,17 @@ func (r *PostgresRoleReconciler) Reconcile(ctx context.Context, req ctrl.Request r.PGPools.Default.Config().ConnConfig, ) if err != nil { - return ctrlFailResult, err + return r.Result(err) } if !resource.Status.Succeeded { resource.Status.Succeeded = true if err = r.Client.Status().Update(context.Background(), resource); err != nil { - return ctrlFailResult, fmt.Errorf("failed to update object: %s", err) + return r.Result(fmt.Errorf("failed to update object: %s", err)) } } - return ctrlSuccessResult, nil + return r.Result(nil) } // SetupWithManager sets up the controller with the Manager. @@ -195,9 +199,23 @@ func (r *PostgresRoleReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&managedpostgresoperatorhoppscalecomv1alpha1.PostgresRole{}). Named("postgresrole"). + WithOptions(controller.Options{ + RateLimiter: workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Second, r.RequeueInterval), + &workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + }). Complete(r) } +// Result builds reconciler result depending on error +func (r *PostgresRoleReconciler) Result(err error) (ctrl.Result, error) { + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{RequeueAfter: r.RequeueInterval}, nil +} + // reconcileOnDeletion performs all actions related to deleting the resource func (r *PostgresRoleReconciler) reconcileOnDeletion(existingRole *postgresql.Role, keepOnDelete bool) (err error) { if existingRole == nil { @@ -213,8 +231,7 @@ func (r *PostgresRoleReconciler) reconcileOnDeletion(existingRole *postgresql.Ro err = postgresql.DropRole(r.PGPools.Default, existingRole.Name) if err != nil { - r.logging.Error(err, "failed to delete role") - return + return fmt.Errorf("failed to delete role: %s", err) } r.logging.Info("Role has been deleted") diff --git a/internal/controller/postgresschema_controller.go b/internal/controller/postgresschema_controller.go index 9839fd7..3e9335c 100644 --- a/internal/controller/postgresschema_controller.go +++ b/internal/controller/postgresschema_controller.go @@ -22,11 +22,15 @@ import ( "slices" "time" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/go-logr/logr" managedpostgresoperatorhoppscalecomv1alpha1 "github.com/hoppscale/managed-postgres-operator/api/v1alpha1" @@ -42,6 +46,8 @@ type PostgresSchemaReconciler struct { Scheme *runtime.Scheme logging logr.Logger + RequeueInterval time.Duration + PGPools *postgresql.PGPools OperatorInstanceName string } @@ -52,29 +58,26 @@ type PostgresSchemaReconciler struct { func (r *PostgresSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { r.logging = log.FromContext(ctx) - ctrlSuccessResult := ctrl.Result{RequeueAfter: time.Minute} - ctrlFailResult := ctrl.Result{} - resource := &managedpostgresoperatorhoppscalecomv1alpha1.PostgresSchema{} if err := r.Client.Get(ctx, req.NamespacedName, resource); err != nil { - return ctrlFailResult, client.IgnoreNotFound(err) + return r.Result(client.IgnoreNotFound(err)) } // Skip reconcile if the resource is not managed by this operator if !utils.IsManagedByOperatorInstance(resource.ObjectMeta.Annotations, r.OperatorInstanceName) { - return ctrlSuccessResult, nil + return r.Result(nil) } err := postgresql.EnsurePGPoolExists(r.PGPools, resource.Spec.Database) if err != nil { r.logging.Error(err, "failed to open pg pool") - return ctrlFailResult, err + return r.Result(err) } existingSchema, err := postgresql.GetSchema(r.PGPools.Databases[resource.Spec.Database], resource.Spec.Name) if err != nil { - return ctrlFailResult, fmt.Errorf("failed to retrieve schema: %s", err) + return r.Result(fmt.Errorf("failed to retrieve schema: %s", err)) } if existingSchema != nil { @@ -87,36 +90,37 @@ func (r *PostgresSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Reque Owner: resource.Spec.Owner, } - // - // Deletion logic - // - if resource.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(resource, PostgresSchemaFinalizer) { controllerutil.AddFinalizer(resource, PostgresSchemaFinalizer) if err := r.Update(ctx, resource); err != nil { - return ctrlFailResult, err + return r.Result(err) } } } else { + + // + // Deletion logic + // + // If there is no finalizer, delete the resource immediately if !controllerutil.ContainsFinalizer(resource, PostgresSchemaFinalizer) { - return ctrlSuccessResult, nil + return r.Result(nil) } err = r.reconcileOnDeletion(existingSchema, resource.Spec.KeepOnDelete) if err != nil { - return ctrlFailResult, err + return r.Result(err) } // Remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(resource, PostgresSchemaFinalizer) if err := r.Update(ctx, resource); err != nil { - return ctrlFailResult, err + return r.Result(err) } // Stop reconciliation as the item is being deleted - return ctrlSuccessResult, nil + return r.Result(nil) } // @@ -125,13 +129,13 @@ func (r *PostgresSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Reque err = r.reconcileOnCreation(existingSchema, &desiredSchema) if err != nil { - return ctrlFailResult, err + return r.Result(err) } if !resource.Status.Succeeded { resource.Status.Succeeded = true if err = r.Client.Status().Update(context.Background(), resource); err != nil { - return ctrlFailResult, fmt.Errorf("failed to update object: %s", err) + return r.Result(fmt.Errorf("failed to update object: %s", err)) } } @@ -143,11 +147,11 @@ func (r *PostgresSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.convertPrivilegesSpecToList(rolePrivileges), ) if err != nil { - return ctrlFailResult, err + return r.Result(err) } } - return ctrlSuccessResult, nil + return r.Result(nil) } @@ -156,9 +160,23 @@ func (r *PostgresSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&managedpostgresoperatorhoppscalecomv1alpha1.PostgresSchema{}). Named("postgresschema"). + WithOptions(controller.Options{ + RateLimiter: workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Second, r.RequeueInterval), + &workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + }). Complete(r) } +// Result builds reconciler result depending on error +func (r *PostgresSchemaReconciler) Result(err error) (ctrl.Result, error) { + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{RequeueAfter: r.RequeueInterval}, nil +} + // reconcileOnDeletion performs all actions related to deleting the resource func (r *PostgresSchemaReconciler) reconcileOnDeletion(schema *postgresql.Schema, keepOnDelete bool) (err error) { if schema == nil { From 00bf373d7da5e170ea141e1191f73c904049bfa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Depriester?= Date: Sun, 30 Nov 2025 16:17:34 +0100 Subject: [PATCH 2/2] feat(helm): allow setting custom requeue interval for reconciliation --- .../charts/managed-postgres-operator/templates/deployment.yaml | 3 +++ deploy/charts/managed-postgres-operator/values.yaml | 1 + 2 files changed, 4 insertions(+) diff --git a/deploy/charts/managed-postgres-operator/templates/deployment.yaml b/deploy/charts/managed-postgres-operator/templates/deployment.yaml index c4e7632..94c9170 100644 --- a/deploy/charts/managed-postgres-operator/templates/deployment.yaml +++ b/deploy/charts/managed-postgres-operator/templates/deployment.yaml @@ -49,6 +49,9 @@ spec: {{- if .Values.operatorInstanceName }} - --operator-instance-name={{ .Values.operatorInstanceName }} {{- end }} + {{- if .Values.reconciliationRequeueInterval }} + - --reconciliation-requeue-interval={{ .Values.reconciliationRequeueInterval }} + {{- end }} {{- with .Values.extraEnv }} env: {{- toYaml . | nindent 12 }} diff --git a/deploy/charts/managed-postgres-operator/values.yaml b/deploy/charts/managed-postgres-operator/values.yaml index 238da16..1be00a1 100644 --- a/deploy/charts/managed-postgres-operator/values.yaml +++ b/deploy/charts/managed-postgres-operator/values.yaml @@ -20,6 +20,7 @@ serviceAccount: name: "" operatorInstanceName: "" +reconciliationRequeueInterval: "" extraEnv: [] envFrom: []