From 532661b67f484f475d9e0f517bb6dbcff2e3a8fb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 09:14:53 +0000 Subject: [PATCH 1/4] feat: add scheduled actions - migrations, service, tools, cron route, code maps --- apps/node_backend/src/app.ts | 3 + .../021_create_scheduled_actions.sql | 28 ++ .../022_create_scheduled_action_runs.sql | 30 ++ apps/node_backend/src/routes/cron.ts | 285 +++++++++++ .../src/services/localAgentLoopService.ts | 233 +++++++++ .../src/services/scheduledActionService.ts | 453 ++++++++++++++++++ docs/code_maps/feature_map.yaml | 14 + docs/code_maps/logic_map.yaml | 37 ++ vercel.json | 6 + 9 files changed, 1089 insertions(+) create mode 100644 apps/node_backend/src/db/migrations/021_create_scheduled_actions.sql create mode 100644 apps/node_backend/src/db/migrations/022_create_scheduled_action_runs.sql create mode 100644 apps/node_backend/src/routes/cron.ts create mode 100644 apps/node_backend/src/services/scheduledActionService.ts diff --git a/apps/node_backend/src/app.ts b/apps/node_backend/src/app.ts index 5ce9c173..d129aef1 100644 --- a/apps/node_backend/src/app.ts +++ b/apps/node_backend/src/app.ts @@ -8,6 +8,7 @@ import llmRoutes from './routes/llm.js'; import chatRoutes from './routes/chat.js'; import platformRoutes from './routes/platform.js'; import resourcesRoutes from './routes/resources.js'; +import cronRoutes from './routes/cron.js'; import { runMigrations } from './db/migrate.js'; // Load environment variables (no-op in Vercel production where env vars are injected directly) @@ -95,6 +96,8 @@ app.use('/api/llm', llmRoutes); app.use('/api/chat', chatRoutes); app.use('/api/v1/platform', platformRoutes); app.use('/api/resources', resourcesRoutes); +// Cron routes do NOT use the JWT authenticate middleware — they use CRON_SECRET. +app.use('/api/cron', cronRoutes); // 404 handler app.use((req: Request, res: Response) => { diff --git a/apps/node_backend/src/db/migrations/021_create_scheduled_actions.sql b/apps/node_backend/src/db/migrations/021_create_scheduled_actions.sql new file mode 100644 index 00000000..73e2445b --- /dev/null +++ b/apps/node_backend/src/db/migrations/021_create_scheduled_actions.sql @@ -0,0 +1,28 @@ +-- Migration: Create scheduled_actions table +-- Description: Stores user-defined scheduled actions for automated agent execution. +-- Users create and manage scheduled actions through conversation; +-- a cron tick queries due actions and starts normal Agent execution. + +CREATE TABLE scheduled_actions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + channel_id VARCHAR(255) NOT NULL, + thread_id VARCHAR(255), + title TEXT NOT NULL, + prompt TEXT NOT NULL, + schedule_expr TEXT NOT NULL, + interval_seconds INT NOT NULL, + timezone TEXT NOT NULL DEFAULT 'UTC', + next_run_at TIMESTAMP NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'active', + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_scheduled_actions_user_id ON scheduled_actions(user_id); +CREATE INDEX idx_scheduled_actions_due ON scheduled_actions(next_run_at, status); + +CREATE TRIGGER update_scheduled_actions_updated_at + BEFORE UPDATE ON scheduled_actions + FOR EACH ROW + EXECUTE FUNCTION update_updated_at_column(); diff --git a/apps/node_backend/src/db/migrations/022_create_scheduled_action_runs.sql b/apps/node_backend/src/db/migrations/022_create_scheduled_action_runs.sql new file mode 100644 index 00000000..104217d5 --- /dev/null +++ b/apps/node_backend/src/db/migrations/022_create_scheduled_action_runs.sql @@ -0,0 +1,30 @@ +-- Migration: Create scheduled_action_runs table +-- Description: Tracks each execution attempt of a scheduled action. +-- The UNIQUE constraint on (scheduled_action_id, scheduled_fire_at) prevents +-- duplicate claims when multiple cron requests overlap. + +CREATE TABLE scheduled_action_runs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + scheduled_action_id UUID NOT NULL REFERENCES scheduled_actions(id) ON DELETE CASCADE, + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + channel_id VARCHAR(255) NOT NULL, + thread_id VARCHAR(255), + scheduled_fire_at TIMESTAMP NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'queued', + started_at TIMESTAMP, + completed_at TIMESTAMP, + error_text TEXT, + chat_task_id VARCHAR(255), + chat_message_id VARCHAR(255), + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + UNIQUE(scheduled_action_id, scheduled_fire_at) +); + +CREATE INDEX idx_scheduled_action_runs_action_id ON scheduled_action_runs(scheduled_action_id); +CREATE INDEX idx_scheduled_action_runs_user_id ON scheduled_action_runs(user_id); + +CREATE TRIGGER update_scheduled_action_runs_updated_at + BEFORE UPDATE ON scheduled_action_runs + FOR EACH ROW + EXECUTE FUNCTION update_updated_at_column(); diff --git a/apps/node_backend/src/routes/cron.ts b/apps/node_backend/src/routes/cron.ts new file mode 100644 index 00000000..e6490e65 --- /dev/null +++ b/apps/node_backend/src/routes/cron.ts @@ -0,0 +1,285 @@ +/** + * Cron endpoint: process due scheduled actions. + * + * Called by Vercel Cron (or any HTTP client) once per minute. + * Protected by a shared `CRON_SECRET` bearer token — does NOT use user JWT. + * + * For each due action the handler: + * 1. Claims a run record (idempotent via UNIQUE constraint). + * 2. Advances `next_run_at` so the action is not re-claimed in the same tick. + * 3. Inserts a user message with `source: scheduled_action`. + * 4. Runs the agent loop synchronously (no streaming flush). + * 5. Writes the final assistant message. + * 6. Marks the run completed or failed. + */ + +import express, { Request, Response } from 'express'; +import { randomUUID } from 'crypto'; +import { + acceptTask, + upsertMessages, + listSessionMessagesForModel, + type AcceptTaskInput, +} from '../services/chatAsyncTransportService.js'; +import { + buildChatSessionId, + normalizeChatThreadId, + resolveScopeInstructions, +} from '../services/chatRouterService.js'; +import { streamWithAgentToolsAndUserConfig } from '../llm/llm_service.js'; +import { buildAgentTools } from '../services/localAgentLoopService.js'; +import { + queryDueActions, + claimScheduledActionRun, + advanceNextRunAt, + markRunStarted, + markRunCompleted, + markRunFailed, +} from '../services/scheduledActionService.js'; + +const router = express.Router(); + +const MAX_ASSISTANT_CHARS = 120 * 1024; +const DEFAULT_MAX_STEPS = 10; +const DEFAULT_MAX_TOOL_CALLS = 50; +const DEFAULT_TIMEOUT_MS = 60_000; + +function verifyCronAuth(req: Request, res: Response): boolean { + const cronSecret = process.env.CRON_SECRET; + if (!cronSecret) { + // If no secret is configured, block all requests to avoid open execution. + res.status(403).json({ error: 'CRON_SECRET is not configured' }); + return false; + } + const authHeader = req.headers['authorization']; + if (!authHeader || authHeader !== 'Bearer ' + cronSecret) { + res.status(401).json({ error: 'Unauthorized' }); + return false; + } + return true; +} + +/** + * GET /api/cron/scheduled-actions + * + * Processes up to 5 due scheduled actions per invocation. + * Returns a JSON summary of each action processed. + */ +router.get('/scheduled-actions', async (req: Request, res: Response) => { + if (!verifyCronAuth(req, res)) return; + + const fireTime = new Date(); + const dueActions = await queryDueActions(); + + const results: Array<{ + actionId: string; + runId: string | null; + status: 'claimed' | 'skipped' | 'completed' | 'failed'; + error?: string; + }> = []; + + for (const action of dueActions) { + // Attempt to claim a run for this fire time. + const run = await claimScheduledActionRun(action, fireTime); + if (!run) { + // Another cron invocation already claimed this slot — skip. + results.push({ actionId: action.id, runId: null, status: 'skipped' }); + continue; + } + + // Advance next_run_at immediately so the next cron tick won't re-claim. + await advanceNextRunAt(action.id, new Date(action.nextRunAt), action.intervalSeconds); + await markRunStarted(run.id); + + let chatTaskId: string | null = null; + let chatMessageId: string | null = null; + + try { + const normalizedThreadId = normalizeChatThreadId(action.threadId); + const sessionId = buildChatSessionId(action.channelId, normalizedThreadId); + const taskId = randomUUID(); + const userMessageId = randomUUID(); + const assistantMessageId = randomUUID(); + + const acceptInput: AcceptTaskInput = { + taskId, + idempotencyKey: run.id, + channelId: action.channelId, + sessionId, + threadId: normalizedThreadId, + resolvedBotId: null, + resolvedSkillId: null, + }; + const acceptedTask = await acceptTask(action.userId, acceptInput); + chatTaskId = acceptedTask.taskId; + chatMessageId = assistantMessageId; + + // Insert user message. + await upsertMessages(action.userId, [ + { + messageId: userMessageId, + taskId: acceptedTask.taskId, + channelId: action.channelId, + sessionId: acceptedTask.sessionId, + threadId: normalizedThreadId, + role: 'user', + content: action.prompt, + taskState: 'accepted', + checkpointCursor: null, + metadata: { + source: 'scheduled_action', + scheduledActionId: action.id, + scheduledRunId: run.id, + }, + createdAt: null, + }, + ]); + + // Insert dispatch placeholder for the assistant message. + await upsertMessages(action.userId, [ + { + messageId: assistantMessageId, + taskId: acceptedTask.taskId, + channelId: action.channelId, + sessionId: acceptedTask.sessionId, + threadId: normalizedThreadId, + role: 'assistant', + content: '', + taskState: 'dispatched', + checkpointCursor: null, + metadata: { + source: 'backend.cron.scheduled_action', + dispatchPlaceholder: true, + scheduledActionId: action.id, + scheduledRunId: run.id, + }, + createdAt: null, + }, + ]); + + // Resolve scope instructions (channel + thread). + const scopeInstructions = await resolveScopeInstructions(action.userId, { + channelId: action.channelId, + threadId: normalizedThreadId, + }); + + // Build system prompt with session context. + const ctxLines = [`Channel ID: ${action.channelId}`]; + if (normalizedThreadId && normalizedThreadId !== 'main') { + ctxLines.push(`Thread ID: ${normalizedThreadId}`); + } + const systemParts: string[] = [`Session context:\n${ctxLines.map((l) => `- ${l}`).join('\n')}`]; + if (scopeInstructions.channelInstructions?.trim()) { + systemParts.push(`Channel context:\n${scopeInstructions.channelInstructions.trim()}`); + } + if (scopeInstructions.threadInstructions?.trim()) { + systemParts.push(`Section context:\n${scopeInstructions.threadInstructions.trim()}`); + } + const composedSystemPrompt = systemParts.join('\n\n'); + + // Load session history. + const modelMessages = await listSessionMessagesForModel(action.userId, acceptedTask.sessionId, { + limit: 40, + maxChars: 10000, + }); + const messagesWithSystem = [ + { role: 'system' as const, content: composedSystemPrompt }, + ...modelMessages, + ]; + + // Run the agent loop. + const agentTools = buildAgentTools(action.userId); + const streamResult = await streamWithAgentToolsAndUserConfig( + action.userId, + { + messages: messagesWithSystem, + }, + agentTools, + { + maxSteps: DEFAULT_MAX_STEPS, + maxToolCalls: DEFAULT_MAX_TOOL_CALLS, + timeoutMs: DEFAULT_TIMEOUT_MS, + }, + ); + + // Collect full text (no intermediate flushes needed in cron context). + let assistantContent = ''; + for await (const chunk of streamResult.textStream) { + if (typeof chunk === 'string') { + assistantContent += chunk; + if (assistantContent.length >= MAX_ASSISTANT_CHARS) break; + } + } + + // Write final assistant message. + const finalContent = assistantContent.trim() || '(no response)'; + await upsertMessages(action.userId, [ + { + messageId: assistantMessageId, + taskId: acceptedTask.taskId, + channelId: action.channelId, + sessionId: acceptedTask.sessionId, + threadId: normalizedThreadId, + role: 'assistant', + content: finalContent, + taskState: 'completed', + checkpointCursor: null, + metadata: { + source: 'backend.cron.scheduled_action', + scheduledActionId: action.id, + scheduledRunId: run.id, + }, + createdAt: null, + }, + ]); + + await markRunCompleted(run.id, chatTaskId, chatMessageId); + results.push({ actionId: action.id, runId: run.id, status: 'completed' }); + } catch (error) { + const errorText = error instanceof Error ? error.message : String(error); + console.error('Cron scheduled action execution error:', { + actionId: action.id, + runId: run.id, + error: errorText, + }); + + // Write a failed assistant message so the run is visible in the chat history. + if (chatTaskId && chatMessageId) { + try { + await upsertMessages(action.userId, [ + { + messageId: chatMessageId, + taskId: chatTaskId, + channelId: action.channelId, + sessionId: buildChatSessionId( + action.channelId, + normalizeChatThreadId(action.threadId), + ), + threadId: normalizeChatThreadId(action.threadId), + role: 'assistant', + content: `Error: ${errorText}`, + taskState: 'failed', + checkpointCursor: null, + metadata: { + source: 'backend.cron.scheduled_action', + scheduledActionId: action.id, + scheduledRunId: run.id, + error: errorText, + }, + createdAt: null, + }, + ]); + } catch { + // Ignore secondary write error. + } + } + + await markRunFailed(run.id, errorText); + results.push({ actionId: action.id, runId: run.id, status: 'failed', error: errorText }); + } + } + + res.json({ processed: results.length, results }); +}); + +export default router; diff --git a/apps/node_backend/src/services/localAgentLoopService.ts b/apps/node_backend/src/services/localAgentLoopService.ts index 3ed23328..fa1e5673 100644 --- a/apps/node_backend/src/services/localAgentLoopService.ts +++ b/apps/node_backend/src/services/localAgentLoopService.ts @@ -37,6 +37,17 @@ import { } from './assetTableService.js'; import { sanitizeBatchRowCellData, sanitizeTableCellData } from './assetTableCellData.js'; import { listHighlights } from './textHighlightService.js'; +import { + listScheduledActions, + getScheduledAction, + createScheduledAction, + updateScheduledAction, + pauseScheduledAction, + resumeScheduledAction, + deleteScheduledAction, + type CreateScheduledActionInput, + type UpdateScheduledActionInput, +} from './scheduledActionService.js'; import type { AgentTool } from '../llm/types.js'; export const INTERNAL_TOOL_CHAT_CHANNEL_INSTRUCTION_SET = 'chat.channel.instruction.set'; @@ -78,6 +89,15 @@ export const INTERNAL_TOOL_TABLE_BATCH_ADD_ROWS = 'table.batch_add_rows'; // Text highlight tools (highlight creation is manual via Flutter; list is AI-accessible) export const INTERNAL_TOOL_HIGHLIGHT_LIST = 'highlight.list'; +// Scheduled action tools +export const INTERNAL_TOOL_SCHEDULED_ACTION_CREATE = 'scheduled_action.create'; +export const INTERNAL_TOOL_SCHEDULED_ACTION_LIST = 'scheduled_action.list'; +export const INTERNAL_TOOL_SCHEDULED_ACTION_GET = 'scheduled_action.get'; +export const INTERNAL_TOOL_SCHEDULED_ACTION_UPDATE = 'scheduled_action.update'; +export const INTERNAL_TOOL_SCHEDULED_ACTION_PAUSE = 'scheduled_action.pause'; +export const INTERNAL_TOOL_SCHEDULED_ACTION_RESUME = 'scheduled_action.resume'; +export const INTERNAL_TOOL_SCHEDULED_ACTION_DELETE = 'scheduled_action.delete'; + export const INTERNAL_TOOLS = [ INTERNAL_TOOL_CHAT_CHANNEL_INSTRUCTION_SET, INTERNAL_TOOL_CHAT_THREAD_INSTRUCTION_SET, @@ -109,6 +129,13 @@ export const INTERNAL_TOOLS = [ INTERNAL_TOOL_TABLE_DELETE_ROW, INTERNAL_TOOL_TABLE_BATCH_ADD_ROWS, INTERNAL_TOOL_HIGHLIGHT_LIST, + INTERNAL_TOOL_SCHEDULED_ACTION_CREATE, + INTERNAL_TOOL_SCHEDULED_ACTION_LIST, + INTERNAL_TOOL_SCHEDULED_ACTION_GET, + INTERNAL_TOOL_SCHEDULED_ACTION_UPDATE, + INTERNAL_TOOL_SCHEDULED_ACTION_PAUSE, + INTERNAL_TOOL_SCHEDULED_ACTION_RESUME, + INTERNAL_TOOL_SCHEDULED_ACTION_DELETE, ] as const; export type InternalToolName = (typeof INTERNAL_TOOLS)[number]; @@ -982,6 +1009,105 @@ export async function executeInternalTool( const highlights = await listHighlights(userId); return { ok: true, toolName, data: { highlights }, error: null }; } + + // ------------------------------------------------------------------------- + // Scheduled action tools + // ------------------------------------------------------------------------- + case INTERNAL_TOOL_SCHEDULED_ACTION_CREATE: { + const channelId = args.channelId as string | undefined; + const title = args.title as string | undefined; + const prompt = args.prompt as string | undefined; + const scheduleExpr = args.scheduleExpr as string | undefined; + const intervalSeconds = args.intervalSeconds as number | undefined; + if (!channelId || !title || !prompt || !scheduleExpr || intervalSeconds == null) { + return { + ok: false, + toolName, + data: null, + error: { code: 'invalid_args', message: 'channelId, title, prompt, scheduleExpr and intervalSeconds are required' }, + }; + } + const input: CreateScheduledActionInput = { + channelId, + threadId: args.threadId as string | null | undefined, + title, + prompt, + scheduleExpr, + intervalSeconds: Number(intervalSeconds), + timezone: (args.timezone as string | undefined) ?? 'UTC', + }; + const action = await createScheduledAction(userId, input); + return { ok: true, toolName, data: { action }, error: null }; + } + + case INTERNAL_TOOL_SCHEDULED_ACTION_LIST: { + const actions = await listScheduledActions(userId); + return { ok: true, toolName, data: { actions }, error: null }; + } + + case INTERNAL_TOOL_SCHEDULED_ACTION_GET: { + const id = args.id as string | undefined; + if (!id) { + return { ok: false, toolName, data: null, error: { code: 'invalid_args', message: 'id is required' } }; + } + const action = await getScheduledAction(userId, id); + if (!action) { + return { ok: false, toolName, data: null, error: { code: 'not_found', message: `Scheduled action ${id} not found` } }; + } + return { ok: true, toolName, data: { action }, error: null }; + } + + case INTERNAL_TOOL_SCHEDULED_ACTION_UPDATE: { + const id = args.id as string | undefined; + if (!id) { + return { ok: false, toolName, data: null, error: { code: 'invalid_args', message: 'id is required' } }; + } + const input: UpdateScheduledActionInput = { + title: args.title as string | undefined, + prompt: args.prompt as string | undefined, + scheduleExpr: args.scheduleExpr as string | undefined, + intervalSeconds: args.intervalSeconds != null ? Number(args.intervalSeconds) : undefined, + timezone: args.timezone as string | undefined, + }; + const action = await updateScheduledAction(userId, id, input); + if (!action) { + return { ok: false, toolName, data: null, error: { code: 'not_found', message: `Scheduled action ${id} not found` } }; + } + return { ok: true, toolName, data: { action }, error: null }; + } + + case INTERNAL_TOOL_SCHEDULED_ACTION_PAUSE: { + const id = args.id as string | undefined; + if (!id) { + return { ok: false, toolName, data: null, error: { code: 'invalid_args', message: 'id is required' } }; + } + const action = await pauseScheduledAction(userId, id); + if (!action) { + return { ok: false, toolName, data: null, error: { code: 'not_found', message: `Scheduled action ${id} not found or not active` } }; + } + return { ok: true, toolName, data: { action }, error: null }; + } + + case INTERNAL_TOOL_SCHEDULED_ACTION_RESUME: { + const id = args.id as string | undefined; + if (!id) { + return { ok: false, toolName, data: null, error: { code: 'invalid_args', message: 'id is required' } }; + } + const action = await resumeScheduledAction(userId, id); + if (!action) { + return { ok: false, toolName, data: null, error: { code: 'not_found', message: `Scheduled action ${id} not found or not paused` } }; + } + return { ok: true, toolName, data: { action }, error: null }; + } + + case INTERNAL_TOOL_SCHEDULED_ACTION_DELETE: { + const id = args.id as string | undefined; + if (!id) { + return { ok: false, toolName, data: null, error: { code: 'invalid_args', message: 'id is required' } }; + } + const result = await deleteScheduledAction(userId, id); + return { ok: true, toolName, data: result, error: null }; + } } } @@ -1582,5 +1708,112 @@ export function buildAgentTools(userId: string): Record { }, execute: (args) => runTool(INTERNAL_TOOL_HIGHLIGHT_LIST, args), }, + + // ------------------------------------------------------------------------- + // Scheduled action tools + // ------------------------------------------------------------------------- + scheduled_action_create: { + description: + 'Create a new scheduled action that will run a prompt automatically at a recurring interval. ' + + 'Use when the user wants to automate a repeating task (e.g. daily standup, weekly report). ' + + 'intervalSeconds is the repeat period in seconds (minimum 60). ' + + 'scheduleExpr is a human-readable description of the schedule (e.g. "every day at 9am").', + parametersSchema: { + type: 'object', + properties: { + channelId: { type: 'string', description: 'The channel where the action will run.' }, + threadId: { type: 'string', description: 'Optional thread (sub-area) inside the channel. Omit for the main area.' }, + title: { type: 'string', description: 'Short human-readable title for the scheduled action.' }, + prompt: { type: 'string', description: 'The message that will be sent automatically on each run.' }, + scheduleExpr: { type: 'string', description: 'Human-readable schedule description, e.g. "every 30 minutes" or "daily at 9am".' }, + intervalSeconds: { type: 'number', description: 'Repeat interval in seconds. Minimum is 60 (1 minute).' }, + timezone: { type: 'string', description: 'IANA timezone string, e.g. "America/New_York". Defaults to UTC.' }, + }, + required: ['channelId', 'title', 'prompt', 'scheduleExpr', 'intervalSeconds'], + additionalProperties: false, + }, + execute: (args) => runTool(INTERNAL_TOOL_SCHEDULED_ACTION_CREATE, args), + }, + + scheduled_action_list: { + description: 'List all scheduled actions for the current user. Returns active and paused actions.', + parametersSchema: { + type: 'object', + properties: {}, + additionalProperties: false, + }, + execute: (args) => runTool(INTERNAL_TOOL_SCHEDULED_ACTION_LIST, args), + }, + + scheduled_action_get: { + description: 'Get details of a specific scheduled action by its id.', + parametersSchema: { + type: 'object', + properties: { + id: { type: 'string', description: 'Scheduled action identifier.' }, + }, + required: ['id'], + additionalProperties: false, + }, + execute: (args) => runTool(INTERNAL_TOOL_SCHEDULED_ACTION_GET, args), + }, + + scheduled_action_update: { + description: + 'Update one or more fields of an existing scheduled action. Only the provided fields are changed.', + parametersSchema: { + type: 'object', + properties: { + id: { type: 'string', description: 'Scheduled action identifier.' }, + title: { type: 'string', description: 'New title.' }, + prompt: { type: 'string', description: 'New prompt text.' }, + scheduleExpr: { type: 'string', description: 'New human-readable schedule description.' }, + intervalSeconds: { type: 'number', description: 'New repeat interval in seconds (minimum 60).' }, + timezone: { type: 'string', description: 'New IANA timezone string.' }, + }, + required: ['id'], + additionalProperties: false, + }, + execute: (args) => runTool(INTERNAL_TOOL_SCHEDULED_ACTION_UPDATE, args), + }, + + scheduled_action_pause: { + description: 'Pause a scheduled action so it stops running until resumed.', + parametersSchema: { + type: 'object', + properties: { + id: { type: 'string', description: 'Scheduled action identifier.' }, + }, + required: ['id'], + additionalProperties: false, + }, + execute: (args) => runTool(INTERNAL_TOOL_SCHEDULED_ACTION_PAUSE, args), + }, + + scheduled_action_resume: { + description: 'Resume a paused scheduled action. The next run will be scheduled from now.', + parametersSchema: { + type: 'object', + properties: { + id: { type: 'string', description: 'Scheduled action identifier.' }, + }, + required: ['id'], + additionalProperties: false, + }, + execute: (args) => runTool(INTERNAL_TOOL_SCHEDULED_ACTION_RESUME, args), + }, + + scheduled_action_delete: { + description: 'Permanently delete a scheduled action. This cannot be undone.', + parametersSchema: { + type: 'object', + properties: { + id: { type: 'string', description: 'Scheduled action identifier.' }, + }, + required: ['id'], + additionalProperties: false, + }, + execute: (args) => runTool(INTERNAL_TOOL_SCHEDULED_ACTION_DELETE, args), + }, }; } diff --git a/apps/node_backend/src/services/scheduledActionService.ts b/apps/node_backend/src/services/scheduledActionService.ts new file mode 100644 index 00000000..4384e3b1 --- /dev/null +++ b/apps/node_backend/src/services/scheduledActionService.ts @@ -0,0 +1,453 @@ +import pool from '../db/index.js'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type ScheduledActionStatus = 'active' | 'paused' | 'deleted'; +export type ScheduledActionRunStatus = 'queued' | 'running' | 'completed' | 'failed'; + +export interface ScheduledAction { + id: string; + userId: string; + channelId: string; + threadId: string | null; + title: string; + prompt: string; + scheduleExpr: string; + intervalSeconds: number; + timezone: string; + nextRunAt: string; + status: ScheduledActionStatus; + createdAt: string; + updatedAt: string; +} + +export interface CreateScheduledActionInput { + channelId: string; + threadId?: string | null; + title: string; + prompt: string; + scheduleExpr: string; + intervalSeconds: number; + timezone?: string; +} + +export interface UpdateScheduledActionInput { + title?: string; + prompt?: string; + scheduleExpr?: string; + intervalSeconds?: number; + timezone?: string; +} + +export interface ScheduledActionRun { + id: string; + scheduledActionId: string; + userId: string; + channelId: string; + threadId: string | null; + scheduledFireAt: string; + status: ScheduledActionRunStatus; + startedAt: string | null; + completedAt: string | null; + errorText: string | null; + chatTaskId: string | null; + chatMessageId: string | null; + createdAt: string; + updatedAt: string; +} + +// --------------------------------------------------------------------------- +// Row types +// --------------------------------------------------------------------------- + +interface ScheduledActionRow { + id: string; + user_id: string; + channel_id: string; + thread_id: string | null; + title: string; + prompt: string; + schedule_expr: string; + interval_seconds: number; + timezone: string; + next_run_at: string; + status: string; + created_at: string; + updated_at: string; +} + +interface ScheduledActionRunRow { + id: string; + scheduled_action_id: string; + user_id: string; + channel_id: string; + thread_id: string | null; + scheduled_fire_at: string; + status: string; + started_at: string | null; + completed_at: string | null; + error_text: string | null; + chat_task_id: string | null; + chat_message_id: string | null; + created_at: string; + updated_at: string; +} + +// --------------------------------------------------------------------------- +// Mappers +// --------------------------------------------------------------------------- + +function toActionDto(row: ScheduledActionRow): ScheduledAction { + return { + id: row.id, + userId: row.user_id, + channelId: row.channel_id, + threadId: row.thread_id, + title: row.title, + prompt: row.prompt, + scheduleExpr: row.schedule_expr, + intervalSeconds: Number(row.interval_seconds), + timezone: row.timezone, + nextRunAt: row.next_run_at, + status: row.status as ScheduledActionStatus, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +function toRunDto(row: ScheduledActionRunRow): ScheduledActionRun { + return { + id: row.id, + scheduledActionId: row.scheduled_action_id, + userId: row.user_id, + channelId: row.channel_id, + threadId: row.thread_id, + scheduledFireAt: row.scheduled_fire_at, + status: row.status as ScheduledActionRunStatus, + startedAt: row.started_at, + completedAt: row.completed_at, + errorText: row.error_text, + chatTaskId: row.chat_task_id, + chatMessageId: row.chat_message_id, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} + +const ACTION_SELECT_COLS = + 'id, user_id, channel_id, thread_id, title, prompt, schedule_expr, interval_seconds, timezone, next_run_at, status, created_at, updated_at'; + +const RUN_SELECT_COLS = + 'id, scheduled_action_id, user_id, channel_id, thread_id, scheduled_fire_at, status, started_at, completed_at, error_text, chat_task_id, chat_message_id, created_at, updated_at'; + +// --------------------------------------------------------------------------- +// Schedule helpers +// --------------------------------------------------------------------------- + +const MIN_INTERVAL_SECONDS = 60; // 1 minute minimum + +/** + * Clamp interval to a minimum of 1 minute and ensure it is a positive integer. + */ +export function clampIntervalSeconds(value: number): number { + const n = Math.max(MIN_INTERVAL_SECONDS, Math.trunc(value)); + return Number.isFinite(n) ? n : MIN_INTERVAL_SECONDS; +} + +/** + * Compute the next run time after `fromDate` by advancing `intervalSeconds`. + * If `fromDate` is already in the past, the result is anchored from now so + * the action fires at the next interval boundary rather than immediately + * catching up on every missed interval. + */ +export function computeNextRunAt(fromDate: Date, intervalSeconds: number): Date { + const now = Date.now(); + const base = Math.max(fromDate.getTime(), now); + return new Date(base + intervalSeconds * 1000); +} + +// --------------------------------------------------------------------------- +// CRUD +// --------------------------------------------------------------------------- + +export async function listScheduledActions(userId: string): Promise { + const result = await pool.query( + `SELECT ${ACTION_SELECT_COLS} + FROM scheduled_actions + WHERE user_id = $1 AND status <> 'deleted' + ORDER BY created_at ASC`, + [userId], + ); + return result.rows.map(toActionDto); +} + +export async function getScheduledAction( + userId: string, + id: string, +): Promise { + const result = await pool.query( + `SELECT ${ACTION_SELECT_COLS} + FROM scheduled_actions + WHERE user_id = $1 AND id = $2 AND status <> 'deleted'`, + [userId, id], + ); + return result.rows[0] ? toActionDto(result.rows[0]) : null; +} + +export async function createScheduledAction( + userId: string, + input: CreateScheduledActionInput, +): Promise { + const intervalSeconds = clampIntervalSeconds(input.intervalSeconds); + const nextRunAt = new Date(Date.now() + intervalSeconds * 1000); + + const result = await pool.query( + `INSERT INTO scheduled_actions + (user_id, channel_id, thread_id, title, prompt, schedule_expr, interval_seconds, timezone, next_run_at, status) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,'active') + RETURNING ${ACTION_SELECT_COLS}`, + [ + userId, + input.channelId, + input.threadId ?? null, + input.title.trim(), + input.prompt.trim(), + input.scheduleExpr.trim(), + intervalSeconds, + (input.timezone ?? 'UTC').trim(), + nextRunAt.toISOString(), + ], + ); + return toActionDto(result.rows[0]); +} + +export async function updateScheduledAction( + userId: string, + id: string, + input: UpdateScheduledActionInput, +): Promise { + const setClauses: string[] = []; + const values: unknown[] = [userId, id]; + let idx = 3; + + if (input.title !== undefined) { + setClauses.push(`title = $${idx++}`); + values.push(input.title.trim()); + } + if (input.prompt !== undefined) { + setClauses.push(`prompt = $${idx++}`); + values.push(input.prompt.trim()); + } + if (input.scheduleExpr !== undefined) { + setClauses.push(`schedule_expr = $${idx++}`); + values.push(input.scheduleExpr.trim()); + } + if (input.intervalSeconds !== undefined) { + const clamped = clampIntervalSeconds(input.intervalSeconds); + setClauses.push(`interval_seconds = $${idx++}`); + values.push(clamped); + // Recompute next_run_at when interval changes + const newNext = new Date(Date.now() + clamped * 1000); + setClauses.push(`next_run_at = $${idx++}`); + values.push(newNext.toISOString()); + } + if (input.timezone !== undefined) { + setClauses.push(`timezone = $${idx++}`); + values.push(input.timezone.trim()); + } + + if (setClauses.length === 0) { + return getScheduledAction(userId, id); + } + + const result = await pool.query( + `UPDATE scheduled_actions + SET ${setClauses.join(', ')} + WHERE user_id = $1 AND id = $2 AND status <> 'deleted' + RETURNING ${ACTION_SELECT_COLS}`, + values, + ); + return result.rows[0] ? toActionDto(result.rows[0]) : null; +} + +export async function pauseScheduledAction( + userId: string, + id: string, +): Promise { + const result = await pool.query( + `UPDATE scheduled_actions + SET status = 'paused' + WHERE user_id = $1 AND id = $2 AND status = 'active' + RETURNING ${ACTION_SELECT_COLS}`, + [userId, id], + ); + return result.rows[0] ? toActionDto(result.rows[0]) : null; +} + +export async function resumeScheduledAction( + userId: string, + id: string, +): Promise { + // Recompute next_run_at from now so the action fires at the next interval. + const result = await pool.query( + `SELECT ${ACTION_SELECT_COLS} + FROM scheduled_actions + WHERE user_id = $1 AND id = $2 AND status = 'paused'`, + [userId, id], + ); + if (!result.rows[0]) return null; + + const action = toActionDto(result.rows[0]); + const nextRunAt = computeNextRunAt(new Date(), action.intervalSeconds); + + const updated = await pool.query( + `UPDATE scheduled_actions + SET status = 'active', next_run_at = $3 + WHERE user_id = $1 AND id = $2 AND status = 'paused' + RETURNING ${ACTION_SELECT_COLS}`, + [userId, id, nextRunAt.toISOString()], + ); + return updated.rows[0] ? toActionDto(updated.rows[0]) : null; +} + +export async function deleteScheduledAction( + userId: string, + id: string, +): Promise<{ deleted: boolean }> { + const result = await pool.query( + `UPDATE scheduled_actions + SET status = 'deleted' + WHERE user_id = $1 AND id = $2 AND status <> 'deleted'`, + [userId, id], + ); + return { deleted: (result.rowCount ?? 0) > 0 }; +} + +// --------------------------------------------------------------------------- +// Cron tick helpers +// --------------------------------------------------------------------------- + +const CRON_BATCH_SIZE = 5; + +/** + * Return up to CRON_BATCH_SIZE active scheduled actions that are due now. + */ +export async function queryDueActions(): Promise { + const result = await pool.query( + `SELECT ${ACTION_SELECT_COLS} + FROM scheduled_actions + WHERE status = 'active' + AND next_run_at <= CURRENT_TIMESTAMP + ORDER BY next_run_at ASC + LIMIT $1`, + [CRON_BATCH_SIZE], + ); + return result.rows.map(toActionDto); +} + +/** + * Attempt to claim a scheduled action run for the given fire time. + * Uses INSERT ... ON CONFLICT DO NOTHING so only one cron invocation wins. + * + * Returns the created run record on success, or null if already claimed. + */ +export async function claimScheduledActionRun( + action: ScheduledAction, + scheduledFireAt: Date, +): Promise { + const result = await pool.query( + `INSERT INTO scheduled_action_runs + (scheduled_action_id, user_id, channel_id, thread_id, scheduled_fire_at, status) + VALUES ($1,$2,$3,$4,$5,'queued') + ON CONFLICT (scheduled_action_id, scheduled_fire_at) DO NOTHING + RETURNING ${RUN_SELECT_COLS}`, + [ + action.id, + action.userId, + action.channelId, + action.threadId ?? null, + scheduledFireAt.toISOString(), + ], + ); + return result.rows[0] ? toRunDto(result.rows[0]) : null; +} + +/** + * Advance next_run_at after a successful claim so the action is not re-queued + * in the same cron tick. + */ +export async function advanceNextRunAt( + actionId: string, + currentNextRunAt: Date, + intervalSeconds: number, +): Promise { + const nextRunAt = computeNextRunAt(currentNextRunAt, intervalSeconds); + await pool.query( + `UPDATE scheduled_actions + SET next_run_at = $2 + WHERE id = $1`, + [actionId, nextRunAt.toISOString()], + ); +} + +/** + * Mark a run as running and record started_at. + */ +export async function markRunStarted(runId: string): Promise { + await pool.query( + `UPDATE scheduled_action_runs + SET status = 'running', started_at = CURRENT_TIMESTAMP + WHERE id = $1`, + [runId], + ); +} + +/** + * Mark a run as completed with optional chat identifiers. + */ +export async function markRunCompleted( + runId: string, + chatTaskId: string | null, + chatMessageId: string | null, +): Promise { + await pool.query( + `UPDATE scheduled_action_runs + SET status = 'completed', completed_at = CURRENT_TIMESTAMP, + chat_task_id = $2, chat_message_id = $3 + WHERE id = $1`, + [runId, chatTaskId, chatMessageId], + ); +} + +/** + * Mark a run as failed with an error description. + */ +export async function markRunFailed(runId: string, errorText: string): Promise { + await pool.query( + `UPDATE scheduled_action_runs + SET status = 'failed', completed_at = CURRENT_TIMESTAMP, error_text = $2 + WHERE id = $1`, + [runId, errorText], + ); +} + +/** + * List runs for a specific scheduled action (most recent first). + */ +export async function listScheduledActionRuns( + userId: string, + scheduledActionId: string, + limit = 20, +): Promise { + const result = await pool.query( + `SELECT ${RUN_SELECT_COLS} + FROM scheduled_action_runs + WHERE user_id = $1 AND scheduled_action_id = $2 + ORDER BY scheduled_fire_at DESC + LIMIT $3`, + [userId, scheduledActionId, limit], + ); + return result.rows.map(toRunDto); +} diff --git a/docs/code_maps/feature_map.yaml b/docs/code_maps/feature_map.yaml index aa1a5541..54dab72b 100644 --- a/docs/code_maps/feature_map.yaml +++ b/docs/code_maps/feature_map.yaml @@ -362,6 +362,20 @@ products: - '向 `/api/chat/scope-settings` PUT 保存 `instructions` 字段后,再通过 GET 读取应返回同一 instructions 值。' - '`/api/chat/respond` 在 body 包含 `systemPrompt` 且 DB 中存在 channel/thread instructions 时,发给模型的消息列表首条应为合并后的 system 消息。' + - feature_id: scheduled_actions + name: 定时 Agent 任务调度 + user_value: 用户可通过对话创建重复执行的 Agent 任务(如每日日程汇总、定时提醒),无需手动触发。 + entry_paths: + - route: GET /api/cron/scheduled-actions + route_hint: apps/node_backend/src/routes/cron.ts + - service: scheduledActionService + route_hint: apps/node_backend/src/services/scheduledActionService.ts + smoke_checks: + - 使用 CRON_SECRET 请求 `/api/cron/scheduled-actions` 返回 200 及 processed 列表。 + - 无 CRON_SECRET 或 token 错误时返回 401/403。 + - 通过 Agent 工具创建定时任务后,`scheduled_actions` 中可查到该记录。 + - 同一 fire time 的重复 cron 请求不会创建第二条 run 记录。 + - product_id: node_openclaw_plugin name: Node OpenClaw Plugin platform: diff --git a/docs/code_maps/logic_map.yaml b/docs/code_maps/logic_map.yaml index 6aa7ce32..7e983fee 100644 --- a/docs/code_maps/logic_map.yaml +++ b/docs/code_maps/logic_map.yaml @@ -751,6 +751,43 @@ index: - 旧数据若缺少 startOffset/endOffset,仍会回退为 selectedText 搜索;重复文本场景下旧记录可能显示多处,需要后续数据迁移或重新创建高亮才能完全消除歧义。 - ChatScreen 必须在 highlight 创建、加载、删除后同步 _textHighlights,否则 Resources tab 可能显示过期划线资源。 + - feature_id: scheduled_actions + capability: 定时 Agent 任务调度与 Cron 执行 + code_index: + - apps/node_backend/src/routes/cron.ts + - apps/node_backend/src/services/scheduledActionService.ts + - apps/node_backend/src/services/localAgentLoopService.ts + - apps/node_backend/src/db/migrations/021_create_scheduled_actions.sql + - apps/node_backend/src/db/migrations/022_create_scheduled_action_runs.sql + - apps/node_backend/src/app.ts + - vercel.json + doc_index: + - docs/plans/2026-05-26-18-34-CST-scheduled-actions-agent-execution.md + test_index: + - apps/node_backend/src/services/localAgentLoopService.test.ts + keywords: + - scheduled_action + - scheduled_action_runs + - CRON_SECRET + - cron + - interval_seconds + - next_run_at + - claimScheduledActionRun + - advanceNextRunAt + - queryDueActions + - scheduled_action_create + - scheduled_action_list + - scheduled_action_pause + - scheduled_action_resume + - scheduled_action_delete + change_risks: + - 若 claimScheduledActionRun 的 UNIQUE 约束失效,同一 fire time 可能创建两条 run 记录并触发重复 Agent 执行。 + - 若 advanceNextRunAt 未在 claim 后立即调用,下一个 cron tick 可能再次 claim 同一 fire time 并被 UNIQUE 阻止,但 next_run_at 不会推进,导致每次 cron 都命中同一条 action 直至其完成。 + - Vercel cron 调用 /api/cron/scheduled-actions 时附带的 Authorization 头必须与 CRON_SECRET 完全匹配;若未在 Vercel 项目设置中配置 CRON_SECRET,端点将永远返回 403。 + - 若 cron 路由被注册到 authenticate 中间件之后,cron 请求(无 JWT)会被 401 拒绝。 + - 定时任务的 Agent 执行依赖 streamWithAgentToolsAndUserConfig;若用户的 LLM 配置缺失,run 将被标记为 failed 并写入错误消息。 + - 迁移 021/022 中的 EXECUTE FUNCTION 触发器在 Turso 上会被静默跳过;updated_at 字段在 Turso 上不会自动更新,需在 UPDATE 语句中手动赋值或接受此差异。 + maintenance_rules: update_required_when: - 新增、删除或重命名用户可见功能入口。 diff --git a/vercel.json b/vercel.json index 0bfaaca5..a2fb5b5c 100644 --- a/vercel.json +++ b/vercel.json @@ -3,6 +3,12 @@ "installCommand": "bash ./tools/vercel-build.sh --install-only && cd apps/node_backend && npm ci --omit=dev", "buildCommand": "bash ./tools/vercel-build.sh", "outputDirectory": "apps/mobile_chat_app/build/web", + "crons": [ + { + "path": "/api/cron/scheduled-actions", + "schedule": "* * * * *" + } + ], "rewrites": [ { "source": "/api/:path*", From ada636845ffda3080556a9331d8143d75ae277d9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 09:17:57 +0000 Subject: [PATCH 2/4] test: add vi.mock for scheduledActionService + 7 scheduled_action tool tests --- .../services/localAgentLoopService.test.ts | 151 ++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/apps/node_backend/src/services/localAgentLoopService.test.ts b/apps/node_backend/src/services/localAgentLoopService.test.ts index 522efd9e..4b6029c8 100644 --- a/apps/node_backend/src/services/localAgentLoopService.test.ts +++ b/apps/node_backend/src/services/localAgentLoopService.test.ts @@ -57,6 +57,30 @@ vi.mock('./textHighlightService.js', () => ({ deleteHighlight: vi.fn().mockResolvedValue({ deleted: false }), })); +vi.mock('./scheduledActionService.js', () => ({ + listScheduledActions: vi.fn().mockResolvedValue([]), + getScheduledAction: vi.fn().mockResolvedValue(null), + createScheduledAction: vi.fn().mockResolvedValue({ + id: 'sa-1', + userId: 'u-1', + channelId: 'ch-1', + threadId: null, + title: 'Daily standup', + prompt: 'Summarize today', + scheduleExpr: 'every day at 9am', + intervalSeconds: 86400, + timezone: 'UTC', + nextRunAt: '2026-05-27T09:00:00.000Z', + status: 'active', + createdAt: '2026-05-27T00:00:00.000Z', + updatedAt: '2026-05-27T00:00:00.000Z', + }), + updateScheduledAction: vi.fn().mockResolvedValue(null), + pauseScheduledAction: vi.fn().mockResolvedValue(null), + resumeScheduledAction: vi.fn().mockResolvedValue(null), + deleteScheduledAction: vi.fn().mockResolvedValue({ deleted: true }), +})); + describe('localAgentLoopService', () => { beforeEach(() => { upsertChatScopeSettingMock.mockReset(); @@ -633,4 +657,131 @@ describe('localAgentLoopService', () => { expect(result.ok).toBe(true); expect((result.data as { highlights: unknown[] }).highlights).toHaveLength(1); }); + + // ------------------------------------------------------------------------- + // Scheduled action tool tests + // ------------------------------------------------------------------------- + + it('dispatches scheduled_action_create and returns the created action', async () => { + const { createScheduledAction } = await import('./scheduledActionService.js'); + const mockAction = { + id: 'sa-1', + userId: 'u-1', + channelId: 'ch-1', + threadId: null, + title: 'Daily standup', + prompt: 'Summarize today', + scheduleExpr: 'every day at 9am', + intervalSeconds: 86400, + timezone: 'UTC', + nextRunAt: '2026-05-27T09:00:00.000Z', + status: 'active', + createdAt: '2026-05-27T00:00:00.000Z', + updatedAt: '2026-05-27T00:00:00.000Z', + }; + (createScheduledAction as ReturnType).mockResolvedValue(mockAction); + + const { executeInternalTool, INTERNAL_TOOL_SCHEDULED_ACTION_CREATE } = await import('./localAgentLoopService.js'); + const result = await executeInternalTool({ + userId: 'u-1', + toolName: INTERNAL_TOOL_SCHEDULED_ACTION_CREATE, + args: { + channelId: 'ch-1', + title: 'Daily standup', + prompt: 'Summarize today', + scheduleExpr: 'every day at 9am', + intervalSeconds: 86400, + }, + }); + + expect(result.ok).toBe(true); + expect((result.data as { action: typeof mockAction }).action.id).toBe('sa-1'); + }); + + it('returns invalid_args when scheduled_action_create is missing required fields', async () => { + const { executeInternalTool, INTERNAL_TOOL_SCHEDULED_ACTION_CREATE } = await import('./localAgentLoopService.js'); + const result = await executeInternalTool({ + userId: 'u-1', + toolName: INTERNAL_TOOL_SCHEDULED_ACTION_CREATE, + args: { channelId: 'ch-1' }, + }); + + expect(result.ok).toBe(false); + expect(result.error?.code).toBe('invalid_args'); + }); + + it('dispatches scheduled_action_list and returns actions', async () => { + const { listScheduledActions } = await import('./scheduledActionService.js'); + (listScheduledActions as ReturnType).mockResolvedValue([ + { id: 'sa-1', title: 'Daily standup' }, + ]); + + const { executeInternalTool, INTERNAL_TOOL_SCHEDULED_ACTION_LIST } = await import('./localAgentLoopService.js'); + const result = await executeInternalTool({ + userId: 'u-1', + toolName: INTERNAL_TOOL_SCHEDULED_ACTION_LIST, + args: {}, + }); + + expect(result.ok).toBe(true); + expect((result.data as { actions: unknown[] }).actions).toHaveLength(1); + }); + + it('dispatches scheduled_action_get and returns the action', async () => { + const { getScheduledAction } = await import('./scheduledActionService.js'); + const mockAction = { id: 'sa-1', title: 'Daily standup', status: 'active' }; + (getScheduledAction as ReturnType).mockResolvedValue(mockAction); + + const { executeInternalTool, INTERNAL_TOOL_SCHEDULED_ACTION_GET } = await import('./localAgentLoopService.js'); + const result = await executeInternalTool({ + userId: 'u-1', + toolName: INTERNAL_TOOL_SCHEDULED_ACTION_GET, + args: { id: 'sa-1' }, + }); + + expect(result.ok).toBe(true); + expect((result.data as { action: typeof mockAction }).action.id).toBe('sa-1'); + }); + + it('returns not_found when scheduled_action_get finds nothing', async () => { + const { getScheduledAction } = await import('./scheduledActionService.js'); + (getScheduledAction as ReturnType).mockResolvedValue(null); + + const { executeInternalTool, INTERNAL_TOOL_SCHEDULED_ACTION_GET } = await import('./localAgentLoopService.js'); + const result = await executeInternalTool({ + userId: 'u-1', + toolName: INTERNAL_TOOL_SCHEDULED_ACTION_GET, + args: { id: 'nonexistent' }, + }); + + expect(result.ok).toBe(false); + expect(result.error?.code).toBe('not_found'); + }); + + it('dispatches scheduled_action_delete and returns deleted status', async () => { + const { deleteScheduledAction } = await import('./scheduledActionService.js'); + (deleteScheduledAction as ReturnType).mockResolvedValue({ deleted: true }); + + const { executeInternalTool, INTERNAL_TOOL_SCHEDULED_ACTION_DELETE } = await import('./localAgentLoopService.js'); + const result = await executeInternalTool({ + userId: 'u-1', + toolName: INTERNAL_TOOL_SCHEDULED_ACTION_DELETE, + args: { id: 'sa-1' }, + }); + + expect(result.ok).toBe(true); + expect((result.data as { deleted: boolean }).deleted).toBe(true); + }); + + it('exposes scheduled_action tools in buildAgentTools', async () => { + const { buildAgentTools } = await import('./localAgentLoopService.js'); + const tools = buildAgentTools('u-1'); + expect(tools).toHaveProperty('scheduled_action_create'); + expect(tools).toHaveProperty('scheduled_action_list'); + expect(tools).toHaveProperty('scheduled_action_get'); + expect(tools).toHaveProperty('scheduled_action_update'); + expect(tools).toHaveProperty('scheduled_action_pause'); + expect(tools).toHaveProperty('scheduled_action_resume'); + expect(tools).toHaveProperty('scheduled_action_delete'); + }); }); From 2b9c96873a5dae914304c601516762b99a2f6737 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 09:20:41 +0000 Subject: [PATCH 3/4] fix: timing-safe cron auth; preserve next_run_at on interval update --- apps/node_backend/src/routes/cron.ts | 9 +++++++-- apps/node_backend/src/services/scheduledActionService.ts | 4 ---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/node_backend/src/routes/cron.ts b/apps/node_backend/src/routes/cron.ts index e6490e65..5ecda30d 100644 --- a/apps/node_backend/src/routes/cron.ts +++ b/apps/node_backend/src/routes/cron.ts @@ -14,7 +14,7 @@ */ import express, { Request, Response } from 'express'; -import { randomUUID } from 'crypto'; +import { randomUUID, timingSafeEqual } from 'crypto'; import { acceptTask, upsertMessages, @@ -52,7 +52,12 @@ function verifyCronAuth(req: Request, res: Response): boolean { return false; } const authHeader = req.headers['authorization']; - if (!authHeader || authHeader !== 'Bearer ' + cronSecret) { + const expectedHeader = 'Bearer ' + cronSecret; + if ( + !authHeader || + authHeader.length !== expectedHeader.length || + !timingSafeEqual(Buffer.from(authHeader), Buffer.from(expectedHeader)) + ) { res.status(401).json({ error: 'Unauthorized' }); return false; } diff --git a/apps/node_backend/src/services/scheduledActionService.ts b/apps/node_backend/src/services/scheduledActionService.ts index 4384e3b1..ef4d85d7 100644 --- a/apps/node_backend/src/services/scheduledActionService.ts +++ b/apps/node_backend/src/services/scheduledActionService.ts @@ -248,10 +248,6 @@ export async function updateScheduledAction( const clamped = clampIntervalSeconds(input.intervalSeconds); setClauses.push(`interval_seconds = $${idx++}`); values.push(clamped); - // Recompute next_run_at when interval changes - const newNext = new Date(Date.now() + clamped * 1000); - setClauses.push(`next_run_at = $${idx++}`); - values.push(newNext.toISOString()); } if (input.timezone !== undefined) { setClauses.push(`timezone = $${idx++}`); From 7e5e061d29ddc49a0c26ef526469b10b1d6c035c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 09:22:15 +0000 Subject: [PATCH 4/4] fix: remove length check before timingSafeEqual; log secondary error; clarify docstring --- apps/node_backend/src/routes/cron.ts | 22 +++++++++++++------ .../src/services/scheduledActionService.ts | 9 ++++---- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/apps/node_backend/src/routes/cron.ts b/apps/node_backend/src/routes/cron.ts index 5ecda30d..d789051a 100644 --- a/apps/node_backend/src/routes/cron.ts +++ b/apps/node_backend/src/routes/cron.ts @@ -53,11 +53,15 @@ function verifyCronAuth(req: Request, res: Response): boolean { } const authHeader = req.headers['authorization']; const expectedHeader = 'Bearer ' + cronSecret; - if ( - !authHeader || - authHeader.length !== expectedHeader.length || - !timingSafeEqual(Buffer.from(authHeader), Buffer.from(expectedHeader)) - ) { + let authorized = false; + try { + authorized = + !!authHeader && + timingSafeEqual(Buffer.from(authHeader, 'utf8'), Buffer.from(expectedHeader, 'utf8')); + } catch { + // Buffer lengths differ — not authorized. + } + if (!authorized) { res.status(401).json({ error: 'Unauthorized' }); return false; } @@ -274,8 +278,12 @@ router.get('/scheduled-actions', async (req: Request, res: Response) => { createdAt: null, }, ]); - } catch { - // Ignore secondary write error. + } catch (secondaryError) { + console.error('Cron: failed to write error message to chat history', { + actionId: action.id, + runId: run.id, + error: secondaryError instanceof Error ? secondaryError.message : String(secondaryError), + }); } } diff --git a/apps/node_backend/src/services/scheduledActionService.ts b/apps/node_backend/src/services/scheduledActionService.ts index ef4d85d7..b7ab85d2 100644 --- a/apps/node_backend/src/services/scheduledActionService.ts +++ b/apps/node_backend/src/services/scheduledActionService.ts @@ -157,10 +157,11 @@ export function clampIntervalSeconds(value: number): number { } /** - * Compute the next run time after `fromDate` by advancing `intervalSeconds`. - * If `fromDate` is already in the past, the result is anchored from now so - * the action fires at the next interval boundary rather than immediately - * catching up on every missed interval. + * Compute the next run time after `fromDate` by adding `intervalSeconds`. + * If `fromDate` is already in the past, the addition is anchored from now so + * the action fires at `now + intervalSeconds` rather than a past time. + * Note: this is a simple interval addition; it does not align to wall-clock + * boundaries (e.g. "daily at 9 am"). */ export function computeNextRunAt(fromDate: Date, intervalSeconds: number): Date { const now = Date.now();