Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/content/docs/workflows/build/trigger-workflows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ The possible values of status are as follows:
| "complete"
| "waiting" // instance is hibernating and waiting for sleep or event to finish
| "waitingForPause" // instance is finishing the current work to pause
| "rollingBack" // rollback in progress (manual or via terminate with rollback)
| "unknown";
error?: {
name: string,
Expand Down
97 changes: 95 additions & 2 deletions src/content/docs/workflows/build/workers-api.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,89 @@ export type WorkflowStepConfig = {

Refer to the [documentation on sleeping and retrying](/workflows/build/sleeping-and-retrying/) to learn more about how Workflows are retried.

## Rollback (Saga Pattern)

Workflows supports the [saga pattern](https://microservices.io/patterns/data/saga.html) for handling distributed transactions. When a workflow throws an uncaught error, you can automatically roll back previously completed steps by executing compensating actions (undo functions) in reverse order (LIFO - last-in, first-out).

To enable automatic rollback, pass a `rollback` configuration when creating the workflow instance. When an uncaught error occurs, all registered undo functions will execute automatically before the workflow enters the errored state.

### step.withRollback

{/* prettier-ignore */}
- <code>step.withRollback&lt;T&gt;(name: string, handler: RollbackHandler&lt;T&gt;, config?: RollbackStepConfig): Promise&lt;T&gt;</code>
- `name` - the name of the step.
- `handler` - an object containing `do` and `undo` functions.
- `config` (optional) - configuration for the step, optionally including separate `undoConfig` for the undo function.

### RollbackHandler

```ts
export type RollbackHandler<T> = {
do: () => Promise<T>;
undo: (err: unknown, value: T) => Promise<void>;
};
```

### RollbackStepConfig

```ts
export type RollbackStepConfig = WorkflowStepConfig & {
undoConfig?: WorkflowStepConfig;
};
```

### RollbackConfig

```ts
export type RollbackConfig = {
/** If true, continue executing remaining undos after a failure and throw AggregateError at end. Default: false */
continueOnError?: boolean;
};
```

Pass this configuration to `workflow.create()` to enable automatic rollback on uncaught errors:

```ts
let instance = await env.MY_WORKFLOW.create({
params: { userId: "123", items: ["item1", "item2"] },
rollback: { continueOnError: true }, // Enable auto-rollback
});
```

### Example

<TypeScriptExample>

```ts
export class OrderWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const order = await step.withRollback("create order", {
do: async () => env.DB.orders.insert({ userId: event.payload.userId, items: event.payload.items }),
undo: async (err, order) => env.DB.orders.delete(order.id),
});

await step.withRollback("charge payment", {
do: async () => env.STRIPE.charges.create({ amount: order.total, customer: event.payload.userId }),
undo: async (err, charge) => env.STRIPE.refunds.create({ charge: charge.id }),
});

// If this step throws, the undo functions above will run automatically
// (in reverse order) if the instance was created with rollback config enabled
await step.do("send confirmation", async () => {
await env.EMAIL.send({ to: event.payload.email, template: "order-confirmed", orderId: order.id });
});
}
}

// Creating the workflow instance with rollback enabled:
// const instance = await env.MY_WORKFLOW.create({
// params: orderParams,
// rollback: { continueOnError: false }, // Stop on first undo failure
// });
```

</TypeScriptExample>

## NonRetryableError

{/* prettier-ignore */}
Expand Down Expand Up @@ -370,9 +453,17 @@ interface WorkflowInstanceCreateOptions {
* The event payload the Workflow instance is triggered with
*/
params?: unknown;
/**
* Enable automatic rollback on uncaught errors.
* When enabled, all registered undo functions will execute in LIFO order
* if the workflow throws an uncaught error.
*/
rollback?: RollbackConfig;
}
```

Refer to the [Rollback (Saga Pattern)](#rollback-saga-pattern) section for details on `RollbackConfig`.

## WorkflowInstance

Represents a specific instance of a Workflow, and provides methods to manage the instance.
Expand Down Expand Up @@ -435,9 +526,10 @@ Restart a Workflow instance.

### terminate

Terminate a Workflow instance.
Terminate a Workflow instance. Optionally run rollback handlers before terminating.

- <code>terminate(): Promise&lt;void&gt;</code>
- <code>terminate(options?: TerminateOptions): Promise&lt;void&gt;</code>
- `options.rollback` (optional) - if `true`, execute all registered undo handlers before terminating. Only works for `running`, `waiting`, or `paused` instances.

### sendEvent

Expand Down Expand Up @@ -492,6 +584,7 @@ type InstanceStatus = {
| "complete"
| "waiting" // instance is hibernating and waiting for sleep or event to finish
| "waitingForPause" // instance is finishing the current work to pause
| "rollingBack" // rollback in progress (manual or via terminate with rollback)
| "unknown";
error?: {
name: string,
Expand Down
70 changes: 70 additions & 0 deletions src/content/docs/workflows/python/python-workers-api.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,76 @@ async def run(self, event, step):
await step.wait_for_event("my-wait-for-event-step", "my-event-type")
```

## Rollback (Saga Pattern)

The Python SDK supports the [saga pattern](https://microservices.io/patterns/data/saga.html) for distributed transactions using the `with_rollback` decorator. When a workflow throws an uncaught error, you can automatically roll back previously completed steps by executing compensating actions (undo functions) in reverse order (LIFO - last-in, first-out).

To enable automatic rollback, pass a `rollback` configuration when creating the workflow instance. When an uncaught error occurs, all registered undo functions will execute automatically before the workflow enters the errored state.

### step.with_rollback

* <code>step.with_rollback(name, config=None)</code> - decorator that allows you to define a step with a rollback handler.
* `name` - the name of the step.
* `config` - an optional `WorkflowStepConfig` dictionary.

### @do_fn.undo

* <code>@do_fn.undo(config=None)</code> - decorator to register an undo function for a `with_rollback` step.
* `config` - optional separate config for the undo function. If not provided, inherits from the do step's config.

### Enabling Rollback

Pass a `rollback` configuration when creating the workflow instance:

```python
instance = await self.env.MY_WORKFLOW.create(
params={"user_id": "123", "items": ["item1", "item2"]},
rollback={"continueOnError": True} # Enable auto-rollback
)
```

* `continueOnError` - if `True`, continue executing remaining undos after a failure and raise `ExceptionGroup` at end. Default: `False`.

### Example

```python
class OrderWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.with_rollback("create order")
async def create_order():
return await env.DB.orders.insert({"user_id": event["payload"]["user_id"], "items": event["payload"]["items"]})

@create_order.undo
async def undo_create_order(err, order):
await env.DB.orders.delete(order["id"])

order = await create_order()

@step.with_rollback("charge payment")
async def charge_payment():
return await env.STRIPE.charges.create(amount=order["total"], customer=event["payload"]["user_id"])

@charge_payment.undo
async def undo_charge_payment(err, charge):
await env.STRIPE.refunds.create(charge=charge["id"])

await charge_payment()

# If this step throws, the undo functions above will run automatically
# (in reverse order) if the instance was created with rollback config enabled
@step.do("send confirmation")
async def send_confirmation():
await env.EMAIL.send(to=event["payload"]["email"], template="order-confirmed", order_id=order["id"])

await send_confirmation()


# Creating the workflow instance with rollback enabled:
# instance = await env.MY_WORKFLOW.create(
# params=order_params,
# rollback={"continueOnError": False} # Stop on first undo failure
# )
```

### `event` parameter

Expand Down
Loading