Skip to content
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ logs/
node_modules/
data/
temp/
WareHouse/
WareHouse/

# Git worktrees
.worktrees/
1 change: 1 addition & 0 deletions docs/user_guide/en/execution_logic.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Execute according to the topological order:
After completing each round of in-cycle execution, the system checks these exit conditions:
- **Exit edge triggered**: If any in-cycle node triggers an edge to an out-of-cycle node, exit the loop
- **Maximum iterations reached**: If the configured maximum (default 100) is reached, force termination
- **Time limit reached**: If a `loop_timer` node within the cycle reaches its configured time limit, exit the loop
- **Initial node not re-triggered**: If the initial node isn't re-triggered by in-cycle predecessors, the loop naturally terminates

If none of the conditions are met, return to Step 2 for the next iteration.
Expand Down
159 changes: 159 additions & 0 deletions docs/user_guide/en/nodes/loop_timer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Loop Timer Node

The Loop Timer node is a loop control node used to limit the duration of a loop in a workflow. Through a time-tracking mechanism, it suppresses output before reaching the preset time limit, and only releases the message to trigger outgoing edges when the time limit is reached, thereby terminating the loop.

## Configuration

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `max_duration` | float | Yes | `60.0` | Maximum loop duration, must be > 0 |
| `duration_unit` | string | Yes | `"seconds"` | Time unit: "seconds", "minutes", or "hours" |
| `reset_on_emit` | bool | No | `true` | Whether to reset the timer after reaching the limit |
| `message` | text | No | - | Message content to send to downstream when time limit is reached |

## Core Concepts

### How It Works

The Loop Timer node maintains an internal timer with the following behavior:

1. **On first trigger**: Timer starts tracking elapsed time
2. **Elapsed time < `max_duration`**: **No output is produced**, outgoing edges are not triggered
3. **Elapsed time >= `max_duration`**: Output message is produced, triggering outgoing edges

This "suppress-release" mechanism allows the Loop Timer to precisely control when a loop terminates based on time rather than iteration count.

### Topological Structure Requirements

The Loop Timer node has special placement requirements in the graph structure:

```
┌──────────────────────────────────────┐
▼ │
Agent ──► Human ─────► Loop Timer ──┬──┘
▲ │ │
└─────────┘ ▼
End Node (outside loop)
```

> **Important**: Since Loop Timer **produces no output until the time limit is reached**:
> - **Human must connect to both Agent and Loop Timer**: This way the "continue loop" edge is handled by Human → Agent, while Loop Timer only handles time tracking
> - **Loop Timer must connect to Agent (inside loop)**: So it's recognized as an in-loop node, avoiding premature loop termination
> - **Loop Timer must connect to End Node (outside loop)**: When the time limit is reached, trigger the out-of-loop node to terminate the entire loop execution

### Timer State

- Timer state persists throughout the entire workflow execution
- Timer starts on the first trigger to the Loop Timer node
- When `reset_on_emit: true`, the timer resets after reaching the limit
- When `reset_on_emit: false`, the timer continues running after reaching the limit, outputting on every subsequent trigger

## When to Use

- **Time-based constraints**: Enforce time limits for loops (e.g., "review must complete within 5 minutes")
- **Timeout protection**: Serve as a "circuit breaker" to prevent runaway processes
- **Variable iteration time**: When each loop iteration takes unpredictable time, but total duration must be bounded

## Examples

### Basic Usage

```yaml
nodes:
- id: Time Guard
type: loop_timer
config:
max_duration: 5
duration_unit: minutes
reset_on_emit: true
message: Time limit reached (5 minutes), process terminated.
```

### Time-Limited Review Loop

This is the most typical use case for Loop Timer:

```yaml
graph:
id: timed_review_loop
description: Review loop with 5-minute time limit

nodes:
- id: Writer
type: agent
config:
provider: openai
name: gpt-4o
role: Improve articles based on user feedback

- id: Reviewer
type: human
config:
description: |
Review the article, enter ACCEPT to accept or provide modification suggestions.

- id: Loop Gate
type: loop_timer
config:
max_duration: 5
duration_unit: minutes
message: Time limit (5 minutes) reached, process automatically ended.

- id: Final Output
type: passthrough
config: {}

edges:
# Main loop: Writer -> Reviewer
- from: Writer
to: Reviewer

# Condition 1: User enters ACCEPT -> End
- from: Reviewer
to: Final Output
condition:
type: keyword
config:
any: [ACCEPT]

# Condition 2: User enters modification suggestions -> Trigger both Writer to continue loop AND Loop Gate to track time
- from: Reviewer
to: Writer
condition:
type: keyword
config:
none: [ACCEPT]

- from: Reviewer
to: Loop Gate
condition:
type: keyword
config:
none: [ACCEPT]

# Loop Gate connects to Writer (keeps it inside the loop)
- from: Loop Gate
to: Writer

# When Loop Gate reaches time limit: Trigger Final Output to end the process
- from: Loop Gate
to: Final Output

start: [Writer]
end: [Final Output]
```

**Execution Flow Explanation**:
1. User first enters modification suggestions → Triggers both Writer (continue loop) and Loop Gate (track time, no output)
2. User enters modification suggestions again → Triggers both Writer (continue loop) and Loop Gate (track time, no output)
3. After 5 minutes of elapsed time → Loop Gate outputs message triggering Final Output, terminating the loop
4. Or at any time user enters ACCEPT → Goes directly to Final Output to end

## Notes

- `max_duration` must be a positive number (> 0)
- `duration_unit` must be one of: "seconds", "minutes", "hours"
- Loop Timer **produces no output until the time limit is reached**, outgoing edges will not trigger
- Ensure Loop Timer connects to both in-loop and out-of-loop nodes
- The `message` field is optional, default message is `"Time limit reached (N units)"`
- Timer starts on the first trigger to the Loop Timer node
1 change: 1 addition & 0 deletions docs/user_guide/en/workflow_authoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Further reading: `docs/user_guide/en/field_specs.md` (field catalog), `docs/user
| `passthrough` | Pass-through node that forwards only the last message by default and can be configured to forward all messages; used for context filtering and graph structure optimization. | `only_last_message` | [passthrough.md](nodes/passthrough.md) |
| `literal` | Emits a fixed text payload whenever triggered and discards inputs. | `content`, `role` (`user`/`assistant`) | [literal.md](nodes/literal.md) |
| `loop_counter` | Guard node that limits loop iterations before releasing downstream edges. | `max_iterations`, `reset_on_emit`, `message` | [loop_counter.md](nodes/loop_counter.md) |
| `loop_timer` | Guard node that limits loop duration before releasing downstream edges. | `max_duration`, `duration_unit`, `reset_on_emit`, `message`, `passthrough` | [loop_timer.md](nodes/loop_timer.md) |

Fetch the full schema via `POST /api/config/schema` or inspect the dataclasses inside `entity/configs/`.

Expand Down
122 changes: 122 additions & 0 deletions entity/configs/node/loop_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Configuration for loop timer guard nodes."""

from dataclasses import dataclass
from typing import Mapping, Any, Optional

from entity.configs.base import (
BaseConfig,
ConfigError,
ConfigFieldSpec,
require_mapping,
extend_path,
optional_str,
)


@dataclass
class LoopTimerConfig(BaseConfig):
"""Configuration schema for the loop timer node type."""

max_duration: float = 60.0
duration_unit: str = "seconds"
reset_on_emit: bool = True
message: Optional[str] = None
passthrough: bool = False

@classmethod
def from_dict(
cls, data: Mapping[str, Any] | None, *, path: str
) -> "LoopTimerConfig":
mapping = require_mapping(data or {}, path)
max_duration_raw = mapping.get("max_duration", 60.0)
try:
max_duration = float(max_duration_raw)
except (TypeError, ValueError) as exc: # pragma: no cover - defensive
raise ConfigError(
"max_duration must be a number",
extend_path(path, "max_duration"),
) from exc

if max_duration <= 0:
raise ConfigError(
"max_duration must be > 0", extend_path(path, "max_duration")
)

duration_unit = str(mapping.get("duration_unit", "seconds"))
valid_units = ["seconds", "minutes", "hours"]
if duration_unit not in valid_units:
raise ConfigError(
f"duration_unit must be one of: {', '.join(valid_units)}",
extend_path(path, "duration_unit"),
)

reset_on_emit = bool(mapping.get("reset_on_emit", True))
message = optional_str(mapping, "message", path)
passthrough = bool(mapping.get("passthrough", False))

return cls(
max_duration=max_duration,
duration_unit=duration_unit,
reset_on_emit=reset_on_emit,
message=message,
passthrough=passthrough,
path=path,
)

def validate(self) -> None:
if self.max_duration <= 0:
raise ConfigError(
"max_duration must be > 0", extend_path(self.path, "max_duration")
)

valid_units = ["seconds", "minutes", "hours"]
if self.duration_unit not in valid_units:
raise ConfigError(
f"duration_unit must be one of: {', '.join(valid_units)}",
extend_path(self.path, "duration_unit"),
)

FIELD_SPECS = {
"max_duration": ConfigFieldSpec(
name="max_duration",
display_name="Maximum Duration",
type_hint="float",
required=True,
default=60.0,
description="How long the loop can run before this node emits an output.",
),
"duration_unit": ConfigFieldSpec(
name="duration_unit",
display_name="Duration Unit",
type_hint="str",
required=True,
default="seconds",
description="Unit of time for max_duration: 'seconds', 'minutes', or 'hours'.",
),
"reset_on_emit": ConfigFieldSpec(
name="reset_on_emit",
display_name="Reset After Emit",
type_hint="bool",
required=False,
default=True,
description="Whether to reset the internal timer after reaching the limit.",
advance=True,
),
"message": ConfigFieldSpec(
name="message",
display_name="Release Message",
type_hint="text",
required=False,
description="Optional text sent downstream once the time limit is reached.",
advance=True,
),
"passthrough": ConfigFieldSpec(
name="passthrough",
display_name="Passthrough Mode",
type_hint="bool",
required=False,
default=False,
description="If true, after emitting the limit message, all subsequent inputs pass through unchanged.",
advance=True,
),
}
21 changes: 15 additions & 6 deletions runtime/node/builtin_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
from entity.configs.node.literal import LiteralNodeConfig
from entity.configs.node.python_runner import PythonRunnerConfig
from entity.configs.node.loop_counter import LoopCounterConfig
from entity.configs.node.loop_timer import LoopTimerConfig
from runtime.node.executor.agent_executor import AgentNodeExecutor
from runtime.node.executor.human_executor import HumanNodeExecutor
from runtime.node.executor.passthrough_executor import PassthroughNodeExecutor
from runtime.node.executor.literal_executor import LiteralNodeExecutor
from runtime.node.executor.python_executor import PythonNodeExecutor
from runtime.node.executor.subgraph_executor import SubgraphNodeExecutor
from runtime.node.executor.loop_counter_executor import LoopCounterNodeExecutor
from runtime.node.executor.loop_timer_executor import LoopTimerNodeExecutor
from runtime.node.registry import NodeCapabilities, register_node_type


Expand Down Expand Up @@ -48,9 +50,10 @@
"subgraph",
config_cls=SubgraphConfig,
executor_cls=SubgraphNodeExecutor,
capabilities=NodeCapabilities(
capabilities=NodeCapabilities(),
executor_factory=lambda context, subgraphs=None: SubgraphNodeExecutor(
context, subgraphs or {}
),
executor_factory=lambda context, subgraphs=None: SubgraphNodeExecutor(context, subgraphs or {}),
summary="Embeds (through file path or inline config) and runs another named subgraph within the current workflow",
)

Expand All @@ -69,17 +72,15 @@
"passthrough",
config_cls=PassthroughConfig,
executor_cls=PassthroughNodeExecutor,
capabilities=NodeCapabilities(
),
capabilities=NodeCapabilities(),
summary="Forwards prior node output downstream without modification",
)

register_node_type(
"literal",
config_cls=LiteralNodeConfig,
executor_cls=LiteralNodeExecutor,
capabilities=NodeCapabilities(
),
capabilities=NodeCapabilities(),
summary="Emits the configured text message every time it is triggered",
)

Expand All @@ -91,6 +92,14 @@
summary="Blocks downstream edges until the configured iteration limit is reached, then emits a message to release the loop.",
)

register_node_type(
"loop_timer",
config_cls=LoopTimerConfig,
executor_cls=LoopTimerNodeExecutor,
capabilities=NodeCapabilities(),
summary="Blocks downstream edges until the configured time limit is reached, then emits a message to release the loop.",
)

# Register subgraph source types (file-based and inline config)
register_subgraph_source(
"config",
Expand Down
Loading
Loading