Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions cmd/ateapi/internal/controlapi/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (s *WorkerPoolSyncer) Start(ctx context.Context) {
return
}
slog.InfoContext(ctx, "Syncer: removing worker from store", slog.String("worker", pod.Namespace+"/"+pod.Name))
if err := s.releaseActorOnDeadWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name); err != nil {
slog.ErrorContext(ctx, "Failed to release actor bound to deleted worker", slog.Any("err", err))
}
err := s.persistence.DeleteWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name)
if err != nil {
slog.ErrorContext(ctx, "Failed to delete worker from store during delete event", slog.Any("err", err))
Expand Down Expand Up @@ -97,6 +100,9 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po

if pod.DeletionTimestamp != nil {
slog.InfoContext(ctx, "Syncer: removing worker from store (pod deleting)", slog.String("worker", pod.Namespace+"/"+pod.Name))
if err := s.releaseActorOnDeadWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name); err != nil {
slog.ErrorContext(ctx, "Failed to release actor bound to soft-deleting worker", slog.Any("err", err))
}
err := s.persistence.DeleteWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name)
if err != nil {
slog.ErrorContext(ctx, "Failed to delete worker from store during update event (deleting)", slog.Any("err", err))
Expand Down Expand Up @@ -136,3 +142,41 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po
func isWorkerEligible(pod *corev1.Pod) bool {
return pod.Status.PodIP != ""
}

// releaseActorOnDeadWorker resets the actor bound to a vanishing worker pod
// back to STATUS_SUSPENDED so the next request reassigns it. Optimistic
// version on UpdateActor protects against a concurrent SuspendActor /
// ResumeActor; on contention we drop this attempt and rely on the next
// informer event (resync, late delete) to retry.
func (s *WorkerPoolSyncer) releaseActorOnDeadWorker(ctx context.Context, namespace, pool, podName string) error {
worker, err := s.persistence.GetWorker(ctx, namespace, pool, podName)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil
}
return err
}
if worker.GetActorId() == "" {
return nil
}
actor, err := s.persistence.GetActor(ctx, worker.GetActorId())
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil
}
return err
}
// Skip if a concurrent SuspendActor already cleared the pointer.
if actor.GetAteomPodNamespace() != namespace || actor.GetAteomPodName() != podName {
return nil
}
actor.Status = ateapipb.Actor_STATUS_SUSPENDED
actor.AteomPodNamespace = ""
actor.AteomPodName = ""
actor.AteomPodIp = ""
actor.InProgressSnapshot = ""
if err := s.persistence.UpdateActor(ctx, actor, actor.GetVersion()); err != nil && !errors.Is(err, store.ErrPersistenceRetry) {
return err
}
return nil
}
60 changes: 60 additions & 0 deletions cmd/ateapi/internal/controlapi/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/agent-substrate/substrate/cmd/ateapi/internal/store"
"github.com/agent-substrate/substrate/cmd/ateapi/internal/store/storetest"
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -154,3 +155,62 @@ func TestSyncer_Lifecycle(t *testing.T) {
t.Fatalf("Worker still found in Redis after deletion: %v", err)
}
}

func TestSyncer_DeleteBoundWorker_ClearsActor(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

persistence, fakeK8s, cleanup := setupSyncerTest(t, ctx)
defer cleanup()

ns, pool, pod, ip := "ns-orphan", "pool1", "worker-orphan", "10.0.0.1"
if _, err := fakeK8s.CoreV1().Pods(ns).Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod, Namespace: ns,
Labels: map[string]string{workerPodLabel: pool}},
Status: corev1.PodStatus{Phase: corev1.PodRunning, PodIP: ip,
PodIPs: []corev1.PodIP{{IP: ip}}},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("create pod: %v", err)
}
if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) {
_, gerr := persistence.GetWorker(c, ns, pool, pod)
return gerr == nil, nil
}); err != nil {
t.Fatalf("worker row not materialised: %v", err)
}
actorID := "actor-orphan"
if err := persistence.CreateActor(ctx, &ateapipb.Actor{
ActorId: actorID, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl",
Status: ateapipb.Actor_STATUS_RUNNING,
AteomPodNamespace: ns, AteomPodName: pod, AteomPodIp: ip,
LastSnapshot: "gs://snapshots/last", InProgressSnapshot: "gs://snapshots/partial",
}); err != nil {
t.Fatalf("create actor: %v", err)
}
w, _ := persistence.GetWorker(ctx, ns, pool, pod)
w.ActorId, w.ActorNamespace, w.ActorTemplate = actorID, ns, "tmpl"
if err := persistence.UpdateWorker(ctx, w, w.Version); err != nil {
t.Fatalf("update worker: %v", err)
}

if err := fakeK8s.CoreV1().Pods(ns).Delete(ctx, pod, metav1.DeleteOptions{}); err != nil {
t.Fatalf("delete pod: %v", err)
}
var got *ateapipb.Actor
if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) {
a, gerr := persistence.GetActor(c, actorID)
if gerr != nil {
return false, gerr
}
got = a
return a.GetStatus() == ateapipb.Actor_STATUS_SUSPENDED, nil
}); err != nil {
t.Fatalf("actor not reset to SUSPENDED: %v", err)
}
if got.AteomPodName != "" || got.AteomPodNamespace != "" || got.AteomPodIp != "" || got.InProgressSnapshot != "" {
t.Errorf("bind fields not cleared: %+v", got)
}
if got.LastSnapshot == "" {
t.Errorf("LastSnapshot must be preserved")
}
}
Loading