${["csv", "pipeline", "constant"].map(a =>
- `${a} `).join("")}
+
+ ${["csv", "pipeline", "constant"].map(a =>
+ `${a} `).join("")}
@@ -239,6 +245,15 @@ export const fetcherForm = async (
+
+ Repetitions per worker
+ How many tasks a single worker may complete per import batch.
+ 0 = no limit, 1 = each worker may do one task. Only applies to CSV data sources.
+
+
+
Capabilities
Comma separated list of capabilities required for tasks (note: only 1 capability is used at the moment).
@@ -490,6 +505,7 @@ export const createFetcher = async (
status: oldFetcher?.status ?? "active",
hidden: fields.hidden === "on" || fields.hidden === true,
+ repetitions: Number(fields.repetitions) || 0,
schedule: (() => {
try {
@@ -586,7 +602,7 @@ export const getTasks = async (fetcher: Fetcher, csv: string) => {
}
// regex filter
- if (!task.result || (regex && regex.test(task.result)) || task.result == "") {
+ if (!task.result || (regex && regex.test(task.result)) || task.type === "report" || task.result == "") {
continue;
}
@@ -614,6 +630,8 @@ export const getTasks = async (fetcher: Fetcher, csv: string) => {
// avoid expensive mistakes.
const maxPrice = BigInt(fetcher.price) * BigInt(10);
+ const batchRunId = ulid();
+
const tasks = data.map(
(d, _idx) => {
const rawPrice = d["dataffect/reward"] ?
@@ -628,6 +646,8 @@ export const getTasks = async (fetcher: Fetcher, csv: string) => {
templateId: fetcher.template,
templateData: JSON.stringify(d),
capability: fetcher.capabilities[0],
+ batchId: `${fetcher.datasetId}-${fetcher.index}-${batchRunId}`,
+ repetitions: fetcher.repetitions ?? 0,
} as Task;
},
);
@@ -886,7 +906,7 @@ export const processResults = async (f: Fetcher, batchSize: number) => {
let importCount = 0;
for (const d of data as any) {
- if (d.type !== "submission")
+ if (d.type !== "submission" && d.type !== "report")
continue;
db.beginTransaction();
await db.delete([...keyBase, "active", d.taskId]);
@@ -972,6 +992,15 @@ const formValidations: ValidationMap = {
},
template: (v: string) => (!v || v.length === 0) && "Template is required",
+ repetitions: (v: any) => {
+ if (v === undefined || v === "") return false;
+ const repetitions = Number(v);
+ return (
+ (isNaN(repetitions) && "Repetitions must be a number") ||
+ (!Number.isInteger(repetitions) && "Repetitions must be a whole number") ||
+ (repetitions < 0 && "Repetitions cannot be negative")
+ );
+ },
timeLimitSeconds: (v: any) => {
const num = Number(v);
return (
@@ -1075,10 +1104,11 @@ export const addFetcherRoutes = (app: Express): void => {
Finished: ${doneSize}
Failed: ${failedSize}
Batch / Freq: ${f.batchSize} / ${f.frequency}
+ ${f.repetitions ? `Repetitions per worker: ${f.repetitions} ` : ''}
Time Limit: ${f.timeLimitSeconds}s
Schedule: ${f.schedule && Object.keys(f.schedule).length > 0
? Object.entries(f.schedule).map(([day, slots]) =>
- `${day}: ${(slots as TimeSlot[]).map(s => s.from && s.to ? `${s.from}-${s.to}` : 'all day').join(', ')}`
+ `${day}: ${Array.isArray(slots) ? (slots as TimeSlot[]).map(s => s.from && s.to ? `${s.from}-${s.to}` : 'all day').join(', ') : 'all day'}`
).join('; ')
: 'continuous (no restrictions)'}
diff --git a/apps/worker-app/app/composables/useTasks.ts b/apps/worker-app/app/composables/useTasks.ts
index bb259c60..13c35a5a 100644
--- a/apps/worker-app/app/composables/useTasks.ts
+++ b/apps/worker-app/app/composables/useTasks.ts
@@ -37,47 +37,28 @@ export const useTasks = () => {
const useGetActiveTasks = (index: Ref) => {
const queryClient = useQueryClient();
-
- const handleTaskEvent = (event: CustomEvent) => {
- queryClient.setQueryData(["tasks", index], (old) =>
- old ? [...old, event.detail] : [event.detail],
- );
- };
+ const { account } = useAuth();
const invalidateTasks = () => {
- queryClient.invalidateQueries({ queryKey: ["tasks", index] });
+ queryClient.invalidateQueries({ queryKey: ["tasks", index, account] });
};
- onMounted(() => {
- instance.value?.events.addEventListener("task:created", handleTaskEvent);
- instance.value?.events.addEventListener("task:rejected", invalidateTasks);
- instance.value?.events.addEventListener(
- "task:completed",
- invalidateTasks,
- );
- instance.value?.events.addEventListener(
- "task:expired",
- invalidateTasks,
- );
- });
-
- onUnmounted(() => {
- instance.value?.events.removeEventListener(
- "task:created",
- handleTaskEvent,
- );
- instance.value?.events.removeEventListener(
- "task:rejected",
- invalidateTasks,
- );
- instance.value?.events.removeEventListener(
- "task:completed",
- invalidateTasks,
- );
- instance.value?.events.removeEventListener(
- "task:expired",
- invalidateTasks,
- );
+ // Register listeners as soon as instance is available, not just on mount.
+ watchEffect((onCleanup) => {
+ const events = instance.value?.events;
+ if (!events) return;
+
+ events.addEventListener("task:created", invalidateTasks);
+ events.addEventListener("task:rejected", invalidateTasks);
+ events.addEventListener("task:completed", invalidateTasks);
+ events.addEventListener("task:expired", invalidateTasks);
+
+ onCleanup(() => {
+ events.removeEventListener("task:created", invalidateTasks);
+ events.removeEventListener("task:rejected", invalidateTasks);
+ events.removeEventListener("task:completed", invalidateTasks);
+ events.removeEventListener("task:expired", invalidateTasks);
+ });
});
return useGetTasks(index);
diff --git a/core/protobufs/src/effect.ts b/core/protobufs/src/effect.ts
index c949f3f5..35acc945 100644
--- a/core/protobufs/src/effect.ts
+++ b/core/protobufs/src/effect.ts
@@ -2766,6 +2766,8 @@ export interface Task {
templateId: string
templateData: string
capability?: string
+ batchId?: string
+ repetitions: number
}
export namespace Task {
@@ -2813,6 +2815,16 @@ export namespace Task {
w.string(obj.capability)
}
+ if (obj.batchId != null) {
+ w.uint32(66)
+ w.string(obj.batchId)
+ }
+
+ if ((obj.repetitions != null && obj.repetitions !== 0)) {
+ w.uint32(72)
+ w.uint32(obj.repetitions)
+ }
+
if (opts.lengthDelimited !== false) {
w.ldelim()
}
@@ -2823,7 +2835,8 @@ export namespace Task {
reward: 0n,
timeLimitSeconds: 0,
templateId: '',
- templateData: ''
+ templateData: '',
+ repetitions: 0
}
const end = length == null ? reader.len : reader.pos + length
@@ -2860,6 +2873,14 @@ export namespace Task {
obj.capability = reader.string()
break
}
+ case 8: {
+ obj.batchId = reader.string()
+ break
+ }
+ case 9: {
+ obj.repetitions = reader.uint32()
+ break
+ }
default: {
reader.skipType(tag & 7)
break
diff --git a/core/protobufs/src/task/task.proto b/core/protobufs/src/task/task.proto
index 46605b09..49cb94a6 100644
--- a/core/protobufs/src/task/task.proto
+++ b/core/protobufs/src/task/task.proto
@@ -16,6 +16,8 @@ message Task {
string template_id = 5;
string template_data = 6;
optional string capability = 7;
+ optional string batch_id = 8;
+ uint32 repetitions = 9;
}
message TaskAssignment {
diff --git a/core/protocol/src/consts.ts b/core/protocol/src/consts.ts
index 3c2cabc1..d02a8cb0 100644
--- a/core/protocol/src/consts.ts
+++ b/core/protocol/src/consts.ts
@@ -1,4 +1,4 @@
-export const TASK_ACCEPTANCE_TIME = 1000 * 60 * 10; // 10 minutes
-
+// 10 minutes, in milliseconds. Same duration as modules/manager/src/consts.ts
+export const TASK_ACCEPTANCE_TIME = 1000 * 60 * 10;
export const PROTOCOL_NAME = "effectai";
export const PROTOCOL_VERSION = "0.0.2";
diff --git a/modules/manager/src/consts.ts b/modules/manager/src/consts.ts
index df0ec53e..dad28837 100644
--- a/modules/manager/src/consts.ts
+++ b/modules/manager/src/consts.ts
@@ -1,4 +1,5 @@
-export const TASK_ACCEPTANCE_TIME = 600;
+// 10 minutes, in seconds. Same duration as core/protocol/src/consts.ts
+export const TASK_ACCEPTANCE_TIME = 60 * 10;
export const ACTIVE_TASK_TRESHOLD = 50;
export const PAYMENT_BATCH_SIZE = 60;
export const PAYMENT_VERSION = 1;
diff --git a/modules/manager/src/main.ts b/modules/manager/src/main.ts
index a5531f4f..2c09481c 100644
--- a/modules/manager/src/main.ts
+++ b/modules/manager/src/main.ts
@@ -387,12 +387,24 @@ export const createManager = async ({
taskId,
index: "completed",
})
- .then(
- (a) =>
- a.events
- .filter((e: any) => e.type === "submission")
- .map((e: any) => ({ ...e, taskId }))[0],
- )
+ .then((taskRecord) => {
+ const event = taskRecord.events.find(
+ (taskEvent: any) =>
+ taskEvent.type === "submission" || taskEvent.type === "report",
+ );
+ if (!event) return { taskId, error: "NOT FOUND" };
+
+ if (event.type === "report") {
+ return {
+ type: "report",
+ timestamp: event.timestamp,
+ taskId,
+ submissionByPeer: event.reportedByPeer,
+ result: event.result,
+ };
+ }
+ return { ...event, taskId };
+ })
.catch((_e) => ({ taskId, error: "NOT FOUND" }));
});
diff --git a/modules/manager/src/modules/createTaskManager.ts b/modules/manager/src/modules/createTaskManager.ts
index 2d7a906d..0e8f40db 100644
--- a/modules/manager/src/modules/createTaskManager.ts
+++ b/modules/manager/src/modules/createTaskManager.ts
@@ -7,6 +7,7 @@ import type {
ManagerTaskStore,
TaskAcceptedEvent,
TaskAssignedEvent,
+ TaskRejectedEvent,
TaskSubmissionEvent,
} from "../stores/managerTaskStore.js";
@@ -56,6 +57,14 @@ export function createTaskManager({
const isExpired = (timestamp: number, value: number) =>
timestamp + value < Math.floor(Date.now() / 1000);
+ const isReportPayload = (result: string): boolean => {
+ try {
+ return JSON.parse(result)?.task === "report";
+ } catch {
+ return false;
+ }
+ };
+
const getTask = async ({
taskId,
index = "active",
@@ -137,12 +146,12 @@ export function createTaskManager({
reason,
});
- workerManager.markTaskReleased(workerPeerIdStr, taskId);
-
const taskRecord = await taskStore.getTask({
entityId: taskId,
});
+ workerManager.markTaskReleased(workerPeerIdStr, taskId, taskRecord?.state.batchId);
+
await workerManager.incrementStateValue(workerPeerIdStr, "tasksRejected");
events.safeDispatchEvent("task:rejected", {
@@ -159,13 +168,18 @@ export function createTaskManager({
result: string;
workerPeerIdStr: string;
}) => {
+ if (isReportPayload(result)) {
+ await processTaskReport({ taskId, result, workerPeerIdStr });
+ return;
+ }
+
const taskRecord = await taskStore.complete({
entityId: taskId,
result,
peerIdStr: workerPeerIdStr,
});
- workerManager.markTaskReleased(workerPeerIdStr, taskId);
+ workerManager.markTaskReleased(workerPeerIdStr, taskId, taskRecord.state.batchId);
await workerManager.incrementStateValue(workerPeerIdStr, "tasksCompleted");
@@ -174,6 +188,30 @@ export function createTaskManager({
});
};
+ const processTaskReport = async ({
+ taskId,
+ result,
+ workerPeerIdStr,
+ }: {
+ taskId: string;
+ result: string;
+ workerPeerIdStr: string;
+ }) => {
+ const taskRecord = await taskStore.report({
+ entityId: taskId,
+ result,
+ peerIdStr: workerPeerIdStr,
+ });
+
+ workerManager.markTaskReleased(workerPeerIdStr, taskId, taskRecord.state.batchId);
+
+ await workerManager.incrementStateValue(workerPeerIdStr, "tasksRejected");
+
+ events.safeDispatchEvent("task:rejected", {
+ detail: taskRecord,
+ });
+ };
+
const handleCreateEvent = async (taskRecord: ManagerTaskRecord) => {
await assignTask({ entityId: taskRecord.state.id });
};
@@ -182,13 +220,7 @@ export function createTaskManager({
taskRecord: ManagerTaskRecord,
lastEvent: TaskAssignedEvent,
) => {
- const { timeLimitSeconds } = taskRecord.state;
- const acceptanceWindow =
- typeof timeLimitSeconds === "number" && timeLimitSeconds > 0
- ? timeLimitSeconds
- : TASK_ACCEPTANCE_TIME;
-
- if (isExpired(lastEvent.timestamp, acceptanceWindow)) {
+ if (isExpired(lastEvent.timestamp, TASK_ACCEPTANCE_TIME)) {
await rejectAndReassignTask(taskRecord);
}
};
@@ -222,6 +254,7 @@ export function createTaskManager({
workerManager.markTaskReleased(
latestAssignEvent.assignedToPeer,
taskRecord.state.id,
+ taskRecord.state.batchId,
);
await workerManager.incrementStateValue(
@@ -311,6 +344,8 @@ export function createTaskManager({
case "submission":
await handleSubmissionEvent(taskRecord, lastEvent);
break;
+ case "report":
+ break;
case "payout":
// do nothing..
break;
@@ -333,13 +368,31 @@ export function createTaskManager({
throw new Error("Task is already assigned.");
}
- const originalWorkerId = taskRecord?.state.templateData
- ? JSON.parse(taskRecord.state.templateData)?.submissionByPeer
- : undefined;
+ // In a repetition-limited batch, the most recent rejector of this task must
+ // be excluded from the re-assignment - otherwise Task 1 bounces straight
+ // back to them and consumes the batch slot they need for Task 2.
+ let lastRejector: string | undefined;
+ if (taskRecord.state.repetitions > 0) {
+ for (let index = taskRecord.events.length - 1; index >= 0; index--) {
+ const event = taskRecord.events[index];
+ if (event.type === "reject") {
+ lastRejector = (event as TaskRejectedEvent).rejectedByPeer;
+ break;
+ }
+ }
+ }
+
+ const originalWorkerId =
+ lastRejector ??
+ (taskRecord?.state.templateData
+ ? JSON.parse(taskRecord.state.templateData)?.submissionByPeer
+ : undefined);
const worker = await workerManager.selectWorker(
taskRecord.state.capability || undefined,
originalWorkerId || undefined,
+ taskRecord.state.batchId || undefined,
+ taskRecord.state.repetitions,
);
if (!worker) {
diff --git a/modules/manager/src/modules/createWorkerManager.ts b/modules/manager/src/modules/createWorkerManager.ts
index 1456ae1a..c44c58e4 100644
--- a/modules/manager/src/modules/createWorkerManager.ts
+++ b/modules/manager/src/modules/createWorkerManager.ts
@@ -27,6 +27,49 @@ export const createWorkerManager = ({
const workerStore = createWorkerStore({
datastore,
});
+
+ const countBatchKeys = async (prefix: string): Promise => {
+ let count = 0;
+ for await (const _key of datastore.queryKeys({ prefix })) {
+ count++;
+ }
+ return count;
+ };
+
+ const countCompletedInBatch = (batchId: string, workerId: string) =>
+ countBatchKeys(`/tasks/batch/${batchId}/completed/${workerId}/`);
+
+ // How many tasks of this batch are currently assigned to the worker.
+ const countActiveAssignmentsInBatch = (batchId: string, workerId: string) =>
+ countBatchKeys(`/tasks/batch/${batchId}/assigned/${workerId}/`);
+
+ const reservedBatchSlots = new Map>();
+
+ const getReservedBatchSlots = (batchId: string, workerId: string): number =>
+ reservedBatchSlots.get(batchId)?.get(workerId) ?? 0;
+
+ const reserveBatchSlot = (batchId: string, workerId: string): void => {
+ let workerSlots = reservedBatchSlots.get(batchId);
+ if (!workerSlots) {
+ workerSlots = new Map();
+ reservedBatchSlots.set(batchId, workerSlots);
+ }
+ workerSlots.set(workerId, (workerSlots.get(workerId) ?? 0) + 1);
+ };
+
+ const releaseBatchSlot = (batchId: string, workerId: string): void => {
+ const workerSlots = reservedBatchSlots.get(batchId);
+ if (!workerSlots) return;
+
+ const currentSlots = workerSlots.get(workerId) ?? 0;
+ if (currentSlots <= 1) {
+ workerSlots.delete(workerId);
+ } else {
+ workerSlots.set(workerId, currentSlots - 1);
+ }
+
+ if (workerSlots.size === 0) reservedBatchSlots.delete(batchId);
+ };
const workerAssignments = new Map>();
const assignmentsFor = (workerId: string) => {
@@ -42,17 +85,14 @@ export const createWorkerManager = ({
const markTaskAssigned = (workerId: string, taskId: string) => {
assignmentsFor(workerId).add(taskId);
};
- const markTaskReleased = (workerId: string, taskId: string) => {
+ const markTaskReleased = (workerId: string, taskId: string, batchId?: string) => {
const assignments = workerAssignments.get(workerId);
- if (!assignments) {
- return;
+ if (assignments) {
+ assignments.delete(taskId);
+ if (assignments.size === 0) workerAssignments.delete(workerId);
}
- assignments.delete(taskId);
-
- if (assignments.size === 0) {
- workerAssignments.delete(workerId);
- }
+ if (batchId) releaseBatchSlot(batchId, workerId);
};
const accessCodeStore = createAccessCodeStore({ datastore });
@@ -195,36 +235,67 @@ export const createWorkerManager = ({
const selectWorker = async (
capability?: string,
- originalWorkerId?: string
+ originalWorkerId?: string,
+ batchId?: string,
+ repetitions?: number,
): Promise => {
const queue = workerQueue.getQueue();
- //TODO:: optimize this..
- for (const workerId of queue) {
+ // 0 repetitions means no per-worker limit for this batch.
+ const maxTasksPerWorker = repetitions ?? 0;
+ const limitWorkersInBatch =
+ maxTasksPerWorker > 0 && batchId !== undefined;
+
+ const isEligible = async (workerId: string): Promise => {
const worker = await getWorker(workerId);
- if (!worker || originalWorkerId === workerId) continue;
+ if (!worker || originalWorkerId === workerId) return false;
const workerCapabilities =
worker.state.capabilities.concat(worker.state.managerCapabilities || []);
if (capability) {
- const info = getCapabilityInfo(capability);
-
+ const capabilityInfo = getCapabilityInfo(capability);
const hasCapability = workerCapabilities.includes(capability);
const hasAntiCapability =
- info?.antiCapability && workerCapabilities.includes(info.antiCapability);
+ capabilityInfo?.antiCapability &&
+ workerCapabilities.includes(capabilityInfo.antiCapability);
+ if (!hasCapability || hasAntiCapability) return false;
+ }
+
+ if (await isBusy(worker)) return false;
- if (!hasCapability || hasAntiCapability) continue;
+ if (limitWorkersInBatch && batchId) {
+ const completedCount = await countCompletedInBatch(batchId, workerId);
+ const assignedInDatastore = await countActiveAssignmentsInBatch(
+ batchId,
+ workerId,
+ );
+ const reservedInMemory = getReservedBatchSlots(batchId, workerId);
+ const activeCount = Math.max(reservedInMemory, assignedInDatastore);
+
+ if (completedCount + activeCount > maxTasksPerWorker) return false;
+ }
+
+ return true;
+ };
+
+ //TODO:: optimize this..
+ for (const workerId of queue) {
+ // Synchronous reservation must happen before any await so two concurrent
+ // assignTask calls can't both claim the same slot for the same batch.
+ if (limitWorkersInBatch && batchId) {
+ reserveBatchSlot(batchId, workerId);
}
- const busy = await isBusy(worker);
- if (!busy) {
- workerQueue.dequeuePeer(workerId);
- return workerId;
+ if (!(await isEligible(workerId))) {
+ if (limitWorkersInBatch && batchId) releaseBatchSlot(batchId, workerId);
+ continue;
}
+
+ workerQueue.dequeuePeer(workerId);
+ return workerId;
}
- // No available worker found
return null;
};
diff --git a/modules/manager/src/stores/managerTaskStore.ts b/modules/manager/src/stores/managerTaskStore.ts
index 4959d2ca..a7ff615d 100644
--- a/modules/manager/src/stores/managerTaskStore.ts
+++ b/modules/manager/src/stores/managerTaskStore.ts
@@ -18,6 +18,7 @@ export type ManagerTaskEvent =
| TaskAssignedEvent
| TaskSubmissionEvent
| TaskRejectedEvent
+ | TaskReportedEvent
| TaskAcceptedEvent
| TaskPaymentEvent;
@@ -42,6 +43,12 @@ export interface TaskRejectedEvent extends BaseTaskEvent {
rejectedByPeer: string;
}
+export interface TaskReportedEvent extends BaseTaskEvent {
+ type: "report";
+ result: string;
+ reportedByPeer: string;
+}
+
export interface TaskSubmissionEvent extends BaseTaskEvent {
type: "submission";
result: string;
@@ -170,6 +177,21 @@ export const createManagerTaskStore = ({
Buffer.from(stringifyWithBigInt(taskRecord)),
);
batch.delete(new Key(`/tasks/assign/${peerIdStr}/${entityId}`));
+
+ if (taskRecord.state.batchId && taskRecord.state.repetitions > 0) {
+ batch.put(
+ new Key(
+ `/tasks/batch/${taskRecord.state.batchId}/completed/${peerIdStr}/${entityId}`,
+ ),
+ new Uint8Array(),
+ );
+ batch.delete(
+ new Key(
+ `/tasks/batch/${taskRecord.state.batchId}/assigned/${peerIdStr}/${entityId}`,
+ ),
+ );
+ }
+
await batch.commit();
return taskRecord;
@@ -259,7 +281,64 @@ export const createManagerTaskStore = ({
new Key(`/tasks/assign/${latestAssignEvent?.assignedToPeer}/${entityId}`),
);
+ if (taskRecord.state.batchId && taskRecord.state.repetitions > 0) {
+ batch.delete(
+ new Key(
+ `/tasks/batch/${taskRecord.state.batchId}/assigned/${latestAssignEvent.assignedToPeer}/${entityId}`,
+ ),
+ );
+ }
+
+ await batch.commit();
+ };
+
+ const report = async ({
+ entityId,
+ result,
+ peerIdStr,
+ }: {
+ entityId: string;
+ result: string;
+ peerIdStr: string;
+ }): Promise => {
+ const taskRecord = await getTask({ entityId });
+
+ if (!taskRecord) {
+ throw new TaskValidationError("Task not found");
+ }
+
+ const lastEvent = taskRecord.events[taskRecord.events.length - 1];
+ if (lastEvent.type !== "accept" || lastEvent.acceptedByPeer !== peerIdStr) {
+ throw new TaskValidationError("Task was not accepted by this worker");
+ }
+
+ taskRecord.events.push({
+ timestamp: Math.floor(Date.now() / 1000),
+ type: "report",
+ result,
+ reportedByPeer: peerIdStr,
+ });
+
+ const batch = datastore.batch();
+
+ batch.put(
+ new Key(`/tasks/completed/${taskRecord.state.id}`),
+ Buffer.from(stringifyWithBigInt(taskRecord)),
+ );
+ batch.delete(new Key(`/tasks/active/${taskRecord.state.id}`));
+ batch.delete(new Key(`/tasks/assign/${peerIdStr}/${entityId}`));
+
+ if (taskRecord.state.batchId && taskRecord.state.repetitions > 0) {
+ batch.delete(
+ new Key(
+ `/tasks/batch/${taskRecord.state.batchId}/assigned/${peerIdStr}/${entityId}`,
+ ),
+ );
+ }
+
await batch.commit();
+
+ return taskRecord;
};
const payout = async ({
@@ -339,6 +418,15 @@ export const createManagerTaskStore = ({
new Uint8Array(),
);
+ if (taskRecord.state.batchId && taskRecord.state.repetitions > 0) {
+ batch.put(
+ new Key(
+ `/tasks/batch/${taskRecord.state.batchId}/assigned/${workerPeerIdStr}/${entityId}`,
+ ),
+ new Uint8Array(),
+ );
+ }
+
await batch.commit();
};
@@ -348,6 +436,7 @@ export const createManagerTaskStore = ({
complete,
accept,
reject,
+ report,
payout,
assign,
getTask,
diff --git a/tools/cli/src/manager/tasks.ts b/tools/cli/src/manager/tasks.ts
index 2f72dc4f..480ab410 100644
--- a/tools/cli/src/manager/tasks.ts
+++ b/tools/cli/src/manager/tasks.ts
@@ -29,6 +29,11 @@ taskCommand
.option("--data ", "Template data as JSON string", "{}")
.option("--id ", "Custom task ID")
.option("--capability ", "capability string to match workers")
+ .option(
+ "--repetitions ",
+ "Max tasks a single worker may complete per batch (0 = no limit)",
+ "0",
+ )
.action(async (options) => {
try {
const task: Task = {
@@ -39,6 +44,7 @@ taskCommand
templateId: options.templateId,
templateData: options.data || {},
capability: options.capability || "",
+ repetitions: Number.parseInt(options.repetitions),
};
const { data } = await api.post(`${options.url}/task`, task);