Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions controllers/etcdmember_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,9 +898,66 @@ func restoreInitContainer(member *lll.EtcdMember, peerAddr, operatorImage string
}, vols
}

// dataLossRestartThreshold is how many times the etcd container must have
// restarted before we treat a non-bootstrap PVC member as unrecoverable and
// replace it. High enough to ride out transient join churn and slow restores;
// CrashLoopBackOff caps its backoff at 5m, so this many restarts means a
// member that has been unable to start for several minutes.
const dataLossRestartThreshold = 5

// etcdContainerStuck reports whether the pod's etcd container is persistently
// failing to start: not ready and restarted at least dataLossRestartThreshold
// times, excluding OOMKilled (a resource problem that re-creating the member
// would not fix). This is the signature of an unrecoverable member — most
// commonly a data dir lost while the cluster membership moved on, so the
// member's frozen --initial-cluster no longer matches the live cluster and
// etcd refuses to boot ("error validating peerURLs ... member count is
// unequal"). etcd ignores --initial-cluster for an initialised data dir, so a
// healthy member that merely restarts is unaffected.
func etcdContainerStuck(pod *corev1.Pod) bool {
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name != "etcd" {
continue
}
if cs.Ready || cs.RestartCount < dataLossRestartThreshold {
return false
}
if t := cs.LastTerminationState.Terminated; t != nil && t.Reason == "OOMKilled" {
return false
}
return true
}
return false
}
Comment on lines +917 to +931

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue 1: Missing check for Pod deletion state

If a Pod is currently being deleted (e.g., due to a manual restart, node drain, or eviction), its DeletionTimestamp is set, and its containers will eventually terminate. If we evaluate it as "stuck" during this graceful termination window, we might prematurely delete the EtcdMember CR and trigger an expensive member replacement instead of letting the Pod naturally restart or reschedule.

Issue 2: Missing check for current OOMKilled state

The current implementation only checks cs.LastTerminationState.Terminated for the "OOMKilled" reason. However, if the container has just been OOMKilled and is currently in the Terminated state (but has not yet transitioned to Waiting / CrashLoopBackOff), the "OOMKilled" reason will be in cs.State.Terminated instead. Checking both states ensures we don't falsely trigger self-healing for resource-limit issues.

func etcdContainerStuck(pod *corev1.Pod) bool {
	if pod.DeletionTimestamp != nil {
		return false
	}
	for _, cs := range pod.Status.ContainerStatuses {
		if cs.Name != "etcd" {
			continue
		}
		if cs.Ready || cs.RestartCount < dataLossRestartThreshold {
			return false
		}
		if t := cs.State.Terminated; t != nil && t.Reason == "OOMKilled" {
			return false
		}
		if t := cs.LastTerminationState.Terminated; t != nil && t.Reason == "OOMKilled" {
			return false
		}
		return true
	}
	return false
}


// clusterHasQuorumWithout reports whether the member's cluster still has quorum
// among its OTHER members — i.e. losing this one is a minority failure, so
// replacing it cannot break the cluster. ReadyMembers already excludes this
// (not-ready) member, so it is the live healthy count. Used to gate self-heal:
// we never delete a member during a cluster-wide outage (where many members
// crash at once), only an isolated stuck member backed by a healthy majority.
func (r *EtcdMemberReconciler) clusterHasQuorumWithout(ctx context.Context, member *lll.EtcdMember) bool {
cluster, err := r.clusterFor(ctx, member)
if err != nil {
return false
}
desired := 0
if cluster.Status.Observed != nil {
desired = int(cluster.Status.Observed.Replicas)
}
if desired == 0 && cluster.Spec.Replicas != nil {
desired = int(*cluster.Spec.Replicas)
}
if desired == 0 {
return false
}
return int(cluster.Status.ReadyMembers) >= desired/2+1
}
Comment on lines +939 to +955

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Potential race condition with stale ReadyMembers count

cluster.Status.ReadyMembers is updated asynchronously by the cluster controller. If this member was previously healthy (and thus counted as ready in ReadyMembers), but has recently started crash-looping, cluster.Status.ReadyMembers might still include it if the cluster controller hasn't reconciled yet.

If another member is also down, ReadyMembers might incorrectly indicate that we still have quorum to perform a deletion, when in reality we do not. To prevent cascading outages during multi-member failures, we should decrement the ready count if this member's CR status still lists it as Ready.

func (r *EtcdMemberReconciler) clusterHasQuorumWithout(ctx context.Context, member *lll.EtcdMember) bool {
	cluster, err := r.clusterFor(ctx, member)
	if err != nil {
		return false
	}
	desired := 0
	if cluster.Status.Observed != nil {
		desired = int(cluster.Status.Observed.Replicas)
	}
	if desired == 0 && cluster.Spec.Replicas != nil {
		desired = int(*cluster.Spec.Replicas)
	}
	if desired == 0 {
		return false
	}
	readyMembers := int(cluster.Status.ReadyMembers)
	for _, cond := range member.Status.Conditions {
		if cond.Type == lll.MemberReady && cond.Status == metav1.ConditionTrue {
			readyMembers--
			break
		}
	}
	return readyMembers >= desired/2+1
}


// ── Status ───────────────────────────────────────────────────────────────

func (r *EtcdMemberReconciler) updateStatus(ctx context.Context, member *lll.EtcdMember) (ctrl.Result, error) {
log := log.FromContext(ctx)
pod := &corev1.Pod{}
if err := r.Get(ctx, types.NamespacedName{Namespace: member.Namespace, Name: member.Name}, pod); err != nil {
if errors.IsNotFound(err) {
Expand Down Expand Up @@ -950,6 +1007,26 @@ func (r *EtcdMemberReconciler) updateStatus(ctx context.Context, member *lll.Etc

switch {
case !podReady:
// Self-heal an unrecoverable member. A non-bootstrap PVC member whose
// etcd cannot start — classically because its data dir was lost while
// the cluster membership moved on, leaving its frozen --initial-cluster
// stale (etcd: "member count is unequal") — crash-loops forever on its
// own. Replace it: delete the CR so the finalizer does a clean
// MemberRemove and the cluster controller gap-fills a fresh member with
// a current --initial-cluster. Gate on the rest of the cluster having
// quorum so a cluster-wide outage never cascades into mass deletion
// (the finalizer's MemberRemove is quorum-gated too — belt and braces).
if !member.Spec.Bootstrap &&
member.Spec.Storage.Medium != lll.StorageMediumMemory &&
etcdContainerStuck(pod) &&
r.clusterHasQuorumWithout(ctx, member) {
log.Info("etcd member is persistently crash-looping while the rest of the cluster is healthy; deleting it for replacement",
"restartThreshold", dataLossRestartThreshold)
if err := r.Delete(ctx, member); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if setMemberCondition(member, lll.MemberReady, metav1.ConditionFalse, "PodNotReady",
fmt.Sprintf("pod phase: %s", pod.Status.Phase)) {
changed = true
Expand Down
143 changes: 143 additions & 0 deletions controllers/etcdmember_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

lll "github.com/cozystack/etcd-operator/api/v1alpha2"
)
Expand Down Expand Up @@ -691,6 +692,148 @@ func TestUpdateStatus_PopulatesMemberIDAndFlipsReady(t *testing.T) {
}
}

// TestEtcdContainerStuck pins the self-heal detection: an etcd container is
// "stuck" only when it is not ready, has restarted at least the threshold, and
// was not OOMKilled.
func TestEtcdContainerStuck(t *testing.T) {
mk := func(name string, ready bool, restarts int32, lastReason string) *corev1.Pod {
cs := corev1.ContainerStatus{Name: name, Ready: ready, RestartCount: restarts}
if lastReason != "" {
cs.LastTerminationState.Terminated = &corev1.ContainerStateTerminated{Reason: lastReason, ExitCode: 1}
}
return &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{cs}}}
}
cases := []struct {
name string
pod *corev1.Pod
want bool
}{
{"stuck: not ready, at threshold, Error exit", mk("etcd", false, dataLossRestartThreshold, "Error"), true},
{"stuck: no last-termination recorded yet", mk("etcd", false, dataLossRestartThreshold+1, ""), true},
{"ready", mk("etcd", true, dataLossRestartThreshold+4, "Error"), false},
{"below restart threshold", mk("etcd", false, dataLossRestartThreshold-1, "Error"), false},
{"OOMKilled is excluded", mk("etcd", false, dataLossRestartThreshold+4, "OOMKilled"), false},
{"no etcd container", mk("other", false, dataLossRestartThreshold+4, "Error"), false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := etcdContainerStuck(tc.pod); got != tc.want {
t.Fatalf("etcdContainerStuck = %v, want %v", got, tc.want)
}
})
}
}

// crashLoopPod builds a Pod whose etcd container is persistently crash-looping
// (not ready, restarted past the threshold with an Error exit) — the data-loss
// signature.
func crashLoopPod(name, ns string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: ns},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionFalse}},
ContainerStatuses: []corev1.ContainerStatus{{
Name: "etcd",
Ready: false,
RestartCount: dataLossRestartThreshold + 2,
LastTerminationState: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{Reason: "Error", ExitCode: 1},
},
}},
},
}
}

// clusterWithReady builds a 3-replica EtcdCluster and persists ready as its
// status.readyMembers (status is a subresource on the fake client).
func clusterWithReady(t *testing.T, c client.Client, name, ns string, ready int32) {
t.Helper()
got := mustGet(t, c, name, ns, &lll.EtcdCluster{})
got.Status.ReadyMembers = ready
if err := c.Status().Update(context.Background(), got); err != nil {
t.Fatalf("seed cluster status: %v", err)
}
}

// TestUpdateStatus_ReplacesStuckMember: a persistently crash-looping
// non-bootstrap PVC member is deleted for replacement when the rest of the
// cluster still has quorum.
func TestUpdateStatus_ReplacesStuckMember(t *testing.T) {
ctx := context.Background()
cluster := &lll.EtcdCluster{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "ns"},
Spec: lll.EtcdClusterSpec{Replicas: ptrInt32(3)},
}
member := &lll.EtcdMember{
ObjectMeta: metav1.ObjectMeta{Name: "test-1", Namespace: "ns", Labels: memberLabels("test", "test-1")},
Spec: lll.EtcdMemberSpec{ClusterName: "test", Version: "3.5.17", Storage: lll.StorageSpec{Size: quickQty(t, "1Gi")}, InitialCluster: "x", ClusterToken: "test"},
}
c, _ := newTestClient(t, cluster, member, crashLoopPod("test-1", "ns"))
clusterWithReady(t, c, "test", "ns", 2) // 2/3 ready → quorum without test-1

r := &EtcdMemberReconciler{Client: c, Scheme: testScheme(t)}
if _, err := r.updateStatus(ctx, member); err != nil {
t.Fatalf("updateStatus: %v", err)
}

err := c.Get(ctx, types.NamespacedName{Name: "test-1", Namespace: "ns"}, &lll.EtcdMember{})
if !apierrors.IsNotFound(err) {
t.Fatalf("expected member deleted for replacement; Get err = %v", err)
}
}

// TestUpdateStatus_KeepsStuckMemberWithoutQuorum: the same crash-looping member
// is NOT deleted when the rest of the cluster lacks quorum — self-heal must
// never cascade a cluster-wide outage into mass deletion.
func TestUpdateStatus_KeepsStuckMemberWithoutQuorum(t *testing.T) {
ctx := context.Background()
cluster := &lll.EtcdCluster{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "ns"},
Spec: lll.EtcdClusterSpec{Replicas: ptrInt32(3)},
}
member := &lll.EtcdMember{
ObjectMeta: metav1.ObjectMeta{Name: "test-1", Namespace: "ns", Labels: memberLabels("test", "test-1")},
Spec: lll.EtcdMemberSpec{ClusterName: "test", Version: "3.5.17", Storage: lll.StorageSpec{Size: quickQty(t, "1Gi")}, InitialCluster: "x", ClusterToken: "test"},
}
c, _ := newTestClient(t, cluster, member, crashLoopPod("test-1", "ns"))
clusterWithReady(t, c, "test", "ns", 1) // only 1/3 ready → no quorum

r := &EtcdMemberReconciler{Client: c, Scheme: testScheme(t)}
if _, err := r.updateStatus(ctx, member); err != nil {
t.Fatalf("updateStatus: %v", err)
}

if err := c.Get(ctx, types.NamespacedName{Name: "test-1", Namespace: "ns"}, &lll.EtcdMember{}); err != nil {
t.Fatalf("member must NOT be deleted without quorum; Get err = %v", err)
}
}

// TestUpdateStatus_KeepsStuckBootstrapMember: the bootstrap seed is never
// self-healed by deletion — there is nothing to replace it from yet.
func TestUpdateStatus_KeepsStuckBootstrapMember(t *testing.T) {
ctx := context.Background()
cluster := &lll.EtcdCluster{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "ns"},
Spec: lll.EtcdClusterSpec{Replicas: ptrInt32(3)},
}
member := &lll.EtcdMember{
ObjectMeta: metav1.ObjectMeta{Name: "test-0", Namespace: "ns", Labels: memberLabels("test", "test-0")},
Spec: lll.EtcdMemberSpec{ClusterName: "test", Bootstrap: true, Version: "3.5.17", Storage: lll.StorageSpec{Size: quickQty(t, "1Gi")}, InitialCluster: "x", ClusterToken: "test"},
}
c, _ := newTestClient(t, cluster, member, crashLoopPod("test-0", "ns"))
clusterWithReady(t, c, "test", "ns", 2)

r := &EtcdMemberReconciler{Client: c, Scheme: testScheme(t)}
if _, err := r.updateStatus(ctx, member); err != nil {
t.Fatalf("updateStatus: %v", err)
}

if err := c.Get(ctx, types.NamespacedName{Name: "test-0", Namespace: "ns"}, &lll.EtcdMember{}); err != nil {
t.Fatalf("bootstrap member must NOT be self-deleted; Get err = %v", err)
}
}

// TestRemoveMemberFromEtcd_LastMemberIsNoOp: if no other members exist (the
// cluster is being torn down or this is genuinely the last member), the
// finalizer can't reach a peer to call MemberRemove. Don't block — return
Expand Down
106 changes: 106 additions & 0 deletions test/e2e/kamaji_datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"net/url"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -134,6 +135,94 @@ func TestKamajiDataStore(t *testing.T) {
}
t.Logf("found %q among etcd keys", proofName)

// ── Member churn: replace EVERY original member one at a time and prove
// Kamaji keeps working through the new, GenerateName-hashed members.
// Native members are named via GenerateName ("<cluster>-<hash>"), so the
// DataStore can only address the cluster through the stable
// <cluster>-client Service (its sole endpoint) and the operator's server
// cert SAN is a wildcard (*.<cluster>.<ns>.svc). Member names therefore
// never reach Kamaji — this churns the entire member set out from under a
// live tenant control plane and guards that contract end to end.
//
// Gate on a fully-formed cluster first: Available latches on quorum, not on
// the full replica count, so without this wait we might delete a member
// while the third is still a freshly-promoted/learner member — collapsing
// the cluster into the fragile 2-node window mid-bootstrap.
waitFor(ctx, t, 5*time.Minute, "all 3 members Ready before churn", func(ctx context.Context) error {
ec := &etcdv1alpha2.EtcdCluster{}
if err := kube.Get(ctx, client.ObjectKey{Namespace: e2eNamespace, Name: clusterName}, ec); err != nil {
return err
}
if ec.Status.ReadyMembers != 3 {
return fmt.Errorf("readyMembers=%d, want 3", ec.Status.ReadyMembers)
}
return nil
})
original := memberNames(ctx, t)
if len(original) != 3 {
t.Fatalf("expected 3 members before churn, got %d: %v", len(original), original)
}
t.Logf("original members: %v", original)
for _, victim := range original {
t.Logf("deleting EtcdMember %q (operator does MemberRemove + a GenerateName replacement)", victim)
m := &etcdv1alpha2.EtcdMember{ObjectMeta: metav1.ObjectMeta{Namespace: e2eNamespace, Name: victim}}
if err := kube.Delete(ctx, m); err != nil && !apierrors.IsNotFound(err) {
t.Fatalf("delete member %s: %v", victim, err)
}
waitFor(ctx, t, 5*time.Minute, fmt.Sprintf("%q removed and the cluster back to 3 ready members", victim),
func(ctx context.Context) error {
err := kube.Get(ctx, client.ObjectKey{Namespace: e2eNamespace, Name: victim}, &etcdv1alpha2.EtcdMember{})
if err == nil {
return fmt.Errorf("victim %s still present (MemberRemove in flight)", victim)
}
if !apierrors.IsNotFound(err) {
return err
}
if names := memberNames(ctx, t); len(names) != 3 {
return fmt.Errorf("have %d members, want 3: %v", len(names), names)
}
Comment on lines +181 to +183

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

memberNames calls t.Fatalf inside waitFor callback, bypassing retry logic.

The memberNames helper (line 406) calls t.Fatalf on List errors. When invoked inside the waitFor callback, a transient API error will immediately fail the test instead of allowing waitFor to retry. Consider using a local error-returning variant or inline the list call with error return.

Proposed inline fix
-			if names := memberNames(ctx, t); len(names) != 3 {
-				return fmt.Errorf("have %d members, want 3: %v", len(names), names)
-			}
+			list := &etcdv1alpha2.EtcdMemberList{}
+			if err := kube.List(ctx, list, client.InNamespace(e2eNamespace),
+				client.MatchingLabels{"etcd-operator.cozystack.io/cluster": clusterName}); err != nil {
+				return err
+			}
+			if len(list.Items) != 3 {
+				return fmt.Errorf("have %d members, want 3", len(list.Items))
+			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if names := memberNames(ctx, t); len(names) != 3 {
return fmt.Errorf("have %d members, want 3: %v", len(names), names)
}
list := &etcdv1alpha2.EtcdMemberList{}
if err := kube.List(ctx, list, client.InNamespace(e2eNamespace),
client.MatchingLabels{"etcd-operator.cozystack.io/cluster": clusterName}); err != nil {
return err
}
if len(list.Items) != 3 {
return fmt.Errorf("have %d members, want 3", len(list.Items))
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@test/e2e/kamaji_datastore_test.go` around lines 181 - 183, The memberNames
helper function calls t.Fatalf internally when encountering List errors, which
causes the test to fail immediately when invoked inside the waitFor callback,
bypassing the retry logic. To fix this, create a variant of memberNames that
returns an error value instead of calling t.Fatalf, or inline the list call
directly within the waitFor callback body to handle errors gracefully and allow
waitFor to retry on transient API failures.

ec := &etcdv1alpha2.EtcdCluster{}
if err := kube.Get(ctx, client.ObjectKey{Namespace: e2eNamespace, Name: clusterName}, ec); err != nil {
return err
}
if ec.Status.ReadyMembers != 3 {
return fmt.Errorf("readyMembers=%d, want 3", ec.Status.ReadyMembers)
}
return nil
})
}

// The cluster is now a wholly fresh member set — no original name remains.
final := memberNames(ctx, t)
for _, o := range original {
for _, f := range final {
if o == f {
t.Fatalf("original member %q still present after full churn: %v", o, final)
}
}
}
t.Logf("fully churned member set: %v -> %v", original, final)

// Kamaji must still work through the new members: the original proof key
// survived all three replacements, and a fresh write still lands in etcd —
// all without any change to the DataStore (it still points at the same
// <cluster>-client Service).
if keys := etcdKeys(ctx, t); !strings.Contains(keys, proofName) {
t.Fatalf("proof key %q lost after member churn", proofName)
}
const churnProof = "e2e-proof-postchurn"
cm2 := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: churnProof, Namespace: "default"},
Data: map[string]string{"written-by": "etcd-operator-e2e-postchurn"},
}
if _, err := tenantSet.CoreV1().ConfigMaps("default").Create(ctx, cm2, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
t.Fatalf("create post-churn ConfigMap via tenant API: %v", err)
}
if keys := etcdKeys(ctx, t); !strings.Contains(keys, churnProof) {
t.Fatalf("post-churn write %q did not reach etcd through the fresh members", churnProof)
}
t.Log("Kamaji tenant API still roundtrips through the fully churned (hash-named) member set")

// Teardown — reverse order, waiting where deletion is asynchronous.
deleteAndWait(ctx, t, "kamaji.clastix.io/v1alpha1", "TenantControlPlane", e2eNamespace, tcpName, 5*time.Minute)
deleteAndWait(ctx, t, "kamaji.clastix.io/v1alpha1", "DataStore", "", "kamaji-e2e", 2*time.Minute)
Expand Down Expand Up @@ -307,6 +396,23 @@ func etcdKeys(ctx context.Context, t *testing.T) string {
return stdout
}

// memberNames returns the sorted names of the cluster's EtcdMembers, selected
// by the cluster label (member pods/CRs carry GenerateName-hashed names).
func memberNames(ctx context.Context, t *testing.T) []string {
t.Helper()
list := &etcdv1alpha2.EtcdMemberList{}
if err := kube.List(ctx, list, client.InNamespace(e2eNamespace),
client.MatchingLabels{"etcd-operator.cozystack.io/cluster": clusterName}); err != nil {
t.Fatalf("list etcd members: %v", err)
}
names := make([]string, 0, len(list.Items))
for i := range list.Items {
names = append(names, list.Items[i].Name)
}
sort.Strings(names)
return names
}

func podExec(ctx context.Context, namespace, pod, container string, command []string) (string, string, error) {
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").Namespace(namespace).Name(pod).SubResource("exec").
Expand Down
Loading