diff --git a/.gitignore b/.gitignore index 96b6481f9..1e65d1cc5 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,7 @@ logs/ node_modules/ data/ temp/ -WareHouse/ \ No newline at end of file +WareHouse/ + +# Git worktrees +.worktrees/ \ No newline at end of file diff --git a/docs/user_guide/en/execution_logic.md b/docs/user_guide/en/execution_logic.md index aff163ce2..55cb0220f 100755 --- a/docs/user_guide/en/execution_logic.md +++ b/docs/user_guide/en/execution_logic.md @@ -114,6 +114,7 @@ Execute according to the topological order: After completing each round of in-cycle execution, the system checks these exit conditions: - **Exit edge triggered**: If any in-cycle node triggers an edge to an out-of-cycle node, exit the loop - **Maximum iterations reached**: If the configured maximum (default 100) is reached, force termination +- **Time limit reached**: If a `loop_timer` node within the cycle reaches its configured time limit, exit the loop - **Initial node not re-triggered**: If the initial node isn't re-triggered by in-cycle predecessors, the loop naturally terminates If none of the conditions are met, return to Step 2 for the next iteration. diff --git a/docs/user_guide/en/nodes/loop_timer.md b/docs/user_guide/en/nodes/loop_timer.md new file mode 100644 index 000000000..9bb5b032c --- /dev/null +++ b/docs/user_guide/en/nodes/loop_timer.md @@ -0,0 +1,159 @@ +# Loop Timer Node + +The Loop Timer node is a loop control node used to limit the duration of a loop in a workflow. Through a time-tracking mechanism, it suppresses output before reaching the preset time limit, and only releases the message to trigger outgoing edges when the time limit is reached, thereby terminating the loop. + +## Configuration + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `max_duration` | float | Yes | `60.0` | Maximum loop duration, must be > 0 | +| `duration_unit` | string | Yes | `"seconds"` | Time unit: "seconds", "minutes", or "hours" | +| `reset_on_emit` | bool | No | `true` | Whether to reset the timer after reaching the limit | +| `message` | text | No | - | Message content to send to downstream when time limit is reached | + +## Core Concepts + +### How It Works + +The Loop Timer node maintains an internal timer with the following behavior: + +1. **On first trigger**: Timer starts tracking elapsed time +2. **Elapsed time < `max_duration`**: **No output is produced**, outgoing edges are not triggered +3. **Elapsed time >= `max_duration`**: Output message is produced, triggering outgoing edges + +This "suppress-release" mechanism allows the Loop Timer to precisely control when a loop terminates based on time rather than iteration count. + +### Topological Structure Requirements + +The Loop Timer node has special placement requirements in the graph structure: + +``` + ┌──────────────────────────────────────┐ + ▼ │ + Agent ──► Human ─────► Loop Timer ──┬──┘ + ▲ │ │ + └─────────┘ ▼ + End Node (outside loop) +``` + +> **Important**: Since Loop Timer **produces no output until the time limit is reached**: +> - **Human must connect to both Agent and Loop Timer**: This way the "continue loop" edge is handled by Human → Agent, while Loop Timer only handles time tracking +> - **Loop Timer must connect to Agent (inside loop)**: So it's recognized as an in-loop node, avoiding premature loop termination +> - **Loop Timer must connect to End Node (outside loop)**: When the time limit is reached, trigger the out-of-loop node to terminate the entire loop execution + +### Timer State + +- Timer state persists throughout the entire workflow execution +- Timer starts on the first trigger to the Loop Timer node +- When `reset_on_emit: true`, the timer resets after reaching the limit +- When `reset_on_emit: false`, the timer continues running after reaching the limit, outputting on every subsequent trigger + +## When to Use + +- **Time-based constraints**: Enforce time limits for loops (e.g., "review must complete within 5 minutes") +- **Timeout protection**: Serve as a "circuit breaker" to prevent runaway processes +- **Variable iteration time**: When each loop iteration takes unpredictable time, but total duration must be bounded + +## Examples + +### Basic Usage + +```yaml +nodes: + - id: Time Guard + type: loop_timer + config: + max_duration: 5 + duration_unit: minutes + reset_on_emit: true + message: Time limit reached (5 minutes), process terminated. +``` + +### Time-Limited Review Loop + +This is the most typical use case for Loop Timer: + +```yaml +graph: + id: timed_review_loop + description: Review loop with 5-minute time limit + + nodes: + - id: Writer + type: agent + config: + provider: openai + name: gpt-4o + role: Improve articles based on user feedback + + - id: Reviewer + type: human + config: + description: | + Review the article, enter ACCEPT to accept or provide modification suggestions. + + - id: Loop Gate + type: loop_timer + config: + max_duration: 5 + duration_unit: minutes + message: Time limit (5 minutes) reached, process automatically ended. + + - id: Final Output + type: passthrough + config: {} + + edges: + # Main loop: Writer -> Reviewer + - from: Writer + to: Reviewer + + # Condition 1: User enters ACCEPT -> End + - from: Reviewer + to: Final Output + condition: + type: keyword + config: + any: [ACCEPT] + + # Condition 2: User enters modification suggestions -> Trigger both Writer to continue loop AND Loop Gate to track time + - from: Reviewer + to: Writer + condition: + type: keyword + config: + none: [ACCEPT] + + - from: Reviewer + to: Loop Gate + condition: + type: keyword + config: + none: [ACCEPT] + + # Loop Gate connects to Writer (keeps it inside the loop) + - from: Loop Gate + to: Writer + + # When Loop Gate reaches time limit: Trigger Final Output to end the process + - from: Loop Gate + to: Final Output + + start: [Writer] + end: [Final Output] +``` + +**Execution Flow Explanation**: +1. User first enters modification suggestions → Triggers both Writer (continue loop) and Loop Gate (track time, no output) +2. User enters modification suggestions again → Triggers both Writer (continue loop) and Loop Gate (track time, no output) +3. After 5 minutes of elapsed time → Loop Gate outputs message triggering Final Output, terminating the loop +4. Or at any time user enters ACCEPT → Goes directly to Final Output to end + +## Notes + +- `max_duration` must be a positive number (> 0) +- `duration_unit` must be one of: "seconds", "minutes", "hours" +- Loop Timer **produces no output until the time limit is reached**, outgoing edges will not trigger +- Ensure Loop Timer connects to both in-loop and out-of-loop nodes +- The `message` field is optional, default message is `"Time limit reached (N units)"` +- Timer starts on the first trigger to the Loop Timer node diff --git a/docs/user_guide/en/workflow_authoring.md b/docs/user_guide/en/workflow_authoring.md index 70f823a1f..abbb2f14d 100755 --- a/docs/user_guide/en/workflow_authoring.md +++ b/docs/user_guide/en/workflow_authoring.md @@ -92,6 +92,7 @@ Further reading: `docs/user_guide/en/field_specs.md` (field catalog), `docs/user | `passthrough` | Pass-through node that forwards only the last message by default and can be configured to forward all messages; used for context filtering and graph structure optimization. | `only_last_message` | [passthrough.md](nodes/passthrough.md) | | `literal` | Emits a fixed text payload whenever triggered and discards inputs. | `content`, `role` (`user`/`assistant`) | [literal.md](nodes/literal.md) | | `loop_counter` | Guard node that limits loop iterations before releasing downstream edges. | `max_iterations`, `reset_on_emit`, `message` | [loop_counter.md](nodes/loop_counter.md) | +| `loop_timer` | Guard node that limits loop duration before releasing downstream edges. | `max_duration`, `duration_unit`, `reset_on_emit`, `message`, `passthrough` | [loop_timer.md](nodes/loop_timer.md) | Fetch the full schema via `POST /api/config/schema` or inspect the dataclasses inside `entity/configs/`. diff --git a/entity/configs/node/loop_timer.py b/entity/configs/node/loop_timer.py new file mode 100644 index 000000000..699b328b8 --- /dev/null +++ b/entity/configs/node/loop_timer.py @@ -0,0 +1,122 @@ +"""Configuration for loop timer guard nodes.""" + +from dataclasses import dataclass +from typing import Mapping, Any, Optional + +from entity.configs.base import ( + BaseConfig, + ConfigError, + ConfigFieldSpec, + require_mapping, + extend_path, + optional_str, +) + + +@dataclass +class LoopTimerConfig(BaseConfig): + """Configuration schema for the loop timer node type.""" + + max_duration: float = 60.0 + duration_unit: str = "seconds" + reset_on_emit: bool = True + message: Optional[str] = None + passthrough: bool = False + + @classmethod + def from_dict( + cls, data: Mapping[str, Any] | None, *, path: str + ) -> "LoopTimerConfig": + mapping = require_mapping(data or {}, path) + max_duration_raw = mapping.get("max_duration", 60.0) + try: + max_duration = float(max_duration_raw) + except (TypeError, ValueError) as exc: # pragma: no cover - defensive + raise ConfigError( + "max_duration must be a number", + extend_path(path, "max_duration"), + ) from exc + + if max_duration <= 0: + raise ConfigError( + "max_duration must be > 0", extend_path(path, "max_duration") + ) + + duration_unit = str(mapping.get("duration_unit", "seconds")) + valid_units = ["seconds", "minutes", "hours"] + if duration_unit not in valid_units: + raise ConfigError( + f"duration_unit must be one of: {', '.join(valid_units)}", + extend_path(path, "duration_unit"), + ) + + reset_on_emit = bool(mapping.get("reset_on_emit", True)) + message = optional_str(mapping, "message", path) + passthrough = bool(mapping.get("passthrough", False)) + + return cls( + max_duration=max_duration, + duration_unit=duration_unit, + reset_on_emit=reset_on_emit, + message=message, + passthrough=passthrough, + path=path, + ) + + def validate(self) -> None: + if self.max_duration <= 0: + raise ConfigError( + "max_duration must be > 0", extend_path(self.path, "max_duration") + ) + + valid_units = ["seconds", "minutes", "hours"] + if self.duration_unit not in valid_units: + raise ConfigError( + f"duration_unit must be one of: {', '.join(valid_units)}", + extend_path(self.path, "duration_unit"), + ) + + FIELD_SPECS = { + "max_duration": ConfigFieldSpec( + name="max_duration", + display_name="Maximum Duration", + type_hint="float", + required=True, + default=60.0, + description="How long the loop can run before this node emits an output.", + ), + "duration_unit": ConfigFieldSpec( + name="duration_unit", + display_name="Duration Unit", + type_hint="str", + required=True, + default="seconds", + description="Unit of time for max_duration: 'seconds', 'minutes', or 'hours'.", + ), + "reset_on_emit": ConfigFieldSpec( + name="reset_on_emit", + display_name="Reset After Emit", + type_hint="bool", + required=False, + default=True, + description="Whether to reset the internal timer after reaching the limit.", + advance=True, + ), + "message": ConfigFieldSpec( + name="message", + display_name="Release Message", + type_hint="text", + required=False, + description="Optional text sent downstream once the time limit is reached.", + advance=True, + ), + "passthrough": ConfigFieldSpec( + name="passthrough", + display_name="Passthrough Mode", + type_hint="bool", + required=False, + default=False, + description="If true, after emitting the limit message, all subsequent inputs pass through unchanged.", + advance=True, + ), + } diff --git a/runtime/node/builtin_nodes.py b/runtime/node/builtin_nodes.py index 2a2311156..f55ac848d 100755 --- a/runtime/node/builtin_nodes.py +++ b/runtime/node/builtin_nodes.py @@ -12,6 +12,7 @@ from entity.configs.node.literal import LiteralNodeConfig from entity.configs.node.python_runner import PythonRunnerConfig from entity.configs.node.loop_counter import LoopCounterConfig +from entity.configs.node.loop_timer import LoopTimerConfig from runtime.node.executor.agent_executor import AgentNodeExecutor from runtime.node.executor.human_executor import HumanNodeExecutor from runtime.node.executor.passthrough_executor import PassthroughNodeExecutor @@ -19,6 +20,7 @@ from runtime.node.executor.python_executor import PythonNodeExecutor from runtime.node.executor.subgraph_executor import SubgraphNodeExecutor from runtime.node.executor.loop_counter_executor import LoopCounterNodeExecutor +from runtime.node.executor.loop_timer_executor import LoopTimerNodeExecutor from runtime.node.registry import NodeCapabilities, register_node_type @@ -48,9 +50,10 @@ "subgraph", config_cls=SubgraphConfig, executor_cls=SubgraphNodeExecutor, - capabilities=NodeCapabilities( + capabilities=NodeCapabilities(), + executor_factory=lambda context, subgraphs=None: SubgraphNodeExecutor( + context, subgraphs or {} ), - executor_factory=lambda context, subgraphs=None: SubgraphNodeExecutor(context, subgraphs or {}), summary="Embeds (through file path or inline config) and runs another named subgraph within the current workflow", ) @@ -69,8 +72,7 @@ "passthrough", config_cls=PassthroughConfig, executor_cls=PassthroughNodeExecutor, - capabilities=NodeCapabilities( - ), + capabilities=NodeCapabilities(), summary="Forwards prior node output downstream without modification", ) @@ -78,8 +80,7 @@ "literal", config_cls=LiteralNodeConfig, executor_cls=LiteralNodeExecutor, - capabilities=NodeCapabilities( - ), + capabilities=NodeCapabilities(), summary="Emits the configured text message every time it is triggered", ) @@ -91,6 +92,14 @@ summary="Blocks downstream edges until the configured iteration limit is reached, then emits a message to release the loop.", ) +register_node_type( + "loop_timer", + config_cls=LoopTimerConfig, + executor_cls=LoopTimerNodeExecutor, + capabilities=NodeCapabilities(), + summary="Blocks downstream edges until the configured time limit is reached, then emits a message to release the loop.", +) + # Register subgraph source types (file-based and inline config) register_subgraph_source( "config", diff --git a/runtime/node/executor/loop_timer_executor.py b/runtime/node/executor/loop_timer_executor.py new file mode 100644 index 000000000..d86a548b3 --- /dev/null +++ b/runtime/node/executor/loop_timer_executor.py @@ -0,0 +1,148 @@ +"""Loop timer guard node executor.""" + +import time +from typing import List, Dict, Any + +from entity.configs import Node +from entity.configs.node.loop_timer import LoopTimerConfig +from entity.messages import Message, MessageRole +from runtime.node.executor.base import NodeExecutor + + +class LoopTimerNodeExecutor(NodeExecutor): + """Track loop duration and emit output only after hitting the time limit. + + Supports two modes: + 1. Standard Mode (passthrough=False): Suppresses input until time limit, then emits message + 2. Terminal Gate Mode (passthrough=True): Acts as a sequential switch + - Before limit: Pass input through unchanged + - At limit: Emit configured message, suppress original input + - After limit: Transparent gate, pass all subsequent messages through + """ + + STATE_KEY = "loop_timer" + + def execute(self, node: Node, inputs: List[Message]) -> List[Message]: + config = node.as_config(LoopTimerConfig) + if config is None: + raise ValueError(f"Node {node.id} missing loop_timer configuration") + + state = self._get_state() + timer_state = state.setdefault(node.id, {}) + + # Initialize timer on first execution + current_time = time.time() + if "start_time" not in timer_state: + timer_state["start_time"] = current_time + timer_state["emitted"] = False + + start_time = timer_state["start_time"] + elapsed_time = current_time - start_time + + # Convert max_duration to seconds based on unit + max_duration_seconds = self._convert_to_seconds( + config.max_duration, config.duration_unit + ) + + # Check if time limit has been reached + limit_reached = elapsed_time >= max_duration_seconds + + # Terminal Gate Mode (passthrough=True) + if config.passthrough: + if not limit_reached: + # Before limit: pass input through unchanged + self.log_manager.debug( + f"LoopTimer {node.id}: {elapsed_time:.1f}s / {max_duration_seconds:.1f}s " + f"(passthrough mode: forwarding input)" + ) + return inputs + elif not timer_state["emitted"]: + # At limit: emit configured message, suppress original input + timer_state["emitted"] = True + if config.reset_on_emit: + timer_state["start_time"] = current_time + + content = ( + config.message + or f"Time limit reached ({config.max_duration} {config.duration_unit})" + ) + metadata = { + "loop_timer": { + "elapsed_time": elapsed_time, + "max_duration": config.max_duration, + "duration_unit": config.duration_unit, + "reset_on_emit": config.reset_on_emit, + "passthrough": True, + } + } + + self.log_manager.debug( + f"LoopTimer {node.id}: {elapsed_time:.1f}s / {max_duration_seconds:.1f}s " + f"(passthrough mode: emitting limit message)" + ) + + return [ + Message( + role=MessageRole.ASSISTANT, + content=content, + metadata=metadata, + ) + ] + else: + # After limit: transparent gate, pass all subsequent messages through + self.log_manager.debug( + f"LoopTimer {node.id}: {elapsed_time:.1f}s (passthrough mode: transparent gate)" + ) + return inputs + + # Standard Mode (passthrough=False) + if not limit_reached: + self.log_manager.debug( + f"LoopTimer {node.id}: {elapsed_time:.1f}s / {max_duration_seconds:.1f}s " + f"(suppress downstream)" + ) + return [] + + if config.reset_on_emit and not timer_state["emitted"]: + timer_state["start_time"] = current_time + + timer_state["emitted"] = True + + content = ( + config.message + or f"Time limit reached ({config.max_duration} {config.duration_unit})" + ) + metadata = { + "loop_timer": { + "elapsed_time": elapsed_time, + "max_duration": config.max_duration, + "duration_unit": config.duration_unit, + "reset_on_emit": config.reset_on_emit, + "passthrough": False, + } + } + + self.log_manager.debug( + f"LoopTimer {node.id}: {elapsed_time:.1f}s / {max_duration_seconds:.1f}s " + f"reached limit, releasing output" + ) + + return [ + Message( + role=MessageRole.ASSISTANT, + content=content, + metadata=metadata, + ) + ] + + def _get_state(self) -> Dict[str, Dict[str, Any]]: + return self.context.global_state.setdefault(self.STATE_KEY, {}) + + def _convert_to_seconds(self, duration: float, unit: str) -> float: + """Convert duration to seconds based on unit.""" + unit_multipliers = { + "seconds": 1.0, + "minutes": 60.0, + "hours": 3600.0, + } + return duration * unit_multipliers.get(unit, 1.0) diff --git a/yaml_instance/demo_loop_timer.yaml b/yaml_instance/demo_loop_timer.yaml new file mode 100644 index 000000000..9d4667596 --- /dev/null +++ b/yaml_instance/demo_loop_timer.yaml @@ -0,0 +1,82 @@ +version: 0.0.0 +graph: + start: + - Writer + end: + - Finalizer + id: loop_timer_demo + description: LoopTimer demo that releases output after 10 seconds of agent iterations. + is_majority_voting: false + log_level: INFO + nodes: + - id: Critic + type: agent + config: + name: ${MODEL_NAME} + provider: openai + role: You are a critic. Provide brief feedback (1 sentence) to improve the draft. + base_url: ${BASE_URL} + api_key: ${API_KEY} + params: {} + tooling: [] + thinking: null + memories: [] + retry: null + description: Reviews the draft and provides feedback for improvement. + context_window: 0 + log_output: true + - id: Writer + type: agent + config: + name: ${MODEL_NAME} + provider: openai + role: You are a technical writer. Generate a brief draft (1 sentence). + base_url: ${BASE_URL} + api_key: ${API_KEY} + params: {} + tooling: [] + thinking: null + memories: [] + retry: null + description: Drafts content based on feedback. + context_window: 0 + log_output: true + - id: Finalizer + type: literal + config: + content: Final summary released + role: assistant + description: Receives the release signal from Loop Gate and outputs the final statement. + context_window: 0 + log_output: true + - id: Loop Gate + type: loop_timer + config: + max_duration: 20 + duration_unit: seconds + reset_on_emit: true + message: Time limit reached - loop automatically terminated + passthrough: false + description: Tracks elapsed time, only granting passage after 10 seconds. + context_window: 0 + log_output: true + edges: + - from: Writer + to: Critic + - from: Critic + to: Writer + - from: Critic + to: Loop Gate + - from: Loop Gate + to: Writer + - from: Loop Gate + to: Finalizer + - from: Loop Gate + to: Writer + trigger: true + condition: 'true' + carry_data: true + keep_message: false + process: null +vars: + MODEL_NAME: qwen/qwen3-8b