Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion src/scheduler/__tests__/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ describe("Scheduler", () => {
expect(resumed).toBeNull();
});

test("resumeJob is a no-op on a non-paused job (active, failed, completed)", () => {
test("resumeJob is a no-op on a non-paused job (active, failed, completed) without force", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "ActiveJob",
Expand All @@ -431,6 +431,56 @@ describe("Scheduler", () => {
expect(stillCompleted?.status).toBe("completed");
});

test("resumeJob with force=true revives a failed job, clears consecutive_errors, recomputes next_run_at", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "CircuitBroken",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Try again",
});
// Simulate the executor circuit-breaking the job after MAX_CONSECUTIVE_ERRORS.
db.run("UPDATE scheduled_jobs SET status = 'failed', consecutive_errors = 10, next_run_at = NULL WHERE id = ?", [
job.id,
]);
const broken = scheduler.getJob(job.id);
expect(broken?.status).toBe("failed");
expect(broken?.consecutiveErrors).toBe(10);
expect(broken?.nextRunAt).toBeNull();

const revived = scheduler.resumeJob(job.id, { force: true });
expect(revived?.status).toBe("active");
expect(revived?.consecutiveErrors).toBe(0);
expect(revived?.nextRunAt).toBeTruthy();
const nextMs = revived?.nextRunAt ? new Date(revived.nextRunAt).getTime() : 0;
expect(nextMs).toBeGreaterThan(Date.now() - 5_000);
expect(nextMs).toBeLessThan(Date.now() + 120_000);
});

test("resumeJob with force=true still refuses to revive a completed job", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "OneShotDone",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Fire once",
});
db.run("UPDATE scheduled_jobs SET status = 'completed' WHERE id = ?", [job.id]);
const stillCompleted = scheduler.resumeJob(job.id, { force: true });
expect(stillCompleted?.status).toBe("completed");
});

test("resumeJob with force=true still resumes a paused job (no regression on default path)", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "PausedNeedsForce",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Go",
});
scheduler.pauseJob(job.id);
const resumed = scheduler.resumeJob(job.id, { force: true });
expect(resumed?.status).toBe("active");
expect(resumed?.nextRunAt).toBeTruthy();
});

test("createJob honors enabled=false by inserting an inactive row", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
Expand Down
30 changes: 18 additions & 12 deletions src/scheduler/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,26 @@ export class Scheduler {
}

/**
* Flip a paused job back to active. Recomputes next_run_at from the stored
* schedule so a job paused mid-interval resumes on a fresh cadence.
* Resets consecutive_errors so a job paused in its backoff fan-out gets a
* clean retry budget. Returns the updated job, or null if the id does not
* exist.
* Flip a paused or failed job back to active. Recomputes next_run_at from
* the stored schedule so a resumed job picks up on a fresh cadence, and
* resets consecutive_errors so it gets a clean retry budget.
*
* Default behavior accepts only `paused`. Pass `force: true` to also revive
* `failed` jobs (the ones the executor circuit-broke after
* MAX_CONSECUTIVE_ERRORS). The operator opts in because they know the
* underlying cause has cleared. `completed` is never revivable here:
* `at`-kind one-shots may have already deleted themselves
* (executor.ts deleteAfterRun path) and `cron`/`every` jobs do not reach
* `completed` through the normal lifecycle.
*
* Returns the updated job, or null if the id does not exist.
*/
resumeJob(id: string): ScheduledJob | null {
resumeJob(id: string, options?: { force?: boolean }): ScheduledJob | null {
const job = this.getJob(id);
if (!job) return null;
// Only paused jobs may be resumed. Failed and completed are terminal
// states; force-reviving them would bypass the lifecycle (e.g.,
// re-running a one-shot that already deleted itself, or restarting a
// circuit-broken job without addressing the failure).
if (job.status !== "paused") return job;
const force = options?.force === true;
const eligible = job.status === "paused" || (force && job.status === "failed");
if (!eligible) return job;
const nextRun = computeNextRunAt(job.schedule);
const nextRunIso = nextRun ? nextRun.toISOString() : null;
this.db.run(
Expand All @@ -173,7 +179,7 @@ export class Scheduler {
next_run_at = ?,
consecutive_errors = 0,
updated_at = datetime('now')
WHERE id = ? AND status = 'paused'`,
WHERE id = ? AND status IN ('paused', 'failed')`,
[nextRunIso, id],
);
this.armTimer();
Expand Down
41 changes: 41 additions & 0 deletions src/ui/api/__tests__/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,47 @@ describe("scheduler API", () => {
expect(body.job.consecutiveErrors).toBe(0);
});

test("POST /:id/resume without body is a no-op on a failed job", async () => {
const job = scheduler.createJob({
name: "stuck-failed",
schedule: { kind: "every", intervalMs: 60_000 },
task: "go",
});
db.run("UPDATE scheduled_jobs SET status = 'failed' WHERE id = ?", [job.id]);
const res = await handleUiRequest(req(`/ui/api/scheduler/${job.id}/resume`, { method: "POST" }));
expect(res.status).toBe(200);
const body = (await res.json()) as { job: { status: string } };
expect(body.job.status).toBe("failed");
});

test("POST /:id/resume with force=true revives a failed job and audits as resume:force", async () => {
const job = scheduler.createJob({
name: "circuit-broken",
schedule: { kind: "every", intervalMs: 60_000 },
task: "go",
});
db.run("UPDATE scheduled_jobs SET status = 'failed', consecutive_errors = 10, next_run_at = NULL WHERE id = ?", [
job.id,
]);
const res = await handleUiRequest(
req(`/ui/api/scheduler/${job.id}/resume`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ force: true }),
}),
);
expect(res.status).toBe(200);
const body = (await res.json()) as { job: { status: string; consecutiveErrors: number; nextRunAt: string | null } };
expect(body.job.status).toBe("active");
expect(body.job.consecutiveErrors).toBe(0);
expect(body.job.nextRunAt).toBeTruthy();

const audit = db.query("SELECT action FROM scheduler_audit_log WHERE job_id = ?").all(job.id) as Array<{
action: string;
}>;
expect(audit.some((a) => a.action === "resume:force")).toBe(true);
});

test("POST /:id/run runs the job and returns the result", async () => {
const job = scheduler.createJob({
name: "run-me",
Expand Down
31 changes: 27 additions & 4 deletions src/ui/api/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const PreviewSchema = z.object({
schedule: ScheduleInputSchema,
});

const ResumeSchema = z.object({
force: z.boolean().optional(),
});

function json(body: unknown, init?: ResponseInit): Response {
return new Response(JSON.stringify(body), {
...init,
Expand Down Expand Up @@ -268,15 +272,34 @@ function handlePause(deps: SchedulerApiDeps, id: string): Response {
return json({ job: updated });
}

function handleResume(deps: SchedulerApiDeps, id: string): Response {
async function handleResume(req: Request, deps: SchedulerApiDeps, id: string): Promise<Response> {
const before = deps.scheduler.getJob(id);
if (!before) return errJson("Job not found", 404);
const updated = deps.scheduler.resumeJob(id);

// Body is optional. Empty body means force=false; non-empty body must parse
// to { force?: boolean }. Read once via .text() so missing content-length
// (which Bun does not set on Request objects built in tests) does not
// suppress force=true.
let force = false;
const rawText = await req.text();
if (rawText.trim().length > 0) {
let raw: unknown;
try {
raw = JSON.parse(rawText);
} catch {
return errJson("Invalid JSON body", 400);
}
const parsed = ResumeSchema.safeParse(raw);
if (!parsed.success) return errJson(zodErrorMessage(parsed.error), 400);
force = parsed.data.force === true;
}

const updated = deps.scheduler.resumeJob(id, { force });
if (!updated) return errJson("Job not found", 404);
writeAudit(deps.db, {
jobId: updated.id,
jobName: updated.name,
action: "resume",
action: force ? "resume:force" : "resume",
previousStatus: before.status,
newStatus: updated.status,
});
Expand Down Expand Up @@ -357,7 +380,7 @@ export async function handleSchedulerApi(req: Request, url: URL, deps: Scheduler
const resumeMatch = pathname.match(/^\/ui\/api\/scheduler\/([^/]+)\/resume$/);
if (resumeMatch) {
if (req.method !== "POST") return errJson("Method not allowed", 405);
return handleResume(deps, resumeMatch[1]);
return handleResume(req, deps, resumeMatch[1]);
}

const runMatch = pathname.match(/^\/ui\/api\/scheduler\/([^/]+)\/run$/);
Expand Down
Loading