diff --git a/src/content/docs/workflows/build/trigger-workflows.mdx b/src/content/docs/workflows/build/trigger-workflows.mdx index 385155b9543e3c..4751066bfbc051 100644 --- a/src/content/docs/workflows/build/trigger-workflows.mdx +++ b/src/content/docs/workflows/build/trigger-workflows.mdx @@ -114,6 +114,7 @@ The possible values of status are as follows: | "complete" | "waiting" // instance is hibernating and waiting for sleep or event to finish | "waitingForPause" // instance is finishing the current work to pause + | "rollingBack" // rollback in progress (manual or via terminate with rollback) | "unknown"; error?: { name: string, diff --git a/src/content/docs/workflows/build/workers-api.mdx b/src/content/docs/workflows/build/workers-api.mdx index 12e94f789ef45f..fcb42f1136e07b 100644 --- a/src/content/docs/workflows/build/workers-api.mdx +++ b/src/content/docs/workflows/build/workers-api.mdx @@ -152,6 +152,89 @@ export type WorkflowStepConfig = { Refer to the [documentation on sleeping and retrying](/workflows/build/sleeping-and-retrying/) to learn more about how Workflows are retried. +## Rollback (Saga Pattern) + +Workflows supports the [saga pattern](https://microservices.io/patterns/data/saga.html) for handling distributed transactions. When a workflow throws an uncaught error, you can automatically roll back previously completed steps by executing compensating actions (undo functions) in reverse order (LIFO - last-in, first-out). + +To enable automatic rollback, pass a `rollback` configuration when creating the workflow instance. When an uncaught error occurs, all registered undo functions will execute automatically before the workflow enters the errored state. + +### step.withRollback + +{/* prettier-ignore */} +- step.withRollback<T>(name: string, handler: RollbackHandler<T>, config?: RollbackStepConfig): Promise<T> + - `name` - the name of the step. + - `handler` - an object containing `do` and `undo` functions. + - `config` (optional) - configuration for the step, optionally including separate `undoConfig` for the undo function. + +### RollbackHandler + +```ts +export type RollbackHandler = { + do: () => Promise; + undo: (err: unknown, value: T) => Promise; +}; +``` + +### RollbackStepConfig + +```ts +export type RollbackStepConfig = WorkflowStepConfig & { + undoConfig?: WorkflowStepConfig; +}; +``` + +### RollbackConfig + +```ts +export type RollbackConfig = { + /** If true, continue executing remaining undos after a failure and throw AggregateError at end. Default: false */ + continueOnError?: boolean; +}; +``` + +Pass this configuration to `workflow.create()` to enable automatic rollback on uncaught errors: + +```ts +let instance = await env.MY_WORKFLOW.create({ + params: { userId: "123", items: ["item1", "item2"] }, + rollback: { continueOnError: true }, // Enable auto-rollback +}); +``` + +### Example + + + +```ts +export class OrderWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const order = await step.withRollback("create order", { + do: async () => env.DB.orders.insert({ userId: event.payload.userId, items: event.payload.items }), + undo: async (err, order) => env.DB.orders.delete(order.id), + }); + + await step.withRollback("charge payment", { + do: async () => env.STRIPE.charges.create({ amount: order.total, customer: event.payload.userId }), + undo: async (err, charge) => env.STRIPE.refunds.create({ charge: charge.id }), + }); + + // If this step throws, the undo functions above will run automatically + // (in reverse order) if the instance was created with rollback config enabled + await step.do("send confirmation", async () => { + await env.EMAIL.send({ to: event.payload.email, template: "order-confirmed", orderId: order.id }); + }); + } +} + +// Creating the workflow instance with rollback enabled: +// const instance = await env.MY_WORKFLOW.create({ +// params: orderParams, +// rollback: { continueOnError: false }, // Stop on first undo failure +// }); +``` + + + ## NonRetryableError {/* prettier-ignore */} @@ -370,9 +453,17 @@ interface WorkflowInstanceCreateOptions { * The event payload the Workflow instance is triggered with */ params?: unknown; + /** + * Enable automatic rollback on uncaught errors. + * When enabled, all registered undo functions will execute in LIFO order + * if the workflow throws an uncaught error. + */ + rollback?: RollbackConfig; } ``` +Refer to the [Rollback (Saga Pattern)](#rollback-saga-pattern) section for details on `RollbackConfig`. + ## WorkflowInstance Represents a specific instance of a Workflow, and provides methods to manage the instance. @@ -435,9 +526,10 @@ Restart a Workflow instance. ### terminate -Terminate a Workflow instance. +Terminate a Workflow instance. Optionally run rollback handlers before terminating. -- terminate(): Promise<void> +- terminate(options?: TerminateOptions): Promise<void> + - `options.rollback` (optional) - if `true`, execute all registered undo handlers before terminating. Only works for `running`, `waiting`, or `paused` instances. ### sendEvent @@ -492,6 +584,7 @@ type InstanceStatus = { | "complete" | "waiting" // instance is hibernating and waiting for sleep or event to finish | "waitingForPause" // instance is finishing the current work to pause + | "rollingBack" // rollback in progress (manual or via terminate with rollback) | "unknown"; error?: { name: string, diff --git a/src/content/docs/workflows/python/python-workers-api.mdx b/src/content/docs/workflows/python/python-workers-api.mdx index b11f47ffc16eb5..07f84e88933eed 100644 --- a/src/content/docs/workflows/python/python-workers-api.mdx +++ b/src/content/docs/workflows/python/python-workers-api.mdx @@ -77,6 +77,125 @@ async def run(self, event, step): await step.wait_for_event("my-wait-for-event-step", "my-event-type") ``` +## Rollback (Saga Pattern) + +The Python SDK supports the [saga pattern](https://microservices.io/patterns/data/saga.html) for distributed transactions using the `with_rollback` decorator. When a workflow throws an uncaught error, you can automatically roll back previously completed steps by executing compensating actions (undo functions) in reverse order (LIFO - last-in, first-out). + +To enable automatic rollback, pass a `rollback` configuration when creating the workflow instance. When an uncaught error occurs, all registered undo functions will execute automatically before the workflow enters the errored state. + +### step.with_rollback + +* step.with_rollback(name, *, undo=None, depends=None, concurrent=False, config=None, undo_config=None) - decorator that allows you to define a step with a rollback handler. + * `name` - the name of the step (up to 256 chars). + * `undo` - undo handler function, or use `@decorated_fn.undo` decorator. + * `depends` - optional list of steps this depends on (DAG pattern). See [DAG Workflows](/workflows/python/dag). + * `concurrent` - run dependencies in parallel (default: `False`). + * `config` - optional `WorkflowStepConfig` dictionary for the do function. + * `undo_config` - optional `WorkflowStepConfig` dictionary for the undo function. + +:::note +An undo handler is **required** for `with_rollback` steps. If no undo handler is provided via the `undo=` parameter or `@fn.undo` decorator, a `ValueError` is raised at call time. Use `step.do()` for steps that don't need rollback. +::: + +### @do_fn.undo + +* @do_fn.undo(config=None) - decorator to register an undo function for a `with_rollback` step. + * `config` - optional separate config for the undo function. + +### Enabling Rollback + +Pass a `rollback` configuration when creating the workflow instance: + +```python +instance = await self.env.MY_WORKFLOW.create( + params={"user_id": "123", "items": ["item1", "item2"]}, + rollback={"continue_on_error": True} # Enable auto-rollback +) +``` + +* `continue_on_error` - if `True`, continue executing remaining undos after a failure and raise `ExceptionGroup` at end. Default: `False`. + +### Example + +Two patterns are supported for attaching undo handlers: + +```python +from workers import WorkflowEntrypoint + +class OrderWorkflow(WorkflowEntrypoint): + async def run(self, event, step): + payload = event["payload"] + + # Pattern A: Chained decorator (preferred - keeps do/undo together) + @step.with_rollback("create order") + async def create_order(): + return await self.env.DB.prepare( + "INSERT INTO orders (user_id, items) VALUES (?, ?) RETURNING *" + ).bind(payload["user_id"], payload["items"]).first() + + @create_order.undo + async def _(err, order): + await self.env.DB.prepare("DELETE FROM orders WHERE id = ?").bind(order["id"]).run() + + order = await create_order() + + # Pattern B: undo= parameter (for reusable undo handlers) + async def refund_charge(err, charge): + await stripe_refund(charge["id"]) + + @step.with_rollback( + "charge payment", + undo=refund_charge, + config={"retries": {"limit": 3, "delay": "1 second", "backoff": "exponential"}}, + undo_config={"retries": {"limit": 5, "delay": "2 seconds"}} + ) + async def charge_payment(): + return await stripe_charge(order["total"], payload["user_id"]) + + charge = await charge_payment() + + # Steps can depend on other rollback steps (DAG pattern) + @step.with_rollback("reserve inventory", depends=[create_order]) + async def reserve_inventory(order_result): + return await inventory_reserve(order_result["items"]) + + @reserve_inventory.undo + async def _(err, reservation): + await inventory_release(reservation["id"]) + + await reserve_inventory() + + # Non-rollbackable step (email can't be unsent) - use step.do() + @step.do("send confirmation") + async def send_confirmation(): + await send_email(payload["user_id"], "order-confirmed", order["id"]) + + await send_confirmation() + + return {"order_id": order["id"], "charge_id": charge["id"]} +``` + +Creating the workflow instance with rollback enabled: + +```python +class Default(WorkerEntrypoint): + async def fetch(self, request): + # Auto-rollback enabled (stop on first undo failure) + instance = await self.env.MY_WORKFLOW.create( + id="order-456", + params={"user_id": "u2", "items": ["item1"]}, + rollback={} + ) + + # Auto-rollback with continue-on-error + instance = await self.env.MY_WORKFLOW.create( + id="order-789", + params={"user_id": "u3", "items": ["item2"]}, + rollback={"continue_on_error": True} + ) + + return Response.json({"id": instance.id}) +``` ### `event` parameter