diff --git a/cmd/servers/ateapi/controlapi/syncer.go b/cmd/servers/ateapi/controlapi/syncer.go index 1635cc0..369e933 100644 --- a/cmd/servers/ateapi/controlapi/syncer.go +++ b/cmd/servers/ateapi/controlapi/syncer.go @@ -18,10 +18,12 @@ import ( "context" "errors" "log/slog" + "time" "github.com/agent-substrate/substrate/cmd/servers/ateapi/store" "github.com/agent-substrate/substrate/proto/ateapipb" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" ) @@ -88,6 +90,22 @@ func (s *WorkerPoolSyncer) Start(ctx context.Context) { s.syncWorkerToStore(ctx, pod) } }() + + // Start background periodic sweep to reconcile/heal the idle workers set. + go wait.UntilWithContext(ctx, func(ctx context.Context) { + workers, err := s.persistence.ListWorkers(ctx) + if err != nil { + slog.ErrorContext(ctx, "Syncer: failed to list workers for idle reconciliation", slog.Any("err", err)) + return + } + for _, w := range workers { + if w.GetActorId() == "" { + if err := s.persistence.EnsureWorkerIdle(ctx, w.GetWorkerNamespace(), w.GetWorkerPool(), w.GetWorkerPod()); err != nil { + slog.ErrorContext(ctx, "Syncer: failed to ensure worker is idle during reconciliation", slog.String("worker", w.GetWorkerPod()), slog.Any("err", err)) + } + } + } + }, 1*time.Minute) } func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Pod) { @@ -123,6 +141,12 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po return } + if w.GetActorId() == "" { + if err := s.persistence.EnsureWorkerIdle(ctx, w.GetWorkerNamespace(), w.GetWorkerPool(), w.GetWorkerPod()); err != nil { + slog.ErrorContext(ctx, "Syncer: failed to ensure worker is idle", slog.String("worker", w.GetWorkerPod()), slog.Any("err", err)) + } + } + if w.Ip != pod.Status.PodIP { // TODO: I don't think this is possible, but handling this case so we can log it just in case we can reproduce it. slog.InfoContext(ctx, "Syncer: updating worker in store (IP changed)", slog.String("worker", pod.Namespace+"/"+pod.Name)) diff --git a/cmd/servers/ateapi/controlapi/workflow_resume.go b/cmd/servers/ateapi/controlapi/workflow_resume.go index 744422a..eabcb0e 100644 --- a/cmd/servers/ateapi/controlapi/workflow_resume.go +++ b/cmd/servers/ateapi/controlapi/workflow_resume.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "log/slog" - "math/rand" "time" atev1alpha1 "github.com/agent-substrate/substrate/api/v1alpha1" @@ -85,38 +84,34 @@ func (s *AssignWorkerStep) IsComplete(ctx context.Context, input *ResumeInput, s return state.Actor.GetStatus() == ateapipb.Actor_STATUS_RUNNING, nil } func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error { - workers, err := s.store.ListWorkers(ctx) - if err != nil { - return fmt.Errorf("while listing workers: %w", err) - } - var assignedWorker *ateapipb.Worker - // Check if we already have a worker assigned from a previous failed attempt - for _, worker := range workers { - if worker.GetActorId() == input.ActorID && worker.GetWorkerPool() == state.ActorTemplate.Spec.WorkerPoolRef.Name && worker.GetWorkerNamespace() == state.ActorTemplate.Spec.WorkerPoolRef.Namespace { + // Re-use previously assigned worker if available (e.g., on retry of a failed resume). + if state.Actor.GetAteomPodName() != "" { + worker, err := s.store.GetWorker(ctx, state.Actor.GetAteomPodNamespace(), state.ActorTemplate.Spec.WorkerPoolRef.Name, state.Actor.GetAteomPodName()) + if err == nil && worker.GetActorId() == input.ActorID { assignedWorker = worker - break } } - // If not, find a free one using randomized shuffling + // Claim a new idle worker. if assignedWorker == nil { - pickedWorker := s.findFreeWorker(workers, state.ActorTemplate.Spec.WorkerPoolRef.Namespace, state.ActorTemplate.Spec.WorkerPoolRef.Name) - if pickedWorker == nil { - return status.Errorf(codes.FailedPrecondition, "no free workers available") + pickedWorker, err := s.store.ClaimIdleWorker( + ctx, + state.ActorTemplate.Spec.WorkerPoolRef.Namespace, + state.ActorTemplate.Spec.WorkerPoolRef.Name, + input.ActorID, + state.Actor.GetActorTemplateNamespace(), + state.Actor.GetActorTemplateName(), + ) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + return status.Errorf(codes.FailedPrecondition, "no free workers available") + } + return fmt.Errorf("while claiming idle worker: %w", err) } - assignedWorker = pickedWorker - slog.InfoContext(ctx, "Picked worker", slog.Any("worker", pickedWorker.String())) - } - - assignedWorker.ActorId = input.ActorID - assignedWorker.ActorNamespace = state.Actor.GetActorTemplateNamespace() - assignedWorker.ActorTemplate = state.Actor.GetActorTemplateName() - - if err := s.store.UpdateWorker(ctx, assignedWorker, assignedWorker.Version); err != nil { - return err + slog.InfoContext(ctx, "Claimed idle worker", slog.Any("worker", pickedWorker.String())) } state.Actor.Status = ateapipb.Actor_STATUS_RESUMING @@ -139,23 +134,6 @@ func (s *AssignWorkerStep) RetryBackoff() *wait.Backoff { } } -func (s *AssignWorkerStep) findFreeWorker(workers []*ateapipb.Worker, workerPoolNamespace, workerPoolName string) *ateapipb.Worker { - var freeWorkers []*ateapipb.Worker - for _, worker := range workers { - if worker.GetActorId() == "" && worker.GetWorkerPool() == workerPoolName && worker.GetWorkerNamespace() == workerPoolNamespace { - freeWorkers = append(freeWorkers, worker) - } - } - - if len(freeWorkers) > 0 { - rand.Shuffle(len(freeWorkers), func(i, j int) { - freeWorkers[i], freeWorkers[j] = freeWorkers[j], freeWorkers[i] - }) - return freeWorkers[0] - } - return nil -} - type CallAteletRestoreStep struct { dialer *AteletDialer } diff --git a/cmd/servers/ateapi/store/ateredis/ateredis.go b/cmd/servers/ateapi/store/ateredis/ateredis.go index ebde322..19ff15d 100644 --- a/cmd/servers/ateapi/store/ateredis/ateredis.go +++ b/cmd/servers/ateapi/store/ateredis/ateredis.go @@ -164,6 +164,13 @@ func (s *Persistence) CreateWorker(ctx context.Context, worker *ateapipb.Worker) return store.ErrAlreadyExists } + // Add to the idle set. + setKey := fmt.Sprintf("pool:%s:%s:idle_workers", worker.GetWorkerNamespace(), worker.GetWorkerPool()) + err = s.rdb.SAdd(ctx, setKey, worker.GetWorkerPod()).Err() + if err != nil { + return fmt.Errorf("while registering worker in idle set: %w", err) + } + return nil } @@ -192,6 +199,7 @@ func (s *Persistence) GetWorker(ctx context.Context, namespace, pool, pod string func (s *Persistence) UpdateWorker(ctx context.Context, worker *ateapipb.Worker, expectedVersion int64) error { dbKey := workerDBKey(worker.GetWorkerNamespace(), worker.GetWorkerPool(), worker.GetWorkerPod()) + var shouldAddToIdle bool // Clone because we will update the version field, and we don't want to // stomp the caller's copy. @@ -228,6 +236,10 @@ func (s *Persistence) UpdateWorker(ctx context.Context, worker *ateapipb.Worker, return fmt.Errorf("ip is immutable") } + if currentWorker.GetActorId() != "" && dbWorker.GetActorId() == "" { + shouldAddToIdle = true + } + newVal, err := protojson.Marshal(dbWorker) if err != nil { return fmt.Errorf("in protojson.Marshal: %w", err) @@ -246,15 +258,32 @@ func (s *Persistence) UpdateWorker(ctx context.Context, worker *ateapipb.Worker, return fmt.Errorf("while executing update worker transaction: %w", err) } + // Run SAdd sequentially outside the transaction to avoid cluster slot restrictions. + if shouldAddToIdle { + setKey := fmt.Sprintf("pool:%s:%s:idle_workers", worker.GetWorkerNamespace(), worker.GetWorkerPool()) + err = s.rdb.SAdd(ctx, setKey, worker.GetWorkerPod()).Err() + if err != nil { + return fmt.Errorf("while returning worker to idle set: %w", err) + } + } + return nil } func (s *Persistence) DeleteWorker(ctx context.Context, namespace, pool, pod string) error { dbKey := workerDBKey(namespace, pool, pod) + setKey := fmt.Sprintf("pool:%s:%s:idle_workers", namespace, pool) + err := s.rdb.Del(ctx, dbKey).Err() if err != nil { return fmt.Errorf("while deleting worker key %q: %w", dbKey, err) } + + err = s.rdb.SRem(ctx, setKey, pod).Err() + if err != nil { + return fmt.Errorf("while removing worker from idle set: %w", err) + } + return nil } @@ -452,3 +481,69 @@ func (s *Persistence) ReleaseLock(ctx context.Context, key string, value string) } return nil } + +func (s *Persistence) ClaimIdleWorker(ctx context.Context, namespace, pool string, actorID string, actorNamespace string, actorTemplate string) (*ateapipb.Worker, error) { + setKey := fmt.Sprintf("pool:%s:%s:idle_workers", namespace, pool) + + for { + // Pop a random idle worker name. + podName, err := s.rdb.SPop(ctx, setKey).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil, store.ErrNotFound + } + return nil, fmt.Errorf("while popping idle worker from set: %w", err) + } + + worker, err := s.GetWorker(ctx, namespace, pool, podName) + if err != nil { + // If the worker was deleted, skip and pop the next one. + if errors.Is(err, store.ErrNotFound) { + continue + } + _ = s.rdb.SAdd(ctx, setKey, podName).Err() + return nil, fmt.Errorf("while loading popped worker metadata: %w", err) + } + + if worker.GetActorId() != "" { + // Skip busy workers. + continue + } + + worker.ActorId = actorID + worker.ActorNamespace = actorNamespace + worker.ActorTemplate = actorTemplate + + err = s.UpdateWorker(ctx, worker, worker.Version) + if err != nil { + if errors.Is(err, store.ErrPersistenceRetry) { + // Return to the idle set and retry on locking conflict. + _ = s.rdb.SAdd(ctx, setKey, podName).Err() + continue + } + _ = s.rdb.SAdd(ctx, setKey, podName).Err() + return nil, fmt.Errorf("while claiming popped worker: %w", err) + } + + return worker, nil + } +} + +func (s *Persistence) EnsureWorkerIdle(ctx context.Context, namespace, pool, pod string) error { + worker, err := s.GetWorker(ctx, namespace, pool, pod) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + return nil + } + return fmt.Errorf("while getting worker for idle check: %w", err) + } + + if worker.GetActorId() == "" { + setKey := fmt.Sprintf("pool:%s:%s:idle_workers", namespace, pool) + err = s.rdb.SAdd(ctx, setKey, pod).Err() + if err != nil { + return fmt.Errorf("while ensuring worker in idle set: %w", err) + } + } + return nil +} diff --git a/cmd/servers/ateapi/store/ateredis/ateredis_test.go b/cmd/servers/ateapi/store/ateredis/ateredis_test.go index acc0fda..6dadd3d 100644 --- a/cmd/servers/ateapi/store/ateredis/ateredis_test.go +++ b/cmd/servers/ateapi/store/ateredis/ateredis_test.go @@ -721,3 +721,50 @@ func TestAcquireLock_NonReentry(t *testing.T) { t.Errorf("expected second lock acquisition to fail (non-reentrant)") } } + +func TestEnsureWorkerIdle(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + worker := &ateapipb.Worker{ + WorkerNamespace: "default", + WorkerPool: "pool-1", + WorkerPod: "pod-1", + } + + err := s.CreateWorker(ctx, worker) + if err != nil { + t.Fatalf("CreateWorker failed: %v", err) + } + + // Manually remove from idle set to simulate leak/crash before SAdd. + setKey := "pool:default:pool-1:idle_workers" + _, err = s.rdb.SRem(ctx, setKey, "pod-1").Result() + if err != nil { + t.Fatalf("SRem failed: %v", err) + } + + // Verify it's gone from set + isMember, err := s.rdb.SIsMember(ctx, setKey, "pod-1").Result() + if err != nil { + t.Fatalf("SIsMember failed: %v", err) + } + if isMember { + t.Errorf("expected worker to be removed from idle set") + } + + // Run EnsureWorkerIdle + err = s.EnsureWorkerIdle(ctx, "default", "pool-1", "pod-1") + if err != nil { + t.Fatalf("EnsureWorkerIdle failed: %v", err) + } + + // Verify it's back in the set + isMember, err = s.rdb.SIsMember(ctx, setKey, "pod-1").Result() + if err != nil { + t.Fatalf("SIsMember failed: %v", err) + } + if !isMember { + t.Errorf("expected worker to be restored to idle set") + } +} diff --git a/cmd/servers/ateapi/store/store.go b/cmd/servers/ateapi/store/store.go index ab76aa8..2ebcd51 100644 --- a/cmd/servers/ateapi/store/store.go +++ b/cmd/servers/ateapi/store/store.go @@ -69,6 +69,14 @@ type Interface interface { // Lists all known workers. Returns nil if none found. ListWorkers(ctx context.Context) ([]*ateapipb.Worker, error) + // ClaimIdleWorker atomically claims a random idle worker from the specified pool + // and associates it with the given Actor. Returns ErrNotFound if no idle workers are available. + ClaimIdleWorker(ctx context.Context, namespace, pool string, actorID string, actorNamespace string, actorTemplate string) (*ateapipb.Worker, error) + + // EnsureWorkerIdle adds the worker to the idle set if it is currently idle in the database. + // This is used by background processes for self-healing and crash resilience. + EnsureWorkerIdle(ctx context.Context, namespace, pool, pod string) error + // AcquireLock attempts to acquire a distributed lock with a TTL. // Returns true if the lock was successfully acquired. // Returns false if the lock is already held by another client (conflict).