|
| 1 | +--- |
| 2 | +title: Input Streams |
| 3 | +sidebarTitle: Input Streams |
| 4 | +description: Send data into running tasks from your backend code |
| 5 | +--- |
| 6 | + |
| 7 | +The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them. |
| 8 | + |
| 9 | +<Note> |
| 10 | + To learn how to receive input stream data inside your tasks, see [Input |
| 11 | + Streams](/tasks/streams#input-streams) in the Streams doc. |
| 12 | +</Note> |
| 13 | + |
| 14 | +## Sending data to a running task |
| 15 | + |
| 16 | +### Using defined input streams (Recommended) |
| 17 | + |
| 18 | +The recommended approach is to use [defined input streams](/tasks/streams#defining-input-streams) for full type safety: |
| 19 | + |
| 20 | +```ts |
| 21 | +import { cancelSignal, approval } from "./trigger/streams"; |
| 22 | + |
| 23 | +// Cancel a running AI stream |
| 24 | +await cancelSignal.send(runId, { reason: "User clicked stop" }); |
| 25 | + |
| 26 | +// Approve a draft |
| 27 | +await approval.send(runId, { approved: true, reviewer: "alice@example.com" }); |
| 28 | +``` |
| 29 | + |
| 30 | +The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream. |
| 31 | + |
| 32 | +<Note> |
| 33 | + `.send()` works the same regardless of how the task is listening — whether it uses `.wait()` |
| 34 | + (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know |
| 35 | + how the task is consuming the data. See [Input Streams](/tasks/streams#input-streams) for details on each |
| 36 | + receiving method. |
| 37 | +</Note> |
| 38 | + |
| 39 | +## Practical examples |
| 40 | + |
| 41 | +### Cancel from a Next.js API route |
| 42 | + |
| 43 | +```ts app/api/cancel/route.ts |
| 44 | +import { cancelStream } from "@/trigger/streams"; |
| 45 | + |
| 46 | +export async function POST(req: Request) { |
| 47 | + const { runId } = await req.json(); |
| 48 | + |
| 49 | + await cancelStream.send(runId, { reason: "User clicked stop" }); |
| 50 | + |
| 51 | + return Response.json({ cancelled: true }); |
| 52 | +} |
| 53 | +``` |
| 54 | + |
| 55 | +### Approval workflow API |
| 56 | + |
| 57 | +```ts app/api/approve/route.ts |
| 58 | +import { approval } from "@/trigger/streams"; |
| 59 | + |
| 60 | +export async function POST(req: Request) { |
| 61 | + const { runId, approved, reviewer } = await req.json(); |
| 62 | + |
| 63 | + await approval.send(runId, { |
| 64 | + approved, |
| 65 | + reviewer, |
| 66 | + }); |
| 67 | + |
| 68 | + return Response.json({ success: true }); |
| 69 | +} |
| 70 | +``` |
| 71 | + |
| 72 | +### Remix action handler |
| 73 | + |
| 74 | +```ts app/routes/api.approve.ts |
| 75 | +import { json, type ActionFunctionArgs } from "@remix-run/node"; |
| 76 | +import { approval } from "~/trigger/streams"; |
| 77 | + |
| 78 | +export async function action({ request }: ActionFunctionArgs) { |
| 79 | + const formData = await request.formData(); |
| 80 | + const runId = formData.get("runId") as string; |
| 81 | + const approved = formData.get("approved") === "true"; |
| 82 | + const reviewer = formData.get("reviewer") as string; |
| 83 | + |
| 84 | + await approval.send(runId, { approved, reviewer }); |
| 85 | + |
| 86 | + return json({ success: true }); |
| 87 | +} |
| 88 | +``` |
| 89 | + |
| 90 | +### Express handler |
| 91 | + |
| 92 | +```ts |
| 93 | +import express from "express"; |
| 94 | +import { cancelSignal } from "./trigger/streams"; |
| 95 | + |
| 96 | +const app = express(); |
| 97 | +app.use(express.json()); |
| 98 | + |
| 99 | +app.post("/api/cancel", async (req, res) => { |
| 100 | + const { runId, reason } = req.body; |
| 101 | + |
| 102 | + await cancelSignal.send(runId, { reason }); |
| 103 | + |
| 104 | + res.json({ cancelled: true }); |
| 105 | +}); |
| 106 | +``` |
| 107 | + |
| 108 | +### Sending from another task |
| 109 | + |
| 110 | +You can send input stream data from one task to another running task: |
| 111 | + |
| 112 | +```ts |
| 113 | +import { task } from "@trigger.dev/sdk"; |
| 114 | +import { approval } from "./streams"; |
| 115 | + |
| 116 | +export const reviewerTask = task({ |
| 117 | + id: "auto-reviewer", |
| 118 | + run: async (payload: { targetRunId: string }) => { |
| 119 | + // Perform automated review logic... |
| 120 | + const isApproved = await performReview(); |
| 121 | + |
| 122 | + // Send approval to the waiting task |
| 123 | + await approval.send(payload.targetRunId, { |
| 124 | + approved: isApproved, |
| 125 | + reviewer: "auto-reviewer", |
| 126 | + }); |
| 127 | + }, |
| 128 | +}); |
| 129 | +``` |
| 130 | + |
| 131 | +## Error handling |
| 132 | + |
| 133 | +The `.send()` method will throw if: |
| 134 | + |
| 135 | +- The run has already completed, failed, or been canceled |
| 136 | +- The payload exceeds the 1MB size limit |
| 137 | +- The run ID is invalid |
| 138 | + |
| 139 | +```ts |
| 140 | +import { cancelSignal } from "./trigger/streams"; |
| 141 | + |
| 142 | +try { |
| 143 | + await cancelSignal.send(runId, { reason: "User clicked stop" }); |
| 144 | +} catch (error) { |
| 145 | + console.error("Failed to send:", error); |
| 146 | + // Handle the error — the run may have already completed |
| 147 | +} |
| 148 | +``` |
| 149 | + |
| 150 | +## Important notes |
| 151 | + |
| 152 | +- Maximum payload size per `.send()` call is **1MB** |
| 153 | +- You cannot send data to a completed, failed, or canceled run |
| 154 | +- Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches |
| 155 | +- Input streams require the current streams implementation (v2 is the default in SDK 4.1.0+). See [Streams](/tasks/streams) for details. |
0 commit comments