diff --git a/README.md b/README.md index 3340372..15984ed 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,6 @@ A comprehensive skill for building Temporal applications in Python and TypeScrip This skill provides multi-language guidance for Temporal development, combining: - **Core concepts** shared across languages (determinism, patterns, versioning) - **Language-specific references** for Python and TypeScript -- **Operational scripts** for worker and workflow management - **AI/LLM integration patterns** for building durable AI applications ## Structure @@ -23,7 +22,7 @@ temporal-dev/ │ │ ├── troubleshooting.md # Decision trees, recovery procedures │ │ ├── error-reference.md # Common error types, workflow status │ │ ├── interactive-workflows.md # Testing signals, updates, queries -│ │ ├── tool-reference.md # Script options, worker management +│ │ ├── dev-management.md # Dev cycle & management of server and workers │ │ ├── logs.md # Log file locations, search patterns │ │ └── ai-integration.md # AI/LLM integration concepts │ ├── python/ # Python SDK references @@ -36,7 +35,7 @@ temporal-dev/ │ │ ├── data-handling.md # Pydantic, encryption │ │ ├── observability.md # Logging, metrics │ │ ├── versioning.md # Python patching API -│ │ ├── advanced-features.md # Continue-as-new, interceptors +│ │ ├── advanced-features.md # Continue-as-new, updates, schedules, and more │ │ └── ai-patterns.md # Python AI Cookbook patterns │ └── typescript/ # TypeScript SDK references │ ├── typescript.md # SDK overview, quick start @@ -46,22 +45,7 @@ temporal-dev/ │ ├── data-handling.md # Data converters │ ├── observability.md # Sinks, logging │ ├── versioning.md # TypeScript patching API -│ └── advanced-features.md # Cancellation scopes -├── scripts/ # Operational utilities -│ ├── ensure-server.sh # Start Temporal dev server -│ ├── ensure-worker.sh # Start worker for project -│ ├── list-workers.sh # List running workers -│ ├── kill-worker.sh # Stop specific worker -│ ├── kill-all-workers.sh # Stop ALL workers -│ ├── monitor-worker-health.sh # Check worker health -│ ├── list-recent-workflows.sh # Show recent executions -│ ├── get-workflow-result.sh # Get workflow output -│ ├── find-stalled-workflows.sh # Find stuck workflows -│ ├── analyze-workflow-error.sh # Diagnose failures -│ ├── bulk-cancel-workflows.sh # Cancel multiple workflows -│ ├── wait-for-workflow-status.sh # Poll workflow status -│ ├── wait-for-worker-ready.sh # Poll worker startup -│ └── find-project-workers.sh # Helper: find worker PIDs +│ └── advanced-features.md # Sinks, updates, schedules and more ``` ## Progressive Disclosure @@ -84,14 +68,6 @@ The skill uses progressive loading to manage context efficiently: - Language-specific gotchas - Testing patterns -## Content Sources - -This skill merges content from multiple sources: -- **Steve's temporal-dev skill** - Operational scripts, troubleshooting -- **Max's temporal-claude-skill** - Multi-SDK structure, AI integration -- **Mason's python-sdk skill** - Python deep-dive, sandbox, sync/async -- **Mason's typescript-sdk skill** - TypeScript patterns, V8 isolation - ## Trigger Phrases The skill activates on phrases like: diff --git a/SKILL.md b/SKILL.md index 1673557..1f84117 100644 --- a/SKILL.md +++ b/SKILL.md @@ -1,6 +1,6 @@ --- name: temporal-developer -description: This skill should be used when the user asks to "create a Temporal workflow", "write a Temporal activity", "debug stuck workflow", "fix non-determinism error", "Temporal Python", "Temporal TypeScript", "workflow replay", "activity timeout", "signal workflow", "query workflow", "worker not starting", "activity keeps retrying", "Temporal heartbeat", "continue-as-new", "child workflow", "saga pattern", "workflow versioning", "durable execution", "reliable distributed systems", or mentions Temporal SDK development. Provides multi-language guidance for Python and TypeScript with operational scripts. +description: This skill should be used when the user asks to "create a Temporal workflow", "write a Temporal activity", "debug stuck workflow", "fix non-determinism error", "Temporal Python", "Temporal TypeScript", "workflow replay", "activity timeout", "signal workflow", "query workflow", "worker not starting", "activity keeps retrying", "Temporal heartbeat", "continue-as-new", "child workflow", "saga pattern", "workflow versioning", "durable execution", "reliable distributed systems", or mentions Temporal SDK development. version: 1.0.0 --- @@ -118,36 +118,6 @@ To safely change workflow code while workflows are running: See `references/core/versioning.md` for concepts, language-specific files for implementation. -## Scripts (Operational) - -Available scripts in `scripts/` for worker and workflow management: - -### Server & Worker Lifecycle -| Script | Purpose | -|--------|---------| -| `ensure-server.sh` | Start Temporal dev server if not running | -| `ensure-worker.sh` | Start worker for project (kills existing first) | -| `list-workers.sh` | List running workers | -| `kill-worker.sh` | Stop a specific worker | -| `kill-all-workers.sh` | Stop ALL workers (cleanup) | -| `monitor-worker-health.sh` | Check worker health, uptime, recent errors | - -### Workflow Operations -| Script | Purpose | -|--------|---------| -| `list-recent-workflows.sh` | Show recent workflow executions | -| `get-workflow-result.sh` | Get output/result from completed workflow | -| `find-stalled-workflows.sh` | Find workflows not making progress | -| `analyze-workflow-error.sh` | Diagnose workflow failures | -| `bulk-cancel-workflows.sh` | Cancel multiple workflows by ID or pattern | - -### Utilities (used by other scripts) -| Script | Purpose | -|--------|---------| -| `wait-for-workflow-status.sh` | Poll until workflow reaches target status | -| `wait-for-worker-ready.sh` | Poll log file for worker startup | -| `find-project-workers.sh` | Helper to find worker PIDs for a project | - ## Additional Resources ### Core References (Language-Agnostic) @@ -158,7 +128,7 @@ Available scripts in `scripts/` for worker and workflow management: - **`references/core/error-reference.md`** - Common error types, workflow status reference - **`references/core/common-gotchas.md`** - Anti-patterns and common mistakes - **`references/core/interactive-workflows.md`** - Testing signals, updates, queries -- **`references/core/tool-reference.md`** - Script options and worker management details +- **`references/core/dev-management.md`** - Dev cycle & management of server and workers - **`references/core/logs.md`** - Log file locations and search patterns - **`references/core/ai-integration.md`** - AI/LLM integration patterns @@ -172,7 +142,7 @@ Available scripts in `scripts/` for worker and workflow management: - **`references/python/data-handling.md`** - Pydantic, encryption - **`references/python/observability.md`** - Logging, metrics, tracing - **`references/python/versioning.md`** - Python patching API -- **`references/python/advanced-features.md`** - Continue-as-new, interceptors +- **`references/python/advanced-features.md`** - Continue-as-new, updates, schedules, and more - **`references/python/ai-patterns.md`** - Python AI Cookbook patterns - **`references/python/gotchas.md`** - Python-specific anti-patterns @@ -184,7 +154,7 @@ Available scripts in `scripts/` for worker and workflow management: - **`references/typescript/data-handling.md`** - Data converters - **`references/typescript/observability.md`** - Sinks, logging - **`references/typescript/versioning.md`** - TypeScript patching API -- **`references/typescript/advanced-features.md`** - Cancellation scopes, interceptors +- **`references/typescript/advanced-features.md`** - Sinks, updates, schedules and more - **`references/typescript/gotchas.md`** - TypeScript-specific anti-patterns ## Feedback diff --git a/references/core/ai-integration.md b/references/core/ai-integration.md index acf79f3..77c8a36 100644 --- a/references/core/ai-integration.md +++ b/references/core/ai-integration.md @@ -4,7 +4,9 @@ Temporal provides durable execution for AI/LLM applications, handling retries, rate limits, and long-running operations automatically. These patterns apply across languages, with Python being the most mature for AI integration. -For Python-specific implementation details and code examples, see `references/python/ai-patterns.md`. +For Python-specific implementation details and code examples, see `references/python/ai-patterns.md`. Temporal's Python SDK also provides pre-built integrations with several LLM and agent SDKs, which can be leveraged to create agentic workflows with minimal effort (when working in Python). + +The remainder of this document describes general principles to follow when building AI/LLM applications in Temporal, particularly when from scratch instead of with an integration. ## Why Temporal for AI? @@ -19,28 +21,24 @@ For Python-specific implementation details and code examples, see `references/py ## Core Patterns -### Pattern 1: Generic LLM Activity - -Create flexible, reusable activities for LLM calls: +### Pattern 1: Activities should Wrap LLM Calls -``` -Activity: call_llm_generic( - model: string, - system_instructions: string, - user_input: string, - tools?: list, - response_format?: schema -) -> response -``` +- activity: call_llm + - inputs: + - model_id -> internally activity can route to different models, so we don't need 1 activity per unique model. + - prompt / chat history + - tools + - etc. + - returns model response, as a typed structured output **Benefits**: - Single activity handles multiple use cases - Consistent retry handling - Centralized configuration -### Pattern 2: Activity-Based Separation +### Pattern 2: Non-deterministic / heavy tools in Activities -Isolate each operation in its own activity: +Tools which are non-deterministic and/or heavy actions (file system, hitting APIs, etc.) should be placed in activities: ``` Workflow: @@ -55,55 +53,32 @@ Workflow: - Easier testing and mocking - Failure isolation -### Pattern 3: Centralized Retry Management +### Pattern 3: Tools that Mutate Agent State can be in the Workflow directly -**Critical**: Disable retries in LLM client libraries, let Temporal handle retries. +Generally, agent state is in bijection with workflow state. Thus, tools which mutate agent state and are deterministic (like TODO tools, just updating a hash map) typically belong in the workflow code rather than an activity. ``` -LLM Client Config: - max_retries = 0 ← Disable client retries - -Activity Retry Policy: - initial_interval = 1s - backoff_coefficient = 2.0 - maximum_attempts = 5 - maximum_interval = 60s +Workflow: + ├── Activity: call_llm (tool selection: todos_write tool) + ├── Write new TODOs to workflow state (not in activity) + └── Activity: call_llm (continuing agent flow...) ``` +### Pattern 4: Centralized Retry Management + +Disable retries in LLM client libraries, let Temporal handle retries. + +- LLM Client Config: + - max_retries = 0 ← Disable client retries at the LLM client level + +Use either the default activity retry policy, or customize it as needed for the situation. + **Why**: - Temporal retries are durable (survive crashes) - Single retry configuration point - Better visibility into retry attempts - Consistent backoff behavior -### Pattern 4: Tool-Calling Agent - -Three-phase workflow for LLM agents with tools: - -``` -┌─────────────────────────────────────────────┐ -│ Phase 1: Tool Selection │ -│ Activity: Present tools to LLM │ -│ LLM returns: tool_name, arguments │ -└─────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────┐ -│ Phase 2: Tool Execution │ -│ Activity: Execute selected tool │ -│ (Separate activity per tool type) │ -└─────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────┐ -│ Phase 3: Result Interpretation │ -│ Activity: Send results back to LLM │ -│ LLM returns: final response or next tool │ -└─────────────────────────────────────────────┘ - │ - ▼ - Loop until LLM returns final answer -``` ### Pattern 5: Multi-Agent Orchestration @@ -127,21 +102,6 @@ Deep Research Example: **Key Pattern**: Use parallel execution with `return_exceptions=True` to continue with partial results when some searches fail. -### Pattern 6: Structured Outputs - -Define schemas for LLM responses: - -``` -Input: Raw LLM prompt -Schema: { action: string, confidence: float, reasoning: string } -Output: Validated, typed response -``` - -**Benefits**: -- Type safety -- Automatic validation -- Easier downstream processing - ## Timeout Recommendations | Operation Type | Recommended Timeout | @@ -165,27 +125,14 @@ Output: Validated, typed response Parse rate limit info from API responses: -``` -Response Headers: - Retry-After: 30 - X-RateLimit-Remaining: 0 - -Activity: - If rate limited: - Raise retryable error with retry_after hint - Temporal handles the delay -``` - -### Retry Policy Configuration +- Response Headers: + - Retry-After: 30 + - X-RateLimit-Remaining: 0 -``` -Retry Policy: - initial_interval: 1s (or from Retry-After header) - backoff_coefficient: 2.0 - maximum_interval: 60s - maximum_attempts: 10 - non_retryable_errors: [InvalidAPIKey, InvalidInput] -``` +- Activity: + - If rate limited: + - Raise retryable error with retry_after hint + - Temporal handles the delay ## Error Handling @@ -209,15 +156,13 @@ Retry Policy: 4. **Use structured outputs** - For type safety and validation 5. **Handle partial failures** - Continue with available results 6. **Monitor costs** - Track LLM calls at activity level -7. **Version prompts** - Track prompt changes in code -8. **Test with mocks** - Mock LLM responses in tests +7. **Test with mocks** - Mock LLM responses in tests ## Observability -- **Activity duration**: Track LLM latency -- **Retry counts**: Monitor rate limiting -- **Token usage**: Log in activity output -- **Cost attribution**: Tag workflows with cost centers +See `references/python/observability.md` (or the language you are working in) for documentation on observability in Temporal. It is generally recommended to add observability for: +- Token usage, via activity logging +- any else to help track LLM usage and debug agentic flows, within moderation. ## Language-Specific Resources diff --git a/references/core/common-gotchas.md b/references/core/common-gotchas.md index 7d1ee23..7d13eb3 100644 --- a/references/core/common-gotchas.md +++ b/references/core/common-gotchas.md @@ -2,9 +2,7 @@ Common mistakes and anti-patterns in Temporal development. Learning from these saves significant debugging time. -## Idempotency Issues - -### Non-Idempotent Activities +## Non-Idempotent Activities **The Problem**: Activities may execute more than once due to retries or Worker failures. If an activity calls an external service without an idempotency key, you may charge a customer twice, send duplicate emails, or create duplicate records. @@ -14,38 +12,30 @@ Common mistakes and anti-patterns in Temporal development. Learning from these s **The Fix**: Always use idempotency keys when calling external services. Use the workflow ID, activity ID, or a domain-specific identifier (like order ID) as the key. -### Local Activities - -Local Activities skip the task queue for lower latency, but they're still subject to retries. The same idempotency rules apply. - -## Replay Safety Violations +**Note:** Local Activities skip the task queue for lower latency, but they're still subject to retries. The same idempotency rules apply. -### Side Effects in Workflow Code +## Side Effects & Non-Determinism in Workflow Code -**The Problem**: Code in workflow functions runs on first execution AND on every replay. Any side effect (logging, notifications, metrics) will happen multiple times. +**The Problem**: Code in workflow functions runs on first execution AND on every replay. Any side effect (logging, notifications, metrics, etc.) will happen multiple times and non-deterministic code (IO, current time, random numbers, threading, etc.) won't replay correctly. **Symptoms**: +- Non-determinism errors +- Sandbox violations, depending on SDK language - Duplicate log entries - Multiple notifications for the same event - Inflated metrics **The Fix**: -- Use the SDK's replay-aware logger (only logs on first execution) -- Put all side effects in Activities - -### Non-Deterministic Time +- Use Temporal replay-aware managed side effects for common, non-business logic cases: + - Temporal workflow logging + - Temporal date time (`workflow.now()` in Python, `Date.now()` is auto-replaced in TypeScript) + - Temporal UUID generation + - Temporal random number generation +- Put all other side effects in Activities -**The Problem**: Using system time (`datetime.now()`, `Date.now()`) in workflow code returns different values on replay, causing non-determinism errors. +See `references/core/determinism.md` for more info. -**Symptoms**: -- Non-determinism errors mentioning time-based decisions -- Workflows that worked once but fail on replay - -**The Fix**: Use the SDK's deterministic time function (`workflow.now()` in Python, `Date.now()` is auto-replaced in TypeScript). - -## Worker Management Issues - -### Multiple Workers with Different Code +## Multiple Workers with Different Code **The Problem**: If Worker A runs part of a workflow with code v1, then Worker B (with code v2) picks it up, replay may produce different Commands. @@ -55,70 +45,45 @@ Local Activities skip the task queue for lower latency, but they're still subjec **The Fix**: - Use Worker Versioning for production deployments +- Use patching APIs - During development: kill old workers before starting new ones - Ensure all workers run identical code -### Stale Workflows During Development - -**The Problem**: Workflows started with old code continue running after you change the code. +**Note:** Workflows started with old code continue running after you change the code, which can then induce the above issues. During development (NOT production), you may want to terminate stale workflows (`temporal workflow terminate --workflow-id `). -**Symptoms**: -- Workflows behave unexpectedly after code changes -- Non-determinism errors on previously-working workflows - -**The Fix**: -- Terminate stale workflows: `temporal workflow terminate --workflow-id ` -- Use `find-stalled-workflows.sh` to detect stuck workflows -- In production, use versioning for backward compatibility - -## Workflow Design Anti-Patterns - -### The Mega Workflow - -**The Problem**: Putting too much logic in a single workflow. - -**Issues**: -- Hard to test and maintain -- Event history grows unbounded -- Single point of failure -- Difficult to reason about - -**The Fix**: -- Keep workflows focused on a single responsibility -- Use Child Workflows for sub-processes -- Use Continue-as-New for long-running workflows +See `references/core/versioning.md` for more info. -### Failing Too Quickly +## Failing Activities Too Quickly -**The Problem**: Using aggressive retry policies that give up too easily. +**The Problem**: Using aggressive activity retry policies that give up too easily. **Symptoms**: - Workflows failing on transient errors - Unnecessary workflow failures during brief outages -**The Fix**: Use appropriate retry policies. Let Temporal handle transient failures with exponential backoff. Reserve `maximum_attempts=1` for truly non-retryable operations. +**The Fix**: Use appropriate activity retry policies. Let Temporal handle transient failures with exponential backoff. Reserve `maximum_attempts=1` for truly non-retryable operations. -## Query Handler Mistakes +## Query Handler & Update Validator Mistakes -### Modifying State in Queries +### Modifying State in Queries & Update Validators -**The Problem**: Queries are read-only. Modifying state in a query handler causes non-determinism on replay because queries don't generate history events. +**The Problem**: Queries and update validators are read-only. Modifying state causes non-determinism on replay, and must strictly be avoided. **Symptoms**: - State inconsistencies after workflow replay - Non-determinism errors -**The Fix**: Queries must only read state. Use Updates for operations that need to modify state AND return a result. +**The Fix**: Queries and update validators must only read state. Use Updates for operations that need to modify state AND return a result. -### Blocking in Queries +### Blocking in Queries & Update Validators -**The Problem**: Queries must return immediately. They cannot await activities, child workflows, timers, or conditions. +**The Problem**: Queries and update validators must return immediately. They cannot await activities, child workflows, timers, or conditions. **Symptoms**: -- Query timeouts +- Query / update validators timeouts - Deadlocks -**The Fix**: Queries return current state only. Use Signals or Updates to trigger async operations. +**The Fix**: Queries and update validators must only look at current state. Use Signals or Updates to trigger async operations. ### Query vs Signal vs Update diff --git a/references/core/determinism.md b/references/core/determinism.md index 9a606d1..0daeea7 100644 --- a/references/core/determinism.md +++ b/references/core/determinism.md @@ -8,7 +8,7 @@ Temporal workflows must be deterministic because of **history replay** - the mec ### The Replay Mechanism -When a Worker needs to restore workflow state (after crash, cache eviction, or continuing after a long timer), it **re-executes the workflow code from the beginning**. But instead of re-running activities, it uses results stored in the Event History. +When a Worker needs to restore workflow state (after crash, cache eviction, or continuing after a long timer), it **re-executes the workflow code from the beginning**. But instead of re-running external actions, it uses results stored in the Event History. ``` Initial Execution: @@ -22,7 +22,7 @@ Replay (Recovery): ### Commands and Events -Every workflow operation generates a Command that becomes an Event: +Every workflow operation generates a Command that becomes an Event, here are some examples: | Workflow Code | Command Generated | Event Stored | |--------------|-------------------|--------------| @@ -95,6 +95,24 @@ Math.random() // Returns seeded PRNG value new Date() // Deterministic ``` +### Go `workflowcheck` static analyzer +The Go SDK provides a workflowcheck CLI tool that: +- Statically analyzes registered Workflow Definitions and their call graph +- Flags common sources of non-determinism (e.g., time.Now, time.Sleep, goroutines, channels, map iteration, global math/rand, stdio) +- Helps catch invalid constructs early in development, but cannot detect all issues (e.g., global var mutation, some reflection) + +```bash +# Install +go install go.temporal.io/sdk/contrib/tools/workflowcheck@latest + +# Run from your module root to scan all packages +workflowcheck ./... + +# Optional: configure overrides / skips in workflowcheck.config.yaml +# (e.g., mark a function as deterministic or skip files) +workflowcheck -config workflowcheck.config.yaml ./... +``` + ## Detecting Non-Determinism ### During Execution diff --git a/references/core/dev-management.md b/references/core/dev-management.md new file mode 100644 index 0000000..01faed0 --- /dev/null +++ b/references/core/dev-management.md @@ -0,0 +1,26 @@ +# Development Server and Worker Management + +## Server Management + +Before starting workers or workflows, you MUST start a local dev server, using the Temporal CLI: + +```bash +temporal server start-dev # Start this in the background. +``` + +It is perfectly OK for this process to be shared across multiple projects / left running as you develop your Temporal code. + +## Worker Management Details + +### Starting Workers + +How you start a worker is project-dependent, but generally Temporal code should have a program entrypoint which starts a worker. If your project doesn't, you should define it. + +When you need a new worker, you should start it in the background (and preferrably have it log somewhere you can check), and then remember its PID so you can kill / clean it up later. + +**Best practice**: As far as local development goes, run only ONE worker instance with the latest code. Don't keep stale workers (running old code) around. + + +### Cleanup + +**Always kill workers when done.** Don't leave workers running. diff --git a/references/core/error-reference.md b/references/core/error-reference.md index b39be4a..141fd4f 100644 --- a/references/core/error-reference.md +++ b/references/core/error-reference.md @@ -1,16 +1,19 @@ # Common Error Types Reference -| Error Type | Where to Find | What Happened | Recovery | -|------------|---------------|---------------|----------| -| **Non-determinism** | `WorkflowTaskFailed` in history | Replay doesn't match history | Analyze error first. **If accidental**: fix code to match history → restart worker. **If intentional v2 change**: terminate → start fresh workflow. | -| **Workflow code bug** | `WorkflowTaskFailed` in history | Bug in workflow logic | Fix code → Restart worker → Workflow auto-resumes | -| **Missing workflow** | Worker logs | Workflow not registered | Add to worker.py → Restart worker | -| **Missing activity** | Worker logs | Activity not registered | Add to worker.py → Restart worker | -| **Activity bug** | `ActivityTaskFailed` in history | Bug in activity code | Fix code → Restart worker → Auto-retries | -| **Activity retries** | `ActivityTaskFailed` (count >2) | Repeated failures | Fix code → Restart worker → Auto-retries | -| **Sandbox violation** | Worker logs | Bad imports in workflow | Fix workflow.py imports → Restart worker | -| **Task queue mismatch** | Workflow never starts | Different queues in starter/worker | Align task queue names | -| **Timeout** | Status = TIMED_OUT | Operation too slow | Increase timeout config | +| Error Type | Error identifier (if any) | Where to Find | What Happened | Recovery | Link to additional info (if any) +|------------|---------------|---------------|---------------|----------|----------| +| **Non-determinism** | TMPRL1100 | `WorkflowTaskFailed` in history | Replay doesn't match history | Analyze error first. **If accidental**: fix code to match history → restart worker. **If intentional v2 change**: terminate → start fresh workflow. | https://github.com/temporalio/rules/blob/main/rules/TMPRL1100.md | +| **Deadlock** | TMPRL1101 | `WorkflowTaskFailed` in history, worker logs | Workflow blocked too long (deadlock detected) | Remove blocking operations from workflow code (no I/O, no sleep, no threading locks). Use Temporal primitives instead. | https://github.com/temporalio/rules/blob/main/rules/TMPRL1101.md | +| **Unfinished handlers** | TMPRL1102 | `WorkflowTaskFailed` in history | Workflow completed while update/signal handlers still running | Ensure all handlers complete before workflow finishes. Use `workflow.wait_condition()` to wait for handler completion. | https://github.com/temporalio/rules/blob/main/rules/TMPRL1102.md | +| **Payload overflow** | TMPRL1103 | `WorkflowTaskFailed` or `ActivityTaskFailed` in history | Payload size limit exceeded (default 2MB) | Reduce payload size. Use external storage (S3, database) for large data and pass references instead. | https://github.com/temporalio/rules/blob/main/rules/TMPRL1103.md | +| **Workflow code bug** | | `WorkflowTaskFailed` in history | Bug in workflow logic | Fix code → Restart worker → Workflow auto-resumes | | +| **Missing workflow** | | Worker logs | Workflow not registered | Add to worker.py → Restart worker | | +| **Missing activity** | | Worker logs | Activity not registered | Add to worker.py → Restart worker | | +| **Activity bug** | | `ActivityTaskFailed` in history | Bug in activity code | Fix code → Restart worker → Auto-retries | | +| **Activity retries** | | `ActivityTaskFailed` (count >2) | Repeated failures | Fix code → Restart worker → Auto-retries | | +| **Sandbox violation** | | Worker logs | Bad imports in workflow | Fix workflow.py imports → Restart worker | | +| **Task queue mismatch** | | Workflow never starts | Different queues in starter/worker | Align task queue names | | +| **Timeout** | | Status = TIMED_OUT | Operation too slow | Increase timeout config | | ## Workflow Status Reference diff --git a/references/core/interactive-workflows.md b/references/core/interactive-workflows.md index f1bafe3..3b02028 100644 --- a/references/core/interactive-workflows.md +++ b/references/core/interactive-workflows.md @@ -1,6 +1,6 @@ # Interactive Workflows -Interactive workflows pause and wait for external input (signals or updates). +Interactive workflows are workflows that use Temporal features such as signals or updates to pause and wait for external input. When testing and debugging these types of workflows you can send them input via the Temporal CLI. ## Signals @@ -12,9 +12,6 @@ temporal workflow signal \ --workflow-id \ --name "signal_name" \ --input '{"key": "value"}' - -# Or via interact script (if available) -uv run interact --workflow-id --signal-name "signal_name" --data '{"key": "value"}' ``` ## Updates @@ -23,7 +20,7 @@ Request-response style interaction (returns a value). ```bash # Send update to workflow -temporal workflow update \ +temporal workflow update execute \ --workflow-id \ --name "update_name" \ --input '{"approved": true}' @@ -40,14 +37,13 @@ temporal workflow query \ --name "get_status" ``` -## Testing Interactive Workflows +## Typical Steps for Testing Interactive Workflows ```bash -./scripts/ensure-worker.sh -uv run starter # Get workflow_id -./scripts/wait-for-workflow-status.sh --workflow-id $workflow_id --status RUNNING -uv run interact --workflow-id $workflow_id --signal-name "approval" --data '{"approved": true}' -./scripts/wait-for-workflow-status.sh --workflow-id $workflow_id --status COMPLETED -./scripts/get-workflow-result.sh --workflow-id $workflow_id -./scripts/kill-worker.sh # CLEANUP +# 1. Start worker (command is project dependent) +# 2. Start workflow (command is project dependent) This code should output the workflow ID, if not, modify it to. +temporal workflow signal --workflow-id --name "signal_name" --input '{"key": "value"}' # 3. Send it interactive events, e.g. a signal. +# 4. Wait for workflow to complete (use Temporal CLI to check status) +# 5. Read workflow result, using the Temporal CLI +# 6. Cleanup the worker process if needed. ``` diff --git a/references/core/patterns.md b/references/core/patterns.md index 2ecc726..198786b 100644 --- a/references/core/patterns.md +++ b/references/core/patterns.md @@ -2,7 +2,7 @@ ## Overview -Common patterns for building robust Temporal workflows. For language-specific implementations, see the Python or TypeScript references. +Common patterns for building robust Temporal workflows. See the language-specific references for the language you are working in. ## Signals @@ -30,6 +30,9 @@ Client Workflow │◀──── (no response) ──────│ ``` +**Note:** A related but distinct pattern to signals is async activity completion. This is an advanced feature, which you may consider if the external system that would deliver the signal is unreliable and might fail to Signal, or +you want the external process to Heartbeat or receive Cancellation. If this may be the case, look at language-specific advanced features for your SDK language (e.g. references/python/advanced-features.md). + ## Queries **Purpose**: Read workflow state synchronously without modifying it. @@ -45,6 +48,7 @@ Client Workflow - Read-only - must not modify state - Not recorded in history - Executes on the worker, not persisted +- Can run even on completed workflows **Example Flow**: ``` @@ -68,7 +72,7 @@ Client Workflow **Characteristics**: - Synchronous - caller waits for completion - Can mutate state AND return values -- Supports validators to reject invalid updates +- Supports validators to reject invalid updates before they even get persisted into history - Recorded in history **Example Flow**: @@ -82,12 +86,9 @@ Client Workflow ## Child Workflows -**Purpose**: Break complex workflows into smaller, reusable pieces. - **When to Use**: - Prevent history from growing too large - Isolate failure domains (child can fail without failing parent) -- Reuse workflow logic across multiple parents - Different retry policies for different parts **Characteristics**: @@ -101,6 +102,8 @@ Client Workflow - `ABANDON` - Child continues running independently - `REQUEST_CANCEL` - Cancellation requested but not forced +**Note:** Do not need to use child workflows simply for breaking complex logic down into smaller pieces. Standard programming abstractions within a workflow can already be used for that. + ## Continue-as-New **Purpose**: Prevent unbounded history growth by "restarting" with fresh history. @@ -166,7 +169,7 @@ On failure at step 3: - Reducing total workflow duration **Patterns**: -- `Promise.all()` / `asyncio.gather()` - Wait for all +- `Promise` / `asyncio` - Use traditional concurrency helpers (e.g. wait for all, wait for first, etc) - Partial failure handling - Continue with successful results ## Entity Workflow Pattern @@ -208,22 +211,122 @@ Entity Workflow (user-123) **Characteristics**: - Timers are durable (persisted in history) - Can be cancelled -- Combine with cancellation scopes for timeouts -## Polling Pattern +## Polling Patterns -**Purpose**: Repeatedly check external state until condition met. +### Frequent Polling + +**Purpose**: Frequently (once per second of faster) repeatedly check external state until condition met. **Implementation**: + ``` +# Inside Activity (polling_activity): while not condition_met: - result = await check_activity() + result = await call_external_api() if result.done: break + activity.heartbeat("Invoking activity") await sleep(poll_interval) + + +# In workflow code: +workflow.execute_activity( + polling_activity, + PollingActivityInput(...), + start_to_close_timeout=timedelta(seconds=60), + heartbeat_timeout=timedelta(seconds=2), +) +``` + +To ensure that polling_activity is restarted in a timely manner, we make sure that it heartbeats on every iteration. Note that heartbeating only works if we set the heartbeat_timeout to a shorter value than the Activity start_to_close_timeout timeout + +**Advantage:** Because the polling loop is inside the activity, this does not pollute the workflow history. + +### Infrequent Polling + +**Purpose**: Infrequently (once per minute or slower) repeatedly poll an external service. + +**Implementation**: + +Define an Activty which fails (raises an exception) exactly when polling is not completed. + +The polling loop is accomplised via activity retries, by setting the following Retry options: +- backoff_coefficient: to 1 +- initial_interval: to the polling interval (e.g. 60 seconds) + +This will enable the Activity to be retried exactly on the set interval. + +**Advantage:** Individual Activity retries are not recorded in Workflow History, so this approach can poll for a very long time without affecting the history size. + +## Idempotency Patterns + +**Purpose**: Ensure activities can be safely retried and replayed without causing duplicate side effects. + +**Why It Matters**: Temporal may re-execute activities during retries (on failure) or replay (on worker restart). Without idempotency, this can cause duplicate charges, duplicate emails, duplicate database entries, etc. + +### Using Idempotency Keys + +Pass a unique identifier to external services so they can detect and deduplicate repeated requests: + +``` +Activity: charge_payment(order_id, amount) + │ + └── Call payment API with: + amount: $100 + idempotency_key: "order-{order_id}" + │ + └── Payment provider deduplicates based on key + (second call with same key returns original result) +``` + +**Good idempotency key sources**: +- Workflow ID (unique per workflow execution) +- Business identifier (order ID, transaction ID) +- Workflow ID + activity name + attempt number + +### Check-Before-Act Pattern + +Query the external system's state before making changes: + +``` +Activity: send_welcome_email(user_id) + │ + ├── Check: Has welcome email been sent for user_id? + │ │ + │ ├── YES: Return early (already done) + │ │ + │ └── NO: Send email, mark as sent +``` + +### Designing Idempotent Activities + +1. **Use unique identifiers** as idempotency keys with external APIs +2. **Check before acting**: Query current state before making changes +3. **Make operations repeatable**: Ensure calling twice produces the same result +4. **Record outcomes**: Store transaction IDs or results for verification +5. **Leverage external system features**: Many APIs (Stripe, AWS, etc.) have built-in idempotency key support + +### Tracking State in Workflows + +For complex multi-step operations, track completion status in workflow state: + +``` +Workflow State: + payment_completed: false + shipment_created: false + +Run: + if not payment_completed: + charge_payment(...) + payment_completed = true + + if not shipment_created: + create_shipment(...) + shipment_created = true ``` -**Best Practice**: Use exponential backoff for polling intervals. +This ensures that on replay, already-completed steps are skipped. ## Choosing Between Patterns @@ -237,3 +340,4 @@ while not condition_met: | Rollback on failure | Saga | | Process items concurrently | Parallel Execution | | Long-lived stateful entity | Entity Workflow | +| Safe retries/replays | Idempotency | diff --git a/references/core/tool-reference.md b/references/core/tool-reference.md deleted file mode 100644 index 4702172..0000000 --- a/references/core/tool-reference.md +++ /dev/null @@ -1,84 +0,0 @@ -# Tool Reference - -## Lifecycle Scripts - -| Tool | Description | Key Options | -|------|-------------|-------------| -| `ensure-server.sh` | Start dev server if not running | - | -| `ensure-worker.sh` | Kill old workers, start fresh one | Uses `$TEMPORAL_WORKER_CMD` | -| `kill-worker.sh` | Kill current project's worker | - | -| `kill-all-workers.sh` | Kill all workers | `--include-server` | -| `list-workers.sh` | List running workers | - | - -## Monitoring Scripts - -| Tool | Description | Key Options | -|------|-------------|-------------| -| `list-recent-workflows.sh` | Show recent executions | `--minutes N` (default: 5) | -| `find-stalled-workflows.sh` | Detect stalled workflows | `--query "..."` | -| `monitor-worker-health.sh` | Check worker status | - | -| `wait-for-workflow-status.sh` | Block until status | `--workflow-id`, `--status`, `--timeout` | - -## Debugging Scripts - -| Tool | Description | Key Options | -|------|-------------|-------------| -| `analyze-workflow-error.sh` | Extract errors from history | `--workflow-id`, `--run-id` | -| `get-workflow-result.sh` | Get workflow output | `--workflow-id`, `--raw` | -| `bulk-cancel-workflows.sh` | Mass cancellation | `--pattern "..."` | - -## Worker Management Details - -### The Golden Rule - -**Ensure no old workers are running.** Stale workers with outdated code cause: -- Non-determinism errors (history mismatch) -- Executing old buggy code -- Confusing behavior - -**Best practice**: Run only ONE worker instance with the latest code. - -### Starting Workers - -```bash -# PREFERRED: Smart restart (kills old, starts fresh) -./scripts/ensure-worker.sh -``` - -This command: -1. Finds ALL existing workers for the project -2. Kills them -3. Starts a new worker with fresh code -4. Waits for worker to be ready - -### Verifying Workers - -```bash -# List all running workers -./scripts/list-workers.sh - -# Check specific worker health -./scripts/monitor-worker-health.sh - -# View worker logs -tail -f $CLAUDE_TEMPORAL_LOG_DIR/worker-$(basename "$(pwd)").log -``` - -**What to look for in logs**: -- `Worker started, listening on task queue: ...` → Worker is ready -- `Worker process died during startup` → Startup failure, check logs for error - -### Cleanup (REQUIRED) - -**Always kill workers when done.** Don't leave workers running. - -```bash -# Kill current project's worker -./scripts/kill-worker.sh - -# Kill ALL workers (full cleanup) -./scripts/kill-all-workers.sh - -# Kill all workers AND server -./scripts/kill-all-workers.sh --include-server -``` diff --git a/references/core/troubleshooting.md b/references/core/troubleshooting.md index c931c9f..5aea3a9 100644 --- a/references/core/troubleshooting.md +++ b/references/core/troubleshooting.md @@ -30,7 +30,7 @@ Workflow stuck in RUNNING? ├─▶ Is a worker running? │ │ │ ├─▶ NO: Start a worker -│ │ └─▶ scripts/ensure-worker.sh +│ │ └─▶ See references/core/dev-management.md │ │ │ └─▶ YES: Is it on the correct task queue? │ │ @@ -41,15 +41,24 @@ Workflow stuck in RUNNING? │ ├─▶ NondeterminismError in logs? │ │ └─▶ Go to: "Non-Determinism" section │ │ -│ └─▶ No errors? +│ ├─▶ Check history for task failures +│ │ └─▶ Run: `temporal workflow show --workflow-id ` +│ │ │ +│ │ ├─▶ WorkflowTaskFailed event? +│ │ │ └─▶ Check error type in event details +│ │ │ └─▶ Go to relevant section in error-reference.md +│ │ │ +│ │ └─▶ ActivityTaskFailed event? +│ │ └─▶ Go to: "Activity Keeps Retrying" section +│ │ +│ └─▶ No errors in logs or history? │ └─▶ Check if workflow is waiting for signal/timer ``` ### Common Causes 1. **No worker running** - - Check: `scripts/list-workers.sh` - - Fix: `scripts/ensure-worker.sh ` + - See references/core/dev-management.md 2. **Worker on wrong task queue** - Check: Worker logs for task queue name @@ -76,14 +85,20 @@ NondeterminismError? │ ├─▶ Was code intentionally changed? │ │ -│ ├─▶ YES: Use patching API -│ │ └─▶ See: references/core/versioning.md +│ ├─▶ YES: Do you need to support in-flight workflows? +│ │ │ +│ │ ├─▶ YES (production): Use patching API +│ │ │ └─▶ See: references/core/versioning.md +│ │ │ +│ │ └─▶ NO (local dev/testing): Terminate or reset workflow +│ │ └─▶ `temporal workflow terminate --workflow-id ` +│ │ └─▶ Then start fresh with new code │ │ │ └─▶ NO: Accidental change │ │ │ ├─▶ Can you identify the change? │ │ │ -│ │ ├─▶ YES: Revert and restart worker +│ │ ├─▶ YES: Revert and restart worker. Note, this doesn't always work if workflow has progressed past the change (may induce other code paths), so may need to reset workflow. │ │ │ │ │ └─▶ NO: Compare current code to expected history │ │ └─▶ Check: Activity names, order, parameters @@ -91,24 +106,24 @@ NondeterminismError? ### Common Causes -1. **Changed activity order** +1. **Changed call order** ``` # Before # After (BREAKS) await activity_a await activity_b await activity_b await activity_a ``` -2. **Changed activity name** +2. **Changed call name** ``` # Before # After (BREAKS) await process_order(...) await handle_order(...) ``` -3. **Added/removed activity call** +3. **Added/removed call** - Adding new activity mid-workflow - Removing activity that was previously called -4. **Using non-deterministic values** +4. **Using non-deterministic code** - `datetime.now()` in workflow (use `workflow.now()`) - `random.random()` in workflow (use `workflow.random()`) @@ -177,7 +192,7 @@ Timeout error? ├─▶ Which timeout? │ │ │ ├─▶ Workflow timeout -│ │ └─▶ Increase timeout or optimize workflow +│ │ └─▶ Increase timeout or optimize workflow. Better yet, consider removing the workflow timeout, as it is generally discourged unless *necessary* for your use case. │ │ │ ├─▶ ScheduleToCloseTimeout │ │ └─▶ Activity taking too long overall (including retries) @@ -255,7 +270,7 @@ Worker won't start? │ ├─▶ Connection error │ └─▶ Check Temporal server is running -│ └─▶ scripts/ensure-server.sh +│ └─▶ `temporal server start-dev` (start in background, see references/core/dev-management.md) │ ├─▶ Registration error │ └─▶ Check workflow/activity definitions are valid diff --git a/references/python/advanced-features.md b/references/python/advanced-features.md index 3fffa77..0857236 100644 --- a/references/python/advanced-features.md +++ b/references/python/advanced-features.md @@ -13,7 +13,7 @@ class BatchProcessingWorkflow: # Process next batch state = await workflow.execute_activity( process_batch, state, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) # Check history size and continue-as-new if needed @@ -122,113 +122,13 @@ await schedule.trigger() # Run immediately await schedule.delete() ``` -## Interceptors - -Interceptors allow cross-cutting concerns like logging, metrics, and auth. - -### Creating a Custom Activity Interceptor - -The interceptor pattern uses a chain of interceptors. You create an `Interceptor` class that returns specialized inbound interceptors for activities and workflows. - -```python -from temporalio.worker import ( - Interceptor, - ActivityInboundInterceptor, - ExecuteActivityInput, -) -from typing import Any - -class LoggingActivityInboundInterceptor(ActivityInboundInterceptor): - async def execute_activity(self, input: ExecuteActivityInput) -> Any: - activity.logger.info(f"Activity starting: {input.fn.__name__}") - try: - # Delegate to next interceptor in chain - result = await self.next.execute_activity(input) - activity.logger.info(f"Activity completed: {input.fn.__name__}") - return result - except Exception as e: - activity.logger.error(f"Activity failed: {e}") - raise - -class LoggingInterceptor(Interceptor): - def intercept_activity( - self, - next: ActivityInboundInterceptor, - ) -> ActivityInboundInterceptor: - # Return our interceptor wrapping the next one - return LoggingActivityInboundInterceptor(next) - -# Apply to worker -worker = Worker( - client, - task_queue="my-queue", - workflows=[MyWorkflow], - activities=[my_activity], - interceptors=[LoggingInterceptor()], -) -``` - -### Creating a Custom Workflow Interceptor - -```python -from temporalio.worker import ( - Interceptor, - WorkflowInboundInterceptor, - WorkflowInterceptorClassInput, - ExecuteWorkflowInput, -) - -class LoggingWorkflowInboundInterceptor(WorkflowInboundInterceptor): - async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: - workflow.logger.info(f"Workflow starting: {input.type}") - try: - result = await self.next.execute_workflow(input) - workflow.logger.info(f"Workflow completed: {input.type}") - return result - except Exception as e: - workflow.logger.error(f"Workflow failed: {e}") - raise - -class LoggingInterceptor(Interceptor): - def workflow_interceptor_class( - self, - input: WorkflowInterceptorClassInput, - ) -> type[WorkflowInboundInterceptor] | None: - return LoggingWorkflowInboundInterceptor -``` - -## Dynamic Workflows and Activities - -Handle workflows/activities not known at compile time. - -### Dynamic Workflow Handler - -```python -@workflow.defn(dynamic=True) -class DynamicWorkflow: - @workflow.run - async def run(self, args: Sequence[RawValue]) -> Any: - workflow_type = workflow.info().workflow_type - # Route based on type - if workflow_type == "order-workflow": - return await self._handle_order(args) - elif workflow_type == "refund-workflow": - return await self._handle_refund(args) -``` - -### Dynamic Activity Handler - -```python -@activity.defn(dynamic=True) -async def dynamic_activity(args: Sequence[RawValue]) -> Any: - activity_type = activity.info().activity_type - # Handle based on type - ... -``` - ## Async Activity Completion For activities that complete asynchronously (e.g., human tasks, external callbacks). +If you configure a heartbeat_timeout on this activity, the external completer is responsible for sending heartbeats via the async handle. +If you do NOT set a heartbeat_timeout, no heartbeats are required. + +**Note:** If the external system that completes the asynchronous action can reliably be trusted to do the task and Signal back with the result, and it doesn't need to Heartbeat or receive Cancellation, then consider using **signals** instead. ```python from temporalio import activity @@ -242,74 +142,29 @@ async def request_approval(request_id: str) -> None: # Store task token for later completion (e.g., in database) await store_task_token(request_id, task_token) - # Raise to indicate async completion + # Mark this activity as waiting for external completion activity.raise_complete_async() # Later, complete the activity from another process async def complete_approval(request_id: str, approved: bool): - client = await Client.connect("localhost:7233") + client = await Client.connect("localhost:7233", namespace="default") task_token = await get_task_token(request_id) + handle = client.get_async_activity_handle(task_token=task_token) + + # Optional: if a heartbeat_timeout was set, you can periodically: + # await handle.heartbeat(progress_details) + if approved: - await client.get_async_activity_handle(task_token).complete("approved") + await handle.complete("approved") else: - await client.get_async_activity_handle(task_token).fail( - ApplicationError("Rejected") - ) + # You can also fail or report cancellation via the handle + await handle.fail(ApplicationError("Rejected")) ``` ## Sandbox Customization -The Python SDK runs workflows in a sandbox to ensure determinism. You can customize sandbox restrictions when needed. - -### Passing Through Modules - -If you need to use modules that are blocked by the sandbox: - -```python -from temporalio.worker import Worker -from temporalio.worker.workflow_sandbox import SandboxRestrictions - -# Allow specific modules through the sandbox -restrictions = SandboxRestrictions.default.with_passthrough_modules("my_module") - -worker = Worker( - client, - task_queue="my-queue", - workflows=[MyWorkflow], - workflow_runner=SandboxedWorkflowRunner( - restrictions=restrictions, - ), -) -``` - -### Passing Through All Modules (Use with Caution) - -```python -# Disable module restrictions entirely - use only if you trust all code -restrictions = SandboxRestrictions.default.with_passthrough_all_modules() -``` - -### Temporary Passthrough in Workflow Code - -```python -@workflow.run -async def run(self) -> str: - # Temporarily disable sandbox restrictions for imports - with workflow.unsafe.imports_passed_through(): - import some_restricted_module - # Use the module... -``` - -### Customizing Invalid Module Members - -```python -# Allow specific members that are normally blocked -restrictions = SandboxRestrictions.default -restrictions = restrictions.with_invalid_module_member_children( - "datetime", {"datetime": {"now"}} # Block datetime.datetime.now -) -``` +The Python SDK runs workflows in a sandbox to help you ensure determinism. You can customize sandbox restrictions when needed. See `references/python/sandbox.md` ## Gevent Compatibility Warning @@ -345,7 +200,9 @@ worker = Worker( ## Workflow Init Decorator -Use `@workflow.init` to run initialization code when a workflow is first created (not on replay). +Use `@workflow.init` to run initialization code when a workflow is first created. + +**Purpose:** Execute some setup code before signal/update happens or run is invoked. ```python @workflow.defn @@ -364,7 +221,12 @@ class MyWorkflow: ## Workflow Failure Exception Types -Control which exceptions cause workflow task failures vs workflow failures: +Control which exceptions cause workflow task failures vs workflow failures. + +- Special case: if you include temporalio.workflow.NondeterminismError (or a superclass), non-determinism errors will fail the workflow instead of leaving it in a retrying state +- **Tip for testing:** Set to `[Exception]` in tests so any unhandled exception fails the workflow immediately rather than retrying the workflow task forever. This surfaces bugs faster. + +### Per-Workflow Configuration ```python @workflow.defn( @@ -376,3 +238,15 @@ class MyWorkflow: async def run(self) -> str: raise ValueError("This fails the workflow, not just the task") ``` + +### Worker-Level Configuration + +```python +worker = Worker( + client, + task_queue="my-queue", + workflows=[MyWorkflow], + workflow_failure_exception_types=[ValueError, CustomBusinessError], +) +``` + diff --git a/references/python/ai-patterns.md b/references/python/ai-patterns.md index 9a3ef62..32a5e13 100644 --- a/references/python/ai-patterns.md +++ b/references/python/ai-patterns.md @@ -14,6 +14,7 @@ from temporalio.contrib.pydantic import pydantic_data_converter client = await Client.connect( "localhost:7233", + namespace="default", data_converter=pydantic_data_converter, ) ``` @@ -109,10 +110,6 @@ class LLMWorkflow: ), start_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy( - initial_interval=timedelta(seconds=1), - backoff_coefficient=2.0, - maximum_interval=timedelta(seconds=60), - maximum_attempts=5, non_retryable_error_types=["InvalidAPIKeyError"], ), ) diff --git a/references/python/data-handling.md b/references/python/data-handling.md index c4e16a5..d6a40d2 100644 --- a/references/python/data-handling.md +++ b/references/python/data-handling.md @@ -16,9 +16,10 @@ The default converter handles: Use Pydantic models for validated, typed data. +In your workflow definition, just use input and result types that subclass `pydantic.BaseModel`: + ```python from pydantic import BaseModel -from temporalio.contrib.pydantic import pydantic_data_converter class OrderInput(BaseModel): order_id: str @@ -41,38 +42,27 @@ class OrderWorkflow: status="completed", tracking_number="TRK123", ) +``` +And when you configure the client, pass the `pydantic_data_converter`: + +```python +from temporalio.contrib.pydantic import pydantic_data_converter # Configure client with Pydantic support client = await Client.connect( "localhost:7233", + namespace="default", data_converter=pydantic_data_converter, ) ``` -## Custom Data Converter - -Create custom converters for special serialization needs. - -```python -from temporalio.converter import ( - DataConverter, - PayloadConverter, - DefaultPayloadConverter, -) - -class CustomPayloadConverter(PayloadConverter): - # Implement encoding_payload_converters and decoding_payload_converters - pass +## Custom Data Conversion -custom_converter = DataConverter( - payload_converter_class=CustomPayloadConverter, -) +Usually the easiest way to do this is via implementing an EncodingPayloadConverter and CompositePayloadConverter. See: +- https://raw.githubusercontent.com/temporalio/samples-python/refs/heads/main/custom_converter/shared.py +- https://raw.githubusercontent.com/temporalio/samples-python/refs/heads/main/custom_converter/starter.py -client = await Client.connect( - "localhost:7233", - data_converter=custom_converter, -) -``` +for an extended example. ## Payload Encryption @@ -92,7 +82,8 @@ class EncryptionCodec(PayloadCodec): return [ Payload( metadata={"encoding": b"binary/encrypted"}, - data=self._fernet.encrypt(p.SerializeToString()), + # Since encryption uses C extensions that give up the GIL, we can avoid blocking the async event loop here. + data=await asyncio.to_thread(self._fernet.encrypt, p.SerializeToString()), ) for p in payloads ] @@ -101,7 +92,7 @@ class EncryptionCodec(PayloadCodec): result = [] for p in payloads: if p.metadata.get("encoding") == b"binary/encrypted": - decrypted = self._fernet.decrypt(p.data) + decrypted = await asyncio.to_thread(self._fernet.decrypt, p.data) decoded = Payload() decoded.ParseFromString(decrypted) result.append(decoded) @@ -112,6 +103,7 @@ class EncryptionCodec(PayloadCodec): # Apply encryption codec client = await Client.connect( "localhost:7233", + namespace="default", data_converter=DataConverter( payload_codec=EncryptionCodec(encryption_key), ), @@ -120,35 +112,66 @@ client = await Client.connect( ## Search Attributes -Custom searchable fields for workflow visibility. +Custom searchable fields for workflow visibility. These can be created at workflow start: ```python -from temporalio.common import SearchAttributes, SearchAttributeKey +from temporalio.common import ( + SearchAttributeKey, + SearchAttributePair, + TypedSearchAttributes, +) +from datetime import datetime +from datetime import timezone -# Define typed keys ORDER_ID = SearchAttributeKey.for_keyword("OrderId") ORDER_STATUS = SearchAttributeKey.for_keyword("OrderStatus") ORDER_TOTAL = SearchAttributeKey.for_float("OrderTotal") CREATED_AT = SearchAttributeKey.for_datetime("CreatedAt") -# Set at workflow start -await client.execute_workflow( +# At workflow start +handle = await client.start_workflow( OrderWorkflow.run, order, id=f"order-{order.id}", task_queue="orders", - search_attributes=SearchAttributes.from_pairs([ - (ORDER_ID, order.id), - (ORDER_STATUS, "pending"), - (ORDER_TOTAL, order.total), - (CREATED_AT, datetime.now(timezone.utc)), + search_attributes=TypedSearchAttributes([ + SearchAttributePair(ORDER_ID, order.id), + SearchAttributePair(ORDER_STATUS, "pending"), + SearchAttributePair(ORDER_TOTAL, order.total), + SearchAttributePair(CREATED_AT, datetime.now(timezone.utc)), ]), ) +``` + +Or upserted during workflow execution: -# Upsert from within workflow -workflow.upsert_search_attributes([ - (ORDER_STATUS, "completed"), -]) +```python +from temporalio import workflow +from temporalio.common import SearchAttributeKey, SearchAttributePair, TypedSearchAttributes + +ORDER_STATUS = SearchAttributeKey.for_keyword("OrderStatus") + +@workflow.defn +class OrderWorkflow: + @workflow.run + async def run(self, order: Order) -> str: + # ... process order ... + + # Update search attribute + workflow.upsert_search_attributes(TypedSearchAttributes([ + SearchAttributePair(ORDER_STATUS, "completed"), + ])) + return "done" +``` + +### Querying Workflows by Search Attributes + +```python +# List workflows using search attributes +async for workflow in client.list_workflows( + 'OrderStatus = "processing" OR OrderStatus = "pending"' +): + print(f"Workflow {workflow.id} is still processing") ``` ## Workflow Memo @@ -167,14 +190,15 @@ await client.execute_workflow( "notes": "Priority customer", }, ) +``` +```python # Read memo from workflow @workflow.defn class OrderWorkflow: @workflow.run async def run(self, order: Order) -> str: - memo = workflow.memo() - notes = memo.get("notes", "") + notes: str = workflow.memo_value("notes", type_hint=str) ... ``` diff --git a/references/python/determinism.md b/references/python/determinism.md index 90e5856..7027fd7 100644 --- a/references/python/determinism.md +++ b/references/python/determinism.md @@ -8,46 +8,26 @@ The Python SDK runs workflows in a sandbox that provides automatic protection ag Temporal achieves durability through **history replay**. Understanding this mechanism is key to writing correct Workflow code. -### How Replay Works - -1. **Initial Execution**: When your Workflow runs for the first time, the SDK records Commands (like "schedule activity") to the Event History stored by Temporal Server. - -2. **Recovery/Continuation**: When a Worker restarts, loses connectivity, or picks up a Workflow Task, it must restore the Workflow's state by replaying the code from the beginning. - -3. **Command Matching**: During replay, the SDK re-executes your Workflow code but doesn't actually run Activities again. Instead, it compares the Commands your code generates against the Events in history. If there's a match, it uses the stored result. - -4. **Non-determinism Detection**: If your code generates different Commands than what's in history (e.g., different Activity name, different order), the SDK raises a `NondeterminismError`. - -### Example: Why datetime.now() Breaks Replay +## Forbidden Operations -```python -# BAD - Non-deterministic -@workflow.defn -class BadWorkflow: - @workflow.run - async def run(self) -> str: - import datetime - if datetime.datetime.now().hour < 12: # Different value on replay! - await workflow.execute_activity(morning_activity, ...) - else: - await workflow.execute_activity(afternoon_activity, ...) -``` +- Direct I/O (network, filesystem) +- Threading operations +- `subprocess` calls +- Global mutable state modification +- `time.sleep()` (use `asyncio.sleep()`) +- and so on -If this runs at 11:59 AM initially and replays at 12:01 PM, it will try to schedule a different Activity, causing `NondeterminismError`. +## Safe Builtin Alternatives to Common Non Deterministic Things -```python -# GOOD - Deterministic -@workflow.defn -class GoodWorkflow: - @workflow.run - async def run(self) -> str: - if workflow.now().hour < 12: # Consistent during replay - await workflow.execute_activity(morning_activity, ...) - else: - await workflow.execute_activity(afternoon_activity, ...) -``` +| Forbidden | Safe Alternative | +|-----------|------------------| +| `datetime.now()` | `workflow.now()` | +| `datetime.utcnow()` | `workflow.now()` | +| `random.random()` | `rng = workflow.new_random() ; rng.randint(1, 100)` | +| `uuid.uuid4()` | `workflow.uuid4()` | +| `time.time()` | `workflow.now().timestamp()` | -### Testing Replay Compatibility +## Testing Replay Compatibility Use the `Replayer` class to verify your code changes are compatible with existing histories: @@ -73,64 +53,7 @@ The sandbox: - Restricts non-deterministic library calls via proxy objects - Passes through standard library with restrictions -## Safe Alternatives - -| Forbidden | Safe Alternative | -|-----------|------------------| -| `datetime.now()` | `workflow.now()` | -| `datetime.utcnow()` | `workflow.now()` | -| `random.random()` | `workflow.random().random()` | -| `random.randint()` | `workflow.random().randint()` | -| `uuid.uuid4()` | `workflow.uuid4()` | -| `time.time()` | `workflow.now().timestamp()` | - -## Pass-Through Pattern - -For third-party libraries that need to bypass sandbox restrictions: - -```python -with workflow.unsafe.imports_passed_through(): - import pydantic - from my_module import my_activity -``` - -## Disabling Sandbox - -```python -# Per-workflow -@workflow.defn(sandboxed=False) -class UnsandboxedWorkflow: - pass - -# Per-block -with workflow.unsafe.sandbox_unrestricted(): - # Unrestricted code - pass - -# Globally (worker level) -from temporalio.worker import UnsandboxedWorkflowRunner -Worker(..., workflow_runner=UnsandboxedWorkflowRunner()) -``` - -## Forbidden Operations - -- Direct I/O (network, filesystem) -- Threading operations -- `subprocess` calls -- Global mutable state modification -- `time.sleep()` (use `asyncio.sleep()`) - -## Commands and Events - -Understanding the relationship between your code and the Event History: - -| Workflow Code | Command Generated | Event Created | -|--------------|-------------------|---------------| -| `workflow.execute_activity()` | ScheduleActivityTask | ActivityTaskScheduled | -| `asyncio.sleep()` / `workflow.sleep()` | StartTimer | TimerStarted | -| `workflow.execute_child_workflow()` | StartChildWorkflowExecution | ChildWorkflowExecutionStarted | -| `workflow.continue_as_new()` | ContinueAsNewWorkflowExecution | WorkflowExecutionContinuedAsNew | -| Return from `@workflow.run` | CompleteWorkflowExecution | WorkflowExecutionCompleted | +See more info at `references/python/sandbox.md` ## Best Practices diff --git a/references/python/error-handling.md b/references/python/error-handling.md index 627e58c..6e7f9b3 100644 --- a/references/python/error-handling.md +++ b/references/python/error-handling.md @@ -2,11 +2,12 @@ ## Overview -The Python SDK uses `ApplicationError` for application-specific errors and provides comprehensive retry policy configuration. +The Python SDK uses `ApplicationError` for application-specific errors and provides comprehensive retry policy configuration. Generally, the following information about errors and retryability applies across activities, child workflows and Nexus operations. ## Application Errors ```python +from temporalio import activity from temporalio.exceptions import ApplicationError @activity.defn @@ -21,125 +22,114 @@ async def validate_order(order: Order) -> None: ## Non-Retryable Errors ```python -raise ApplicationError( - "Permanent failure - invalid credit card", - type="PaymentError", - non_retryable=True, # Will not retry -) +from dataclasses import dataclass +from temporalio import activity +from temporalio.exceptions import ApplicationError + +@dataclass +class ChargeCardInput: + card_number: str + amount: float + +@activity.defn +async def charge_card(input: ChargeCardInput) -> str: + if not is_valid_card(input.card_number): + raise ApplicationError( + "Permanent failure - invalid credit card", + type="PaymentError", + non_retryable=True, # Will not retry activity + ) + return await process_payment(input.card_number, input.amount) ``` ## Handling Activity Errors ```python -from temporalio.exceptions import ActivityError +from datetime import timedelta +from temporalio import workflow +from temporalio.exceptions import ActivityError, ApplicationError -@workflow.run -async def run(self) -> str: - try: - return await workflow.execute_activity( - risky_activity, - schedule_to_close_timeout=timedelta(minutes=5), - ) - except ActivityError as e: - workflow.logger.error(f"Activity failed: {e}") - # Handle or re-raise - raise ApplicationError("Workflow failed due to activity error") +@workflow.defn +class MyWorkflow: + @workflow.run + async def run(self) -> str: + try: + return await workflow.execute_activity( + risky_activity, + start_to_close_timeout=timedelta(minutes=5), + ) + except ActivityError as e: + workflow.logger.error(f"Activity failed: {e}") + # Handle or re-raise + raise ApplicationError("Workflow failed due to activity error") ``` ## Retry Policy Configuration ```python +from datetime import timedelta +from temporalio import workflow from temporalio.common import RetryPolicy -result = await workflow.execute_activity( - my_activity, - schedule_to_close_timeout=timedelta(minutes=10), - retry_policy=RetryPolicy( - initial_interval=timedelta(seconds=1), - backoff_coefficient=2.0, - maximum_interval=timedelta(minutes=1), - maximum_attempts=5, - non_retryable_error_types=["ValidationError", "PaymentError"], - ), -) +@workflow.defn +class MyWorkflow: + @workflow.run + async def run(self) -> str: + result = await workflow.execute_activity( + my_activity, + start_to_close_timeout=timedelta(minutes=10), + retry_policy=RetryPolicy( + maximum_interval=timedelta(minutes=1), + maximum_attempts=5, + non_retryable_error_types=["ValidationError", "PaymentError"], + ), + ) + return result ``` ## Timeout Configuration ```python -await workflow.execute_activity( - my_activity, - start_to_close_timeout=timedelta(minutes=5), # Single attempt - schedule_to_close_timeout=timedelta(minutes=30), # Including retries - heartbeat_timeout=timedelta(seconds=30), # Between heartbeats -) -``` - -## Workflow Failure +from datetime import timedelta +from temporalio import workflow -```python -@workflow.run -async def run(self) -> str: - if some_condition: - raise ApplicationError( - "Cannot process order", - type="BusinessError", - non_retryable=True, +@workflow.defn +class MyWorkflow: + @workflow.run + async def run(self) -> str: + return await workflow.execute_activity( + my_activity, + start_to_close_timeout=timedelta(minutes=5), # Single attempt + schedule_to_close_timeout=timedelta(minutes=30), # Including retries + heartbeat_timeout=timedelta(seconds=30), # Between heartbeats ) - return "success" ``` -## Idempotency Patterns - -When Activities interact with external systems, making them idempotent ensures correctness during retries and replay. - -### Using Workflow IDs as Idempotency Keys +## Workflow Failure ```python -@activity.defn -async def charge_payment(order_id: str, amount: float) -> str: - # Use order_id as idempotency key with payment provider - result = await payment_api.charge( - amount=amount, - idempotency_key=f"order-{order_id}", # Prevents duplicate charges - ) - return result.transaction_id -``` - -### Tracking Operation Status in Workflow State +from temporalio import workflow +from temporalio.exceptions import ApplicationError -```python @workflow.defn -class OrderWorkflow: - def __init__(self): - self._payment_completed = False - self._transaction_id: str | None = None - +class MyWorkflow: @workflow.run - async def run(self, order: Order) -> str: - if not self._payment_completed: - self._transaction_id = await workflow.execute_activity( - charge_payment, order.id, order.total, - schedule_to_close_timeout=timedelta(minutes=5), + async def run(self) -> str: + if some_condition: + raise ApplicationError( + "Cannot process order", + type="BusinessError", ) - self._payment_completed = True - - # Continue with order processing... - return self._transaction_id + return "success" ``` -### Designing Idempotent Activities - -1. **Use unique identifiers** as idempotency keys (workflow ID, activity ID, or business ID) -2. **Check before acting**: Query external system state before making changes -3. **Make operations repeatable**: Ensure calling twice produces the same result -4. **Record outcomes**: Store transaction IDs or results for verification +**Note:** Do not use `non_retryable=` with `ApplicationError` inside a worklow (as opposed to an activity). ## Best Practices 1. Use specific error types for different failure modes 2. Mark permanent failures as non-retryable -3. Configure appropriate retry policies per activity +3. Configure appropriate retry policies 4. Log errors before re-raising 5. Use `ActivityError` to catch activity failures in workflows -6. Design activities to be idempotent for safe retries +6. Design code to be idempotent for safe retries (see more at `references/core/patterns.md`) diff --git a/references/python/gotchas.md b/references/python/gotchas.md index ceec619..1a919c1 100644 --- a/references/python/gotchas.md +++ b/references/python/gotchas.md @@ -5,18 +5,23 @@ Python-specific mistakes and anti-patterns. See also [Common Gotchas](../core/co ## Idempotency ```python +@dataclass +class ChargePaymentInput: + order_id: str + amount: float + # BAD - May charge customer multiple times on retry @activity.defn -async def charge_payment(order_id: str, amount: float) -> str: - return await payment_api.charge(customer_id, amount) +async def charge_payment(input: ChargePaymentInput) -> str: + return await payment_api.charge(input.customer_id, input.amount) # GOOD - Safe for retries @activity.defn -async def charge_payment(order_id: str, amount: float) -> str: +async def charge_payment(input: ChargePaymentInput) -> str: return await payment_api.charge( - customer_id, - amount, - idempotency_key=f"order-{order_id}" + input.customer_id, + input.amount, + idempotency_key=f"order-{input.order_id}" ) ``` @@ -291,23 +296,20 @@ async def call_api(): # BAD - Gives up too easily result = await workflow.execute_activity( flaky_api_call, - schedule_to_close_timeout=timedelta(seconds=30), + start_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=1), ) # GOOD - Resilient to transient failures result = await workflow.execute_activity( flaky_api_call, - schedule_to_close_timeout=timedelta(minutes=10), - retry_policy=RetryPolicy( - initial_interval=timedelta(seconds=1), - maximum_interval=timedelta(minutes=1), - backoff_coefficient=2.0, - maximum_attempts=10, - ), + start_to_close_timeout=timedelta(minutes=10), + retry_policy=RetryPolicy(maximum_attempts=10), ) ``` +Generally, prefer to use the default RetryPolicy. + ## Heartbeating ### Forgetting to Heartbeat Long Activities diff --git a/references/python/observability.md b/references/python/observability.md index fcba076..26296c3 100644 --- a/references/python/observability.md +++ b/references/python/observability.md @@ -19,7 +19,7 @@ class MyWorkflow: result = await workflow.execute_activity( my_activity, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) workflow.logger.info("Activity completed", extra={"result": result}) @@ -50,21 +50,16 @@ Activity logger includes: - Workflow ID and run ID - Attempt number (for retries) -### Custom Logger Configuration +### Customizing Logger Configuration ```python import logging -# Configure a custom handler -handler = logging.StreamHandler() -handler.setFormatter(logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" -)) - -# Apply to Temporal's logger -temporal_logger = logging.getLogger("temporalio") -temporal_logger.addHandler(handler) -temporal_logger.setLevel(logging.INFO) +# Applies to temporalio.workflow.logger and temporalio.activity.logger, as Temporal inherits the default logger +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) ``` ## Metrics @@ -75,17 +70,19 @@ temporal_logger.setLevel(logging.INFO) from temporalio.client import Client from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig -# Configure Prometheus metrics endpoint +# Create a custom runtime runtime = Runtime( telemetry=TelemetryConfig( - metrics=PrometheusConfig(bind_address="0.0.0.0:9090") + metrics=PrometheusConfig(bind_address="0.0.0.0:9000") ) ) -client = await Client.connect( - "localhost:7233", - runtime=runtime, -) +# Set it as the global default BEFORE any Client/Worker is created +# Do this only ONCE. +Runtime.set_default(runtime, error_if_already_set=True) +# error_if_already_set can be False if you want to overwrite an existing default without raising. + +# ...elsewhere, client = ... as usual ``` ### Key SDK Metrics @@ -95,92 +92,10 @@ client = await Client.connect( - `temporal_activity_execution_latency` - Activity execution time - `temporal_workflow_task_replay_latency` - Replay duration -## Tracing - -### OpenTelemetry Integration - -```python -from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from temporalio.contrib.opentelemetry import TracingInterceptor - -# Set up OpenTelemetry -provider = TracerProvider() -trace.set_tracer_provider(provider) - -# Create tracing interceptor -tracing_interceptor = TracingInterceptor() - -# Apply to client and worker -client = await Client.connect( - "localhost:7233", - interceptors=[tracing_interceptor], -) - -worker = Worker( - client, - task_queue="my-queue", - workflows=[MyWorkflow], - activities=[my_activity], - interceptors=[tracing_interceptor], -) -``` ## Search Attributes (Visibility) -### Setting Search Attributes at Start - -```python -from temporalio.common import SearchAttributes, SearchAttributeKey - -# Define typed search attribute keys -ORDER_ID = SearchAttributeKey.for_keyword("OrderId") -CUSTOMER_TYPE = SearchAttributeKey.for_keyword("CustomerType") -ORDER_TOTAL = SearchAttributeKey.for_float("OrderTotal") - -# Start workflow with search attributes -await client.execute_workflow( - OrderWorkflow.run, - order, - id=f"order-{order.id}", - task_queue="orders", - search_attributes=SearchAttributes.from_pairs([ - (ORDER_ID, order.id), - (CUSTOMER_TYPE, order.customer_type), - (ORDER_TOTAL, order.total), - ]), -) -``` - -### Upserting Search Attributes from Workflow - -```python -@workflow.defn -class OrderWorkflow: - @workflow.run - async def run(self, order: Order) -> str: - # Update status as workflow progresses - workflow.upsert_search_attributes([ - (ORDER_STATUS, "processing"), - ]) - - await workflow.execute_activity(process_order, order, ...) - - workflow.upsert_search_attributes([ - (ORDER_STATUS, "completed"), - ]) - return "done" -``` - -### Querying Workflows by Search Attributes - -```python -# List workflows using search attributes -async for workflow in client.list_workflows( - 'OrderStatus = "processing" AND CustomerType = "premium"' -): - print(f"Workflow {workflow.id} is still processing") -``` +See the Search Attributes section of `references/python/data-handling.md` ## Best Practices @@ -188,4 +103,3 @@ async for workflow in client.list_workflows( 2. Don't use print() in workflows - it will produce duplicate output on replay 3. Configure metrics for production monitoring 4. Use Search Attributes for business-level visibility -5. Add tracing for distributed debugging diff --git a/references/python/patterns.md b/references/python/patterns.md index bc4f3fc..6bdbba3 100644 --- a/references/python/patterns.md +++ b/references/python/patterns.md @@ -86,7 +86,7 @@ class StatusWorkflow: self._progress = i await workflow.execute_activity( process_item, i, - schedule_to_close_timeout=timedelta(minutes=1) + start_to_close_timeout=timedelta(minutes=1) ) self._status = "completed" return "done" @@ -166,7 +166,7 @@ async def run(self, items: list[str]) -> list[str]: tasks = [ workflow.execute_activity( process_item, item, - schedule_to_close_timeout=timedelta(minutes=5) + start_to_close_timeout=timedelta(minutes=5) ) for item in items ] @@ -232,25 +232,25 @@ async def run(self, order: Order) -> str: try: await workflow.execute_activity( reserve_inventory, order, - schedule_to_close_timeout=timedelta(minutes=5) + start_to_close_timeout=timedelta(minutes=5) ) compensations.append(lambda: workflow.execute_activity( release_inventory, order, - schedule_to_close_timeout=timedelta(minutes=5) + start_to_close_timeout=timedelta(minutes=5) )) await workflow.execute_activity( charge_payment, order, - schedule_to_close_timeout=timedelta(minutes=5) + start_to_close_timeout=timedelta(minutes=5) ) compensations.append(lambda: workflow.execute_activity( refund_payment, order, - schedule_to_close_timeout=timedelta(minutes=5) + start_to_close_timeout=timedelta(minutes=5) )) await workflow.execute_activity( ship_order, order, - schedule_to_close_timeout=timedelta(minutes=5) + start_to_close_timeout=timedelta(minutes=5) ) return "Order completed" @@ -279,7 +279,7 @@ async def run(self) -> str: try: await workflow.execute_activity( long_running_activity, - schedule_to_close_timeout=timedelta(hours=1), + start_to_close_timeout=timedelta(hours=1), ) return "completed" except asyncio.CancelledError: @@ -288,7 +288,7 @@ async def run(self) -> str: # Cleanup activities still run even after cancellation await workflow.execute_activity( cleanup_activity, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) raise # Re-raise to mark workflow as cancelled ``` @@ -375,13 +375,13 @@ async def run(self) -> str: # New implementation greeting = await workflow.execute_activity( new_greet_activity, - schedule_to_close_timeout=timedelta(minutes=1) + start_to_close_timeout=timedelta(minutes=1) ) else: # Old implementation (for replay) greeting = await workflow.execute_activity( old_greet_activity, - schedule_to_close_timeout=timedelta(minutes=1) + start_to_close_timeout=timedelta(minutes=1) ) return greeting @@ -415,15 +415,13 @@ async def run(self) -> str: - **High-frequency calls** - When task queue overhead is significant - **Low-latency requirements** - When you can't afford task queue round-trip -**Note:** Local activities are experimental in Python SDK. - ```python @workflow.run async def run(self) -> str: result = await workflow.execute_local_activity( quick_lookup, "key", - schedule_to_close_timeout=timedelta(seconds=5), + start_to_close_timeout=timedelta(seconds=5), ) return result ``` @@ -448,6 +446,7 @@ class OrderWorkflow: # Client setup with Pydantic support client = await Client.connect( "localhost:7233", + namespace="default", data_converter=pydantic_data_converter, ) ``` diff --git a/references/python/python.md b/references/python/python.md index b6c296f..edfb8df 100644 --- a/references/python/python.md +++ b/references/python/python.md @@ -28,7 +28,7 @@ class GreetingWorkflow: @workflow.run async def run(self, name: str) -> str: return await workflow.execute_activity( - greet, name, schedule_to_close_timeout=timedelta(seconds=30) + greet, name, start_to_close_timeout=timedelta(seconds=30) ) ``` @@ -42,7 +42,7 @@ from activities.greet import greet from workflows.greeting import GreetingWorkflow async def main(): - client = await Client.connect("localhost:7233") + client = await Client.connect("localhost:7233", namespace="default") async with Worker(client, task_queue="greeting-queue", workflows=[GreetingWorkflow], activities=[greet]): result = await client.execute_workflow( @@ -171,6 +171,6 @@ with workflow.unsafe.imports_passed_through(): - **`testing.md`** - WorkflowEnvironment, time-skipping, activity mocking - **`patterns.md`** - Signals, queries, child workflows, saga pattern - **`observability.md`** - Logging, metrics, tracing, Search Attributes -- **`advanced-features.md`** - Continue-as-new, schedules, updates, interceptors +- **`advanced-features.md`** - Continue-as-new, updates, schedules, and more - **`data-handling.md`** - Data converters, Pydantic, payload encryption - **`versioning.md`** - Patching API, workflow type versioning, Worker Versioning diff --git a/references/python/sandbox.md b/references/python/sandbox.md index 5ebd79b..f0db704 100644 --- a/references/python/sandbox.md +++ b/references/python/sandbox.md @@ -2,7 +2,7 @@ ## Overview -The Python SDK runs workflows in a sandbox that provides automatic protection against non-deterministic operations. This is unique to Python - TypeScript uses V8 isolation with automatic replacements instead. +The Python SDK runs workflows in a sandbox that provides automatic protection against non-deterministic operations. This is unique to the Python SDK. ## How the Sandbox Works @@ -12,18 +12,15 @@ The sandbox: - Passes through standard library with restrictions - Reloads workflow files on each execution -## Safe Alternatives +## Forbidden Operations + +These operations will fail in the sandbox: -| Forbidden | Safe Alternative | -|-----------|------------------| -| `datetime.now()` | `workflow.now()` | -| `datetime.utcnow()` | `workflow.now()` | -| `random.random()` | `workflow.random().random()` | -| `random.randint()` | `workflow.random().randint()` | -| `uuid.uuid4()` | `workflow.uuid4()` | -| `time.time()` | `workflow.now().timestamp()` | -| `asyncio.wait()` | `workflow.wait()` (deterministic ordering) | -| `asyncio.as_completed()` | `workflow.as_completed()` | +- **Direct I/O**: Network calls, file reads/writes +- **Threading**: `threading` module operations +- **Subprocess**: `subprocess` calls +- **Global state**: Modifying mutable global variables +- **Blocking sleep**: `time.sleep()` (use `asyncio.sleep()`) ## Pass-Through Pattern @@ -42,6 +39,9 @@ with workflow.unsafe.imports_passed_through(): - Serialization libraries - Type definitions - Any library that doesn't do I/O or non-deterministic operations +- Performance, as many non-passthrough imports can be slower + +**Note:** The imports, even when using `imports_passed_through`, should all be at the top of the file. Runtime imports are an anti-pattern. ## Importing Activities @@ -62,30 +62,17 @@ class OrderWorkflow: await workflow.execute_activity( process_payment, order_id, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) return await workflow.execute_activity( ship_order, order_id, - schedule_to_close_timeout=timedelta(minutes=10), + start_to_close_timeout=timedelta(minutes=10), ) ``` ## Disabling the Sandbox -### Per-Workflow - -```python -@workflow.defn(sandboxed=False) -class UnsandboxedWorkflow: - @workflow.run - async def run(self) -> str: - # No sandbox protection - use with caution - return "result" -``` - -### Per-Block - ```python @workflow.defn class MyWorkflow: @@ -97,29 +84,100 @@ class MyWorkflow: return "result" ``` -### Globally (Worker Level) +- Per‑block escape hatch from runtime restrictions; imports unchanged. +- Use when: You need to call something the sandbox would normally block (e.g., a restricted stdlib call) in a very small, controlled section. +- **IMPORTANT:** Use it sparingly; you lose determinism checks inside the block +- Genuinely non-deterministic code still *MUST* go into activities. + +## Customizing Invalid Module Members + +`invalid_module_members` includes modules that cannot be accessed. + +Checks are compared against the fully qualified path to the item. ```python -from temporalio.worker import Worker, UnsandboxedWorkflowRunner +import dataclasses +from temporalio.worker import Worker +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxMatcher, + SandboxRestrictions, +) + +# Example 1: Remove a restriction on datetime.date.today(): +restrictions = dataclasses.replace( + SandboxRestrictions.default, + invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted( + "datetime", "date", "today", + ), +) + +# Example 2: Restrict the datetime.date class from being used +restrictions = dataclasses.replace( + SandboxRestrictions.default, + invalid_module_members=SandboxRestrictions.invalid_module_members_default | SandboxMatcher( + children={"datetime": SandboxMatcher(use={"date"})}, + ), +) worker = Worker( - client, - task_queue="my-queue", - workflows=[MyWorkflow], - activities=[my_activity], - workflow_runner=UnsandboxedWorkflowRunner(), + ..., + workflow_runner=SandboxedWorkflowRunner(restrictions=restrictions), ) ``` -## Forbidden Operations +## Import Notification Policy -These operations will fail or cause non-determinism in the sandbox: +Control warnings/errors for sandbox import issues. Recommended for catching potential problems: -- **Direct I/O**: Network calls, file reads/writes -- **Threading**: `threading` module operations -- **Subprocess**: `subprocess` calls -- **Global state**: Modifying mutable global variables -- **Blocking sleep**: `time.sleep()` (use `asyncio.sleep()`) +```python +from temporalio import workflow +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions + +restrictions = SandboxRestrictions.default.with_import_notification_policy( + workflow.SandboxImportNotificationPolicy.WARN_ON_DYNAMIC_IMPORT + | workflow.SandboxImportNotificationPolicy.WARN_ON_UNINTENTIONAL_PASSTHROUGH +) + +worker = Worker( + ..., + workflow_runner=SandboxedWorkflowRunner(restrictions=restrictions), +) +``` + +- `WARN_ON_DYNAMIC_IMPORT` (default) - warns on imports after initial workflow load +- `WARN_ON_UNINTENTIONAL_PASSTHROUGH` - warns when modules are imported into sandbox without explicit passthrough (not default, but highly recommended for catching missing passthroughs) +- `RAISE_ON_UNINTENTIONAL_PASSTHROUGH` - raise instead of warn + +Override per-import with the context manager: + +```python +with workflow.unsafe.sandbox_import_notification_policy( + workflow.SandboxImportNotificationPolicy.SILENT +): + import pydantic # No warning for this import +``` + +## Disable Lazy sys.modules Passthrough + +By default, passthrough modules are lazily added to the sandbox's `sys.modules` when accessed. To require explicit imports: + +```python +import dataclasses +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions + +restrictions = dataclasses.replace( + SandboxRestrictions.default, + disable_lazy_sys_module_passthrough=True, +) + +worker = Worker( + ..., + workflow_runner=SandboxedWorkflowRunner(restrictions=restrictions), +) +``` + +When `True`, passthrough modules must be explicitly imported to appear in the sandbox's `sys.modules`. ## File Organization @@ -148,6 +206,7 @@ Error: Cannot import 'pydantic' in sandbox ``` **Fix**: Use pass-through: + ```python with workflow.unsafe.imports_passed_through(): import pydantic @@ -165,12 +224,6 @@ result = some_library.cached_operation() # Cache changes between replays **Fix**: Move to activity or use pass-through with caution. -### Slow Worker Startup - -Large workflow files slow down worker initialization because they're reloaded frequently. - -**Fix**: Keep workflow files minimal, move logic to activities. - ## Best Practices 1. **Separate workflow and activity files** for performance diff --git a/references/python/sync-vs-async.md b/references/python/sync-vs-async.md index ba6b831..7875582 100644 --- a/references/python/sync-vs-async.md +++ b/references/python/sync-vs-async.md @@ -28,7 +28,7 @@ The Python async event loop runs in a single thread. When any task runs, no othe ### Synchronous Activities -- Run in the `activity_executor` (thread pool by default) +- Run in the `activity_executor`, which you must provide - Protected from accidentally blocking the global event loop - Multiple activities run in parallel via OS thread scheduling - Thread pool provides preemptive switching between tasks @@ -142,22 +142,10 @@ If experiencing sporadic bugs, hangs, or timeouts: ### Multi-Core Usage -For CPU-bound work or true parallelism: +For CPU-bound work and multi-core usage: -- Use multiple worker processes -- Or use `ProcessPoolExecutor` for synchronous activities - -```python -from concurrent.futures import ProcessPoolExecutor - -with ProcessPoolExecutor(max_workers=4) as executor: - worker = Worker( - client, - task_queue="cpu-intensive-queue", - activities=[cpu_bound_activity], - activity_executor=executor, - ) -``` +- Prefer multiple worker processes and/or threaded synchronous activities. +- Use ProcessPoolExecutor for synchronous activities only if you understand and accept the extra complexity and different cancellation semantics. ### Separate Workers for Workflows vs Activities @@ -185,7 +173,7 @@ def greet_in_spanish(name: str) -> str: return response.text async def main(): - client = await Client.connect("localhost:7233") + client = await Client.connect("localhost:7233", namespace="default") with ThreadPoolExecutor(max_workers=100) as executor: worker = Worker( @@ -219,7 +207,7 @@ class TranslateActivities: return await response.text() async def main(): - client = await Client.connect("localhost:7233") + client = await Client.connect("localhost:7233", namespace="default") async with aiohttp.ClientSession() as session: activities = TranslateActivities(session) diff --git a/references/python/versioning.md b/references/python/versioning.md index 69600a2..0a359f6 100644 --- a/references/python/versioning.md +++ b/references/python/versioning.md @@ -25,13 +25,13 @@ class ShippingWorkflow: # New code path await workflow.execute_activity( send_email, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) else: # Old code path (for replay of existing workflows) await workflow.execute_activity( send_fax, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) ``` @@ -56,14 +56,14 @@ class OrderWorkflow: await workflow.execute_activity( check_fraud, order, - schedule_to_close_timeout=timedelta(minutes=2), + start_to_close_timeout=timedelta(minutes=2), ) # Original payment logic runs for both paths return await workflow.execute_activity( process_payment, order, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) ``` @@ -82,13 +82,13 @@ class OrderWorkflow: await workflow.execute_activity( check_fraud, order, - schedule_to_close_timeout=timedelta(minutes=2), + start_to_close_timeout=timedelta(minutes=2), ) return await workflow.execute_activity( process_payment, order, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) ``` @@ -104,13 +104,13 @@ class OrderWorkflow: await workflow.execute_activity( check_fraud, order, - schedule_to_close_timeout=timedelta(minutes=2), + start_to_close_timeout=timedelta(minutes=2), ) return await workflow.execute_activity( process_payment, order, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) ``` @@ -127,19 +127,19 @@ class NotificationWorkflow: # Latest: SMS notifications await workflow.execute_activity( send_sms, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) elif workflow.patched("use-email"): # Intermediate: Email notifications await workflow.execute_activity( send_email, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) else: # Original: Fax notifications await workflow.execute_activity( send_fax, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) ``` @@ -274,7 +274,7 @@ class StableWorkflow: # This workflow will always run on its assigned version return await workflow.execute_activity( process_order, - schedule_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=5), ) ``` diff --git a/references/typescript/advanced-features.md b/references/typescript/advanced-features.md index 9739a94..188d692 100644 --- a/references/typescript/advanced-features.md +++ b/references/typescript/advanced-features.md @@ -272,47 +272,6 @@ await handle.trigger(); // Run immediately await handle.delete(); ``` -## Interceptors - -Interceptors allow cross-cutting concerns like logging, metrics, and auth. - -### Creating a Custom Interceptor - -```typescript -import { - ActivityInboundCallsInterceptor, - ActivityExecuteInput, - Next, -} from '@temporalio/worker'; - -class LoggingActivityInterceptor implements ActivityInboundCallsInterceptor { - async execute( - input: ActivityExecuteInput, - next: Next - ): Promise { - console.log(`Activity starting: ${input.activity.name}`); - try { - const result = await next(input); - console.log(`Activity completed: ${input.activity.name}`); - return result; - } catch (err) { - console.error(`Activity failed: ${input.activity.name}`, err); - throw err; - } - } -} - -// Apply to worker -const worker = await Worker.create({ - workflowsPath: require.resolve('./workflows'), - activities, - taskQueue: 'my-queue', - interceptors: { - activity: [() => new LoggingActivityInterceptor()], - }, -}); -``` - ## Sinks Sinks allow workflows to emit events for side effects (logging, metrics). @@ -403,36 +362,12 @@ export async function workflowWithCancellation(): Promise { } ``` -## Dynamic Workflows and Activities - -Handle workflows/activities not known at compile time. - -```typescript -// Dynamic workflow registration -import { proxyActivities } from '@temporalio/workflow'; - -export async function dynamicWorkflow( - workflowType: string, - args: unknown[] -): Promise { - switch (workflowType) { - case 'order': - return handleOrderWorkflow(args); - case 'refund': - return handleRefundWorkflow(args); - default: - throw new Error(`Unknown workflow type: ${workflowType}`); - } -} -``` - ## Best Practices 1. Use continue-as-new for long-running workflows to prevent history growth 2. Prefer updates over signals when you need a response 3. Use sinks with `callDuringReplay: false` for logging 4. Use CancellationScope.nonCancellable for critical cleanup operations -5. Configure interceptors for cross-cutting concerns like tracing -6. Use `ActivityCancellationType.WAIT_CANCELLATION_COMPLETED` when cleanup is important -7. Store progress in heartbeat details for resumable long-running activities -8. Use Nexus for cross-namespace service communication +5. Use `ActivityCancellationType.WAIT_CANCELLATION_COMPLETED` when cleanup is important +6. Store progress in heartbeat details for resumable long-running activities +7. Use Nexus for cross-namespace service communication diff --git a/references/typescript/typescript.md b/references/typescript/typescript.md index 663d01f..961f910 100644 --- a/references/typescript/typescript.md +++ b/references/typescript/typescript.md @@ -116,6 +116,6 @@ See `determinism.md` for detailed rules. - **`testing.md`** - TestWorkflowEnvironment, time-skipping - **`patterns.md`** - Signals, queries, cancellation scopes - **`observability.md`** - Replay-aware logging, metrics, OpenTelemetry, debugging -- **`advanced-features.md`** - Interceptors, sinks, updates, schedules +- **`advanced-features.md`** - Sinks, updates, schedules and more - **`data-handling.md`** - Search attributes, workflow memo, data converters - **`versioning.md`** - Patching API, workflow type versioning, Worker Versioning diff --git a/scripts/analyze-workflow-error.sh b/scripts/analyze-workflow-error.sh deleted file mode 100755 index c348227..0000000 --- a/scripts/analyze-workflow-error.sh +++ /dev/null @@ -1,225 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Get the directory where this script is located -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" - -# Environment variables with defaults -TEMPORAL_CLI="${TEMPORAL_CLI:-temporal}" -TEMPORAL_ADDRESS="${TEMPORAL_ADDRESS:-localhost:7233}" -CLAUDE_TEMPORAL_NAMESPACE="${CLAUDE_TEMPORAL_NAMESPACE:-default}" - -usage() { - cat <<'USAGE' -Usage: analyze-workflow-error.sh --workflow-id id [options] - -Parse workflow history to extract error details and provide recommendations. - -Options: - --workflow-id workflow ID to analyze, required - --run-id specific workflow run ID (optional) - -h, --help show this help -USAGE -} - -workflow_id="" -run_id="" - -while [[ $# -gt 0 ]]; do - case "$1" in - --workflow-id) workflow_id="${2-}"; shift 2 ;; - --run-id) run_id="${2-}"; shift 2 ;; - -h|--help) usage; exit 0 ;; - *) echo "Unknown option: $1" >&2; usage; exit 1 ;; - esac -done - -if [[ -z "$workflow_id" ]]; then - echo "workflow-id is required" >&2 - usage - exit 1 -fi - -if ! command -v "$TEMPORAL_CLI" >/dev/null 2>&1; then - echo "Temporal CLI not found: $TEMPORAL_CLI" >&2 - exit 1 -fi - -# Build temporal command -DESCRIBE_CMD=("$TEMPORAL_CLI" "workflow" "describe" "--workflow-id" "$workflow_id" "--address" "$TEMPORAL_ADDRESS" "--namespace" "$CLAUDE_TEMPORAL_NAMESPACE") -SHOW_CMD=("$TEMPORAL_CLI" "workflow" "show" "--workflow-id" "$workflow_id" "--address" "$TEMPORAL_ADDRESS" "--namespace" "$CLAUDE_TEMPORAL_NAMESPACE") - -if [[ -n "$run_id" ]]; then - DESCRIBE_CMD+=("--run-id" "$run_id") - SHOW_CMD+=("--run-id" "$run_id") -fi - -echo "=== Workflow Error Analysis ===" -echo "Workflow ID: $workflow_id" -if [[ -n "$run_id" ]]; then - echo "Run ID: $run_id" -fi -echo "" - -# Get workflow description -if ! describe_output=$("${DESCRIBE_CMD[@]}" 2>&1); then - echo "❌ Failed to describe workflow" >&2 - echo "$describe_output" >&2 - exit 1 -fi - -# Get workflow history -if ! show_output=$("${SHOW_CMD[@]}" 2>&1); then - echo "❌ Failed to get workflow history" >&2 - echo "$show_output" >&2 - exit 1 -fi - -# Extract status -status=$(echo "$describe_output" | grep -E "^\s*Status:" | awk '{print $2}' | tr -d ' ' || echo "UNKNOWN") -echo "Current Status: $status" -echo "" - -# Analyze different error types -workflow_task_failures=$(echo "$show_output" | grep -c "WorkflowTaskFailed" || echo "0") -activity_task_failures=$(echo "$show_output" | grep -c "ActivityTaskFailed" || echo "0") -workflow_exec_failed=$(echo "$show_output" | grep -c "WorkflowExecutionFailed" || echo "0") - -# Report findings -if [[ "$workflow_task_failures" -gt 0 ]]; then - echo "=== WorkflowTaskFailed Detected ===" - echo "Attempts: $workflow_task_failures" - echo "" - - # Extract error details - echo "Error Details:" - echo "$show_output" | grep -A 10 "WorkflowTaskFailed" | head -n 15 - echo "" - - echo "=== Diagnosis ===" - echo "Error Type: WorkflowTaskFailed" - echo "" - echo "Common Causes:" - echo " 1. Workflow type not registered with worker" - echo " 2. Worker missing workflow definition" - echo " 3. Workflow code has syntax errors" - echo " 4. Worker not running or not polling correct task queue" - echo "" - echo "=== Recommended Actions ===" - echo "1. Check if worker is running:" - echo " $SCRIPT_DIR/list-workers.sh" - echo "" - echo "2. Verify workflow is registered in worker.py:" - echo " - Check workflows=[YourWorkflow] in Worker() constructor" - echo "" - echo "3. Restart worker with updated code:" - echo " $SCRIPT_DIR/ensure-worker.sh" - echo "" - echo "4. Check worker logs for errors:" - echo " tail -f \$CLAUDE_TEMPORAL_LOG_DIR/worker-\$(basename \"\$(pwd)\").log" - -elif [[ "$activity_task_failures" -gt 0 ]]; then - echo "=== ActivityTaskFailed Detected ===" - echo "Attempts: $activity_task_failures" - echo "" - - # Extract error details - echo "Error Details:" - echo "$show_output" | grep -A 10 "ActivityTaskFailed" | head -n 20 - echo "" - - echo "=== Diagnosis ===" - echo "Error Type: ActivityTaskFailed" - echo "" - echo "Common Causes:" - echo " 1. Activity code threw an exception" - echo " 2. Activity type not registered with worker" - echo " 3. Activity code has bugs" - echo " 4. External dependency failure (API, database, etc.)" - echo "" - echo "=== Recommended Actions ===" - echo "1. Check activity logs for stack traces:" - echo " tail -f \$CLAUDE_TEMPORAL_LOG_DIR/worker-\$(basename \"\$(pwd)\").log" - echo "" - echo "2. Verify activity is registered in worker.py:" - echo " - Check activities=[your_activity] in Worker() constructor" - echo "" - echo "3. Review activity code for errors" - echo "" - echo "4. If activity code is fixed, restart worker:" - echo " $SCRIPT_DIR/ensure-worker.sh" - echo "" - echo "5. Consider adjusting retry policy if transient failure" - -elif [[ "$workflow_exec_failed" -gt 0 ]]; then - echo "=== WorkflowExecutionFailed Detected ===" - echo "" - - # Extract error details - echo "Error Details:" - echo "$show_output" | grep -A 20 "WorkflowExecutionFailed" | head -n 25 - echo "" - - echo "=== Diagnosis ===" - echo "Error Type: WorkflowExecutionFailed" - echo "" - echo "Common Causes:" - echo " 1. Workflow business logic error" - echo " 2. Unhandled exception in workflow code" - echo " 3. Workflow determinism violation" - echo "" - echo "=== Recommended Actions ===" - echo "1. Review workflow code for logic errors" - echo "" - echo "2. Check for non-deterministic code:" - echo " - Random number generation" - echo " - System time calls" - echo " - Threading/concurrency" - echo "" - echo "3. Review full workflow history:" - echo " temporal workflow show --workflow-id $workflow_id" - echo "" - echo "4. After fixing code, restart worker:" - echo " $SCRIPT_DIR/ensure-worker.sh" - -elif [[ "$status" == "TIMED_OUT" ]]; then - echo "=== Workflow Timeout ===" - echo "" - echo "The workflow exceeded its timeout limit." - echo "" - echo "=== Recommended Actions ===" - echo "1. Review workflow timeout settings in starter code" - echo "" - echo "2. Check if activities are taking too long:" - echo " - Review activity timeout settings" - echo " - Check activity logs for performance issues" - echo "" - echo "3. Consider increasing timeouts if operations legitimately take longer" - -elif [[ "$status" == "RUNNING" ]]; then - echo "=== Workflow Still Running ===" - echo "" - echo "The workflow appears to be running normally." - echo "" - echo "To monitor progress:" - echo " temporal workflow show --workflow-id $workflow_id" - echo "" - echo "To wait for completion:" - echo " $SCRIPT_DIR/wait-for-workflow-status.sh --workflow-id $workflow_id --status COMPLETED" - -elif [[ "$status" == "COMPLETED" ]]; then - echo "=== Workflow Completed Successfully ===" - echo "" - echo "No errors detected. Workflow completed normally." - -else - echo "=== Status: $status ===" - echo "" - echo "Review full workflow details:" - echo " temporal workflow describe --workflow-id $workflow_id" - echo " temporal workflow show --workflow-id $workflow_id" -fi - -echo "" -echo "=== Additional Resources ===" -echo "Web UI: http://localhost:8233/namespaces/$CLAUDE_TEMPORAL_NAMESPACE/workflows/$workflow_id" diff --git a/scripts/bulk-cancel-workflows.sh b/scripts/bulk-cancel-workflows.sh deleted file mode 100755 index c6fa27e..0000000 --- a/scripts/bulk-cancel-workflows.sh +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Environment variables with defaults -TEMPORAL_CLI="${TEMPORAL_CLI:-temporal}" -TEMPORAL_ADDRESS="${TEMPORAL_ADDRESS:-localhost:7233}" -CLAUDE_TEMPORAL_NAMESPACE="${CLAUDE_TEMPORAL_NAMESPACE:-default}" - -usage() { - cat <<'USAGE' -Usage: bulk-cancel-workflows.sh [options] - -Cancel multiple workflows. - -Options: - --workflow-ids file containing workflow IDs (one per line), required unless --pattern - --pattern cancel workflows matching pattern (regex) - --reason cancellation reason (default: "Bulk cancellation") - -h, --help show this help - -Examples: - # Cancel workflows from file - ./bulk-cancel-workflows.sh --workflow-ids stalled.txt - - # Cancel workflows matching pattern - ./bulk-cancel-workflows.sh --pattern "test-.*" - - # Cancel with custom reason - ./bulk-cancel-workflows.sh --workflow-ids stalled.txt --reason "Cleaning up test workflows" -USAGE -} - -workflow_ids_file="" -pattern="" -reason="Bulk cancellation" - -while [[ $# -gt 0 ]]; do - case "$1" in - --workflow-ids) workflow_ids_file="${2-}"; shift 2 ;; - --pattern) pattern="${2-}"; shift 2 ;; - --reason) reason="${2-}"; shift 2 ;; - -h|--help) usage; exit 0 ;; - *) echo "Unknown option: $1" >&2; usage; exit 1 ;; - esac -done - -if [[ -z "$workflow_ids_file" && -z "$pattern" ]]; then - echo "Either --workflow-ids or --pattern is required" >&2 - usage - exit 1 -fi - -if ! command -v "$TEMPORAL_CLI" >/dev/null 2>&1; then - echo "Temporal CLI not found: $TEMPORAL_CLI" >&2 - exit 1 -fi - -# Collect workflow IDs -workflow_ids=() - -if [[ -n "$workflow_ids_file" ]]; then - if [[ ! -f "$workflow_ids_file" ]]; then - echo "File not found: $workflow_ids_file" >&2 - exit 1 - fi - - # Read workflow IDs from file - while IFS= read -r line; do - # Skip empty lines and comments - [[ -z "$line" || "$line" =~ ^[[:space:]]*# ]] && continue - # Trim whitespace - line=$(echo "$line" | xargs) - workflow_ids+=("$line") - done < "$workflow_ids_file" -fi - -if [[ -n "$pattern" ]]; then - echo "Finding workflows matching pattern: $pattern" - - # List workflows and filter by pattern - LIST_CMD=("$TEMPORAL_CLI" "workflow" "list" "--address" "$TEMPORAL_ADDRESS" "--namespace" "$CLAUDE_TEMPORAL_NAMESPACE") - - if workflow_list=$("${LIST_CMD[@]}" 2>&1); then - # Parse workflow IDs from list and filter by pattern - while IFS= read -r wf_id; do - [[ -z "$wf_id" ]] && continue - if echo "$wf_id" | grep -E "$pattern" >/dev/null 2>&1; then - workflow_ids+=("$wf_id") - fi - done < <(echo "$workflow_list" | awk 'NR>1 && $1 != "" {print $1}' | grep -v "^-") - else - echo "Failed to list workflows" >&2 - echo "$workflow_list" >&2 - exit 1 - fi -fi - -# Check if we have any workflow IDs -if [[ "${#workflow_ids[@]}" -eq 0 ]]; then - echo "No workflows to cancel" - exit 0 -fi - -echo "Found ${#workflow_ids[@]} workflow(s) to cancel" -echo "Reason: $reason" -echo "" - -# Confirm with user -read -p "Continue with cancellation? (y/N) " -n 1 -r -echo -if [[ ! $REPLY =~ ^[Yy]$ ]]; then - echo "Cancellation aborted" - exit 0 -fi - -echo "" -echo "Canceling workflows..." -echo "" - -success_count=0 -failed_count=0 - -# Cancel each workflow -for workflow_id in "${workflow_ids[@]}"; do - echo -n "Canceling: $workflow_id ... " - - if "$TEMPORAL_CLI" workflow cancel \ - --workflow-id "$workflow_id" \ - --address "$TEMPORAL_ADDRESS" \ - --namespace "$CLAUDE_TEMPORAL_NAMESPACE" \ - --reason "$reason" \ - >/dev/null 2>&1; then - echo "✓" - success_count=$((success_count + 1)) - else - echo "❌ (may already be canceled or not exist)" - failed_count=$((failed_count + 1)) - fi -done - -echo "" -echo "=== Summary ===" -echo "Successfully canceled: $success_count" -echo "Failed: $failed_count" -echo "Total: ${#workflow_ids[@]}" diff --git a/scripts/ensure-server.sh b/scripts/ensure-server.sh deleted file mode 100755 index 1627909..0000000 --- a/scripts/ensure-server.sh +++ /dev/null @@ -1,98 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Environment variables with defaults -CLAUDE_TEMPORAL_PID_DIR="${CLAUDE_TEMPORAL_PID_DIR:-${TMPDIR:-/tmp}/claude-temporal-pids}" -CLAUDE_TEMPORAL_LOG_DIR="${CLAUDE_TEMPORAL_LOG_DIR:-${TMPDIR:-/tmp}/claude-temporal-logs}" -TEMPORAL_CLI="${TEMPORAL_CLI:-temporal}" -TEMPORAL_ADDRESS="${TEMPORAL_ADDRESS:-localhost:7233}" - -# Create directories if they don't exist -mkdir -p "$CLAUDE_TEMPORAL_PID_DIR" -mkdir -p "$CLAUDE_TEMPORAL_LOG_DIR" - -PID_FILE="$CLAUDE_TEMPORAL_PID_DIR/server.pid" -LOG_FILE="$CLAUDE_TEMPORAL_LOG_DIR/server.log" - -# Check if temporal CLI is installed -if ! command -v "$TEMPORAL_CLI" >/dev/null 2>&1; then - echo "❌ Temporal CLI not found: $TEMPORAL_CLI" >&2 - echo "Install temporal CLI:" >&2 - echo " macOS: brew install temporal" >&2 - echo " Linux: https://github.com/temporalio/cli/releases" >&2 - exit 1 -fi - -# Function to check if server is responding -check_server_connectivity() { - # Try to list namespaces as a connectivity test - if "$TEMPORAL_CLI" operator namespace list --address "$TEMPORAL_ADDRESS" >/dev/null 2>&1; then - return 0 - fi - return 1 -} - -# Check if server is already running -if check_server_connectivity; then - echo "✓ Temporal server already running at $TEMPORAL_ADDRESS" - exit 0 -fi - -# Check if we have a PID file from previous run -if [[ -f "$PID_FILE" ]]; then - OLD_PID=$(cat "$PID_FILE") - if kill -0 "$OLD_PID" 2>/dev/null; then - # Process exists but not responding - might be starting up - echo "⏳ Server process exists (PID: $OLD_PID), checking connectivity..." - sleep 2 - if check_server_connectivity; then - echo "✓ Temporal server ready at $TEMPORAL_ADDRESS" - exit 0 - fi - echo "⚠️ Server process exists but not responding, killing and restarting..." - kill -9 "$OLD_PID" 2>/dev/null || true - fi - rm -f "$PID_FILE" -fi - -# Start server in background -echo "🚀 Starting Temporal dev server..." -"$TEMPORAL_CLI" server start-dev > "$LOG_FILE" 2>&1 & -SERVER_PID=$! -echo "$SERVER_PID" > "$PID_FILE" - -echo "⏳ Waiting for server to be ready..." - -# Wait up to 30 seconds for server to become ready -TIMEOUT=30 -ELAPSED=0 -INTERVAL=1 - -while (( ELAPSED < TIMEOUT )); do - if check_server_connectivity; then - echo "✓ Temporal server ready at $TEMPORAL_ADDRESS (PID: $SERVER_PID)" - echo "" - echo "Web UI: http://localhost:8233" - echo "gRPC: $TEMPORAL_ADDRESS" - echo "" - echo "Server logs: $LOG_FILE" - echo "Server PID file: $PID_FILE" - exit 0 - fi - - # Check if process died - if ! kill -0 "$SERVER_PID" 2>/dev/null; then - echo "❌ Server process died during startup" >&2 - echo "Check logs: $LOG_FILE" >&2 - rm -f "$PID_FILE" - exit 2 - fi - - sleep "$INTERVAL" - ELAPSED=$((ELAPSED + INTERVAL)) -done - -echo "❌ Server startup timeout after ${TIMEOUT}s" >&2 -echo "Server might still be starting. Check logs: $LOG_FILE" >&2 -echo "Server PID: $SERVER_PID" >&2 -exit 2 diff --git a/scripts/ensure-worker.sh b/scripts/ensure-worker.sh deleted file mode 100755 index 5b9265b..0000000 --- a/scripts/ensure-worker.sh +++ /dev/null @@ -1,91 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Get the directory where this script is located -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" - -# Environment variables with defaults -CLAUDE_TEMPORAL_PID_DIR="${CLAUDE_TEMPORAL_PID_DIR:-${TMPDIR:-/tmp}/claude-temporal-pids}" -CLAUDE_TEMPORAL_LOG_DIR="${CLAUDE_TEMPORAL_LOG_DIR:-${TMPDIR:-/tmp}/claude-temporal-logs}" -CLAUDE_TEMPORAL_PROJECT_DIR="${CLAUDE_TEMPORAL_PROJECT_DIR:-$(pwd)}" -CLAUDE_TEMPORAL_PROJECT_NAME="${CLAUDE_TEMPORAL_PROJECT_NAME:-$(basename "$CLAUDE_TEMPORAL_PROJECT_DIR")}" -TEMPORAL_WORKER_CMD="${TEMPORAL_WORKER_CMD:-uv run worker}" - -# Create directories if they don't exist -mkdir -p "$CLAUDE_TEMPORAL_PID_DIR" -mkdir -p "$CLAUDE_TEMPORAL_LOG_DIR" - -PID_FILE="$CLAUDE_TEMPORAL_PID_DIR/worker-$CLAUDE_TEMPORAL_PROJECT_NAME.pid" -LOG_FILE="$CLAUDE_TEMPORAL_LOG_DIR/worker-$CLAUDE_TEMPORAL_PROJECT_NAME.log" - -# Always kill any existing workers (both tracked and orphaned) -# This ensures we don't accumulate orphaned processes -echo "🔍 Checking for existing workers..." - -# Use the helper function to find all workers -source "$SCRIPT_DIR/find-project-workers.sh" -existing_workers=$(find_project_workers "$CLAUDE_TEMPORAL_PROJECT_DIR" 2>/dev/null || true) - -if [[ -n "$existing_workers" ]]; then - worker_count=$(echo "$existing_workers" | wc -l | tr -d ' ') - echo "Found $worker_count existing worker(s), stopping them..." - - if "$SCRIPT_DIR/kill-worker.sh" 2>&1; then - echo "✓ Existing workers stopped" - else - # kill-worker.sh will have printed error messages - echo "⚠️ Some workers may not have been stopped, continuing anyway..." - fi -elif [[ -f "$PID_FILE" ]]; then - # PID file exists but no workers found - clean up stale PID file - echo "Removing stale PID file..." - rm -f "$PID_FILE" -fi - -# Clear old log file -> "$LOG_FILE" - -# Start worker in background -echo "🚀 Starting worker for project: $CLAUDE_TEMPORAL_PROJECT_NAME" -echo "Command: $TEMPORAL_WORKER_CMD" - -# Start worker, redirect output to log file -eval "$TEMPORAL_WORKER_CMD" > "$LOG_FILE" 2>&1 & -WORKER_PID=$! - -# Save PID -echo "$WORKER_PID" > "$PID_FILE" - -echo "Worker PID: $WORKER_PID" -echo "Log file: $LOG_FILE" - -# Wait for worker to be ready (simple approach: wait and check if still running) -echo "⏳ Waiting for worker to be ready..." - -# Wait 10 seconds for worker to initialize -sleep 10 - -# Check if process is still running -if ! kill -0 "$WORKER_PID" 2>/dev/null; then - echo "❌ Worker process died during startup" >&2 - echo "Last 20 lines of log:" >&2 - tail -n 20 "$LOG_FILE" >&2 || true - rm -f "$PID_FILE" - exit 1 -fi - -# Check if log file has content (worker is producing output) -if [[ -f "$LOG_FILE" ]] && [[ -s "$LOG_FILE" ]]; then - echo "✓ Worker ready (PID: $WORKER_PID)" - echo "" - echo "To monitor worker logs:" - echo " tail -f $LOG_FILE" - echo "" - echo "To check worker health:" - echo " $SCRIPT_DIR/monitor-worker-health.sh" - exit 0 -else - echo "⚠️ Worker is running but no logs detected" >&2 - echo "Check logs: $LOG_FILE" >&2 - exit 2 -fi diff --git a/scripts/find-project-workers.sh b/scripts/find-project-workers.sh deleted file mode 100755 index 8b8bf7b..0000000 --- a/scripts/find-project-workers.sh +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env bash -# Helper function to find all worker processes for a specific project -# This can be sourced by other scripts or run directly - -# Usage: find_project_workers PROJECT_DIR -# Returns: PIDs of all worker processes for the project (one per line) -find_project_workers() { - local project_dir="$1" - - # Normalize the project directory path (resolve symlinks, remove trailing slash) - project_dir="$(cd "$project_dir" 2>/dev/null && pwd)" || { - echo "Error: Invalid project directory: $project_dir" >&2 - return 1 - } - - # Find all processes where: - # 1. Command contains the project directory path - # 2. Command contains "worker" (either .venv/bin/worker or "uv run worker") - # We need to be specific to avoid killing unrelated processes - - # Strategy: Find both parent "uv run worker" processes and child Python worker processes - # We'll use the project directory in the path as the key identifier - - local pids=() - - # Use ps to get all processes with their commands - if [[ "$(uname)" == "Darwin" ]]; then - # macOS - find Python workers - while IFS= read -r pid; do - [[ -n "$pid" ]] && pids+=("$pid") - done < <(ps ax -o pid,command | grep -E "\.venv/bin/(python[0-9.]*|worker)" | grep -E "${project_dir}" | grep -v grep | awk '{print $1}') - - # Also find "uv run worker" processes in this directory - while IFS= read -r pid; do - [[ -n "$pid" ]] && pids+=("$pid") - done < <(ps ax -o pid,command | grep "uv run worker" | grep -v grep | awk -v dir="$project_dir" '{ - # Check if process is running from the project directory by checking cwd - cmd = "lsof -a -p " $1 " -d cwd -Fn 2>/dev/null | grep ^n | cut -c2-" - cmd | getline cwd - close(cmd) - if (index(cwd, dir) > 0) print $1 - }') - else - # Linux - find Python workers - while IFS= read -r pid; do - [[ -n "$pid" ]] && pids+=("$pid") - done < <(ps ax -o pid,cmd | grep -E "\.venv/bin/(python[0-9.]*|worker)" | grep -E "${project_dir}" | grep -v grep | awk '{print $1}') - - # Also find "uv run worker" processes in this directory - while IFS= read -r pid; do - [[ -n "$pid" ]] && pids+=("$pid") - done < <(ps ax -o pid,cmd | grep "uv run worker" | grep -v grep | awk -v dir="$project_dir" '{ - # Check if process is running from the project directory - cmd = "readlink -f /proc/" $1 "/cwd 2>/dev/null" - cmd | getline cwd - close(cmd) - if (index(cwd, dir) > 0) print $1 - }') - fi - - # Print unique PIDs - printf "%s\n" "${pids[@]}" | sort -u -} - -# If script is executed directly (not sourced), run the function -if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then - if [[ $# -eq 0 ]]; then - # Default to current directory - find_project_workers "$(pwd)" - else - find_project_workers "$1" - fi -fi diff --git a/scripts/find-stalled-workflows.sh b/scripts/find-stalled-workflows.sh deleted file mode 100755 index be57cd5..0000000 --- a/scripts/find-stalled-workflows.sh +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Environment variables with defaults -TEMPORAL_CLI="${TEMPORAL_CLI:-temporal}" -TEMPORAL_ADDRESS="${TEMPORAL_ADDRESS:-localhost:7233}" -CLAUDE_TEMPORAL_NAMESPACE="${CLAUDE_TEMPORAL_NAMESPACE:-default}" - -usage() { - cat <<'USAGE' -Usage: find-stalled-workflows.sh [options] - -Detect workflows with systematic issues (e.g., workflow task failures). - -Options: - --query filter workflows by query (optional) - -h, --help show this help -USAGE -} - -query="" - -while [[ $# -gt 0 ]]; do - case "$1" in - --query) query="${2-}"; shift 2 ;; - -h|--help) usage; exit 0 ;; - *) echo "Unknown option: $1" >&2; usage; exit 1 ;; - esac -done - -if ! command -v "$TEMPORAL_CLI" >/dev/null 2>&1; then - echo "Temporal CLI not found: $TEMPORAL_CLI" >&2 - exit 1 -fi - -if ! command -v jq >/dev/null 2>&1; then - echo "⚠️ jq not found. Install jq for better output formatting." >&2 - echo "This script will continue with basic text parsing..." >&2 -fi - -# Build list command - only look for RUNNING workflows since stalled workflows must be running -LIST_CMD=("$TEMPORAL_CLI" "workflow" "list" "--address" "$TEMPORAL_ADDRESS" "--namespace" "$CLAUDE_TEMPORAL_NAMESPACE") - -if [[ -n "$query" ]]; then - # Append user query to running filter - LIST_CMD+=("--query" "ExecutionStatus='Running' AND ($query)") -else - # Default: only find running workflows - LIST_CMD+=("--query" "ExecutionStatus='Running'") -fi - -echo "Scanning for stalled workflows..." -echo "" - -# Get list of running workflows -if ! workflow_list=$("${LIST_CMD[@]}" 2>&1); then - echo "Failed to list workflows" >&2 - echo "$workflow_list" >&2 - exit 1 -fi - -# Parse workflow IDs from list -# The output format is: Status WorkflowId Type StartTime -# WorkflowId is in column 2 -workflow_ids=$(echo "$workflow_list" | awk 'NR>1 {print $2}' | grep -v "^-" | grep -v "^$" || true) - -if [[ -z "$workflow_ids" ]]; then - echo "No workflows found" - exit 0 -fi - -# Print header -printf "%-40s %-35s %-10s\n" "WORKFLOW_ID" "ERROR_TYPE" "ATTEMPTS" -printf "%-40s %-35s %-10s\n" "----------------------------------------" "-----------------------------------" "----------" - -found_stalled=false - -# Check each workflow for errors -while IFS= read -r workflow_id; do - [[ -z "$workflow_id" ]] && continue - - # Get workflow event history using 'show' to see failure events - if show_output=$("$TEMPORAL_CLI" workflow show --workflow-id "$workflow_id" --address "$TEMPORAL_ADDRESS" --namespace "$CLAUDE_TEMPORAL_NAMESPACE" 2>/dev/null); then - - # Check for workflow task failures - workflow_task_failures=$(echo "$show_output" | grep -c "WorkflowTaskFailed" 2>/dev/null || echo "0") - workflow_task_failures=$(echo "$workflow_task_failures" | tr -d '\n' | tr -d ' ') - activity_task_failures=$(echo "$show_output" | grep -c "ActivityTaskFailed" 2>/dev/null || echo "0") - activity_task_failures=$(echo "$activity_task_failures" | tr -d '\n' | tr -d ' ') - - # Report if significant failures found - if [[ "$workflow_task_failures" -gt 0 ]]; then - found_stalled=true - # Truncate long workflow IDs for display - display_id=$(echo "$workflow_id" | cut -c1-40) - printf "%-40s %-35s %-10s\n" "$display_id" "WorkflowTaskFailed" "$workflow_task_failures" - elif [[ "$activity_task_failures" -gt 2 ]]; then - # Only report activity failures if they're excessive (>2) - found_stalled=true - display_id=$(echo "$workflow_id" | cut -c1-40) - printf "%-40s %-35s %-10s\n" "$display_id" "ActivityTaskFailed" "$activity_task_failures" - fi - fi -done <<< "$workflow_ids" - -echo "" - -if [[ "$found_stalled" == false ]]; then - echo "No stalled workflows detected" -else - echo "Found stalled workflows. To investigate:" - echo " ./tools/analyze-workflow-error.sh --workflow-id " - echo "" - echo "To cancel all stalled workflows:" - echo " ./tools/find-stalled-workflows.sh | awk 'NR>2 {print \$1}' > stalled.txt" - echo " ./tools/bulk-cancel-workflows.sh --workflow-ids stalled.txt" -fi diff --git a/scripts/get-workflow-result.sh b/scripts/get-workflow-result.sh deleted file mode 100755 index 05783a4..0000000 --- a/scripts/get-workflow-result.sh +++ /dev/null @@ -1,202 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Environment variables with defaults -TEMPORAL_CLI="${TEMPORAL_CLI:-temporal}" -TEMPORAL_ADDRESS="${TEMPORAL_ADDRESS:-localhost:7233}" -CLAUDE_TEMPORAL_NAMESPACE="${CLAUDE_TEMPORAL_NAMESPACE:-default}" - -usage() { - cat <<'USAGE' -Usage: get-workflow-result.sh --workflow-id [options] - -Get the result/output from a completed workflow execution. - -Options: - --workflow-id Workflow ID to query (required) - --run-id Specific run ID (optional) - --raw Output raw JSON result only - -h, --help Show this help - -Examples: - # Get workflow result with formatted output - ./tools/get-workflow-result.sh --workflow-id my-workflow-123 - - # Get raw JSON result only - ./tools/get-workflow-result.sh --workflow-id my-workflow-123 --raw - - # Get result for specific run - ./tools/get-workflow-result.sh --workflow-id my-workflow-123 --run-id abc-def-ghi - -Output: - - Workflow status (COMPLETED, FAILED, etc.) - - Workflow result/output (if completed successfully) - - Failure message (if failed) - - Termination reason (if terminated) -USAGE -} - -workflow_id="" -run_id="" -raw_mode=false - -while [[ $# -gt 0 ]]; do - case "$1" in - --workflow-id) workflow_id="${2-}"; shift 2 ;; - --run-id) run_id="${2-}"; shift 2 ;; - --raw) raw_mode=true; shift ;; - -h|--help) usage; exit 0 ;; - *) echo "Unknown option: $1" >&2; usage; exit 1 ;; - esac -done - -if [[ -z "$workflow_id" ]]; then - echo "Error: --workflow-id is required" >&2 - usage - exit 1 -fi - -if ! command -v "$TEMPORAL_CLI" >/dev/null 2>&1; then - echo "Temporal CLI not found: $TEMPORAL_CLI" >&2 - exit 1 -fi - -# Build describe command -DESCRIBE_CMD=("$TEMPORAL_CLI" "workflow" "describe" "--workflow-id" "$workflow_id" "--address" "$TEMPORAL_ADDRESS" "--namespace" "$CLAUDE_TEMPORAL_NAMESPACE") - -if [[ -n "$run_id" ]]; then - DESCRIBE_CMD+=("--run-id" "$run_id") -fi - -# Get workflow details -if ! describe_output=$("${DESCRIBE_CMD[@]}" 2>&1); then - echo "Failed to describe workflow: $workflow_id" >&2 - echo "$describe_output" >&2 - exit 1 -fi - -# Extract workflow status -status=$(echo "$describe_output" | grep -i "Status:" | head -n1 | awk '{print $2}' || echo "UNKNOWN") - -if [[ "$raw_mode" == true ]]; then - # Raw mode: just output the result payload - # Use 'temporal workflow show' to get execution history with result - if ! show_output=$("$TEMPORAL_CLI" workflow show --workflow-id "$workflow_id" --address "$TEMPORAL_ADDRESS" --namespace "$CLAUDE_TEMPORAL_NAMESPACE" 2>&1); then - echo "Failed to get workflow result" >&2 - exit 1 - fi - - # Extract result from WorkflowExecutionCompleted event - echo "$show_output" | grep -A 10 "WorkflowExecutionCompleted" | grep -E "result|Result" || echo "{}" - exit 0 -fi - -# Formatted output -echo "════════════════════════════════════════════════════════════" -echo "Workflow: $workflow_id" -echo "Status: $status" -echo "════════════════════════════════════════════════════════════" -echo "" - -case "$status" in - COMPLETED) - echo "✅ Workflow completed successfully" - echo "" - echo "Result:" - echo "────────────────────────────────────────────────────────────" - - # Get workflow result using 'show' command - if show_output=$("$TEMPORAL_CLI" workflow show --workflow-id "$workflow_id" --address "$TEMPORAL_ADDRESS" --namespace "$CLAUDE_TEMPORAL_NAMESPACE" 2>/dev/null); then - # Extract result from WorkflowExecutionCompleted event - result=$(echo "$show_output" | grep -A 20 "WorkflowExecutionCompleted" | grep -E "result|Result" || echo "") - - if [[ -n "$result" ]]; then - echo "$result" - else - echo "(No result payload - workflow may return None/void)" - fi - else - echo "(Unable to extract result)" - fi - ;; - - FAILED) - echo "❌ Workflow failed" - echo "" - echo "Failure details:" - echo "────────────────────────────────────────────────────────────" - - # Extract failure message - failure=$(echo "$describe_output" | grep -A 5 "Failure:" || echo "") - if [[ -n "$failure" ]]; then - echo "$failure" - else - echo "(No failure details available)" - fi - - echo "" - echo "To analyze error:" - echo " ./tools/analyze-workflow-error.sh --workflow-id $workflow_id" - ;; - - CANCELED) - echo "🚫 Workflow was canceled" - echo "" - - # Try to extract cancellation reason - cancel_info=$(echo "$describe_output" | grep -i "cancel" || echo "") - if [[ -n "$cancel_info" ]]; then - echo "Cancellation info:" - echo "$cancel_info" - fi - ;; - - TERMINATED) - echo "⛔ Workflow was terminated" - echo "" - - # Extract termination reason - term_reason=$(echo "$describe_output" | grep -i "reason:" | head -n1 || echo "") - if [[ -n "$term_reason" ]]; then - echo "Termination reason:" - echo "$term_reason" - fi - ;; - - TIMED_OUT) - echo "⏱️ Workflow timed out" - echo "" - - timeout_info=$(echo "$describe_output" | grep -i "timeout" || echo "") - if [[ -n "$timeout_info" ]]; then - echo "Timeout info:" - echo "$timeout_info" - fi - ;; - - RUNNING) - echo "🏃 Workflow is still running" - echo "" - echo "Cannot get result for running workflow." - echo "" - echo "To wait for completion:" - echo " ./tools/wait-for-workflow-status.sh --workflow-id $workflow_id --status COMPLETED" - exit 1 - ;; - - *) - echo "Status: $status" - echo "" - echo "Full workflow details:" - echo "$describe_output" - ;; -esac - -echo "" -echo "════════════════════════════════════════════════════════════" -echo "" -echo "To view full workflow history:" -echo " temporal workflow show --workflow-id $workflow_id" -echo "" -echo "To view in Web UI:" -echo " http://localhost:8233/namespaces/$CLAUDE_TEMPORAL_NAMESPACE/workflows/$workflow_id" diff --git a/scripts/kill-all-workers.sh b/scripts/kill-all-workers.sh deleted file mode 100755 index c3a619f..0000000 --- a/scripts/kill-all-workers.sh +++ /dev/null @@ -1,134 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Environment variables with defaults -CLAUDE_TEMPORAL_PID_DIR="${CLAUDE_TEMPORAL_PID_DIR:-${TMPDIR:-/tmp}/claude-temporal-pids}" - -# Graceful shutdown timeout (seconds) -GRACEFUL_TIMEOUT=5 - -usage() { - cat <<'USAGE' -Usage: kill-all-workers.sh [options] - -Kill all tracked workers across all projects. - -Options: - -p, --project kill only specific project worker - --include-server also kill temporal dev server - -h, --help show this help -USAGE -} - -specific_project="" -include_server=false - -while [[ $# -gt 0 ]]; do - case "$1" in - -p|--project) specific_project="${2-}"; shift 2 ;; - --include-server) include_server=true; shift ;; - -h|--help) usage; exit 0 ;; - *) echo "Unknown option: $1" >&2; usage; exit 1 ;; - esac -done - -# Check if PID directory exists -if [[ ! -d "$CLAUDE_TEMPORAL_PID_DIR" ]]; then - echo "No PID directory found: $CLAUDE_TEMPORAL_PID_DIR" - exit 0 -fi - -# Function to kill a process gracefully then forcefully -kill_process() { - local pid=$1 - local name=$2 - - if ! kill -0 "$pid" 2>/dev/null; then - echo "$name (PID $pid): already dead" - return 0 - fi - - # Attempt graceful shutdown - kill -TERM "$pid" 2>/dev/null || true - - # Wait for graceful shutdown - local elapsed=0 - while (( elapsed < GRACEFUL_TIMEOUT )); do - if ! kill -0 "$pid" 2>/dev/null; then - echo "$name (PID $pid): stopped gracefully ✓" - return 0 - fi - sleep 1 - elapsed=$((elapsed + 1)) - done - - # Force kill if still running - if kill -0 "$pid" 2>/dev/null; then - kill -9 "$pid" 2>/dev/null || true - sleep 1 - - if kill -0 "$pid" 2>/dev/null; then - echo "$name (PID $pid): failed to kill ❌" >&2 - return 1 - fi - echo "$name (PID $pid): force killed ✓" - fi - - return 0 -} - -killed_count=0 - -# Kill specific project worker if requested -if [[ -n "$specific_project" ]]; then - PID_FILE="$CLAUDE_TEMPORAL_PID_DIR/worker-$specific_project.pid" - if [[ -f "$PID_FILE" ]]; then - WORKER_PID=$(cat "$PID_FILE") - if kill_process "$WORKER_PID" "worker-$specific_project"; then - rm -f "$PID_FILE" - killed_count=$((killed_count + 1)) - fi - else - echo "No worker found for project: $specific_project" - exit 1 - fi -else - # Kill all workers - shopt -s nullglob - PID_FILES=("$CLAUDE_TEMPORAL_PID_DIR"/worker-*.pid) - shopt -u nullglob - - for pid_file in "${PID_FILES[@]}"; do - # Extract project name from filename - filename=$(basename "$pid_file") - project="${filename#worker-}" - project="${project%.pid}" - - # Read PID - worker_pid=$(cat "$pid_file") - - if kill_process "$worker_pid" "worker-$project"; then - rm -f "$pid_file" - killed_count=$((killed_count + 1)) - fi - done -fi - -# Kill server if requested -if [[ "$include_server" == true ]]; then - SERVER_PID_FILE="$CLAUDE_TEMPORAL_PID_DIR/server.pid" - if [[ -f "$SERVER_PID_FILE" ]]; then - SERVER_PID=$(cat "$SERVER_PID_FILE") - if kill_process "$SERVER_PID" "server"; then - rm -f "$SERVER_PID_FILE" - killed_count=$((killed_count + 1)) - fi - fi -fi - -if [[ "$killed_count" -eq 0 ]]; then - echo "No processes to kill" -else - echo "" - echo "Total: $killed_count process(es) killed" -fi diff --git a/scripts/kill-worker.sh b/scripts/kill-worker.sh deleted file mode 100755 index 596acdb..0000000 --- a/scripts/kill-worker.sh +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Get the directory where this script is located -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" - -# Source the helper function to find project workers -source "$SCRIPT_DIR/find-project-workers.sh" - -# Environment variables with defaults -CLAUDE_TEMPORAL_PID_DIR="${CLAUDE_TEMPORAL_PID_DIR:-${TMPDIR:-/tmp}/claude-temporal-pids}" -CLAUDE_TEMPORAL_PROJECT_NAME="${CLAUDE_TEMPORAL_PROJECT_NAME:-$(basename "$(pwd)")}" -CLAUDE_TEMPORAL_PROJECT_DIR="${CLAUDE_TEMPORAL_PROJECT_DIR:-$(pwd)}" - -PID_FILE="$CLAUDE_TEMPORAL_PID_DIR/worker-$CLAUDE_TEMPORAL_PROJECT_NAME.pid" - -# Graceful shutdown timeout (seconds) -GRACEFUL_TIMEOUT=5 - -# Find ALL workers for this project (both tracked and orphaned) -echo "🔍 Finding all workers for project: $CLAUDE_TEMPORAL_PROJECT_NAME" - -# Collect all PIDs -worker_pids=() - -# Add PID from file if it exists -if [[ -f "$PID_FILE" ]]; then - TRACKED_PID=$(cat "$PID_FILE") - if kill -0 "$TRACKED_PID" 2>/dev/null; then - worker_pids+=("$TRACKED_PID") - fi -fi - -# Find all workers for this project using the helper function -while IFS= read -r pid; do - [[ -n "$pid" ]] && worker_pids+=("$pid") -done < <(find_project_workers "$CLAUDE_TEMPORAL_PROJECT_DIR" 2>/dev/null || true) - -# Remove duplicates -worker_pids=($(printf "%s\n" "${worker_pids[@]}" | sort -u)) - -if [[ ${#worker_pids[@]} -eq 0 ]]; then - echo "No workers running for project: $CLAUDE_TEMPORAL_PROJECT_NAME" - rm -f "$PID_FILE" - exit 1 -fi - -echo "Found ${#worker_pids[@]} worker process(es): ${worker_pids[*]}" - -# Attempt graceful shutdown of all workers -echo "⏳ Attempting graceful shutdown..." -for pid in "${worker_pids[@]}"; do - kill -TERM "$pid" 2>/dev/null || true -done - -# Wait for graceful shutdown -ELAPSED=0 -while (( ELAPSED < GRACEFUL_TIMEOUT )); do - all_dead=true - for pid in "${worker_pids[@]}"; do - if kill -0 "$pid" 2>/dev/null; then - all_dead=false - break - fi - done - - if [[ "$all_dead" == true ]]; then - echo "✓ All workers stopped gracefully" - rm -f "$PID_FILE" - exit 0 - fi - - sleep 1 - ELAPSED=$((ELAPSED + 1)) -done - -# Force kill any still running -still_running=() -for pid in "${worker_pids[@]}"; do - if kill -0 "$pid" 2>/dev/null; then - still_running+=("$pid") - fi -done - -if [[ ${#still_running[@]} -gt 0 ]]; then - echo "⚠️ ${#still_running[@]} process(es) still running after ${GRACEFUL_TIMEOUT}s, forcing kill..." - for pid in "${still_running[@]}"; do - kill -9 "$pid" 2>/dev/null || true - done - sleep 1 - - # Verify all are dead - failed_pids=() - for pid in "${still_running[@]}"; do - if kill -0 "$pid" 2>/dev/null; then - failed_pids+=("$pid") - fi - done - - if [[ ${#failed_pids[@]} -gt 0 ]]; then - echo "❌ Failed to kill worker process(es): ${failed_pids[*]}" >&2 - exit 1 - fi -fi - -echo "✓ All workers killed (${#worker_pids[@]} process(es))" -rm -f "$PID_FILE" -exit 0 diff --git a/scripts/list-recent-workflows.sh b/scripts/list-recent-workflows.sh deleted file mode 100755 index b20801f..0000000 --- a/scripts/list-recent-workflows.sh +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Environment variables with defaults -TEMPORAL_CLI="${TEMPORAL_CLI:-temporal}" -TEMPORAL_ADDRESS="${TEMPORAL_ADDRESS:-localhost:7233}" -CLAUDE_TEMPORAL_NAMESPACE="${CLAUDE_TEMPORAL_NAMESPACE:-default}" - -usage() { - cat <<'USAGE' -Usage: list-recent-workflows.sh [options] - -List recently completed/terminated workflows within a time window. - -Options: - --minutes Look back N minutes (default: 5) - --status Filter by status: COMPLETED, FAILED, CANCELED, TERMINATED, TIMED_OUT (optional) - --workflow-type Filter by workflow type (optional) - -h, --help Show this help - -Examples: - # List all workflows from last 5 minutes - ./tools/list-recent-workflows.sh - - # List failed workflows from last 10 minutes - ./tools/list-recent-workflows.sh --minutes 10 --status FAILED - - # List completed workflows of specific type from last 2 minutes - ./tools/list-recent-workflows.sh --minutes 2 --status COMPLETED --workflow-type MyWorkflow - -Output format: - WORKFLOW_ID STATUS WORKFLOW_TYPE CLOSE_TIME -USAGE -} - -minutes=5 -status="" -workflow_type="" - -while [[ $# -gt 0 ]]; do - case "$1" in - --minutes) minutes="${2-}"; shift 2 ;; - --status) status="${2-}"; shift 2 ;; - --workflow-type) workflow_type="${2-}"; shift 2 ;; - -h|--help) usage; exit 0 ;; - *) echo "Unknown option: $1" >&2; usage; exit 1 ;; - esac -done - -if ! command -v "$TEMPORAL_CLI" >/dev/null 2>&1; then - echo "Temporal CLI not found: $TEMPORAL_CLI" >&2 - exit 1 -fi - -# Validate status if provided -if [[ -n "$status" ]]; then - case "$status" in - COMPLETED|FAILED|CANCELED|TERMINATED|TIMED_OUT) ;; - *) echo "Invalid status: $status" >&2; usage; exit 1 ;; - esac -fi - -# Calculate time threshold (minutes ago) -if [[ "$OSTYPE" == "darwin"* ]]; then - # macOS - time_threshold=$(date -u -v-"${minutes}M" +"%Y-%m-%dT%H:%M:%SZ") -else - # Linux - time_threshold=$(date -u -d "$minutes minutes ago" +"%Y-%m-%dT%H:%M:%SZ") -fi - -# Build query -query="CloseTime > \"$time_threshold\"" - -if [[ -n "$status" ]]; then - query="$query AND ExecutionStatus = \"$status\"" -fi - -if [[ -n "$workflow_type" ]]; then - query="$query AND WorkflowType = \"$workflow_type\"" -fi - -echo "Searching workflows from last $minutes minute(s)..." -echo "Query: $query" -echo "" - -# Execute list command -if ! workflow_list=$("$TEMPORAL_CLI" workflow list \ - --address "$TEMPORAL_ADDRESS" \ - --namespace "$CLAUDE_TEMPORAL_NAMESPACE" \ - --query "$query" 2>&1); then - echo "Failed to list workflows" >&2 - echo "$workflow_list" >&2 - exit 1 -fi - -# Check if any workflows found -if echo "$workflow_list" | grep -q "No workflows found"; then - echo "No workflows found in the last $minutes minute(s)" - exit 0 -fi - -# Parse and display results -echo "$workflow_list" | head -n 50 - -# Count results -workflow_count=$(echo "$workflow_list" | awk 'NR>1 && $1 != "" && $1 !~ /^-+$/ {print $1}' | wc -l | tr -d ' ') - -echo "" -echo "Found $workflow_count workflow(s)" -echo "" -echo "To get workflow result:" -echo " ./tools/get-workflow-result.sh --workflow-id " diff --git a/scripts/list-workers.sh b/scripts/list-workers.sh deleted file mode 100755 index ea75f4c..0000000 --- a/scripts/list-workers.sh +++ /dev/null @@ -1,221 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Get the directory where this script is located -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" - -# Source the helper function to find project workers -source "$SCRIPT_DIR/find-project-workers.sh" - -# Environment variables with defaults -CLAUDE_TEMPORAL_PID_DIR="${CLAUDE_TEMPORAL_PID_DIR:-${TMPDIR:-/tmp}/claude-temporal-pids}" - -# Check if PID directory exists -if [[ ! -d "$CLAUDE_TEMPORAL_PID_DIR" ]]; then - echo "No PID directory found: $CLAUDE_TEMPORAL_PID_DIR" - exit 0 -fi - -# Function to get process uptime -get_uptime() { - local pid=$1 - if [[ "$(uname)" == "Darwin" ]]; then - # macOS - local start_time=$(ps -o lstart= -p "$pid" 2>/dev/null | xargs -I{} date -j -f "%c" "{}" "+%s" 2>/dev/null || echo "0") - else - # Linux - local start_time=$(ps -o etimes= -p "$pid" 2>/dev/null | tr -d ' ' || echo "0") - fi - - if [[ "$start_time" == "0" ]]; then - echo "-" - return - fi - - local now=$(date +%s) - local elapsed=$((now - start_time)) - - # For Linux, etimes already gives elapsed seconds - if [[ "$(uname)" != "Darwin" ]]; then - elapsed=$start_time - fi - - local hours=$((elapsed / 3600)) - local minutes=$(((elapsed % 3600) / 60)) - local seconds=$((elapsed % 60)) - - if (( hours > 0 )); then - printf "%dh %dm" "$hours" "$minutes" - elif (( minutes > 0 )); then - printf "%dm %ds" "$minutes" "$seconds" - else - printf "%ds" "$seconds" - fi -} - -# Function to get process command -get_command() { - local pid=$1 - ps -o command= -p "$pid" 2>/dev/null | cut -c1-50 || echo "-" -} - -# Print header -printf "%-20s %-8s %-10s %-10s %-10s %s\n" "PROJECT" "PID" "STATUS" "TRACKED" "UPTIME" "COMMAND" -printf "%-20s %-8s %-10s %-10s %-10s %s\n" "--------------------" "--------" "----------" "----------" "----------" "-----" - -# Find all PID files -found_any=false - -# Track all PIDs we've seen to detect orphans later -declare -A tracked_pids - -# List server if exists -SERVER_PID_FILE="$CLAUDE_TEMPORAL_PID_DIR/server.pid" -if [[ -f "$SERVER_PID_FILE" ]]; then - found_any=true - SERVER_PID=$(cat "$SERVER_PID_FILE") - tracked_pids[$SERVER_PID]=1 - if kill -0 "$SERVER_PID" 2>/dev/null; then - uptime=$(get_uptime "$SERVER_PID") - command=$(get_command "$SERVER_PID") - printf "%-20s %-8s %-10s %-10s %-10s %s\n" "server" "$SERVER_PID" "running" "yes" "$uptime" "$command" - else - printf "%-20s %-8s %-10s %-10s %-10s %s\n" "server" "$SERVER_PID" "dead" "yes" "-" "-" - fi -fi - -# List all worker PID files -shopt -s nullglob -PID_FILES=("$CLAUDE_TEMPORAL_PID_DIR"/worker-*.pid) -shopt -u nullglob - -# Store project directories for orphan detection -declare -A project_dirs - -for pid_file in "${PID_FILES[@]}"; do - found_any=true - # Extract project name from filename - filename=$(basename "$pid_file") - project="${filename#worker-}" - project="${project%.pid}" - - # Read PID - worker_pid=$(cat "$pid_file") - tracked_pids[$worker_pid]=1 - - # Check if process is running - if kill -0 "$worker_pid" 2>/dev/null; then - uptime=$(get_uptime "$worker_pid") - command=$(get_command "$worker_pid") - printf "%-20s %-8s %-10s %-10s %-10s %s\n" "$project" "$worker_pid" "running" "yes" "$uptime" "$command" - - # Try to determine project directory from the command - # Look for project directory in the command path - if [[ "$command" =~ ([^[:space:]]+/${project})[/[:space:]] ]]; then - project_dir="${BASH_REMATCH[1]}" - project_dirs[$project]="$project_dir" - fi - else - printf "%-20s %-8s %-10s %-10s %-10s %s\n" "$project" "$worker_pid" "dead" "yes" "-" "-" - fi -done - -# Now check for orphaned workers for each project we know about -for project in "${!project_dirs[@]}"; do - project_dir="${project_dirs[$project]}" - - # Find all workers for this project - while IFS= read -r pid; do - [[ -z "$pid" ]] && continue - - # Skip if we already tracked this PID - if [[ -n "${tracked_pids[$pid]:-}" ]]; then - continue - fi - - # This is an orphaned worker - found_any=true - if kill -0 "$pid" 2>/dev/null; then - uptime=$(get_uptime "$pid") - command=$(get_command "$pid") - printf "%-20s %-8s %-10s %-10s %-10s %s\n" "$project" "$pid" "running" "ORPHAN" "$uptime" "$command" - tracked_pids[$pid]=1 - fi - done < <(find_project_workers "$project_dir" 2>/dev/null || true) -done - -# Also scan for workers that have no PID file at all (completely orphaned) -# Find all Python worker processes and group by project -if [[ "$(uname)" == "Darwin" ]]; then - # macOS - while IFS= read -r line; do - [[ -z "$line" ]] && continue - - pid=$(echo "$line" | awk '{print $1}') - command=$(echo "$line" | cut -d' ' -f2-) - - # Skip if already tracked - if [[ -n "${tracked_pids[$pid]:-}" ]]; then - continue - fi - - # Extract project name from path - if [[ "$command" =~ /([^/]+)/\.venv/bin/ ]]; then - project="${BASH_REMATCH[1]}" - found_any=true - uptime=$(get_uptime "$pid") - cmd_display=$(echo "$command" | cut -c1-50) - printf "%-20s %-8s %-10s %-10s %-10s %s\n" "$project" "$pid" "running" "ORPHAN" "$uptime" "$cmd_display" - tracked_pids[$pid]=1 - fi - done < <(ps ax -o pid,command | grep -E "\.venv/bin/(python[0-9.]*|worker)" | grep -v grep) - - # Also check for "uv run worker" processes - while IFS= read -r line; do - [[ -z "$line" ]] && continue - - pid=$(echo "$line" | awk '{print $1}') - - # Skip if already tracked - if [[ -n "${tracked_pids[$pid]:-}" ]]; then - continue - fi - - # Get the working directory for this process - cwd=$(lsof -a -p "$pid" -d cwd -Fn 2>/dev/null | grep ^n | cut -c2- || echo "") - if [[ -n "$cwd" ]]; then - project=$(basename "$cwd") - found_any=true - uptime=$(get_uptime "$pid") - printf "%-20s %-8s %-10s %-10s %-10s %s\n" "$project" "$pid" "running" "ORPHAN" "$uptime" "uv run worker" - tracked_pids[$pid]=1 - fi - done < <(ps ax -o pid,command | grep "uv run worker" | grep -v grep | awk '{print $1}') -else - # Linux - similar logic using /proc - while IFS= read -r line; do - [[ -z "$line" ]] && continue - - pid=$(echo "$line" | awk '{print $1}') - command=$(echo "$line" | cut -d' ' -f2-) - - # Skip if already tracked - if [[ -n "${tracked_pids[$pid]:-}" ]]; then - continue - fi - - # Extract project name from path - if [[ "$command" =~ /([^/]+)/\.venv/bin/ ]]; then - project="${BASH_REMATCH[1]}" - found_any=true - uptime=$(get_uptime "$pid") - cmd_display=$(echo "$command" | cut -c1-50) - printf "%-20s %-8s %-10s %-10s %-10s %s\n" "$project" "$pid" "running" "ORPHAN" "$uptime" "$cmd_display" - tracked_pids[$pid]=1 - fi - done < <(ps ax -o pid,cmd | grep -E "\.venv/bin/(python[0-9.]*|worker)" | grep -v grep) -fi - -if [[ "$found_any" == false ]]; then - echo "No workers or server found" -fi diff --git a/scripts/monitor-worker-health.sh b/scripts/monitor-worker-health.sh deleted file mode 100755 index 6c15a82..0000000 --- a/scripts/monitor-worker-health.sh +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Environment variables with defaults -CLAUDE_TEMPORAL_PID_DIR="${CLAUDE_TEMPORAL_PID_DIR:-${TMPDIR:-/tmp}/claude-temporal-pids}" -CLAUDE_TEMPORAL_LOG_DIR="${CLAUDE_TEMPORAL_LOG_DIR:-${TMPDIR:-/tmp}/claude-temporal-logs}" -CLAUDE_TEMPORAL_PROJECT_NAME="${CLAUDE_TEMPORAL_PROJECT_NAME:-$(basename "$(pwd)")}" - -PID_FILE="$CLAUDE_TEMPORAL_PID_DIR/worker-$CLAUDE_TEMPORAL_PROJECT_NAME.pid" -LOG_FILE="$CLAUDE_TEMPORAL_LOG_DIR/worker-$CLAUDE_TEMPORAL_PROJECT_NAME.log" - -# Function to get process uptime -get_uptime() { - local pid=$1 - if [[ "$(uname)" == "Darwin" ]]; then - # macOS - local start_time=$(ps -o lstart= -p "$pid" 2>/dev/null | xargs -I{} date -j -f "%c" "{}" "+%s" 2>/dev/null || echo "0") - else - # Linux - local start_time=$(ps -o etimes= -p "$pid" 2>/dev/null | tr -d ' ' || echo "0") - fi - - if [[ "$start_time" == "0" ]]; then - echo "unknown" - return - fi - - local now=$(date +%s) - local elapsed=$((now - start_time)) - - # For Linux, etimes already gives elapsed seconds - if [[ "$(uname)" != "Darwin" ]]; then - elapsed=$start_time - fi - - local hours=$((elapsed / 3600)) - local minutes=$(((elapsed % 3600) / 60)) - local seconds=$((elapsed % 60)) - - if (( hours > 0 )); then - printf "%dh %dm %ds" "$hours" "$minutes" "$seconds" - elif (( minutes > 0 )); then - printf "%dm %ds" "$minutes" "$seconds" - else - printf "%ds" "$seconds" - fi -} - -echo "=== Worker Health Check ===" -echo "Project: $CLAUDE_TEMPORAL_PROJECT_NAME" -echo "" - -# Check if PID file exists -if [[ ! -f "$PID_FILE" ]]; then - echo "Worker Status: NOT RUNNING" - echo "No PID file found: $PID_FILE" - exit 1 -fi - -# Read PID -WORKER_PID=$(cat "$PID_FILE") - -# Check if process is alive -if ! kill -0 "$WORKER_PID" 2>/dev/null; then - echo "Worker Status: DEAD" - echo "PID file exists but process is not running" - echo "PID: $WORKER_PID (stale)" - echo "" - echo "To clean up and restart:" - echo " rm -f $PID_FILE" - echo " ./tools/ensure-worker.sh" - exit 1 -fi - -# Process is alive -echo "Worker Status: RUNNING" -echo "PID: $WORKER_PID" -echo "Uptime: $(get_uptime "$WORKER_PID")" -echo "" - -# Check log file -if [[ -f "$LOG_FILE" ]]; then - echo "Log file: $LOG_FILE" - echo "Log size: $(wc -c < "$LOG_FILE" | tr -d ' ') bytes" - echo "" - - # Check for recent errors in logs (last 50 lines) - if tail -n 50 "$LOG_FILE" | grep -iE "(error|exception|fatal|traceback)" >/dev/null 2>&1; then - echo "⚠️ Recent errors found in logs (last 50 lines):" - echo "" - tail -n 50 "$LOG_FILE" | grep -iE "(error|exception|fatal)" | tail -n 10 - echo "" - echo "Full logs: $LOG_FILE" - exit 1 - fi - - # Show last log entry - echo "Last log entry:" - tail -n 1 "$LOG_FILE" 2>/dev/null || echo "(empty log)" - echo "" - - echo "✓ Worker appears healthy" - echo "" - echo "To view logs:" - echo " tail -f $LOG_FILE" -else - echo "⚠️ Log file not found: $LOG_FILE" - echo "" - echo "Worker is running but no logs found" - exit 1 -fi diff --git a/scripts/wait-for-worker-ready.sh b/scripts/wait-for-worker-ready.sh deleted file mode 100755 index 17fbadf..0000000 --- a/scripts/wait-for-worker-ready.sh +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -usage() { - cat <<'USAGE' -Usage: wait-for-worker-ready.sh --log-file file [options] - -Poll worker log file for startup confirmation. - -Options: - --log-file path to worker log file, required - -p, --pattern regex pattern to look for (default: "Worker started") - -F, --fixed treat pattern as a fixed string (grep -F) - -T, --timeout seconds to wait (integer, default: 30) - -i, --interval poll interval in seconds (default: 0.5) - -h, --help show this help -USAGE -} - -log_file="" -pattern="Worker started" -grep_flag="-E" -timeout=30 -interval=0.5 - -while [[ $# -gt 0 ]]; do - case "$1" in - --log-file) log_file="${2-}"; shift 2 ;; - -p|--pattern) pattern="${2-}"; shift 2 ;; - -F|--fixed) grep_flag="-F"; shift ;; - -T|--timeout) timeout="${2-}"; shift 2 ;; - -i|--interval) interval="${2-}"; shift 2 ;; - -h|--help) usage; exit 0 ;; - *) echo "Unknown option: $1" >&2; usage; exit 1 ;; - esac -done - -if [[ -z "$log_file" ]]; then - echo "log-file is required" >&2 - usage - exit 1 -fi - -if ! [[ "$timeout" =~ ^[0-9]+$ ]]; then - echo "timeout must be an integer number of seconds" >&2 - exit 1 -fi - -# End time in epoch seconds -start_epoch=$(date +%s) -deadline=$((start_epoch + timeout)) - -while true; do - # Check if log file exists and has content - if [[ -f "$log_file" ]]; then - log_content="$(cat "$log_file" 2>/dev/null || true)" - - if [[ -n "$log_content" ]] && printf '%s\n' "$log_content" | grep $grep_flag -- "$pattern" >/dev/null 2>&1; then - exit 0 - fi - fi - - now=$(date +%s) - if (( now >= deadline )); then - echo "Timed out after ${timeout}s waiting for pattern: $pattern" >&2 - if [[ -f "$log_file" ]]; then - echo "Last content from $log_file:" >&2 - tail -n 50 "$log_file" >&2 || true - else - echo "Log file not found: $log_file" >&2 - fi - exit 1 - fi - - sleep "$interval" -done diff --git a/scripts/wait-for-workflow-status.sh b/scripts/wait-for-workflow-status.sh deleted file mode 100755 index 5a3bd54..0000000 --- a/scripts/wait-for-workflow-status.sh +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Environment variables with defaults -TEMPORAL_CLI="${TEMPORAL_CLI:-temporal}" -TEMPORAL_ADDRESS="${TEMPORAL_ADDRESS:-localhost:7233}" -CLAUDE_TEMPORAL_NAMESPACE="${CLAUDE_TEMPORAL_NAMESPACE:-default}" - -usage() { - cat <<'USAGE' -Usage: wait-for-workflow-status.sh --workflow-id id --status status [options] - -Poll workflow for specific status. - -Options: - --workflow-id workflow ID to monitor, required - --status status to wait for, required - (RUNNING, COMPLETED, FAILED, CANCELED, TERMINATED, TIMED_OUT) - --run-id specific workflow run ID (optional) - -T, --timeout seconds to wait (integer, default: 300) - -i, --interval poll interval in seconds (default: 2) - -h, --help show this help -USAGE -} - -workflow_id="" -run_id="" -target_status="" -timeout=300 -interval=2 - -while [[ $# -gt 0 ]]; do - case "$1" in - --workflow-id) workflow_id="${2-}"; shift 2 ;; - --run-id) run_id="${2-}"; shift 2 ;; - --status) target_status="${2-}"; shift 2 ;; - -T|--timeout) timeout="${2-}"; shift 2 ;; - -i|--interval) interval="${2-}"; shift 2 ;; - -h|--help) usage; exit 0 ;; - *) echo "Unknown option: $1" >&2; usage; exit 1 ;; - esac -done - -if [[ -z "$workflow_id" || -z "$target_status" ]]; then - echo "workflow-id and status are required" >&2 - usage - exit 1 -fi - -if ! [[ "$timeout" =~ ^[0-9]+$ ]]; then - echo "timeout must be an integer number of seconds" >&2 - exit 1 -fi - -if ! command -v "$TEMPORAL_CLI" >/dev/null 2>&1; then - echo "Temporal CLI not found: $TEMPORAL_CLI" >&2 - exit 1 -fi - -# Build temporal command -TEMPORAL_CMD=("$TEMPORAL_CLI" "workflow" "describe" "--workflow-id" "$workflow_id" "--address" "$TEMPORAL_ADDRESS" "--namespace" "$CLAUDE_TEMPORAL_NAMESPACE") - -if [[ -n "$run_id" ]]; then - TEMPORAL_CMD+=("--run-id" "$run_id") -fi - -# Normalize target status to uppercase -target_status=$(echo "$target_status" | tr '[:lower:]' '[:upper:]') - -# End time in epoch seconds -start_epoch=$(date +%s) -deadline=$((start_epoch + timeout)) - -echo "Polling workflow: $workflow_id" -echo "Target status: $target_status" -echo "Timeout: ${timeout}s" -echo "" - -while true; do - # Query workflow status - if output=$("${TEMPORAL_CMD[@]}" 2>&1); then - # Extract status from output - # The output includes a line like: " Status COMPLETED" - if current_status=$(echo "$output" | grep -E "^\s*Status\s" | awk '{print $2}' | tr -d ' '); then - echo "Current status: $current_status ($(date '+%H:%M:%S'))" - - if [[ "$current_status" == "$target_status" ]]; then - echo "" - echo "✓ Workflow reached status: $target_status" - exit 0 - fi - - # Check if workflow reached a terminal state different from target - case "$current_status" in - COMPLETED|FAILED|CANCELED|TERMINATED|TIMED_OUT) - if [[ "$current_status" != "$target_status" ]]; then - echo "" - echo "⚠️ Workflow reached terminal status: $current_status (expected: $target_status)" - exit 1 - fi - ;; - esac - else - echo "⚠️ Could not parse workflow status from output" >&2 - fi - else - echo "⚠️ Failed to query workflow (it may not exist yet)" >&2 - fi - - now=$(date +%s) - if (( now >= deadline )); then - echo "" - echo "❌ Timeout after ${timeout}s waiting for status: $target_status" >&2 - exit 1 - fi - - sleep "$interval" -done