Skip to content

Commit 10908fe

Browse files
committed
feat: initial release of temporal-parseable python plugin v0.1.0
0 parents  commit 10908fe

20 files changed

Lines changed: 2335 additions & 0 deletions

.github/workflows/ci.yaml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: [main]
6+
tags: ["v*"]
7+
pull_request:
8+
branches: [main]
9+
10+
jobs:
11+
test:
12+
name: Test (Python ${{ matrix.python-version }})
13+
runs-on: ubuntu-latest
14+
strategy:
15+
matrix:
16+
python-version: ["3.9", "3.10", "3.11", "3.12"]
17+
18+
steps:
19+
- uses: actions/checkout@v4
20+
21+
- uses: actions/setup-python@v5
22+
with:
23+
python-version: ${{ matrix.python-version }}
24+
25+
- name: Install dependencies
26+
run: pip install -e ".[dev]"
27+
28+
- name: Lint
29+
run: ruff check src/ tests/
30+
31+
- name: Type check
32+
run: mypy src/temporal_parseable
33+
34+
- name: Test
35+
run: pytest --tb=short -q
36+
37+
publish:
38+
name: Publish to PyPI
39+
needs: test
40+
runs-on: ubuntu-latest
41+
if: startsWith(github.ref, 'refs/tags/v')
42+
environment: pypi
43+
44+
steps:
45+
- uses: actions/checkout@v4
46+
47+
- uses: actions/setup-python@v5
48+
with:
49+
python-version: "3.11"
50+
51+
- name: Install build tools
52+
run: pip install build twine
53+
54+
- name: Build
55+
run: python -m build
56+
57+
- name: Publish to PyPI
58+
env:
59+
TWINE_USERNAME: __token__
60+
TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }}
61+
run: twine upload dist/*

.gitignore

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Python
2+
__pycache__/
3+
*.py[cod]
4+
*.pyo
5+
*.pyd
6+
.Python
7+
*.egg
8+
*.egg-info/
9+
dist/
10+
build/
11+
.eggs/
12+
*.whl
13+
14+
# Virtual environments
15+
.venv/
16+
venv/
17+
env/
18+
19+
# Testing
20+
.pytest_cache/
21+
.coverage
22+
htmlcov/
23+
.tox/
24+
25+
# Type checking
26+
.mypy_cache/
27+
.ruff_cache/
28+
29+
# IDE
30+
.vscode/
31+
.idea/
32+
*.swp
33+
*.swo
34+
35+
# macOS
36+
.DS_Store
37+
38+
# Distribution
39+
dist/
40+
*.tar.gz

CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Changelog
2+
3+
All notable changes to `temporal-parseable` are documented here.
4+
5+
## [0.1.0] - 2026-06-03
6+
7+
### Added
8+
- `ParseablePlugin` — drop-in Temporal plugin wiring logs and traces to Parseable
9+
- `ParseableConfig` — dataclass config with full `PARSEABLE_*` env-var support
10+
- `ParseableActivityInterceptor` — emits `started / completed / failed` records per activity execution, including `attempt` and `duration_ms`
11+
- `ParseableWorkflowInboundInterceptor` — emits records for `execute_workflow`, `handle_signal`, `handle_query`, `handle_update`
12+
- `ParseableWorkflowOutboundInterceptor` — emits records for `start_child_workflow`, `signal_external_workflow`, `signal_child_workflow`, `continue_as_new`
13+
- `workflow_event()` — replay-safe helper for emitting custom domain events from workflow code
14+
- `SanitizingSpanExporter` — flattens non-primitive OTel span attributes to prevent Parseable `400` rejections
15+
- Correct `X-P-Log-Source` headers: `otel-logs` for logs pipeline, `otel-traces` for traces pipeline
16+
- Sandbox passthrough support — `temporal_parseable` marked as passthrough so the Temporal workflow sandbox does not attempt to import OTel/requests inside the isolate
17+
- Full `pyproject.toml` with hatchling build backend, pinned OTel 1.x deps, and `dev` extras
18+
- `examples/` with `worker.py`, `client.py`, `workflows.py` covering all interceptor paths
19+
- `tests/` with `test_config.py`, `test_sanitizing_exporter.py`, `test_interceptors.py`
20+
- `.github/workflows/ci.yml` — pytest on push/PR, auto-publish to PyPI on `v*` tag

README.md

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# temporal-parseable
2+
3+
Temporal plugin that ships workflow and activity execution events to [Parseable](https://parseable.com) as OpenTelemetry structured logs and traces.
4+
5+
```
6+
┌──────────────┐ OTLP/HTTP ┌──────────────┐
7+
│ Temporal │ ──────────────── │ Parseable │
8+
│ Worker │ logs + traces │ │
9+
│ + Plugin │ │ temporal-* │
10+
└──────────────┘ └──────────────┘
11+
```
12+
13+
Two streams in Parseable:
14+
15+
- **`temporal-logs`** — flat queryable records: workflow/activity start, complete, fail, retry, duration, signals, queries, updates, child workflows, continue-as-new, and custom domain events
16+
- **`temporal-traces`** — OTel waterfall traces across workflow and activity boundaries
17+
18+
## Installation
19+
20+
```bash
21+
pip install temporal-parseable
22+
```
23+
24+
## Quick start
25+
26+
```python
27+
from temporalio.client import Client
28+
from temporalio.worker import Worker
29+
from temporal_parseable import ParseablePlugin, ParseableConfig
30+
31+
config = ParseableConfig(
32+
service_name="my-worker",
33+
endpoint="https://parseable.example.com",
34+
username="admin",
35+
password="secret",
36+
)
37+
plugin = ParseablePlugin(config)
38+
39+
client = await Client.connect("localhost:7233", plugins=[plugin])
40+
41+
async with Worker(
42+
client,
43+
task_queue="my-queue",
44+
workflows=[MyWorkflow],
45+
activities=[my_activity],
46+
plugins=[plugin],
47+
):
48+
await asyncio.Event().wait()
49+
```
50+
51+
## Configuration
52+
53+
All settings fall back to environment variables with the `PARSEABLE_` prefix:
54+
55+
| Argument | Environment variable | Default |
56+
|---|---|---|
57+
| `endpoint` | `PARSEABLE_URL` | `http://localhost:8000` |
58+
| `username` | `PARSEABLE_USERNAME` | `admin` |
59+
| `password` | `PARSEABLE_PASSWORD` | `admin` |
60+
| `service_name` | `PARSEABLE_SERVICE_NAME` | `temporal-worker` |
61+
| `logs.stream` | `PARSEABLE_LOGS_STREAM` | `temporal-logs` |
62+
| `traces.stream` | `PARSEABLE_TRACES_STREAM` | `temporal-traces` |
63+
64+
Pass `logs=None` or `traces=None` to disable either pipeline.
65+
66+
## Custom domain events
67+
68+
Emit replay-safe domain events from inside workflow code:
69+
70+
```python
71+
from temporal_parseable.workflow import workflow_event
72+
73+
@workflow.defn
74+
class AgentWorkflow:
75+
@workflow.run
76+
async def run(self, input: AgentInput) -> AgentResult:
77+
workflow_event("agent.started", {"user_id": input.user_id})
78+
79+
plan = await workflow.execute_activity(plan_activity, input)
80+
workflow_event("agent.plan.chosen", {"steps": len(plan.steps)})
81+
82+
for step in plan.steps:
83+
workflow_event("agent.step.start", {"tool": step.tool})
84+
await workflow.execute_activity(run_step, step)
85+
86+
return result
87+
```
88+
89+
Each call emits a record with `type: "user_event"`, `event_name`, and `event_data`. Records are replay-safe — never duplicated during Temporal history replay.
90+
91+
## Log schema
92+
93+
| Field | Type | Notes |
94+
|---|---|---|
95+
| `type` | `activity` \| `workflow` \| `user_event` \| `signal` \| `query` \| `update` \| `child_workflow` \| `continue_as_new` | discriminator |
96+
| `status` | `started` \| `completed` \| `failed` | not on `user_event` |
97+
| `service_name` | string | from plugin config |
98+
| `timestamp` | ISO 8601 | event time |
99+
| `workflow_id` | string | |
100+
| `run_id` | string | |
101+
| `workflow_name` | string | |
102+
| `activity_name` | string | activity records only |
103+
| `activity_id` | string | activity records only |
104+
| `attempt` | int | activity records only (1-based) |
105+
| `duration_ms` | float | on completion/fail |
106+
| `error` | string | on fail |
107+
| `direction` | `inbound` \| `outbound` | message records |
108+
| `message_name` | string | signal/query/update name |
109+
| `target_workflow_id` | string | outbound signals/child workflows |
110+
| `event_name` | string | user events only |
111+
| `event_data` | object | user events only |
112+
113+
## Replay safety
114+
115+
All workflow-side emission is replay-safe. The plugin guards every emit with `workflow.unsafe.is_replaying()`, so records are never duplicated when Temporal replays workflow history (worker crash, cache eviction, manual replay).
116+
117+
## Example queries in Parseable
118+
119+
```sql
120+
-- Recent workflow failures
121+
SELECT workflow_id, workflow_name, error, p_timestamp
122+
FROM "temporal-logs"
123+
WHERE type = 'workflow' AND status = 'failed'
124+
ORDER BY p_timestamp DESC LIMIT 20;
125+
126+
-- Activity retry hotspots
127+
SELECT activity_name, COUNT(*) as failures
128+
FROM "temporal-logs"
129+
WHERE type = 'activity' AND status = 'failed'
130+
GROUP BY activity_name ORDER BY failures DESC;
131+
132+
-- P95 activity duration
133+
SELECT activity_name, PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms) as p95_ms
134+
FROM "temporal-logs"
135+
WHERE type = 'activity' AND status = 'completed'
136+
GROUP BY activity_name;
137+
```
138+
139+
## Development
140+
141+
```bash
142+
pip install -e ".[dev]"
143+
pytest
144+
```
145+
146+
## License
147+
148+
Apache-2.0

examples/client.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import sys
5+
import os
6+
import uuid
7+
8+
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
9+
10+
from temporalio.client import Client
11+
from workflows import (
12+
ExampleWorkflow, FailingWorkflow, UserEventWorkflow, ParentWorkflow
13+
)
14+
15+
TASK_QUEUE = "temporal-parseable-demo"
16+
17+
def uid(name: str) -> str:
18+
return f"{name}-{uuid.uuid4().hex[:8]}"
19+
20+
21+
async def main() -> None:
22+
client = await Client.connect("localhost:7233")
23+
24+
print("→ Running happy-path workflow...")
25+
result = await client.execute_workflow(
26+
ExampleWorkflow.run, "World",
27+
id=uid("example"), task_queue=TASK_QUEUE,
28+
)
29+
print(f" Result: {result}")
30+
31+
print("→ Running user-event workflow...")
32+
result = await client.execute_workflow(
33+
UserEventWorkflow.run, "Alice",
34+
id=uid("user-event"), task_queue=TASK_QUEUE,
35+
)
36+
print(f" Result: {result}")
37+
38+
print("→ Running parent/child workflow...")
39+
result = await client.execute_workflow(
40+
ParentWorkflow.run, "Bob",
41+
id=uid("parent"), task_queue=TASK_QUEUE,
42+
)
43+
print(f" Result: {result}")
44+
45+
print("→ Running failing workflow (will fail after retries)...")
46+
try:
47+
await client.execute_workflow(
48+
FailingWorkflow.run,
49+
id=uid("failing"), task_queue=TASK_QUEUE,
50+
)
51+
except Exception as e:
52+
print(f" Expected failure: {e}")
53+
54+
print("\nDone. Check Parseable for records in temporal-logs.")
55+
56+
57+
if __name__ == "__main__":
58+
asyncio.run(main())

0 commit comments

Comments
 (0)