Skip to content

Latest commit

 

History

History
892 lines (669 loc) · 20.3 KB

File metadata and controls

892 lines (669 loc) · 20.3 KB
Error in user YAML: (<unknown>): mapping values are not allowed in this context at line 1 column 70
---
description: Empathy Framework - User API Documentation API reference: **Version:** 5.1.0 **Last Updated:** January 27, 2026 **Status:** Anthropic-Native (Claude-only) ---
---

Empathy Framework - User API Documentation

Version: 5.1.0 Last Updated: January 27, 2026 Status: Anthropic-Native (Claude-only)


Table of Contents

  1. Quick Start
  2. Core APIs
  3. Workflows
  4. Telemetry & Monitoring
  5. Cost Optimization
  6. CLI Reference
  7. Configuration
  8. Examples

Quick Start

Installation

pip install empathy-framework

Basic Usage

from empathy_os.workflows import CodeReviewWorkflow

# Create workflow instance
workflow = CodeReviewWorkflow()

# Execute asynchronously
result = await workflow.execute({
    "path": "./src",
    "files": ["main.py", "utils.py"]
})

print(f"Review complete: {result['status']}")
print(f"Cost: ${result['cost']:.4f}")

Core APIs

1. Workflows

Base class for all workflows with multi-tier LLM routing.

BaseWorkflow

from empathy_os.workflows.base import BaseWorkflow, ModelTier

class CustomWorkflow(BaseWorkflow):
    """Custom workflow with adaptive routing."""

    name = "custom-workflow"
    stages = ["preparation", "analysis", "synthesis"]

    def __init__(self, enable_adaptive_routing: bool = True):
        super().__init__(
            enable_heartbeat_tracking=True,
            enable_adaptive_routing=enable_adaptive_routing
        )

    async def run_stage(
        self,
        stage_name: str,
        tier: ModelTier,
        input_data: dict
    ) -> tuple[dict, int, int]:
        """Execute a workflow stage.

        Args:
            stage_name: Name of the stage
            tier: Model tier (CHEAP, CAPABLE, PREMIUM)
            input_data: Input data for the stage

        Returns:
            Tuple of (result_dict, tokens_in, tokens_out)
        """
        # Your stage logic here
        system_prompt = f"You are executing: {stage_name}"
        user_message = f"Process: {input_data}"

        response = await self._call_llm(
            tier=tier,
            system=system_prompt,
            user_message=user_message
        )

        return response, tokens_in, tokens_out

Key Methods:

Method Description Returns
execute(input_data: dict) Execute workflow dict with status, cost, tokens
get_tier_for_stage(stage: str) Get tier for stage ModelTier enum
_call_llm(tier, system, user_message) Call LLM with tier LLM response

Model Tiers:

from empathy_os.workflows.base import ModelTier

ModelTier.CHEAP      # claude-3-5-haiku-20241022 (~$0.001/call)
ModelTier.CAPABLE    # claude-sonnet-4-5 (~$0.008/call)
ModelTier.PREMIUM    # claude-opus-4-5 (~$0.070/call)

2. Telemetry & Usage Tracking

Track LLM usage, costs, and performance metrics.

UsageTracker

from empathy_os.telemetry import UsageTracker

# Get singleton instance
tracker = UsageTracker.get_instance()

# Track LLM call
tracker.track_llm_call(
    workflow="code-review",
    stage="analysis",
    tier="CHEAP",
    model="claude-3-5-haiku-20241022",
    provider="anthropic",
    cost=0.0016,
    tokens={"input": 1000, "output": 500},
    cache_hit=True,
    cache_type="prompt",
    duration_ms=1200,
    success=True
)

# Get statistics
stats = tracker.get_stats(days=7)

print(f"Total calls: {stats['total_calls']:,}")
print(f"Total cost: ${stats['total_cost']:.2f}")
print(f"Cache hit rate: {stats['cache_hit_rate']:.1f}%")
print(f"Avg cost per call: ${stats['avg_cost_per_call']:.4f}")

# Get recent entries
entries = tracker.get_recent_entries(limit=100, days=7)
for entry in entries:
    print(f"[{entry['timestamp']}] {entry['workflow']}: ${entry['cost']:.4f}")

Key Methods:

Method Description Returns
track_llm_call(**kwargs) Record LLM API call None
get_stats(days: int) Get aggregate statistics dict
get_recent_entries(limit, days) Get recent entries list[dict]
export_to_csv(filepath) Export to CSV None

3. Adaptive Model Routing

Automatically select optimal model based on historical performance.

AdaptiveModelRouter

from empathy_os.models import AdaptiveModelRouter
from empathy_os.telemetry import UsageTracker

# Initialize router
tracker = UsageTracker.get_instance()
router = AdaptiveModelRouter(telemetry=tracker)

# Get best model for workflow/stage
model = router.get_best_model(
    workflow="code-review",
    stage="analysis",
    max_cost=0.01,           # Max $0.01 per call
    max_latency_ms=5000,     # Max 5s latency
    min_success_rate=0.9     # Min 90% success
)

print(f"Using: {model}")  # claude-3-5-haiku-20241022

# Check for tier upgrade recommendation
should_upgrade, reason = router.recommend_tier_upgrade(
    workflow="code-review",
    stage="analysis"
)

if should_upgrade:
    print(f"⚠️ Upgrade recommended: {reason}")
    # Upgrade from CHEAP → CAPABLE → PREMIUM

Key Methods:

Method Description Returns
get_best_model(workflow, stage, ...) Get optimal model str (model ID)
recommend_tier_upgrade(workflow, stage) Check if upgrade needed (bool, str)
get_routing_stats(workflow, days) Get routing statistics dict

Decision Logic:

  • Quality Score: (success_rate × 100) - (cost × 10)
  • Upgrade Trigger: Failure rate > 20% in last 20 calls
  • Downgrade Consideration: Success rate > 90% with high cost

Workflows

Built-in Workflows

1. Code Review Workflow

from empathy_os.workflows import CodeReviewWorkflow

workflow = CodeReviewWorkflow()

result = await workflow.execute({
    "path": "./src",
    "files": ["main.py", "utils.py"],
    "focus_areas": ["security", "performance"]
})

# Output
{
    "status": "success",
    "issues": [
        {"file": "main.py", "line": 42, "severity": "high", "issue": "SQL injection risk"},
        {"file": "utils.py", "line": 15, "severity": "medium", "issue": "Inefficient loop"}
    ],
    "suggestions": [...],
    "cost": 0.0234,
    "tokens": {"input": 15000, "output": 3000}
}

2. Test Generation Workflow

from empathy_os.workflows import TestGenerationWorkflow

workflow = TestGenerationWorkflow()

result = await workflow.execute({
    "source_file": "./src/utils.py",
    "functions": ["parse_config", "validate_input"],
    "framework": "pytest"
})

# Output
{
    "status": "success",
    "test_file": "./tests/test_utils.py",
    "tests_generated": 12,
    "coverage_estimate": 0.85,
    "cost": 0.0189
}

3. Batch Processing Workflow (50% Cost Savings)

from empathy_os.workflows.batch_processing import (
    BatchProcessingWorkflow,
    BatchRequest
)

workflow = BatchProcessingWorkflow()

# Create batch requests
requests = [
    BatchRequest(
        task_id="task_1",
        task_type="analyze_logs",
        input_data={"logs": "ERROR: Connection failed..."},
        model_tier="capable"
    ),
    BatchRequest(
        task_id="task_2",
        task_type="generate_report",
        input_data={"data": {...}},
        model_tier="cheap"
    )
]

# Execute batch (processes within 24 hours at 50% cost)
results = await workflow.execute_batch(
    requests=requests,
    poll_interval=300,    # Check every 5 minutes
    timeout=86400         # 24 hour max wait
)

# Check results
for result in results:
    if result.success:
        print(f"✓ {result.task_id}: {result.output['content']}")
    else:
        print(f"✗ {result.task_id}: {result.error}")

Telemetry & Monitoring

Agent Tracking

Track agent lifecycle and heartbeats.

from empathy_os.telemetry import HeartbeatCoordinator

# Initialize coordinator
coordinator = HeartbeatCoordinator(
    enable_streaming=True  # Enable real-time event streaming
)

# Start heartbeat for agent
coordinator.start_heartbeat(
    agent_id="my-agent-001",
    metadata={
        "workflow": "code-review",
        "run_id": "abc123"
    }
)

# Update status
coordinator.beat(
    status="running",
    progress=0.5,
    current_stage="analysis"
)

# Complete
coordinator.complete(
    status="success",
    final_state={"issues_found": 3}
)

Inter-Agent Coordination

Send signals between agents.

from empathy_os.telemetry import CoordinationSignals

# Initialize signals
signals = CoordinationSignals(
    agent_id="orchestrator",
    enable_streaming=True
)

# Send signal to specific agent
signals.signal(
    signal_type="task_assigned",
    target_agent="worker-1",
    payload={"task_id": "t123", "priority": "high"}
)

# Broadcast to all agents
signals.broadcast(
    signal_type="shutdown_requested",
    payload={"reason": "maintenance"}
)

# Listen for signals
received = signals.get_signals(
    target_agent="worker-1",
    signal_types=["task_assigned"],
    since=datetime.now() - timedelta(minutes=5)
)

Event Streaming

Real-time event streaming with Redis Streams.

from empathy_os.telemetry import EventStreamer

streamer = EventStreamer()

# Publish event
event_id = streamer.publish_event(
    event_type="workflow_progress",
    data={
        "workflow": "code-review",
        "stage": "analysis",
        "progress": 0.75
    }
)

# Consume events (blocking iterator)
for event in streamer.consume_events(
    event_types=["agent_heartbeat", "workflow_progress"],
    block_ms=5000,    # 5 second timeout
    count=10          # Max 10 events per batch
):
    print(f"[{event.timestamp}] {event.event_type}: {event.data}")

# Get recent events (non-blocking)
recent = streamer.get_recent_events(
    event_type="workflow_progress",
    count=100
)

Human Approval Gates

Pause workflow execution for human approval.

from empathy_os.telemetry import ApprovalGate

# In workflow: Request approval
gate = ApprovalGate(agent_id="deployment-workflow")

approval = gate.request_approval(
    approval_type="deploy_to_production",
    context={
        "version": "2.0.0",
        "changes": ["feature-x", "bugfix-y"],
        "risk_level": "medium"
    },
    timeout=300.0  # 5 minutes
)

if approval.approved:
    deploy_to_production()
else:
    logger.info(f"Deployment rejected: {approval.reason}")

# In UI: Respond to approval
ui_gate = ApprovalGate()
pending = ui_gate.get_pending_approvals()

for request in pending:
    # Display to user, get decision
    ui_gate.respond_to_approval(
        request_id=request.request_id,
        approved=True,
        responder="user@example.com",
        reason="Looks good to deploy"
    )

Feedback Loop

Record quality ratings to improve model selection.

from empathy_os.telemetry import FeedbackLoop

feedback = FeedbackLoop()

# Record quality feedback (0.0 = bad, 1.0 = excellent)
feedback.record_feedback(
    workflow_name="code-review",
    stage_name="analysis",
    tier="cheap",
    quality_score=0.85,
    metadata={
        "tokens": 1500,
        "latency_ms": 1200
    }
)

# Get tier recommendation based on history
recommendation = feedback.recommend_tier(
    workflow_name="code-review",
    stage_name="analysis",
    current_tier="cheap"
)

if recommendation.recommended_tier != recommendation.current_tier:
    print(f"Upgrade: {recommendation.current_tier}{recommendation.recommended_tier}")
    print(f"Reason: {recommendation.reason}")
    print(f"Confidence: {recommendation.confidence:.1%}")

# Get quality statistics
stats = feedback.get_quality_stats(
    workflow_name="code-review",
    stage_name="analysis",
    tier="cheap"
)

print(f"Average quality: {stats.avg_quality:.2f}")
print(f"Sample count: {stats.sample_count}")
print(f"Trend: {'📈' if stats.recent_trend > 0 else '📉'}")

Cost Optimization

1. Prompt Caching (20-30% Savings)

Automatically enabled for all LLM calls. Track cache performance:

# CLI: View cache statistics
empathy cache stats

# Python API
from empathy_os.telemetry import UsageTracker

tracker = UsageTracker.get_instance()
stats = tracker.get_stats(days=7)

cache_hit_rate = stats['cache_hit_rate']
savings = stats['cache_savings']

print(f"Cache hit rate: {cache_hit_rate:.1f}%")
print(f"Cost savings: ${savings:.2f}")

2. Batch API (50% Savings)

Submit non-urgent tasks for asynchronous processing:

# CLI: Submit batch
empathy batch submit requests.json

# CLI: Check status
empathy batch status msgbatch_abc123

# CLI: Get results
empathy batch results msgbatch_abc123 output.json

# CLI: Wait for completion
empathy batch wait msgbatch_abc123 output.json --poll-interval 300

3. Adaptive Routing ($2,000/year Potential Savings)

Automatically uses cheapest model that meets quality requirements:

# Enable in workflow
workflow = CodeReviewWorkflow(enable_adaptive_routing=True)

# Or manually check recommendations
from empathy_os.models import AdaptiveModelRouter
from empathy_os.telemetry import UsageTracker

router = AdaptiveModelRouter(UsageTracker.get_instance())
model = router.get_best_model(
    workflow="code-review",
    stage="analysis",
    max_cost=0.01
)

4. Precise Token Counting (>98% Accuracy)

Accurate cost estimation before API calls:

from empathy_llm_toolkit.providers import AnthropicProvider

provider = AnthropicProvider()

# Estimate tokens
messages = [{"role": "user", "content": "Hello, world!"}]
tokens = provider.estimate_tokens(messages)

print(f"Estimated tokens: {tokens}")

# Calculate cost
cost = provider.calculate_actual_cost(
    input_tokens=tokens,
    output_tokens=500,
    cache_creation_tokens=0,
    cache_read_tokens=1000
)

print(f"Estimated cost: ${cost:.4f}")

CLI Reference

Workflow Commands

# List available workflows
empathy workflow list

# Run workflow
empathy workflow run code-review --input '{"path": "./src"}'

# Run with JSON output
empathy workflow run bug-predict --json

Telemetry Commands

# Show usage statistics
empathy telemetry show --limit 100

# Export to CSV
empathy telemetry export output.csv --format csv

# Reset telemetry data
empathy telemetry reset

Batch Processing Commands

# Submit batch job
empathy batch submit requests.json

# Check batch status
empathy batch status msgbatch_abc123 [--json]

# Get batch results
empathy batch results msgbatch_abc123 output.json

# Wait for completion
empathy batch wait msgbatch_abc123 output.json --poll-interval 300 --timeout 86400

Cache Commands

# View cache statistics
empathy cache stats [--verbose] [--json]

Routing Commands

# View routing statistics
empathy routing stats code-review [--stage analysis] [--days 7]

# Check tier upgrade recommendations
empathy routing check code-review [--stage analysis]
empathy routing check --all

# Compare model performance
empathy routing models --provider anthropic [--days 30]

Configuration

Environment Variables

# Required
export ANTHROPIC_API_KEY="your-api-key-here"

# Optional
export EMPATHY_LOG_LEVEL="INFO"           # DEBUG, INFO, WARNING, ERROR
export EMPATHY_CACHE_DIR="~/.empathy"     # Cache directory
export EMPATHY_REDIS_URL="redis://localhost:6379"  # Redis connection

Configuration File

Create empathy.config.yml:

# Provider settings
provider: anthropic
api_key: ${ANTHROPIC_API_KEY}

# Model tier defaults
tier_defaults:
  cheap: claude-3-5-haiku-20241022
  capable: claude-sonnet-4-5
  premium: claude-opus-4-5

# Telemetry settings
telemetry:
  enabled: true
  retention_days: 30
  export_format: csv

# Adaptive routing
adaptive_routing:
  enabled: true
  min_samples: 10
  quality_threshold: 0.7
  failure_rate_threshold: 0.2

# Batch processing
batch:
  poll_interval: 300        # 5 minutes
  timeout: 86400            # 24 hours

# Redis (optional)
redis:
  url: redis://localhost:6379
  enabled: false

Load configuration:

from empathy_os.config import load_config

config = load_config("empathy.config.yml")

Examples

Example 1: Cost-Optimized Code Review

import asyncio
from empathy_os.workflows import CodeReviewWorkflow
from empathy_os.telemetry import UsageTracker

async def review_codebase():
    """Review codebase with cost optimization."""

    # Enable adaptive routing for cost optimization
    workflow = CodeReviewWorkflow(
        enable_adaptive_routing=True,
        enable_heartbeat_tracking=True
    )

    # Execute review
    result = await workflow.execute({
        "path": "./src",
        "focus_areas": ["security", "performance"]
    })

    # Show results and cost
    print(f"\n✓ Review complete")
    print(f"  Issues found: {len(result['issues'])}")
    print(f"  Cost: ${result['cost']:.4f}")
    print(f"  Tokens: {result['tokens']['total']:,}")

    # Show cost savings
    tracker = UsageTracker.get_instance()
    stats = tracker.get_stats(days=1)

    print(f"\n💰 Cost Optimization")
    print(f"  Cache hit rate: {stats['cache_hit_rate']:.1f}%")
    print(f"  Cache savings: ${stats['cache_savings']:.4f}")
    print(f"  Total cost: ${stats['total_cost']:.4f}")

if __name__ == "__main__":
    asyncio.run(review_codebase())

Example 2: Batch Log Analysis (50% Cheaper)

import asyncio
from empathy_os.workflows.batch_processing import (
    BatchProcessingWorkflow,
    BatchRequest
)

async def analyze_logs_batch():
    """Analyze logs in batch for 50% cost savings."""

    workflow = BatchProcessingWorkflow()

    # Create batch requests from log files
    requests = []
    for i, log_file in enumerate(["app.log", "error.log", "access.log"]):
        with open(log_file) as f:
            logs = f.read()

        requests.append(BatchRequest(
            task_id=f"log_{i}",
            task_type="analyze_logs",
            input_data={"logs": logs},
            model_tier="cheap"  # Use cheapest tier for bulk analysis
        ))

    print(f"📤 Submitting {len(requests)} log analysis tasks...")

    # Execute batch (50% cheaper, processes within 24 hours)
    results = await workflow.execute_batch(
        requests=requests,
        poll_interval=300,    # Check every 5 minutes
        timeout=3600          # 1 hour timeout
    )

    # Process results
    success_count = sum(1 for r in results if r.success)
    print(f"\n✓ Batch complete: {success_count}/{len(results)} successful")

    for result in results:
        if result.success:
            print(f"\n{result.task_id}:")
            print(f"  Issues: {result.output['content']}")
        else:
            print(f"\n{result.task_id}: ✗ {result.error}")

if __name__ == "__main__":
    asyncio.run(analyze_logs_batch())

Example 3: Real-Time Dashboard

from empathy_os.telemetry import EventStreamer, HeartbeatCoordinator
import asyncio

async def dashboard_monitor():
    """Monitor agent activity in real-time."""

    streamer = EventStreamer()

    print("📊 Real-Time Agent Dashboard")
    print("=" * 50)

    # Consume events in real-time
    for event in streamer.consume_events(
        event_types=["agent_heartbeat", "coordination_signal"],
        block_ms=5000  # 5 second timeout
    ):
        # Display event
        if event.event_type == "agent_heartbeat":
            data = event.data
            agent_id = data.get("agent_id", "unknown")
            status = data.get("status", "unknown")
            progress = data.get("progress", 0) * 100

            print(f"[{event.timestamp}] Agent {agent_id}: {status} ({progress:.0f}%)")

        elif event.event_type == "coordination_signal":
            data = event.data
            signal = data.get("signal_type", "unknown")
            target = data.get("target_agent", "all")

            print(f"[{event.timestamp}] Signal: {signal}{target}")

if __name__ == "__main__":
    asyncio.run(dashboard_monitor())

API Reference

For complete API reference, see:


Support


Last Updated: January 27, 2026 Framework Version: 5.1.0 License: Fair Source License 0.9