diff --git a/cmd/ateapi/internal/controlapi/syncer.go b/cmd/ateapi/internal/controlapi/syncer.go index d557808..33b016d 100644 --- a/cmd/ateapi/internal/controlapi/syncer.go +++ b/cmd/ateapi/internal/controlapi/syncer.go @@ -68,6 +68,9 @@ func (s *WorkerPoolSyncer) Start(ctx context.Context) { return } slog.InfoContext(ctx, "Syncer: removing worker from store", slog.String("worker", pod.Namespace+"/"+pod.Name)) + if err := s.releaseActorOnDeadWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name); err != nil { + slog.ErrorContext(ctx, "Failed to release actor bound to deleted worker", slog.Any("err", err)) + } err := s.persistence.DeleteWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name) if err != nil { slog.ErrorContext(ctx, "Failed to delete worker from store during delete event", slog.Any("err", err)) @@ -97,6 +100,9 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po if pod.DeletionTimestamp != nil { slog.InfoContext(ctx, "Syncer: removing worker from store (pod deleting)", slog.String("worker", pod.Namespace+"/"+pod.Name)) + if err := s.releaseActorOnDeadWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name); err != nil { + slog.ErrorContext(ctx, "Failed to release actor bound to soft-deleting worker", slog.Any("err", err)) + } err := s.persistence.DeleteWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name) if err != nil { slog.ErrorContext(ctx, "Failed to delete worker from store during update event (deleting)", slog.Any("err", err)) @@ -136,3 +142,41 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po func isWorkerEligible(pod *corev1.Pod) bool { return pod.Status.PodIP != "" } + +// releaseActorOnDeadWorker resets the actor bound to a vanishing worker pod +// back to STATUS_SUSPENDED so the next request reassigns it. Optimistic +// version on UpdateActor protects against a concurrent SuspendActor / +// ResumeActor; on contention we drop this attempt and rely on the next +// informer event (resync, late delete) to retry. +func (s *WorkerPoolSyncer) releaseActorOnDeadWorker(ctx context.Context, namespace, pool, podName string) error { + worker, err := s.persistence.GetWorker(ctx, namespace, pool, podName) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + return nil + } + return err + } + if worker.GetActorId() == "" { + return nil + } + actor, err := s.persistence.GetActor(ctx, worker.GetActorId()) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + return nil + } + return err + } + // Skip if a concurrent SuspendActor already cleared the pointer. + if actor.GetAteomPodNamespace() != namespace || actor.GetAteomPodName() != podName { + return nil + } + actor.Status = ateapipb.Actor_STATUS_SUSPENDED + actor.AteomPodNamespace = "" + actor.AteomPodName = "" + actor.AteomPodIp = "" + actor.InProgressSnapshot = "" + if err := s.persistence.UpdateActor(ctx, actor, actor.GetVersion()); err != nil && !errors.Is(err, store.ErrPersistenceRetry) { + return err + } + return nil +} diff --git a/cmd/ateapi/internal/controlapi/syncer_test.go b/cmd/ateapi/internal/controlapi/syncer_test.go index d514779..6276c53 100644 --- a/cmd/ateapi/internal/controlapi/syncer_test.go +++ b/cmd/ateapi/internal/controlapi/syncer_test.go @@ -23,6 +23,7 @@ import ( "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" "github.com/agent-substrate/substrate/cmd/ateapi/internal/store/storetest" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -154,3 +155,62 @@ func TestSyncer_Lifecycle(t *testing.T) { t.Fatalf("Worker still found in Redis after deletion: %v", err) } } + +func TestSyncer_DeleteBoundWorker_ClearsActor(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + persistence, fakeK8s, cleanup := setupSyncerTest(t, ctx) + defer cleanup() + + ns, pool, pod, ip := "ns-orphan", "pool1", "worker-orphan", "10.0.0.1" + if _, err := fakeK8s.CoreV1().Pods(ns).Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: pod, Namespace: ns, + Labels: map[string]string{workerPodLabel: pool}}, + Status: corev1.PodStatus{Phase: corev1.PodRunning, PodIP: ip, + PodIPs: []corev1.PodIP{{IP: ip}}}, + }, metav1.CreateOptions{}); err != nil { + t.Fatalf("create pod: %v", err) + } + if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) { + _, gerr := persistence.GetWorker(c, ns, pool, pod) + return gerr == nil, nil + }); err != nil { + t.Fatalf("worker row not materialised: %v", err) + } + actorID := "actor-orphan" + if err := persistence.CreateActor(ctx, &ateapipb.Actor{ + ActorId: actorID, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl", + Status: ateapipb.Actor_STATUS_RUNNING, + AteomPodNamespace: ns, AteomPodName: pod, AteomPodIp: ip, + LastSnapshot: "gs://snapshots/last", InProgressSnapshot: "gs://snapshots/partial", + }); err != nil { + t.Fatalf("create actor: %v", err) + } + w, _ := persistence.GetWorker(ctx, ns, pool, pod) + w.ActorId, w.ActorNamespace, w.ActorTemplate = actorID, ns, "tmpl" + if err := persistence.UpdateWorker(ctx, w, w.Version); err != nil { + t.Fatalf("update worker: %v", err) + } + + if err := fakeK8s.CoreV1().Pods(ns).Delete(ctx, pod, metav1.DeleteOptions{}); err != nil { + t.Fatalf("delete pod: %v", err) + } + var got *ateapipb.Actor + if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) { + a, gerr := persistence.GetActor(c, actorID) + if gerr != nil { + return false, gerr + } + got = a + return a.GetStatus() == ateapipb.Actor_STATUS_SUSPENDED, nil + }); err != nil { + t.Fatalf("actor not reset to SUSPENDED: %v", err) + } + if got.AteomPodName != "" || got.AteomPodNamespace != "" || got.AteomPodIp != "" || got.InProgressSnapshot != "" { + t.Errorf("bind fields not cleared: %+v", got) + } + if got.LastSnapshot == "" { + t.Errorf("LastSnapshot must be preserved") + } +}