The Node Runtime is a .NET Worker Service that executes business process agents on behalf of the Control Plane. It implements the worker node architecture described in the System Architecture Document.
The Node Runtime provides:
- Node Registration: Registers with the Control Plane on startup
- Heartbeat Management: Sends periodic heartbeats to report node health and capacity
- Lease Pull Loop: Streams work assignments (leases) from the Control Plane via gRPC
- Agent Execution: Executes agents using Microsoft Agent Framework (MAF) SDK with budget enforcement
- OpenTelemetry Integration: Full observability with metrics, traces, and logs
┌─────────────────────────────────────────────────────────────┐
│ Node Runtime │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Worker │────▶│Registration │ │ Heartbeat │ │
│ │ Service │ │ Service │ │ Timer │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ │ └────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Lease Pull │◀────────────────────────────────────────│
│ │ Service │ gRPC Stream (Pull) │
│ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Agent │ Microsoft Agent Framework (MAF) │
│ │ Executor │ - Budget enforcement (tokens/time) │
│ └──────────────┘ - Token estimation & cost tracking │
└─────────────────────────────────────────────────────────────┘
│ ▲
│ HTTP/gRPC │ gRPC
▼ │
┌─────────────────────────────────────────────────────────────┐
│ Control Plane API │
│ - Node Registration (/v1/nodes:register) │
│ - Node Heartbeat (/v1/nodes/{id}:heartbeat) │
│ - Lease Service (gRPC Pull, Ack, Complete, Fail) │
└─────────────────────────────────────────────────────────────┘
Configuration is managed through appsettings.json:
{
"NodeRuntime": {
"NodeId": "node-1",
"ControlPlaneUrl": "http://localhost:5109",
"MaxConcurrentLeases": 5,
"HeartbeatIntervalSeconds": 30,
"Capacity": {
"Slots": 8,
"Cpu": "4",
"Memory": "8Gi"
},
"Metadata": {
"Region": "us-east-1",
"Environment": "development"
}
},
"OpenTelemetry": {
"ServiceName": "Node.Runtime",
"ServiceVersion": "1.0.0",
"OtlpExporter": {
"Endpoint": "http://localhost:4317",
"Protocol": "grpc"
},
"Traces": {
"Enabled": true,
"SamplingRatio": 1.0
},
"Metrics": {
"Enabled": true
}
}
}- NodeId: Unique identifier for this node instance (required)
- ControlPlaneUrl: Base URL of the Control Plane API (required)
- MaxConcurrentLeases: Maximum number of concurrent agent executions (default: 5)
- HeartbeatIntervalSeconds: Interval between heartbeat updates (default: 30)
- Capacity: Node resource capacity
- Slots: Total execution slots available (default: 8)
- Cpu: CPU allocation (e.g., "4" cores)
- Memory: Memory allocation (e.g., "8Gi")
- Metadata: Custom metadata for placement constraints
- Region: Geographic region (e.g., "us-east-1")
- Environment: Environment name (e.g., "production", "staging", "development")
- DefaultModel: Default model to use for agent execution (default: "gpt-4")
- DefaultTemperature: Default temperature for model inference (default: 0.7)
- MaxTokens: Maximum tokens allowed per agent execution (default: 4000)
- MaxDurationSeconds: Maximum duration in seconds for agent execution (default: 60)
- ServiceName: Service name for telemetry (default: "Node.Runtime")
- ServiceVersion: Service version (default: "1.0.0")
- OtlpExporter: OTLP exporter configuration
- Endpoint: OTLP endpoint URL (default: "http://localhost:4317")
- Protocol: Protocol type - "grpc" or "http/protobuf" (default: "grpc")
- ConsoleExporter: Console exporter configuration
- Enabled: Enable console output for telemetry (default: false)
- Traces: Distributed tracing configuration
- Enabled: Enable tracing (default: true)
- SamplingRatio: Sampling ratio from 0.0 to 1.0 (default: 1.0)
- Metrics: Metrics configuration
- Enabled: Enable metrics collection (default: true)
dotnet buildcd src/Node.Runtime
dotnet runBefore running the Node Runtime, ensure the following services are available:
- Control Plane API: Must be running and accessible at the configured
ControlPlaneUrl - OpenTelemetry Collector (optional): For telemetry export, running at the configured OTLP endpoint
Run unit tests:
dotnet test tests/Node.Runtime.Tests/Run all solution tests:
dotnet test- Registration: Node registers with Control Plane via
POST /v1/nodes:register - Heartbeat Timer: Starts periodic heartbeat updates
- Lease Pull Loop: Initiates gRPC stream to receive work assignments
- Lease Reception: Receives leases via gRPC
Pullstream - Lease Acknowledgment: Sends
Ackfor each received lease - Agent Execution: Executes agent using MAF SDK with budget constraints
- Result Reporting: Sends
CompleteorFailvia gRPC with timing and cost information
- Stop Heartbeat: Cancels heartbeat timer
- Stop Lease Pull: Gracefully closes gRPC stream
- Cleanup: Releases resources and stops all background services
Handles HTTP communication with the Control Plane for node lifecycle:
- RegisterNodeAsync: Registers the node with capacity and metadata
- SendHeartbeatAsync: Sends periodic heartbeat with active runs and available slots
Executes agents using Microsoft Agent Framework SDK in-process:
- ExecuteAsync: Executes an agent with the given input and budget constraints
- Applies token and duration limits from budget constraints
- Estimates token usage and cost
- Handles timeouts and errors gracefully
- Returns detailed execution results with timing and cost information
Note: This service is now superseded by SandboxExecutorService for production use. It is kept for reference and fallback scenarios.
Primary agent execution service that executes agents in isolated sandbox processes with comprehensive budget enforcement:
Features:
- Process Isolation: Spawns each agent execution in a separate
Agent.Hostprocess for security and resource isolation - Budget Enforcement: Enforces token limits and execution time budgets with process-level timeout
- Graceful Cleanup: Automatically terminates processes that exceed budgets and cleans up resources
- IPC Communication: Uses stdin/stdout with JSON serialization for inter-process communication
- Error Handling: Robust error handling with process exit code validation and stderr capture
- Metadata Tracking: Includes process ID and sandbox status in execution results
Architecture:
Node.Runtime (Host Process)
↓
SandboxExecutorService
↓ spawn
Agent.Host (Isolated Process)
↓ stdin: AgentExecutionRequest (JSON)
↓ stdout: AgentExecutionResponse (JSON)
↓ runs MAF SDK
↓ enforces budgets
↓ returns result
Budget Enforcement:
- Process Timeout: Maximum duration enforced at process level with 5s buffer for IPC overhead
- Token Limits: Passed to agent executor within the sandbox process
- Process Termination: Entire process tree killed if timeout exceeded
- Resource Cleanup: Automatic cleanup of process resources on completion or timeout
Note: The agent executor requires Azure AI Foundry or OpenAI credentials to be configured (task E3-T4). Until then, execution will return a NotImplementedException indicating the chat client needs configuration.
Manages gRPC communication for work assignment with enhanced error handling and telemetry:
- StartAsync: Initiates gRPC
Pullstream to receive leases - StopAsync: Gracefully stops lease streaming
- AcknowledgeLeaseAsync: Sends acknowledgment for received leases
- ProcessLeaseAsync: Executes agent via AgentExecutorService and reports results
Key Features:
- Exponential Backoff Retry: Automatic reconnection with exponential backoff (2^attempt seconds, max 60s) and jitter (0-2s)
- Comprehensive Telemetry: Full instrumentation with activities, counters, and histograms
- Concurrency Control: Tracks active leases and available slots using semaphores
- Error Categorization: Distinguishes between cancellation, transient errors, and permanent failures
Background service that orchestrates the node runtime lifecycle:
- Registers node on startup
- Manages heartbeat timer
- Coordinates lease pull service
- Handles graceful shutdown
The Node Runtime uses the LeaseService gRPC contract defined in lease_service.proto:
rpc Pull(PullRequest) returns (stream Lease);Streams work assignments from the Control Plane to the node.
rpc Ack(AckRequest) returns (AckResponse);Acknowledges receipt of a lease.
rpc Complete(CompleteRequest) returns (CompleteResponse);Reports successful completion of a run with timing and cost information.
rpc Fail(FailRequest) returns (FailResponse);Reports failure of a run with error details and retry information.
The Node Runtime includes comprehensive OpenTelemetry instrumentation for end-to-end observability (E2-T9):
The Node.Runtime.Observability.TelemetryConfig class provides centralized configuration for all telemetry:
Activity Source: Node.Runtime v1.0.0 for distributed tracing
Meter: Node.Runtime v1.0.0 for custom metrics
The following custom metrics are automatically collected:
Counters:
leases_received_total- Total number of leases received from control planeleases_acknowledged_total- Total number of leases acknowledgedleases_completed_total- Total number of leases completed successfullyleases_failed_total- Total number of leases that failedagent_executions_total- Total number of agent executionsagent_execution_errors_total- Total number of agent execution errorslease_stream_errors_total- Total number of lease stream errorslease_stream_reconnects_total- Total number of lease stream reconnection attempts
Histograms:
lease_processing_duration_ms- Duration of lease processing in millisecondsagent_execution_duration_ms- Duration of agent execution in millisecondsagent_tokens_total- Total tokens used per agent executionagent_cost_usd- Cost of agent execution in USD
Observable Gauges:
active_leases- Current number of active leases being processedavailable_slots- Current number of available slots for lease processing
Automatic Instrumentation:
- HTTP client calls to Control Plane
- gRPC client calls to LeaseService
- .NET runtime metrics (GC, thread pool, etc.)
Distributed traces are automatically created for:
- Lease pull operations:
LeasePullService.PullLeaseswith reconnection tracking - Lease acknowledgment:
LeasePullService.AcknowledgeLeasewith lease and run IDs - Lease processing:
LeasePullService.ProcessLeasewith agent information and execution status - HTTP/gRPC requests: Automatic correlation via trace context propagation
Each trace includes relevant tags (e.g., node.id, lease.id, run.id, agent.id) and correlates with logs via trace_id.
All errors are categorized and tracked with telemetry:
- Stream Errors: Connection failures, network issues, gRPC errors
- Reconnection Attempts: Tracked with exponential backoff metrics
- Agent Execution Errors: Failures during agent processing
- Lease Failures: Failed lease completions with error details
Structured logs are enhanced with OpenTelemetry context:
- Trace correlation: Logs include
trace_idandspan_idfor correlation with traces - Node context: All logs include
node.idfor filtering - Lease context: Processing logs include
lease.idandrun.id - JSON format: Logs are structured for easy parsing and filtering
- OTLP export: Logs are exported to the configured OTLP endpoint for aggregation
Configuration:
Logging instrumentation is configured in Program.cs to send logs to the OTLP collector:
builder.Logging.AddOpenTelemetry(logging =>
{
logging.SetResourceBuilder(resourceBuilder);
logging.IncludeFormattedMessage = true;
logging.IncludeScopes = true;
logging.AddOtlpExporter(options =>
{
options.Endpoint = new Uri(otelConfig.OtlpExporter.Endpoint);
});
});The Node Runtime integrates with the following observability stack:
- Prometheus: Metrics collection and storage via OTLP exporter
- Tempo/Jaeger: Distributed tracing backend via OTLP exporter
- Loki: Log aggregation and querying
- Grafana: Unified dashboards for metrics, traces, and logs
The Node Runtime integrates with Microsoft Agent Framework (MAF) SDK to execute business process agents (E2-T2).
AgentExecutorService (IAgentExecutor)
- Creates and executes agents using MAF SDK
- Enforces budget constraints (token limits, execution timeouts)
- Tracks token usage and estimates costs
- Returns detailed execution results
AgentSpec
- Defines agent configuration including ID, version, name, and instructions
- Specifies model profile and budget constraints
- Passed from Control Plane via gRPC lease specification
Budget Enforcement
- Token Limits: Configurable maximum tokens per execution (default: 4000)
- Time Limits: Configurable maximum duration (default: 60 seconds)
- Cost Tracking: Estimates USD cost based on token usage
Agent runtime behavior is configured via AgentRuntime section in appsettings.json:
{
"AgentRuntime": {
"DefaultModel": "gpt-4",
"DefaultTemperature": 0.7,
"MaxTokens": 4000,
"MaxDurationSeconds": 60
}
}The following MAF SDK packages are included:
Microsoft.Agents.AI(v1.0.0-preview.251028.1)Microsoft.Agents.AI.AzureAI(v1.0.0-preview.251028.1)Microsoft.Agents.AI.OpenAI(v1.0.0-preview.251028.1)
Note: Actual agent execution requires Azure AI Foundry or OpenAI API credentials. This will be configured in E3-T4 (Azure AI Foundry integration).
Until credentials are configured, the agent executor will return a NotImplementedException with a message indicating the chat client needs to be configured. The integration is ready to execute agents once the model provider is set up.
The Node Runtime supports input and output connectors for external message sources and destinations.
The Azure Service Bus input connector (ServiceBusInputConnector) provides reliable message reception from Azure Service Bus queues with the following features:
Key Features:
- Prefetch Support: Configurable prefetch count (default: 16) for improved throughput
- PeekLock Mode: Reliable message processing with manual completion/abandonment
- Dead-Letter Queue (DLQ): Automatic poison message detection and DLQ support
- Telemetry: Full OpenTelemetry instrumentation with activities and metrics
- Error Handling: Robust error handling with detailed logging
Architecture:
Azure Service Bus Queue
↓ ReceiveAsync (prefetch 16)
ServiceBusInputConnector
↓ Message Processing
├─ Complete (success)
├─ Abandon (retry)
└─ DeadLetter (poison/failure)
Configuration:
Service Bus connector is configured via ServiceBusConnector section in appsettings.json:
{
"ServiceBusConnector": {
"ConnectionString": "Endpoint=sb://...;SharedAccessKeyName=...;SharedAccessKey=...",
"QueueName": "invoices",
"PrefetchCount": 16,
"MaxConcurrentCalls": 5,
"MaxWaitTime": "00:00:05",
"MaxDeliveryCount": 3,
"AutoComplete": false,
"ReceiveMode": "PeekLock"
}
}Configuration Options:
- ConnectionString: Azure Service Bus connection string or fully qualified namespace
- QueueName: Name of the queue to receive messages from (required)
- PrefetchCount: Number of messages to prefetch (default: 16, per SAD)
- MaxConcurrentCalls: Maximum concurrent message processing operations (default: 5)
- MaxWaitTime: Maximum wait time for receiving messages (default: 5 seconds)
- MaxDeliveryCount: Maximum delivery attempts before DLQ (default: 3, per SAD)
- AutoComplete: Auto-complete messages after processing (default: false for manual control)
- ReceiveMode: "PeekLock" for reliable processing or "ReceiveAndDelete" for at-most-once delivery
Poison Message Detection:
The connector automatically detects poison messages (messages that exceed MaxDeliveryCount) and logs warnings. Applications can use the DeadLetterMessageAsync method to explicitly move messages to the DLQ:
if (message.DeliveryCount > maxDeliveryCount)
{
await connector.DeadLetterMessageAsync(
message,
"MaxDeliveryCountExceeded",
$"Message exceeded {maxDeliveryCount} delivery attempts");
}Usage Example:
// Initialize connector
var connector = serviceProvider.GetRequiredService<IInputConnector>();
await connector.InitializeAsync();
// Receive messages
var messages = await connector.ReceiveMessagesAsync(
maxMessages: 10,
maxWaitTime: TimeSpan.FromSeconds(5));
foreach (var message in messages)
{
try
{
// Process message
await ProcessMessageAsync(message);
// Complete on success
await connector.CompleteMessageAsync(message);
}
catch (Exception ex)
{
// Abandon for retry or dead-letter on permanent failure
if (IsPermanentError(ex))
{
await connector.DeadLetterMessageAsync(message, "ProcessingError", ex.Message);
}
else
{
await connector.AbandonMessageAsync(message);
}
}
}
// Close connector
await connector.CloseAsync();Telemetry:
The Service Bus connector includes comprehensive observability:
- Activities:
ServiceBusInputConnector.Initialize,ReceiveMessages,CompleteMessage,AbandonMessage,DeadLetterMessage - Tags:
queue.name,prefetch.count,message.id,messages.received,reason - Error Tracking: Automatic error status recording in activities
Dependencies:
Azure.Messaging.ServiceBus(v7.18.3)
Next Connector:
- ⏳ E2-T7: HTTP output connector with retry/backoff and idempotency
Node Runtime core functionality is implemented and enhanced with comprehensive telemetry and error handling:
- ✅ E2-T1: Node runtime skeleton (Complete)
- ✅ E2-T2: Integrate MAF runtime (Complete)
- ✅ E2-T3: Node registration (Complete)
- ✅ E2-T4: Lease pull loop (Complete)
- ✅ E2-T5: Sandbox process model with budget enforcement (Complete)
- ✅ E2-T6: Service Bus connector (Complete)
- ⏳ E2-T7: HTTP output connector
- ⏳ E2-T8: DLQ handling
- ✅ E2-T9: Node telemetry - Complete with metrics, traces, logs, and observable gauges
- ⏳ E2-T10: Secure mTLS communication
- ⏳ E3-T4: Configure Azure AI Foundry credentials for actual LLM execution
src/Node.Runtime/
├── Configuration/ # Configuration options classes
│ ├── NodeRuntimeOptions.cs
│ ├── AgentRuntimeOptions.cs
│ ├── ServiceBusConnectorOptions.cs
│ └── OpenTelemetryOptions.cs
├── Connectors/ # Message connectors (input/output)
│ ├── IMessageConnector.cs
│ ├── IInputConnector.cs
│ └── ServiceBusInputConnector.cs
├── Services/ # Core services
│ ├── NodeRegistrationService.cs
│ ├── AgentExecutorService.cs
│ └── LeasePullService.cs
├── Worker.cs # Background worker service
├── Program.cs # Entry point and DI setup
└── appsettings.json # Configuration
tests/Node.Runtime.Tests/
├── Connectors/ # Connector tests
│ └── ServiceBusInputConnectorTests.cs
└── Services/ # Unit tests
├── NodeRegistrationServiceTests.cs
└── AgentExecutorServiceTests.cs
- Create service interface and implementation in
Services/ - Register service in
Program.cs - Add unit tests in
tests/Node.Runtime.Tests/Services/
Set Logging:LogLevel:Grpc to Debug in appsettings.json to see detailed gRPC communication logs.
See the main repository README for license information.