---
description: Empathy Framework - User API Documentation API reference: **Version:** 5.1.0 **Last Updated:** January 27, 2026 **Status:** Anthropic-Native (Claude-only) ---
---
Version: 5.1.0 Last Updated: January 27, 2026 Status: Anthropic-Native (Claude-only)
- Quick Start
- Core APIs
- Workflows
- Telemetry & Monitoring
- Cost Optimization
- CLI Reference
- Configuration
- Examples
pip install empathy-frameworkfrom empathy_os.workflows import CodeReviewWorkflow
# Create workflow instance
workflow = CodeReviewWorkflow()
# Execute asynchronously
result = await workflow.execute({
"path": "./src",
"files": ["main.py", "utils.py"]
})
print(f"Review complete: {result['status']}")
print(f"Cost: ${result['cost']:.4f}")Base class for all workflows with multi-tier LLM routing.
from empathy_os.workflows.base import BaseWorkflow, ModelTier
class CustomWorkflow(BaseWorkflow):
"""Custom workflow with adaptive routing."""
name = "custom-workflow"
stages = ["preparation", "analysis", "synthesis"]
def __init__(self, enable_adaptive_routing: bool = True):
super().__init__(
enable_heartbeat_tracking=True,
enable_adaptive_routing=enable_adaptive_routing
)
async def run_stage(
self,
stage_name: str,
tier: ModelTier,
input_data: dict
) -> tuple[dict, int, int]:
"""Execute a workflow stage.
Args:
stage_name: Name of the stage
tier: Model tier (CHEAP, CAPABLE, PREMIUM)
input_data: Input data for the stage
Returns:
Tuple of (result_dict, tokens_in, tokens_out)
"""
# Your stage logic here
system_prompt = f"You are executing: {stage_name}"
user_message = f"Process: {input_data}"
response = await self._call_llm(
tier=tier,
system=system_prompt,
user_message=user_message
)
return response, tokens_in, tokens_outKey Methods:
| Method | Description | Returns |
|---|---|---|
execute(input_data: dict) |
Execute workflow | dict with status, cost, tokens |
get_tier_for_stage(stage: str) |
Get tier for stage | ModelTier enum |
_call_llm(tier, system, user_message) |
Call LLM with tier | LLM response |
Model Tiers:
from empathy_os.workflows.base import ModelTier
ModelTier.CHEAP # claude-3-5-haiku-20241022 (~$0.001/call)
ModelTier.CAPABLE # claude-sonnet-4-5 (~$0.008/call)
ModelTier.PREMIUM # claude-opus-4-5 (~$0.070/call)Track LLM usage, costs, and performance metrics.
from empathy_os.telemetry import UsageTracker
# Get singleton instance
tracker = UsageTracker.get_instance()
# Track LLM call
tracker.track_llm_call(
workflow="code-review",
stage="analysis",
tier="CHEAP",
model="claude-3-5-haiku-20241022",
provider="anthropic",
cost=0.0016,
tokens={"input": 1000, "output": 500},
cache_hit=True,
cache_type="prompt",
duration_ms=1200,
success=True
)
# Get statistics
stats = tracker.get_stats(days=7)
print(f"Total calls: {stats['total_calls']:,}")
print(f"Total cost: ${stats['total_cost']:.2f}")
print(f"Cache hit rate: {stats['cache_hit_rate']:.1f}%")
print(f"Avg cost per call: ${stats['avg_cost_per_call']:.4f}")
# Get recent entries
entries = tracker.get_recent_entries(limit=100, days=7)
for entry in entries:
print(f"[{entry['timestamp']}] {entry['workflow']}: ${entry['cost']:.4f}")Key Methods:
| Method | Description | Returns |
|---|---|---|
track_llm_call(**kwargs) |
Record LLM API call | None |
get_stats(days: int) |
Get aggregate statistics | dict |
get_recent_entries(limit, days) |
Get recent entries | list[dict] |
export_to_csv(filepath) |
Export to CSV | None |
Automatically select optimal model based on historical performance.
from empathy_os.models import AdaptiveModelRouter
from empathy_os.telemetry import UsageTracker
# Initialize router
tracker = UsageTracker.get_instance()
router = AdaptiveModelRouter(telemetry=tracker)
# Get best model for workflow/stage
model = router.get_best_model(
workflow="code-review",
stage="analysis",
max_cost=0.01, # Max $0.01 per call
max_latency_ms=5000, # Max 5s latency
min_success_rate=0.9 # Min 90% success
)
print(f"Using: {model}") # claude-3-5-haiku-20241022
# Check for tier upgrade recommendation
should_upgrade, reason = router.recommend_tier_upgrade(
workflow="code-review",
stage="analysis"
)
if should_upgrade:
print(f"⚠️ Upgrade recommended: {reason}")
# Upgrade from CHEAP → CAPABLE → PREMIUMKey Methods:
| Method | Description | Returns |
|---|---|---|
get_best_model(workflow, stage, ...) |
Get optimal model | str (model ID) |
recommend_tier_upgrade(workflow, stage) |
Check if upgrade needed | (bool, str) |
get_routing_stats(workflow, days) |
Get routing statistics | dict |
Decision Logic:
- Quality Score:
(success_rate × 100) - (cost × 10) - Upgrade Trigger: Failure rate > 20% in last 20 calls
- Downgrade Consideration: Success rate > 90% with high cost
from empathy_os.workflows import CodeReviewWorkflow
workflow = CodeReviewWorkflow()
result = await workflow.execute({
"path": "./src",
"files": ["main.py", "utils.py"],
"focus_areas": ["security", "performance"]
})
# Output
{
"status": "success",
"issues": [
{"file": "main.py", "line": 42, "severity": "high", "issue": "SQL injection risk"},
{"file": "utils.py", "line": 15, "severity": "medium", "issue": "Inefficient loop"}
],
"suggestions": [...],
"cost": 0.0234,
"tokens": {"input": 15000, "output": 3000}
}from empathy_os.workflows import TestGenerationWorkflow
workflow = TestGenerationWorkflow()
result = await workflow.execute({
"source_file": "./src/utils.py",
"functions": ["parse_config", "validate_input"],
"framework": "pytest"
})
# Output
{
"status": "success",
"test_file": "./tests/test_utils.py",
"tests_generated": 12,
"coverage_estimate": 0.85,
"cost": 0.0189
}from empathy_os.workflows.batch_processing import (
BatchProcessingWorkflow,
BatchRequest
)
workflow = BatchProcessingWorkflow()
# Create batch requests
requests = [
BatchRequest(
task_id="task_1",
task_type="analyze_logs",
input_data={"logs": "ERROR: Connection failed..."},
model_tier="capable"
),
BatchRequest(
task_id="task_2",
task_type="generate_report",
input_data={"data": {...}},
model_tier="cheap"
)
]
# Execute batch (processes within 24 hours at 50% cost)
results = await workflow.execute_batch(
requests=requests,
poll_interval=300, # Check every 5 minutes
timeout=86400 # 24 hour max wait
)
# Check results
for result in results:
if result.success:
print(f"✓ {result.task_id}: {result.output['content']}")
else:
print(f"✗ {result.task_id}: {result.error}")Track agent lifecycle and heartbeats.
from empathy_os.telemetry import HeartbeatCoordinator
# Initialize coordinator
coordinator = HeartbeatCoordinator(
enable_streaming=True # Enable real-time event streaming
)
# Start heartbeat for agent
coordinator.start_heartbeat(
agent_id="my-agent-001",
metadata={
"workflow": "code-review",
"run_id": "abc123"
}
)
# Update status
coordinator.beat(
status="running",
progress=0.5,
current_stage="analysis"
)
# Complete
coordinator.complete(
status="success",
final_state={"issues_found": 3}
)Send signals between agents.
from empathy_os.telemetry import CoordinationSignals
# Initialize signals
signals = CoordinationSignals(
agent_id="orchestrator",
enable_streaming=True
)
# Send signal to specific agent
signals.signal(
signal_type="task_assigned",
target_agent="worker-1",
payload={"task_id": "t123", "priority": "high"}
)
# Broadcast to all agents
signals.broadcast(
signal_type="shutdown_requested",
payload={"reason": "maintenance"}
)
# Listen for signals
received = signals.get_signals(
target_agent="worker-1",
signal_types=["task_assigned"],
since=datetime.now() - timedelta(minutes=5)
)Real-time event streaming with Redis Streams.
from empathy_os.telemetry import EventStreamer
streamer = EventStreamer()
# Publish event
event_id = streamer.publish_event(
event_type="workflow_progress",
data={
"workflow": "code-review",
"stage": "analysis",
"progress": 0.75
}
)
# Consume events (blocking iterator)
for event in streamer.consume_events(
event_types=["agent_heartbeat", "workflow_progress"],
block_ms=5000, # 5 second timeout
count=10 # Max 10 events per batch
):
print(f"[{event.timestamp}] {event.event_type}: {event.data}")
# Get recent events (non-blocking)
recent = streamer.get_recent_events(
event_type="workflow_progress",
count=100
)Pause workflow execution for human approval.
from empathy_os.telemetry import ApprovalGate
# In workflow: Request approval
gate = ApprovalGate(agent_id="deployment-workflow")
approval = gate.request_approval(
approval_type="deploy_to_production",
context={
"version": "2.0.0",
"changes": ["feature-x", "bugfix-y"],
"risk_level": "medium"
},
timeout=300.0 # 5 minutes
)
if approval.approved:
deploy_to_production()
else:
logger.info(f"Deployment rejected: {approval.reason}")
# In UI: Respond to approval
ui_gate = ApprovalGate()
pending = ui_gate.get_pending_approvals()
for request in pending:
# Display to user, get decision
ui_gate.respond_to_approval(
request_id=request.request_id,
approved=True,
responder="user@example.com",
reason="Looks good to deploy"
)Record quality ratings to improve model selection.
from empathy_os.telemetry import FeedbackLoop
feedback = FeedbackLoop()
# Record quality feedback (0.0 = bad, 1.0 = excellent)
feedback.record_feedback(
workflow_name="code-review",
stage_name="analysis",
tier="cheap",
quality_score=0.85,
metadata={
"tokens": 1500,
"latency_ms": 1200
}
)
# Get tier recommendation based on history
recommendation = feedback.recommend_tier(
workflow_name="code-review",
stage_name="analysis",
current_tier="cheap"
)
if recommendation.recommended_tier != recommendation.current_tier:
print(f"Upgrade: {recommendation.current_tier} → {recommendation.recommended_tier}")
print(f"Reason: {recommendation.reason}")
print(f"Confidence: {recommendation.confidence:.1%}")
# Get quality statistics
stats = feedback.get_quality_stats(
workflow_name="code-review",
stage_name="analysis",
tier="cheap"
)
print(f"Average quality: {stats.avg_quality:.2f}")
print(f"Sample count: {stats.sample_count}")
print(f"Trend: {'📈' if stats.recent_trend > 0 else '📉'}")Automatically enabled for all LLM calls. Track cache performance:
# CLI: View cache statistics
empathy cache stats
# Python API
from empathy_os.telemetry import UsageTracker
tracker = UsageTracker.get_instance()
stats = tracker.get_stats(days=7)
cache_hit_rate = stats['cache_hit_rate']
savings = stats['cache_savings']
print(f"Cache hit rate: {cache_hit_rate:.1f}%")
print(f"Cost savings: ${savings:.2f}")Submit non-urgent tasks for asynchronous processing:
# CLI: Submit batch
empathy batch submit requests.json
# CLI: Check status
empathy batch status msgbatch_abc123
# CLI: Get results
empathy batch results msgbatch_abc123 output.json
# CLI: Wait for completion
empathy batch wait msgbatch_abc123 output.json --poll-interval 300Automatically uses cheapest model that meets quality requirements:
# Enable in workflow
workflow = CodeReviewWorkflow(enable_adaptive_routing=True)
# Or manually check recommendations
from empathy_os.models import AdaptiveModelRouter
from empathy_os.telemetry import UsageTracker
router = AdaptiveModelRouter(UsageTracker.get_instance())
model = router.get_best_model(
workflow="code-review",
stage="analysis",
max_cost=0.01
)Accurate cost estimation before API calls:
from empathy_llm_toolkit.providers import AnthropicProvider
provider = AnthropicProvider()
# Estimate tokens
messages = [{"role": "user", "content": "Hello, world!"}]
tokens = provider.estimate_tokens(messages)
print(f"Estimated tokens: {tokens}")
# Calculate cost
cost = provider.calculate_actual_cost(
input_tokens=tokens,
output_tokens=500,
cache_creation_tokens=0,
cache_read_tokens=1000
)
print(f"Estimated cost: ${cost:.4f}")# List available workflows
empathy workflow list
# Run workflow
empathy workflow run code-review --input '{"path": "./src"}'
# Run with JSON output
empathy workflow run bug-predict --json# Show usage statistics
empathy telemetry show --limit 100
# Export to CSV
empathy telemetry export output.csv --format csv
# Reset telemetry data
empathy telemetry reset# Submit batch job
empathy batch submit requests.json
# Check batch status
empathy batch status msgbatch_abc123 [--json]
# Get batch results
empathy batch results msgbatch_abc123 output.json
# Wait for completion
empathy batch wait msgbatch_abc123 output.json --poll-interval 300 --timeout 86400# View cache statistics
empathy cache stats [--verbose] [--json]# View routing statistics
empathy routing stats code-review [--stage analysis] [--days 7]
# Check tier upgrade recommendations
empathy routing check code-review [--stage analysis]
empathy routing check --all
# Compare model performance
empathy routing models --provider anthropic [--days 30]# Required
export ANTHROPIC_API_KEY="your-api-key-here"
# Optional
export EMPATHY_LOG_LEVEL="INFO" # DEBUG, INFO, WARNING, ERROR
export EMPATHY_CACHE_DIR="~/.empathy" # Cache directory
export EMPATHY_REDIS_URL="redis://localhost:6379" # Redis connectionCreate empathy.config.yml:
# Provider settings
provider: anthropic
api_key: ${ANTHROPIC_API_KEY}
# Model tier defaults
tier_defaults:
cheap: claude-3-5-haiku-20241022
capable: claude-sonnet-4-5
premium: claude-opus-4-5
# Telemetry settings
telemetry:
enabled: true
retention_days: 30
export_format: csv
# Adaptive routing
adaptive_routing:
enabled: true
min_samples: 10
quality_threshold: 0.7
failure_rate_threshold: 0.2
# Batch processing
batch:
poll_interval: 300 # 5 minutes
timeout: 86400 # 24 hours
# Redis (optional)
redis:
url: redis://localhost:6379
enabled: falseLoad configuration:
from empathy_os.config import load_config
config = load_config("empathy.config.yml")import asyncio
from empathy_os.workflows import CodeReviewWorkflow
from empathy_os.telemetry import UsageTracker
async def review_codebase():
"""Review codebase with cost optimization."""
# Enable adaptive routing for cost optimization
workflow = CodeReviewWorkflow(
enable_adaptive_routing=True,
enable_heartbeat_tracking=True
)
# Execute review
result = await workflow.execute({
"path": "./src",
"focus_areas": ["security", "performance"]
})
# Show results and cost
print(f"\n✓ Review complete")
print(f" Issues found: {len(result['issues'])}")
print(f" Cost: ${result['cost']:.4f}")
print(f" Tokens: {result['tokens']['total']:,}")
# Show cost savings
tracker = UsageTracker.get_instance()
stats = tracker.get_stats(days=1)
print(f"\n💰 Cost Optimization")
print(f" Cache hit rate: {stats['cache_hit_rate']:.1f}%")
print(f" Cache savings: ${stats['cache_savings']:.4f}")
print(f" Total cost: ${stats['total_cost']:.4f}")
if __name__ == "__main__":
asyncio.run(review_codebase())import asyncio
from empathy_os.workflows.batch_processing import (
BatchProcessingWorkflow,
BatchRequest
)
async def analyze_logs_batch():
"""Analyze logs in batch for 50% cost savings."""
workflow = BatchProcessingWorkflow()
# Create batch requests from log files
requests = []
for i, log_file in enumerate(["app.log", "error.log", "access.log"]):
with open(log_file) as f:
logs = f.read()
requests.append(BatchRequest(
task_id=f"log_{i}",
task_type="analyze_logs",
input_data={"logs": logs},
model_tier="cheap" # Use cheapest tier for bulk analysis
))
print(f"📤 Submitting {len(requests)} log analysis tasks...")
# Execute batch (50% cheaper, processes within 24 hours)
results = await workflow.execute_batch(
requests=requests,
poll_interval=300, # Check every 5 minutes
timeout=3600 # 1 hour timeout
)
# Process results
success_count = sum(1 for r in results if r.success)
print(f"\n✓ Batch complete: {success_count}/{len(results)} successful")
for result in results:
if result.success:
print(f"\n{result.task_id}:")
print(f" Issues: {result.output['content']}")
else:
print(f"\n{result.task_id}: ✗ {result.error}")
if __name__ == "__main__":
asyncio.run(analyze_logs_batch())from empathy_os.telemetry import EventStreamer, HeartbeatCoordinator
import asyncio
async def dashboard_monitor():
"""Monitor agent activity in real-time."""
streamer = EventStreamer()
print("📊 Real-Time Agent Dashboard")
print("=" * 50)
# Consume events in real-time
for event in streamer.consume_events(
event_types=["agent_heartbeat", "coordination_signal"],
block_ms=5000 # 5 second timeout
):
# Display event
if event.event_type == "agent_heartbeat":
data = event.data
agent_id = data.get("agent_id", "unknown")
status = data.get("status", "unknown")
progress = data.get("progress", 0) * 100
print(f"[{event.timestamp}] Agent {agent_id}: {status} ({progress:.0f}%)")
elif event.event_type == "coordination_signal":
data = event.data
signal = data.get("signal_type", "unknown")
target = data.get("target_agent", "all")
print(f"[{event.timestamp}] Signal: {signal} → {target}")
if __name__ == "__main__":
asyncio.run(dashboard_monitor())For complete API reference, see:
- Documentation: https://empathy-framework.vercel.app
- GitHub: https://github.com/Smart-AI-Memory/empathy-framework
- Issues: https://github.com/Smart-AI-Memory/empathy-framework/issues
- Discussions: https://github.com/Smart-AI-Memory/empathy-framework/discussions
Last Updated: January 27, 2026 Framework Version: 5.1.0 License: Fair Source License 0.9