From a5b408eac198935fa901713768a1b1723e9be13e Mon Sep 17 00:00:00 2001 From: Miguel Nobre Date: Mon, 18 May 2026 00:52:17 -0400 Subject: [PATCH 1/2] manager: Add unique workers batches and task report handling --- apps/task-poster/src/fetcher.ts | 30 +++++-- apps/worker-app/app/composables/useTasks.ts | 55 ++++-------- core/protobufs/src/effect.ts | 20 +++++ core/protobufs/src/task/task.proto | 2 + core/protocol/src/consts.ts | 4 +- modules/manager/src/consts.ts | 3 +- modules/manager/src/main.ts | 24 +++-- .../manager/src/modules/createTaskManager.ts | 79 ++++++++++++++--- .../src/modules/createWorkerManager.ts | 87 ++++++++++++++----- .../manager/src/stores/managerTaskStore.ts | 79 +++++++++++++++++ 10 files changed, 298 insertions(+), 85 deletions(-) diff --git a/apps/task-poster/src/fetcher.ts b/apps/task-poster/src/fetcher.ts index af9d8dda..4cabfcc3 100644 --- a/apps/task-poster/src/fetcher.ts +++ b/apps/task-poster/src/fetcher.ts @@ -80,6 +80,9 @@ export type Fetcher = { // posting schedule: whitelist of days/time windows when tasks can be posted schedule?: Schedule; + + // unique workers: each worker can only complete one task from this fetcher + uniqueWorkers?: boolean; }; const api = axios.create({ @@ -186,8 +189,10 @@ export const fetcherForm = async ( - + ${["csv", "pipeline", "constant"].map(a => + ``).join("")} @@ -239,6 +244,14 @@ export const fetcherForm = async ( +
+ + +
+
@@ -490,6 +503,7 @@ export const createFetcher = async ( status: oldFetcher?.status ?? "active", hidden: fields.hidden === "on" || fields.hidden === true, + uniqueWorkers: fields.uniqueWorkers === "on" || fields.uniqueWorkers === true, schedule: (() => { try { @@ -586,7 +600,7 @@ export const getTasks = async (fetcher: Fetcher, csv: string) => { } // regex filter - if (!task.result || (regex && regex.test(task.result)) || task.result == "") { + if (!task.result || (regex && regex.test(task.result)) || task.type === "report" || task.result == "") { continue; } @@ -614,6 +628,8 @@ export const getTasks = async (fetcher: Fetcher, csv: string) => { // avoid expensive mistakes. const maxPrice = BigInt(fetcher.price) * BigInt(10); + const batchRunId = ulid(); + const tasks = data.map( (d, _idx) => { const rawPrice = d["dataffect/reward"] ? @@ -628,6 +644,8 @@ export const getTasks = async (fetcher: Fetcher, csv: string) => { templateId: fetcher.template, templateData: JSON.stringify(d), capability: fetcher.capabilities[0], + batchId: `${fetcher.datasetId}-${fetcher.index}-${batchRunId}`, + uniqueWorkers: fetcher.uniqueWorkers ?? false, } as Task; }, ); @@ -886,7 +904,7 @@ export const processResults = async (f: Fetcher, batchSize: number) => { let importCount = 0; for (const d of data as any) { - if (d.type !== "submission") + if (d.type !== "submission" && d.type !== "report") continue; db.beginTransaction(); await db.delete([...keyBase, "active", d.taskId]); @@ -972,6 +990,7 @@ const formValidations: ValidationMap = { }, template: (v: string) => (!v || v.length === 0) && "Template is required", + uniqueWorkers: (_v: any) => false, timeLimitSeconds: (v: any) => { const num = Number(v); return ( @@ -1075,10 +1094,11 @@ export const addFetcherRoutes = (app: Express): void => {
  • Finished: ${doneSize}
  • Failed: ${failedSize}
  • Batch / Freq: ${f.batchSize} / ${f.frequency}
  • + ${f.uniqueWorkers ? '
  • Unique workers: enabled
  • ' : ''}
  • Time Limit: ${f.timeLimitSeconds}s
  • Schedule: ${f.schedule && Object.keys(f.schedule).length > 0 ? Object.entries(f.schedule).map(([day, slots]) => - `${day}: ${(slots as TimeSlot[]).map(s => s.from && s.to ? `${s.from}-${s.to}` : 'all day').join(', ')}` + `${day}: ${Array.isArray(slots) ? (slots as TimeSlot[]).map(s => s.from && s.to ? `${s.from}-${s.to}` : 'all day').join(', ') : 'all day'}` ).join('; ') : 'continuous (no restrictions)'} diff --git a/apps/worker-app/app/composables/useTasks.ts b/apps/worker-app/app/composables/useTasks.ts index bb259c60..13c35a5a 100644 --- a/apps/worker-app/app/composables/useTasks.ts +++ b/apps/worker-app/app/composables/useTasks.ts @@ -37,47 +37,28 @@ export const useTasks = () => { const useGetActiveTasks = (index: Ref) => { const queryClient = useQueryClient(); - - const handleTaskEvent = (event: CustomEvent) => { - queryClient.setQueryData(["tasks", index], (old) => - old ? [...old, event.detail] : [event.detail], - ); - }; + const { account } = useAuth(); const invalidateTasks = () => { - queryClient.invalidateQueries({ queryKey: ["tasks", index] }); + queryClient.invalidateQueries({ queryKey: ["tasks", index, account] }); }; - onMounted(() => { - instance.value?.events.addEventListener("task:created", handleTaskEvent); - instance.value?.events.addEventListener("task:rejected", invalidateTasks); - instance.value?.events.addEventListener( - "task:completed", - invalidateTasks, - ); - instance.value?.events.addEventListener( - "task:expired", - invalidateTasks, - ); - }); - - onUnmounted(() => { - instance.value?.events.removeEventListener( - "task:created", - handleTaskEvent, - ); - instance.value?.events.removeEventListener( - "task:rejected", - invalidateTasks, - ); - instance.value?.events.removeEventListener( - "task:completed", - invalidateTasks, - ); - instance.value?.events.removeEventListener( - "task:expired", - invalidateTasks, - ); + // Register listeners as soon as instance is available, not just on mount. + watchEffect((onCleanup) => { + const events = instance.value?.events; + if (!events) return; + + events.addEventListener("task:created", invalidateTasks); + events.addEventListener("task:rejected", invalidateTasks); + events.addEventListener("task:completed", invalidateTasks); + events.addEventListener("task:expired", invalidateTasks); + + onCleanup(() => { + events.removeEventListener("task:created", invalidateTasks); + events.removeEventListener("task:rejected", invalidateTasks); + events.removeEventListener("task:completed", invalidateTasks); + events.removeEventListener("task:expired", invalidateTasks); + }); }); return useGetTasks(index); diff --git a/core/protobufs/src/effect.ts b/core/protobufs/src/effect.ts index c949f3f5..244f38f6 100644 --- a/core/protobufs/src/effect.ts +++ b/core/protobufs/src/effect.ts @@ -2766,6 +2766,8 @@ export interface Task { templateId: string templateData: string capability?: string + batchId?: string + uniqueWorkers?: boolean } export namespace Task { @@ -2813,6 +2815,16 @@ export namespace Task { w.string(obj.capability) } + if (obj.batchId != null) { + w.uint32(66) + w.string(obj.batchId) + } + + if (obj.uniqueWorkers != null) { + w.uint32(72) + w.bool(obj.uniqueWorkers) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -2860,6 +2872,14 @@ export namespace Task { obj.capability = reader.string() break } + case 8: { + obj.batchId = reader.string() + break + } + case 9: { + obj.uniqueWorkers = reader.bool() + break + } default: { reader.skipType(tag & 7) break diff --git a/core/protobufs/src/task/task.proto b/core/protobufs/src/task/task.proto index 46605b09..923c9074 100644 --- a/core/protobufs/src/task/task.proto +++ b/core/protobufs/src/task/task.proto @@ -16,6 +16,8 @@ message Task { string template_id = 5; string template_data = 6; optional string capability = 7; + optional string batch_id = 8; + optional bool unique_workers = 9; } message TaskAssignment { diff --git a/core/protocol/src/consts.ts b/core/protocol/src/consts.ts index 3c2cabc1..d02a8cb0 100644 --- a/core/protocol/src/consts.ts +++ b/core/protocol/src/consts.ts @@ -1,4 +1,4 @@ -export const TASK_ACCEPTANCE_TIME = 1000 * 60 * 10; // 10 minutes - +// 10 minutes, in milliseconds. Same duration as modules/manager/src/consts.ts +export const TASK_ACCEPTANCE_TIME = 1000 * 60 * 10; export const PROTOCOL_NAME = "effectai"; export const PROTOCOL_VERSION = "0.0.2"; diff --git a/modules/manager/src/consts.ts b/modules/manager/src/consts.ts index df0ec53e..dad28837 100644 --- a/modules/manager/src/consts.ts +++ b/modules/manager/src/consts.ts @@ -1,4 +1,5 @@ -export const TASK_ACCEPTANCE_TIME = 600; +// 10 minutes, in seconds. Same duration as core/protocol/src/consts.ts +export const TASK_ACCEPTANCE_TIME = 60 * 10; export const ACTIVE_TASK_TRESHOLD = 50; export const PAYMENT_BATCH_SIZE = 60; export const PAYMENT_VERSION = 1; diff --git a/modules/manager/src/main.ts b/modules/manager/src/main.ts index a5531f4f..2c09481c 100644 --- a/modules/manager/src/main.ts +++ b/modules/manager/src/main.ts @@ -387,12 +387,24 @@ export const createManager = async ({ taskId, index: "completed", }) - .then( - (a) => - a.events - .filter((e: any) => e.type === "submission") - .map((e: any) => ({ ...e, taskId }))[0], - ) + .then((taskRecord) => { + const event = taskRecord.events.find( + (taskEvent: any) => + taskEvent.type === "submission" || taskEvent.type === "report", + ); + if (!event) return { taskId, error: "NOT FOUND" }; + + if (event.type === "report") { + return { + type: "report", + timestamp: event.timestamp, + taskId, + submissionByPeer: event.reportedByPeer, + result: event.result, + }; + } + return { ...event, taskId }; + }) .catch((_e) => ({ taskId, error: "NOT FOUND" })); }); diff --git a/modules/manager/src/modules/createTaskManager.ts b/modules/manager/src/modules/createTaskManager.ts index 2d7a906d..311a1ca0 100644 --- a/modules/manager/src/modules/createTaskManager.ts +++ b/modules/manager/src/modules/createTaskManager.ts @@ -7,6 +7,7 @@ import type { ManagerTaskStore, TaskAcceptedEvent, TaskAssignedEvent, + TaskRejectedEvent, TaskSubmissionEvent, } from "../stores/managerTaskStore.js"; @@ -56,6 +57,14 @@ export function createTaskManager({ const isExpired = (timestamp: number, value: number) => timestamp + value < Math.floor(Date.now() / 1000); + const isReportPayload = (result: string): boolean => { + try { + return JSON.parse(result)?.task === "report"; + } catch { + return false; + } + }; + const getTask = async ({ taskId, index = "active", @@ -137,12 +146,12 @@ export function createTaskManager({ reason, }); - workerManager.markTaskReleased(workerPeerIdStr, taskId); - const taskRecord = await taskStore.getTask({ entityId: taskId, }); + workerManager.markTaskReleased(workerPeerIdStr, taskId, taskRecord?.state.batchId); + await workerManager.incrementStateValue(workerPeerIdStr, "tasksRejected"); events.safeDispatchEvent("task:rejected", { @@ -159,13 +168,18 @@ export function createTaskManager({ result: string; workerPeerIdStr: string; }) => { + if (isReportPayload(result)) { + await processTaskReport({ taskId, result, workerPeerIdStr }); + return; + } + const taskRecord = await taskStore.complete({ entityId: taskId, result, peerIdStr: workerPeerIdStr, }); - workerManager.markTaskReleased(workerPeerIdStr, taskId); + workerManager.markTaskReleased(workerPeerIdStr, taskId, taskRecord.state.batchId); await workerManager.incrementStateValue(workerPeerIdStr, "tasksCompleted"); @@ -174,6 +188,30 @@ export function createTaskManager({ }); }; + const processTaskReport = async ({ + taskId, + result, + workerPeerIdStr, + }: { + taskId: string; + result: string; + workerPeerIdStr: string; + }) => { + const taskRecord = await taskStore.report({ + entityId: taskId, + result, + peerIdStr: workerPeerIdStr, + }); + + workerManager.markTaskReleased(workerPeerIdStr, taskId, taskRecord.state.batchId); + + await workerManager.incrementStateValue(workerPeerIdStr, "tasksRejected"); + + events.safeDispatchEvent("task:rejected", { + detail: taskRecord, + }); + }; + const handleCreateEvent = async (taskRecord: ManagerTaskRecord) => { await assignTask({ entityId: taskRecord.state.id }); }; @@ -182,13 +220,7 @@ export function createTaskManager({ taskRecord: ManagerTaskRecord, lastEvent: TaskAssignedEvent, ) => { - const { timeLimitSeconds } = taskRecord.state; - const acceptanceWindow = - typeof timeLimitSeconds === "number" && timeLimitSeconds > 0 - ? timeLimitSeconds - : TASK_ACCEPTANCE_TIME; - - if (isExpired(lastEvent.timestamp, acceptanceWindow)) { + if (isExpired(lastEvent.timestamp, TASK_ACCEPTANCE_TIME)) { await rejectAndReassignTask(taskRecord); } }; @@ -222,6 +254,7 @@ export function createTaskManager({ workerManager.markTaskReleased( latestAssignEvent.assignedToPeer, taskRecord.state.id, + taskRecord.state.batchId, ); await workerManager.incrementStateValue( @@ -311,6 +344,8 @@ export function createTaskManager({ case "submission": await handleSubmissionEvent(taskRecord, lastEvent); break; + case "report": + break; case "payout": // do nothing.. break; @@ -333,13 +368,31 @@ export function createTaskManager({ throw new Error("Task is already assigned."); } - const originalWorkerId = taskRecord?.state.templateData - ? JSON.parse(taskRecord.state.templateData)?.submissionByPeer - : undefined; + // In a unique-workers batch, the most recent rejector of this task must + // be excluded from the re-assignment - otherwise Task 1 bounces straight + // back to them and consumes the batch slot they need for Task 2. + let lastRejector: string | undefined; + if (taskRecord.state.uniqueWorkers) { + for (let index = taskRecord.events.length - 1; index >= 0; index--) { + const event = taskRecord.events[index]; + if (event.type === "reject") { + lastRejector = (event as TaskRejectedEvent).rejectedByPeer; + break; + } + } + } + + const originalWorkerId = + lastRejector ?? + (taskRecord?.state.templateData + ? JSON.parse(taskRecord.state.templateData)?.submissionByPeer + : undefined); const worker = await workerManager.selectWorker( taskRecord.state.capability || undefined, originalWorkerId || undefined, + taskRecord.state.batchId || undefined, + taskRecord.state.uniqueWorkers || false, ); if (!worker) { diff --git a/modules/manager/src/modules/createWorkerManager.ts b/modules/manager/src/modules/createWorkerManager.ts index 1456ae1a..2b547449 100644 --- a/modules/manager/src/modules/createWorkerManager.ts +++ b/modules/manager/src/modules/createWorkerManager.ts @@ -27,6 +27,31 @@ export const createWorkerManager = ({ const workerStore = createWorkerStore({ datastore, }); + + const hasCompletedBatch = (batchId: string, workerId: string) => + datastore.has(new Key(`/tasks/batch/${batchId}/completed/${workerId}`)); + + const hasActiveBatchAssignment = (batchId: string, workerId: string) => + datastore.has(new Key(`/tasks/batch/${batchId}/assigned/${workerId}`)); + + // In-memory reservation map - prevents same-session races between concurrent assignTask calls. + // The datastore-based checks below serve as fallback after a manager restart. + const activeBatchWorkers = new Map>(); + + const tryReserveBatchWorker = (batchId: string, workerId: string): boolean => { + const existing = activeBatchWorkers.get(batchId); + if (existing?.has(workerId)) return false; + if (existing) existing.add(workerId); + else activeBatchWorkers.set(batchId, new Set([workerId])); + return true; + }; + + const releaseBatchWorker = (batchId: string, workerId: string): void => { + const set = activeBatchWorkers.get(batchId); + if (!set) return; + set.delete(workerId); + if (set.size === 0) activeBatchWorkers.delete(batchId); + }; const workerAssignments = new Map>(); const assignmentsFor = (workerId: string) => { @@ -42,17 +67,14 @@ export const createWorkerManager = ({ const markTaskAssigned = (workerId: string, taskId: string) => { assignmentsFor(workerId).add(taskId); }; - const markTaskReleased = (workerId: string, taskId: string) => { + const markTaskReleased = (workerId: string, taskId: string, batchId?: string) => { const assignments = workerAssignments.get(workerId); - if (!assignments) { - return; + if (assignments) { + assignments.delete(taskId); + if (assignments.size === 0) workerAssignments.delete(workerId); } - assignments.delete(taskId); - - if (assignments.size === 0) { - workerAssignments.delete(workerId); - } + if (batchId) releaseBatchWorker(batchId, workerId); }; const accessCodeStore = createAccessCodeStore({ datastore }); @@ -195,36 +217,59 @@ export const createWorkerManager = ({ const selectWorker = async ( capability?: string, - originalWorkerId?: string + originalWorkerId?: string, + batchId?: string, + uniqueWorkers?: boolean, ): Promise => { const queue = workerQueue.getQueue(); - //TODO:: optimize this.. - for (const workerId of queue) { + const isEligible = async (workerId: string): Promise => { const worker = await getWorker(workerId); - if (!worker || originalWorkerId === workerId) continue; + if (!worker || originalWorkerId === workerId) return false; const workerCapabilities = worker.state.capabilities.concat(worker.state.managerCapabilities || []); if (capability) { - const info = getCapabilityInfo(capability); - + const capabilityInfo = getCapabilityInfo(capability); const hasCapability = workerCapabilities.includes(capability); const hasAntiCapability = - info?.antiCapability && workerCapabilities.includes(info.antiCapability); + capabilityInfo?.antiCapability && + workerCapabilities.includes(capabilityInfo.antiCapability); + if (!hasCapability || hasAntiCapability) return false; + } + + if (await isBusy(worker)) return false; - if (!hasCapability || hasAntiCapability) continue; + // Datastore fallback - covers tasks assigned before a manager restart, + // since the in-memory reservation is lost on restart. + if (uniqueWorkers && batchId) { + if (await hasActiveBatchAssignment(batchId, workerId)) return false; + if (await hasCompletedBatch(batchId, workerId)) return false; } - const busy = await isBusy(worker); - if (!busy) { - workerQueue.dequeuePeer(workerId); - return workerId; + return true; + }; + + //TODO:: optimize this.. + for (const workerId of queue) { + // Synchronous reservation must happen before any await so two concurrent + // assignTask calls can't both claim the same worker for the same batch. + let batchReserved = false; + if (uniqueWorkers && batchId) { + if (!tryReserveBatchWorker(batchId, workerId)) continue; + batchReserved = true; } + + if (!(await isEligible(workerId))) { + if (batchReserved && batchId) releaseBatchWorker(batchId, workerId); + continue; + } + + workerQueue.dequeuePeer(workerId); + return workerId; } - // No available worker found return null; }; diff --git a/modules/manager/src/stores/managerTaskStore.ts b/modules/manager/src/stores/managerTaskStore.ts index 4959d2ca..79a1c34b 100644 --- a/modules/manager/src/stores/managerTaskStore.ts +++ b/modules/manager/src/stores/managerTaskStore.ts @@ -18,6 +18,7 @@ export type ManagerTaskEvent = | TaskAssignedEvent | TaskSubmissionEvent | TaskRejectedEvent + | TaskReportedEvent | TaskAcceptedEvent | TaskPaymentEvent; @@ -42,6 +43,12 @@ export interface TaskRejectedEvent extends BaseTaskEvent { rejectedByPeer: string; } +export interface TaskReportedEvent extends BaseTaskEvent { + type: "report"; + result: string; + reportedByPeer: string; +} + export interface TaskSubmissionEvent extends BaseTaskEvent { type: "submission"; result: string; @@ -170,6 +177,17 @@ export const createManagerTaskStore = ({ Buffer.from(stringifyWithBigInt(taskRecord)), ); batch.delete(new Key(`/tasks/assign/${peerIdStr}/${entityId}`)); + + if (taskRecord.state.batchId && taskRecord.state.uniqueWorkers) { + batch.put( + new Key(`/tasks/batch/${taskRecord.state.batchId}/completed/${peerIdStr}`), + new Uint8Array(), + ); + batch.delete( + new Key(`/tasks/batch/${taskRecord.state.batchId}/assigned/${peerIdStr}`), + ); + } + await batch.commit(); return taskRecord; @@ -259,7 +277,60 @@ export const createManagerTaskStore = ({ new Key(`/tasks/assign/${latestAssignEvent?.assignedToPeer}/${entityId}`), ); + if (taskRecord.state.batchId && taskRecord.state.uniqueWorkers) { + batch.delete( + new Key(`/tasks/batch/${taskRecord.state.batchId}/assigned/${latestAssignEvent.assignedToPeer}`), + ); + } + + await batch.commit(); + }; + + const report = async ({ + entityId, + result, + peerIdStr, + }: { + entityId: string; + result: string; + peerIdStr: string; + }): Promise => { + const taskRecord = await getTask({ entityId }); + + if (!taskRecord) { + throw new TaskValidationError("Task not found"); + } + + const lastEvent = taskRecord.events[taskRecord.events.length - 1]; + if (lastEvent.type !== "accept" || lastEvent.acceptedByPeer !== peerIdStr) { + throw new TaskValidationError("Task was not accepted by this worker"); + } + + taskRecord.events.push({ + timestamp: Math.floor(Date.now() / 1000), + type: "report", + result, + reportedByPeer: peerIdStr, + }); + + const batch = datastore.batch(); + + batch.put( + new Key(`/tasks/completed/${taskRecord.state.id}`), + Buffer.from(stringifyWithBigInt(taskRecord)), + ); + batch.delete(new Key(`/tasks/active/${taskRecord.state.id}`)); + batch.delete(new Key(`/tasks/assign/${peerIdStr}/${entityId}`)); + + if (taskRecord.state.batchId && taskRecord.state.uniqueWorkers) { + batch.delete( + new Key(`/tasks/batch/${taskRecord.state.batchId}/assigned/${peerIdStr}`), + ); + } + await batch.commit(); + + return taskRecord; }; const payout = async ({ @@ -339,6 +410,13 @@ export const createManagerTaskStore = ({ new Uint8Array(), ); + if (taskRecord.state.batchId && taskRecord.state.uniqueWorkers) { + batch.put( + new Key(`/tasks/batch/${taskRecord.state.batchId}/assigned/${workerPeerIdStr}`), + new Uint8Array(), + ); + } + await batch.commit(); }; @@ -348,6 +426,7 @@ export const createManagerTaskStore = ({ complete, accept, reject, + report, payout, assign, getTask, From b718eccb98fba7f55e9b4f0614f1613a75ad37a9 Mon Sep 17 00:00:00 2001 From: Miguel Nobre Date: Thu, 4 Jun 2026 11:10:59 -0400 Subject: [PATCH 2/2] changed unique worker flag from binary to numeric --- apps/task-poster/src/fetcher.ts | 32 ++++--- core/protobufs/src/effect.ts | 11 ++- core/protobufs/src/task/task.proto | 2 +- .../manager/src/modules/createTaskManager.ts | 6 +- .../src/modules/createWorkerManager.ts | 94 ++++++++++++------- .../manager/src/stores/managerTaskStore.ts | 28 ++++-- tools/cli/src/manager/tasks.ts | 6 ++ 7 files changed, 116 insertions(+), 63 deletions(-) diff --git a/apps/task-poster/src/fetcher.ts b/apps/task-poster/src/fetcher.ts index 4cabfcc3..6a4275c1 100644 --- a/apps/task-poster/src/fetcher.ts +++ b/apps/task-poster/src/fetcher.ts @@ -81,8 +81,9 @@ export type Fetcher = { // posting schedule: whitelist of days/time windows when tasks can be posted schedule?: Schedule; - // unique workers: each worker can only complete one task from this fetcher - uniqueWorkers?: boolean; + // repetitions: how many tasks a single worker may complete from each import + // 0 means no limit, 1 means each worker may do one task. + repetitions?: number; }; const api = axios.create({ @@ -190,7 +191,7 @@ export const fetcherForm = async ( @@ -245,10 +246,11 @@ export const fetcherForm = async (
    - - Repetitions per worker
    + How many tasks a single worker may complete per import batch. + 0 = no limit, 1 = each worker may do one task. Only applies to CSV data sources. +
    @@ -503,7 +505,7 @@ export const createFetcher = async ( status: oldFetcher?.status ?? "active", hidden: fields.hidden === "on" || fields.hidden === true, - uniqueWorkers: fields.uniqueWorkers === "on" || fields.uniqueWorkers === true, + repetitions: Number(fields.repetitions) || 0, schedule: (() => { try { @@ -645,7 +647,7 @@ export const getTasks = async (fetcher: Fetcher, csv: string) => { templateData: JSON.stringify(d), capability: fetcher.capabilities[0], batchId: `${fetcher.datasetId}-${fetcher.index}-${batchRunId}`, - uniqueWorkers: fetcher.uniqueWorkers ?? false, + repetitions: fetcher.repetitions ?? 0, } as Task; }, ); @@ -990,7 +992,15 @@ const formValidations: ValidationMap = { }, template: (v: string) => (!v || v.length === 0) && "Template is required", - uniqueWorkers: (_v: any) => false, + repetitions: (v: any) => { + if (v === undefined || v === "") return false; + const repetitions = Number(v); + return ( + (isNaN(repetitions) && "Repetitions must be a number") || + (!Number.isInteger(repetitions) && "Repetitions must be a whole number") || + (repetitions < 0 && "Repetitions cannot be negative") + ); + }, timeLimitSeconds: (v: any) => { const num = Number(v); return ( @@ -1094,7 +1104,7 @@ export const addFetcherRoutes = (app: Express): void => {
  • Finished: ${doneSize}
  • Failed: ${failedSize}
  • Batch / Freq: ${f.batchSize} / ${f.frequency}
  • - ${f.uniqueWorkers ? '
  • Unique workers: enabled
  • ' : ''} + ${f.repetitions ? `
  • Repetitions per worker: ${f.repetitions}
  • ` : ''}
  • Time Limit: ${f.timeLimitSeconds}s
  • Schedule: ${f.schedule && Object.keys(f.schedule).length > 0 ? Object.entries(f.schedule).map(([day, slots]) => diff --git a/core/protobufs/src/effect.ts b/core/protobufs/src/effect.ts index 244f38f6..35acc945 100644 --- a/core/protobufs/src/effect.ts +++ b/core/protobufs/src/effect.ts @@ -2767,7 +2767,7 @@ export interface Task { templateData: string capability?: string batchId?: string - uniqueWorkers?: boolean + repetitions: number } export namespace Task { @@ -2820,9 +2820,9 @@ export namespace Task { w.string(obj.batchId) } - if (obj.uniqueWorkers != null) { + if ((obj.repetitions != null && obj.repetitions !== 0)) { w.uint32(72) - w.bool(obj.uniqueWorkers) + w.uint32(obj.repetitions) } if (opts.lengthDelimited !== false) { @@ -2835,7 +2835,8 @@ export namespace Task { reward: 0n, timeLimitSeconds: 0, templateId: '', - templateData: '' + templateData: '', + repetitions: 0 } const end = length == null ? reader.len : reader.pos + length @@ -2877,7 +2878,7 @@ export namespace Task { break } case 9: { - obj.uniqueWorkers = reader.bool() + obj.repetitions = reader.uint32() break } default: { diff --git a/core/protobufs/src/task/task.proto b/core/protobufs/src/task/task.proto index 923c9074..49cb94a6 100644 --- a/core/protobufs/src/task/task.proto +++ b/core/protobufs/src/task/task.proto @@ -17,7 +17,7 @@ message Task { string template_data = 6; optional string capability = 7; optional string batch_id = 8; - optional bool unique_workers = 9; + uint32 repetitions = 9; } message TaskAssignment { diff --git a/modules/manager/src/modules/createTaskManager.ts b/modules/manager/src/modules/createTaskManager.ts index 311a1ca0..0e8f40db 100644 --- a/modules/manager/src/modules/createTaskManager.ts +++ b/modules/manager/src/modules/createTaskManager.ts @@ -368,11 +368,11 @@ export function createTaskManager({ throw new Error("Task is already assigned."); } - // In a unique-workers batch, the most recent rejector of this task must + // In a repetition-limited batch, the most recent rejector of this task must // be excluded from the re-assignment - otherwise Task 1 bounces straight // back to them and consumes the batch slot they need for Task 2. let lastRejector: string | undefined; - if (taskRecord.state.uniqueWorkers) { + if (taskRecord.state.repetitions > 0) { for (let index = taskRecord.events.length - 1; index >= 0; index--) { const event = taskRecord.events[index]; if (event.type === "reject") { @@ -392,7 +392,7 @@ export function createTaskManager({ taskRecord.state.capability || undefined, originalWorkerId || undefined, taskRecord.state.batchId || undefined, - taskRecord.state.uniqueWorkers || false, + taskRecord.state.repetitions, ); if (!worker) { diff --git a/modules/manager/src/modules/createWorkerManager.ts b/modules/manager/src/modules/createWorkerManager.ts index 2b547449..c44c58e4 100644 --- a/modules/manager/src/modules/createWorkerManager.ts +++ b/modules/manager/src/modules/createWorkerManager.ts @@ -28,29 +28,47 @@ export const createWorkerManager = ({ datastore, }); - const hasCompletedBatch = (batchId: string, workerId: string) => - datastore.has(new Key(`/tasks/batch/${batchId}/completed/${workerId}`)); - - const hasActiveBatchAssignment = (batchId: string, workerId: string) => - datastore.has(new Key(`/tasks/batch/${batchId}/assigned/${workerId}`)); - - // In-memory reservation map - prevents same-session races between concurrent assignTask calls. - // The datastore-based checks below serve as fallback after a manager restart. - const activeBatchWorkers = new Map>(); - - const tryReserveBatchWorker = (batchId: string, workerId: string): boolean => { - const existing = activeBatchWorkers.get(batchId); - if (existing?.has(workerId)) return false; - if (existing) existing.add(workerId); - else activeBatchWorkers.set(batchId, new Set([workerId])); - return true; + const countBatchKeys = async (prefix: string): Promise => { + let count = 0; + for await (const _key of datastore.queryKeys({ prefix })) { + count++; + } + return count; }; - const releaseBatchWorker = (batchId: string, workerId: string): void => { - const set = activeBatchWorkers.get(batchId); - if (!set) return; - set.delete(workerId); - if (set.size === 0) activeBatchWorkers.delete(batchId); + const countCompletedInBatch = (batchId: string, workerId: string) => + countBatchKeys(`/tasks/batch/${batchId}/completed/${workerId}/`); + + // How many tasks of this batch are currently assigned to the worker. + const countActiveAssignmentsInBatch = (batchId: string, workerId: string) => + countBatchKeys(`/tasks/batch/${batchId}/assigned/${workerId}/`); + + const reservedBatchSlots = new Map>(); + + const getReservedBatchSlots = (batchId: string, workerId: string): number => + reservedBatchSlots.get(batchId)?.get(workerId) ?? 0; + + const reserveBatchSlot = (batchId: string, workerId: string): void => { + let workerSlots = reservedBatchSlots.get(batchId); + if (!workerSlots) { + workerSlots = new Map(); + reservedBatchSlots.set(batchId, workerSlots); + } + workerSlots.set(workerId, (workerSlots.get(workerId) ?? 0) + 1); + }; + + const releaseBatchSlot = (batchId: string, workerId: string): void => { + const workerSlots = reservedBatchSlots.get(batchId); + if (!workerSlots) return; + + const currentSlots = workerSlots.get(workerId) ?? 0; + if (currentSlots <= 1) { + workerSlots.delete(workerId); + } else { + workerSlots.set(workerId, currentSlots - 1); + } + + if (workerSlots.size === 0) reservedBatchSlots.delete(batchId); }; const workerAssignments = new Map>(); @@ -74,7 +92,7 @@ export const createWorkerManager = ({ if (assignments.size === 0) workerAssignments.delete(workerId); } - if (batchId) releaseBatchWorker(batchId, workerId); + if (batchId) releaseBatchSlot(batchId, workerId); }; const accessCodeStore = createAccessCodeStore({ datastore }); @@ -219,10 +237,15 @@ export const createWorkerManager = ({ capability?: string, originalWorkerId?: string, batchId?: string, - uniqueWorkers?: boolean, + repetitions?: number, ): Promise => { const queue = workerQueue.getQueue(); + // 0 repetitions means no per-worker limit for this batch. + const maxTasksPerWorker = repetitions ?? 0; + const limitWorkersInBatch = + maxTasksPerWorker > 0 && batchId !== undefined; + const isEligible = async (workerId: string): Promise => { const worker = await getWorker(workerId); if (!worker || originalWorkerId === workerId) return false; @@ -241,11 +264,16 @@ export const createWorkerManager = ({ if (await isBusy(worker)) return false; - // Datastore fallback - covers tasks assigned before a manager restart, - // since the in-memory reservation is lost on restart. - if (uniqueWorkers && batchId) { - if (await hasActiveBatchAssignment(batchId, workerId)) return false; - if (await hasCompletedBatch(batchId, workerId)) return false; + if (limitWorkersInBatch && batchId) { + const completedCount = await countCompletedInBatch(batchId, workerId); + const assignedInDatastore = await countActiveAssignmentsInBatch( + batchId, + workerId, + ); + const reservedInMemory = getReservedBatchSlots(batchId, workerId); + const activeCount = Math.max(reservedInMemory, assignedInDatastore); + + if (completedCount + activeCount > maxTasksPerWorker) return false; } return true; @@ -254,15 +282,13 @@ export const createWorkerManager = ({ //TODO:: optimize this.. for (const workerId of queue) { // Synchronous reservation must happen before any await so two concurrent - // assignTask calls can't both claim the same worker for the same batch. - let batchReserved = false; - if (uniqueWorkers && batchId) { - if (!tryReserveBatchWorker(batchId, workerId)) continue; - batchReserved = true; + // assignTask calls can't both claim the same slot for the same batch. + if (limitWorkersInBatch && batchId) { + reserveBatchSlot(batchId, workerId); } if (!(await isEligible(workerId))) { - if (batchReserved && batchId) releaseBatchWorker(batchId, workerId); + if (limitWorkersInBatch && batchId) releaseBatchSlot(batchId, workerId); continue; } diff --git a/modules/manager/src/stores/managerTaskStore.ts b/modules/manager/src/stores/managerTaskStore.ts index 79a1c34b..a7ff615d 100644 --- a/modules/manager/src/stores/managerTaskStore.ts +++ b/modules/manager/src/stores/managerTaskStore.ts @@ -178,13 +178,17 @@ export const createManagerTaskStore = ({ ); batch.delete(new Key(`/tasks/assign/${peerIdStr}/${entityId}`)); - if (taskRecord.state.batchId && taskRecord.state.uniqueWorkers) { + if (taskRecord.state.batchId && taskRecord.state.repetitions > 0) { batch.put( - new Key(`/tasks/batch/${taskRecord.state.batchId}/completed/${peerIdStr}`), + new Key( + `/tasks/batch/${taskRecord.state.batchId}/completed/${peerIdStr}/${entityId}`, + ), new Uint8Array(), ); batch.delete( - new Key(`/tasks/batch/${taskRecord.state.batchId}/assigned/${peerIdStr}`), + new Key( + `/tasks/batch/${taskRecord.state.batchId}/assigned/${peerIdStr}/${entityId}`, + ), ); } @@ -277,9 +281,11 @@ export const createManagerTaskStore = ({ new Key(`/tasks/assign/${latestAssignEvent?.assignedToPeer}/${entityId}`), ); - if (taskRecord.state.batchId && taskRecord.state.uniqueWorkers) { + if (taskRecord.state.batchId && taskRecord.state.repetitions > 0) { batch.delete( - new Key(`/tasks/batch/${taskRecord.state.batchId}/assigned/${latestAssignEvent.assignedToPeer}`), + new Key( + `/tasks/batch/${taskRecord.state.batchId}/assigned/${latestAssignEvent.assignedToPeer}/${entityId}`, + ), ); } @@ -322,9 +328,11 @@ export const createManagerTaskStore = ({ batch.delete(new Key(`/tasks/active/${taskRecord.state.id}`)); batch.delete(new Key(`/tasks/assign/${peerIdStr}/${entityId}`)); - if (taskRecord.state.batchId && taskRecord.state.uniqueWorkers) { + if (taskRecord.state.batchId && taskRecord.state.repetitions > 0) { batch.delete( - new Key(`/tasks/batch/${taskRecord.state.batchId}/assigned/${peerIdStr}`), + new Key( + `/tasks/batch/${taskRecord.state.batchId}/assigned/${peerIdStr}/${entityId}`, + ), ); } @@ -410,9 +418,11 @@ export const createManagerTaskStore = ({ new Uint8Array(), ); - if (taskRecord.state.batchId && taskRecord.state.uniqueWorkers) { + if (taskRecord.state.batchId && taskRecord.state.repetitions > 0) { batch.put( - new Key(`/tasks/batch/${taskRecord.state.batchId}/assigned/${workerPeerIdStr}`), + new Key( + `/tasks/batch/${taskRecord.state.batchId}/assigned/${workerPeerIdStr}/${entityId}`, + ), new Uint8Array(), ); } diff --git a/tools/cli/src/manager/tasks.ts b/tools/cli/src/manager/tasks.ts index 2f72dc4f..480ab410 100644 --- a/tools/cli/src/manager/tasks.ts +++ b/tools/cli/src/manager/tasks.ts @@ -29,6 +29,11 @@ taskCommand .option("--data ", "Template data as JSON string", "{}") .option("--id ", "Custom task ID") .option("--capability ", "capability string to match workers") + .option( + "--repetitions ", + "Max tasks a single worker may complete per batch (0 = no limit)", + "0", + ) .action(async (options) => { try { const task: Task = { @@ -39,6 +44,7 @@ taskCommand templateId: options.templateId, templateData: options.data || {}, capability: options.capability || "", + repetitions: Number.parseInt(options.repetitions), }; const { data } = await api.post(`${options.url}/task`, task);