Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"os"
"path/filepath"
"time"

"go.uber.org/zap/zapcore"
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down Expand Up @@ -50,6 +51,7 @@ func main() {
var enableHTTP2 bool
var tlsOpts []func(*tls.Config)
var operatorInstanceName string
var reconciliationRequeueInterval time.Duration

flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
Expand All @@ -66,6 +68,8 @@ func main() {
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.StringVar(&operatorInstanceName, "operator-instance-name", "", "The name of this operator instance.")
flag.DurationVar(&reconciliationRequeueInterval, "reconciliation-requeue-interval", 5*time.Minute,
"Default interval between resource reconciliation")

opts := zap.Options{
Development: true,
Expand Down Expand Up @@ -180,6 +184,7 @@ func main() {
if err = (&controller.PostgresDatabaseReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RequeueInterval: reconciliationRequeueInterval,
PGPools: pgpools,
OperatorInstanceName: operatorInstanceName,
}).SetupWithManager(mgr); err != nil {
Expand All @@ -190,6 +195,7 @@ func main() {
if err = (&controller.PostgresRoleReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RequeueInterval: reconciliationRequeueInterval,
PGPools: pgpools,
OperatorInstanceName: operatorInstanceName,
CacheRolePasswords: cacheRolePasswords,
Expand All @@ -200,6 +206,7 @@ func main() {
if err = (&controller.PostgresSchemaReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RequeueInterval: reconciliationRequeueInterval,
PGPools: pgpools,
OperatorInstanceName: operatorInstanceName,
}).SetupWithManager(mgr); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ spec:
{{- if .Values.operatorInstanceName }}
- --operator-instance-name={{ .Values.operatorInstanceName }}
{{- end }}
{{- if .Values.reconciliationRequeueInterval }}
- --reconciliation-requeue-interval={{ .Values.reconciliationRequeueInterval }}
{{- end }}
{{- with .Values.extraEnv }}
env:
{{- toYaml . | nindent 12 }}
Expand Down
1 change: 1 addition & 0 deletions deploy/charts/managed-postgres-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serviceAccount:
name: ""

operatorInstanceName: ""
reconciliationRequeueInterval: ""

extraEnv: []
envFrom: []
Expand Down
58 changes: 38 additions & 20 deletions internal/controller/postgresdatabase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ import (
"time"

"github.com/go-logr/logr"
"golang.org/x/time/rate"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

managedpostgresoperatorhoppscalecomv1alpha1 "github.com/hoppscale/managed-postgres-operator/api/v1alpha1"
"github.com/hoppscale/managed-postgres-operator/internal/postgresql"
Expand All @@ -27,6 +31,8 @@ type PostgresDatabaseReconciler struct {
Scheme *runtime.Scheme
logging logr.Logger

RequeueInterval time.Duration

PGPools *postgresql.PGPools
OperatorInstanceName string
}
Expand All @@ -37,23 +43,20 @@ type PostgresDatabaseReconciler struct {
func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.logging = log.FromContext(ctx)

ctrlSuccessResult := ctrl.Result{RequeueAfter: time.Minute}
ctrlFailResult := ctrl.Result{}

resource := &managedpostgresoperatorhoppscalecomv1alpha1.PostgresDatabase{}

if err := r.Client.Get(ctx, req.NamespacedName, resource); err != nil {
return ctrlFailResult, client.IgnoreNotFound(err)
return r.Result(client.IgnoreNotFound(err))
}

// Skip reconcile if the resource is not managed by this operator
if !utils.IsManagedByOperatorInstance(resource.ObjectMeta.Annotations, r.OperatorInstanceName) {
return ctrlSuccessResult, nil
return r.Result(nil)
}

existingDatabase, err := postgresql.GetDatabase(r.PGPools.Default, resource.Spec.Name)
if err != nil {
return ctrlFailResult, fmt.Errorf("failed to retrieve database: %s", err)
return r.Result(fmt.Errorf("failed to retrieve database: %s", err))
}

desiredDatabase := postgresql.Database{
Expand All @@ -62,36 +65,37 @@ func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Req
Extensions: resource.Spec.Extensions,
}

//
// Deletion logic
//

if resource.ObjectMeta.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(resource, PostgresDatabaseFinalizer) {
controllerutil.AddFinalizer(resource, PostgresDatabaseFinalizer)
if err := r.Update(ctx, resource); err != nil {
return ctrlFailResult, err
return r.Result(err)
}
}
} else {

//
// Deletion logic
//

// If there is no finalizer, delete the resource immediately
if !controllerutil.ContainsFinalizer(resource, PostgresDatabaseFinalizer) {
return ctrlSuccessResult, nil
return r.Result(nil)
}

err = r.reconcileOnDeletion(resource, existingDatabase)
if err != nil {
return ctrlFailResult, err
return r.Result(err)
}

// Remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(resource, PostgresDatabaseFinalizer)
if err := r.Update(ctx, resource); err != nil {
return ctrlFailResult, err
return r.Result(err)
}

// Stop reconciliation as the item is being deleted
return ctrlSuccessResult, nil
return r.Result(nil)
}

//
Expand All @@ -100,12 +104,12 @@ func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Req

err = r.reconcileOnCreation(existingDatabase, &desiredDatabase)
if err != nil {
return ctrlFailResult, err
return r.Result(err)
}

err = r.reconcileExtensions(&desiredDatabase)
if err != nil {
return ctrlFailResult, err
return r.Result(err)
}

for roleName, rolePrivileges := range resource.Spec.PrivilegesByRole {
Expand All @@ -115,28 +119,42 @@ func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Req
r.convertPrivilegesSpecToList(rolePrivileges),
)
if err != nil {
return ctrlFailResult, err
return r.Result(err)
}
}

if !resource.Status.Succeeded {
resource.Status.Succeeded = true
if err = r.Client.Status().Update(context.Background(), resource); err != nil {
return ctrlFailResult, fmt.Errorf("failed to update object: %s", err)
return r.Result(fmt.Errorf("failed to update object: %s", err))
}
}

return ctrlSuccessResult, nil
return r.Result(nil)
}

// SetupWithManager sets up the controller with the Manager.
func (r *PostgresDatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&managedpostgresoperatorhoppscalecomv1alpha1.PostgresDatabase{}).
Named("postgresdatabase").
WithOptions(controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Second, r.RequeueInterval),
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
}).
Complete(r)
}

// Result builds reconciler result depending on error
func (r *PostgresDatabaseReconciler) Result(err error) (ctrl.Result, error) {
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: r.RequeueInterval}, nil
}

// reconcileOnDeletion performs all actions related to deleting the resource
func (r *PostgresDatabaseReconciler) reconcileOnDeletion(resource *managedpostgresoperatorhoppscalecomv1alpha1.PostgresDatabase, existingDatabase *postgresql.Database) (err error) {
if existingDatabase == nil {
Expand Down
Loading
Loading