Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/node_backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import llmRoutes from './routes/llm.js';
import chatRoutes from './routes/chat.js';
import platformRoutes from './routes/platform.js';
import resourcesRoutes from './routes/resources.js';
import cronRoutes from './routes/cron.js';
import { runMigrations } from './db/migrate.js';

// Load environment variables (no-op in Vercel production where env vars are injected directly)
Expand Down Expand Up @@ -95,6 +96,8 @@ app.use('/api/llm', llmRoutes);
app.use('/api/chat', chatRoutes);
app.use('/api/v1/platform', platformRoutes);
app.use('/api/resources', resourcesRoutes);
// Cron routes do NOT use the JWT authenticate middleware — they use CRON_SECRET.
app.use('/api/cron', cronRoutes);

// 404 handler
app.use((req: Request, res: Response) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Migration: Create scheduled_actions table
-- Description: Stores user-defined scheduled actions for automated agent execution.
-- Users create and manage scheduled actions through conversation;
-- a cron tick queries due actions and starts normal Agent execution.

CREATE TABLE scheduled_actions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
channel_id VARCHAR(255) NOT NULL,
thread_id VARCHAR(255),
title TEXT NOT NULL,
prompt TEXT NOT NULL,
schedule_expr TEXT NOT NULL,
interval_seconds INT NOT NULL,
timezone TEXT NOT NULL DEFAULT 'UTC',
next_run_at TIMESTAMP NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'active',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_scheduled_actions_user_id ON scheduled_actions(user_id);
CREATE INDEX idx_scheduled_actions_due ON scheduled_actions(next_run_at, status);

CREATE TRIGGER update_scheduled_actions_updated_at
BEFORE UPDATE ON scheduled_actions
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Migration: Create scheduled_action_runs table
-- Description: Tracks each execution attempt of a scheduled action.
-- The UNIQUE constraint on (scheduled_action_id, scheduled_fire_at) prevents
-- duplicate claims when multiple cron requests overlap.

CREATE TABLE scheduled_action_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
scheduled_action_id UUID NOT NULL REFERENCES scheduled_actions(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
channel_id VARCHAR(255) NOT NULL,
thread_id VARCHAR(255),
scheduled_fire_at TIMESTAMP NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'queued',
started_at TIMESTAMP,
completed_at TIMESTAMP,
error_text TEXT,
chat_task_id VARCHAR(255),
chat_message_id VARCHAR(255),
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE(scheduled_action_id, scheduled_fire_at)
);

CREATE INDEX idx_scheduled_action_runs_action_id ON scheduled_action_runs(scheduled_action_id);
CREATE INDEX idx_scheduled_action_runs_user_id ON scheduled_action_runs(user_id);

CREATE TRIGGER update_scheduled_action_runs_updated_at
BEFORE UPDATE ON scheduled_action_runs
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
298 changes: 298 additions & 0 deletions apps/node_backend/src/routes/cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
/**
* Cron endpoint: process due scheduled actions.
*
* Called by Vercel Cron (or any HTTP client) once per minute.
* Protected by a shared `CRON_SECRET` bearer token — does NOT use user JWT.
*
* For each due action the handler:
* 1. Claims a run record (idempotent via UNIQUE constraint).
* 2. Advances `next_run_at` so the action is not re-claimed in the same tick.
* 3. Inserts a user message with `source: scheduled_action`.
* 4. Runs the agent loop synchronously (no streaming flush).
* 5. Writes the final assistant message.
* 6. Marks the run completed or failed.
*/

import express, { Request, Response } from 'express';
import { randomUUID, timingSafeEqual } from 'crypto';
import {
acceptTask,
upsertMessages,
listSessionMessagesForModel,
type AcceptTaskInput,
} from '../services/chatAsyncTransportService.js';
import {
buildChatSessionId,
normalizeChatThreadId,
resolveScopeInstructions,
} from '../services/chatRouterService.js';
import { streamWithAgentToolsAndUserConfig } from '../llm/llm_service.js';
import { buildAgentTools } from '../services/localAgentLoopService.js';
import {
queryDueActions,
claimScheduledActionRun,
advanceNextRunAt,
markRunStarted,
markRunCompleted,
markRunFailed,
} from '../services/scheduledActionService.js';

const router = express.Router();

const MAX_ASSISTANT_CHARS = 120 * 1024;
const DEFAULT_MAX_STEPS = 10;
const DEFAULT_MAX_TOOL_CALLS = 50;
const DEFAULT_TIMEOUT_MS = 60_000;

function verifyCronAuth(req: Request, res: Response): boolean {
const cronSecret = process.env.CRON_SECRET;
if (!cronSecret) {
// If no secret is configured, block all requests to avoid open execution.
res.status(403).json({ error: 'CRON_SECRET is not configured' });
return false;
}
const authHeader = req.headers['authorization'];
const expectedHeader = 'Bearer ' + cronSecret;
let authorized = false;
try {
authorized =
!!authHeader &&
timingSafeEqual(Buffer.from(authHeader, 'utf8'), Buffer.from(expectedHeader, 'utf8'));
} catch {
// Buffer lengths differ — not authorized.
}
if (!authorized) {
res.status(401).json({ error: 'Unauthorized' });
return false;
}
return true;
}

/**
* GET /api/cron/scheduled-actions
*
* Processes up to 5 due scheduled actions per invocation.
* Returns a JSON summary of each action processed.
*/
router.get('/scheduled-actions', async (req: Request, res: Response) => {
if (!verifyCronAuth(req, res)) return;

const fireTime = new Date();
const dueActions = await queryDueActions();

const results: Array<{
actionId: string;
runId: string | null;
status: 'claimed' | 'skipped' | 'completed' | 'failed';
error?: string;
}> = [];

for (const action of dueActions) {
// Attempt to claim a run for this fire time.
const run = await claimScheduledActionRun(action, fireTime);
Comment on lines +90 to +92
if (!run) {
// Another cron invocation already claimed this slot — skip.
results.push({ actionId: action.id, runId: null, status: 'skipped' });
continue;
}

// Advance next_run_at immediately so the next cron tick won't re-claim.
await advanceNextRunAt(action.id, new Date(action.nextRunAt), action.intervalSeconds);
await markRunStarted(run.id);
Comment on lines +99 to +101

let chatTaskId: string | null = null;
let chatMessageId: string | null = null;

try {
const normalizedThreadId = normalizeChatThreadId(action.threadId);
const sessionId = buildChatSessionId(action.channelId, normalizedThreadId);
const taskId = randomUUID();
const userMessageId = randomUUID();
const assistantMessageId = randomUUID();

const acceptInput: AcceptTaskInput = {
taskId,
idempotencyKey: run.id,
channelId: action.channelId,
sessionId,
threadId: normalizedThreadId,
resolvedBotId: null,
resolvedSkillId: null,
};
const acceptedTask = await acceptTask(action.userId, acceptInput);
chatTaskId = acceptedTask.taskId;
chatMessageId = assistantMessageId;

// Insert user message.
await upsertMessages(action.userId, [
{
messageId: userMessageId,
taskId: acceptedTask.taskId,
channelId: action.channelId,
sessionId: acceptedTask.sessionId,
threadId: normalizedThreadId,
role: 'user',
content: action.prompt,
taskState: 'accepted',
checkpointCursor: null,
metadata: {
source: 'scheduled_action',
scheduledActionId: action.id,
scheduledRunId: run.id,
},
createdAt: null,
},
]);

// Insert dispatch placeholder for the assistant message.
await upsertMessages(action.userId, [
{
messageId: assistantMessageId,
taskId: acceptedTask.taskId,
channelId: action.channelId,
sessionId: acceptedTask.sessionId,
threadId: normalizedThreadId,
role: 'assistant',
content: '',
taskState: 'dispatched',
checkpointCursor: null,
metadata: {
source: 'backend.cron.scheduled_action',
dispatchPlaceholder: true,
scheduledActionId: action.id,
scheduledRunId: run.id,
},
createdAt: null,
},
]);

// Resolve scope instructions (channel + thread).
const scopeInstructions = await resolveScopeInstructions(action.userId, {
channelId: action.channelId,
threadId: normalizedThreadId,
});

// Build system prompt with session context.
const ctxLines = [`Channel ID: ${action.channelId}`];
if (normalizedThreadId && normalizedThreadId !== 'main') {
ctxLines.push(`Thread ID: ${normalizedThreadId}`);
}
const systemParts: string[] = [`Session context:\n${ctxLines.map((l) => `- ${l}`).join('\n')}`];
if (scopeInstructions.channelInstructions?.trim()) {
systemParts.push(`Channel context:\n${scopeInstructions.channelInstructions.trim()}`);
}
if (scopeInstructions.threadInstructions?.trim()) {
systemParts.push(`Section context:\n${scopeInstructions.threadInstructions.trim()}`);
}
const composedSystemPrompt = systemParts.join('\n\n');

// Load session history.
const modelMessages = await listSessionMessagesForModel(action.userId, acceptedTask.sessionId, {
limit: 40,
maxChars: 10000,
});
const messagesWithSystem = [
{ role: 'system' as const, content: composedSystemPrompt },
...modelMessages,
];

// Run the agent loop.
const agentTools = buildAgentTools(action.userId);
const streamResult = await streamWithAgentToolsAndUserConfig(
action.userId,
{
messages: messagesWithSystem,
},
agentTools,
{
maxSteps: DEFAULT_MAX_STEPS,
maxToolCalls: DEFAULT_MAX_TOOL_CALLS,
timeoutMs: DEFAULT_TIMEOUT_MS,
},
);

// Collect full text (no intermediate flushes needed in cron context).
let assistantContent = '';
for await (const chunk of streamResult.textStream) {
if (typeof chunk === 'string') {
assistantContent += chunk;
if (assistantContent.length >= MAX_ASSISTANT_CHARS) break;
}
}

// Write final assistant message.
const finalContent = assistantContent.trim() || '(no response)';
await upsertMessages(action.userId, [
{
messageId: assistantMessageId,
taskId: acceptedTask.taskId,
channelId: action.channelId,
sessionId: acceptedTask.sessionId,
threadId: normalizedThreadId,
role: 'assistant',
content: finalContent,
taskState: 'completed',
checkpointCursor: null,
metadata: {
source: 'backend.cron.scheduled_action',
scheduledActionId: action.id,
scheduledRunId: run.id,
},
createdAt: null,
},
]);

await markRunCompleted(run.id, chatTaskId, chatMessageId);
results.push({ actionId: action.id, runId: run.id, status: 'completed' });
} catch (error) {
const errorText = error instanceof Error ? error.message : String(error);
console.error('Cron scheduled action execution error:', {
actionId: action.id,
runId: run.id,
error: errorText,
});

// Write a failed assistant message so the run is visible in the chat history.
if (chatTaskId && chatMessageId) {
try {
await upsertMessages(action.userId, [
{
messageId: chatMessageId,
taskId: chatTaskId,
channelId: action.channelId,
sessionId: buildChatSessionId(
action.channelId,
normalizeChatThreadId(action.threadId),
),
threadId: normalizeChatThreadId(action.threadId),
role: 'assistant',
content: `Error: ${errorText}`,
taskState: 'failed',
checkpointCursor: null,
metadata: {
source: 'backend.cron.scheduled_action',
scheduledActionId: action.id,
scheduledRunId: run.id,
error: errorText,
},
createdAt: null,
},
]);
} catch (secondaryError) {
console.error('Cron: failed to write error message to chat history', {
actionId: action.id,
runId: run.id,
error: secondaryError instanceof Error ? secondaryError.message : String(secondaryError),
});
}
}

await markRunFailed(run.id, errorText);
results.push({ actionId: action.id, runId: run.id, status: 'failed', error: errorText });
}
}

res.json({ processed: results.length, results });
});

export default router;
Loading