Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,32 @@ See [Search Servers Documentation](search_servers.md) for complete details.

---

## Observability

Controls the usage-statistics aggregate that powers the Web UI usage graphs
(spec 069). The aggregate is built incrementally from the activity log, kept in
memory as an immutable snapshot, and periodically persisted so it survives
restarts without a full re-scan.

```json
{
"observability": {
"usage_cache_ttl": "5s",
"usage_persist_interval": "30s"
}
}
```

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `usage_cache_ttl` | duration string | `5s` | Freshness bound for the usage endpoint's read cache on wide time windows. |
| `usage_persist_interval` | duration string | `30s` | How often the in-memory usage aggregate snapshot is flushed to storage (also flushed on graceful shutdown). |

Both fields are optional, accept Go duration strings (e.g. `"10s"`, `"1m"`),
and are hot-reloadable. Non-positive values fall back to the defaults.

---

## Complete Example

Here's a complete configuration example with all major sections:
Expand Down
37 changes: 37 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ type Config struct {
// Telemetry settings (Spec 036)
Telemetry *TelemetryConfig `json:"telemetry,omitempty" mapstructure:"telemetry"`

// Observability settings (Spec 069): usage aggregate cache/persistence cadence.
Observability *ObservabilityConfig `json:"observability,omitempty" mapstructure:"observability"`

// Routing mode (Spec 031): how MCP tools are exposed to clients
// Valid values: "retrieve_tools" (default), "direct", "code_execution"
RoutingMode string `json:"routing_mode,omitempty" mapstructure:"routing-mode"`
Expand Down Expand Up @@ -732,6 +735,24 @@ func (c *IntentDeclarationConfig) IsStrictServerValidation() bool {
return c.StrictServerValidation
}

// ObservabilityConfig controls the Spec 069 usage aggregate cadence.
type ObservabilityConfig struct {
// UsageCacheTTL bounds the freshness of the usage endpoint's read cache for
// wide windows (FR-005). Default 5s.
UsageCacheTTL Duration `json:"usage_cache_ttl,omitempty" mapstructure:"usage-cache-ttl" swaggertype:"string"`
// UsagePersistInterval is how often the actor-owned usage aggregate snapshot
// is flushed to storage. Default 30s.
UsagePersistInterval Duration `json:"usage_persist_interval,omitempty" mapstructure:"usage-persist-interval" swaggertype:"string"`
}

// DefaultObservabilityConfig returns the default observability configuration.
func DefaultObservabilityConfig() *ObservabilityConfig {
return &ObservabilityConfig{
UsageCacheTTL: Duration(5 * time.Second),
UsagePersistInterval: Duration(30 * time.Second),
}
}

// ToolRegistration represents a tool registration
type ToolRegistration struct {
Name string `json:"name"`
Expand Down Expand Up @@ -965,6 +986,9 @@ func DefaultConfig() *Config {

// Intent declaration defaults (Spec 018) - strict validation by default for security
IntentDeclaration: DefaultIntentDeclarationConfig(),

// Observability defaults (Spec 069)
Observability: DefaultObservabilityConfig(),
}
}

Expand Down Expand Up @@ -1351,6 +1375,19 @@ func (c *Config) Validate() error {
c.IntentDeclaration = DefaultIntentDeclarationConfig()
}

// Ensure Observability config is not nil and has sane cadence defaults
// (Spec 069). The hot-reload path re-runs Validate, so zeroed fields are
// repaired rather than disabling persistence/caching entirely.
if c.Observability == nil {
c.Observability = DefaultObservabilityConfig()
}
if c.Observability.UsageCacheTTL.Duration() <= 0 {
c.Observability.UsageCacheTTL = Duration(5 * time.Second)
}
if c.Observability.UsagePersistInterval.Duration() <= 0 {
c.Observability.UsagePersistInterval = Duration(30 * time.Second)
}

return nil
}

Expand Down
51 changes: 51 additions & 0 deletions internal/config/observability_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package config

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDefaultObservabilityConfig(t *testing.T) {
o := DefaultObservabilityConfig()
require.NotNil(t, o)
assert.Equal(t, 5*time.Second, o.UsageCacheTTL.Duration())
assert.Equal(t, 30*time.Second, o.UsagePersistInterval.Duration())
}

func TestDefaultConfig_HasObservabilityDefaults(t *testing.T) {
cfg := DefaultConfig()
require.NotNil(t, cfg.Observability)
assert.Equal(t, 5*time.Second, cfg.Observability.UsageCacheTTL.Duration())
assert.Equal(t, 30*time.Second, cfg.Observability.UsagePersistInterval.Duration())
}

func TestValidate_FillsObservabilityDefaults(t *testing.T) {
// A config loaded without an observability block gets defaults applied
// on Validate (hot-reload path re-runs Validate).
cfg := DefaultConfig()
cfg.Observability = nil
require.NoError(t, cfg.Validate())
require.NotNil(t, cfg.Observability)
assert.Equal(t, 5*time.Second, cfg.Observability.UsageCacheTTL.Duration())
assert.Equal(t, 30*time.Second, cfg.Observability.UsagePersistInterval.Duration())

// Zero/negative interval fields are repaired to defaults.
cfg.Observability = &ObservabilityConfig{}
require.NoError(t, cfg.Validate())
assert.Equal(t, 5*time.Second, cfg.Observability.UsageCacheTTL.Duration())
assert.Equal(t, 30*time.Second, cfg.Observability.UsagePersistInterval.Duration())
}

func TestObservabilityConfig_PreservesUserValues(t *testing.T) {
cfg := DefaultConfig()
cfg.Observability = &ObservabilityConfig{
UsageCacheTTL: Duration(2 * time.Second),
UsagePersistInterval: Duration(60 * time.Second),
}
require.NoError(t, cfg.Validate())
assert.Equal(t, 2*time.Second, cfg.Observability.UsageCacheTTL.Duration())
assert.Equal(t, 60*time.Second, cfg.Observability.UsagePersistInterval.Duration())
}
37 changes: 36 additions & 1 deletion internal/runtime/activity_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runtime
import (
"context"
"encoding/json"
"sync/atomic"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -49,11 +50,18 @@ type ActivityService struct {

// Event emitter for sensitive data detection events (Spec 026)
eventEmitter SensitiveDataEventEmitter

// Usage aggregate (Spec 069 A2): actor-owned rollup of tool-call activity.
// Mutated only on this goroutine via Apply; published to readers as an
// immutable snapshot. usagePersistIntervalNs is the hot-reloadable flush
// cadence in nanoseconds.
usage *UsageStore
usagePersistIntervalNs atomic.Int64
}

// NewActivityService creates a new activity service.
func NewActivityService(storage *storage.Manager, logger *zap.Logger) *ActivityService {
return &ActivityService{
s := &ActivityService{
storage: storage,
logger: logger,
eventCh: make(chan Event, 100), // Buffer for non-blocking event delivery
Expand All @@ -62,7 +70,10 @@ func NewActivityService(storage *storage.Manager, logger *zap.Logger) *ActivityS
maxRecords: DefaultRetentionMaxRecords,
checkInterval: DefaultRetentionCheckInterval,
detector: nil, // Detector is optional, set via SetDetector
usage: newUsageStore(),
}
s.usagePersistIntervalNs.Store(int64(DefaultUsagePersistInterval))
return s
}

// SetDetector sets the sensitive data detector for async scanning (Spec 026).
Expand Down Expand Up @@ -103,17 +114,25 @@ func (s *ActivityService) Start(ctx context.Context, rt *Runtime) {
// Start retention loop in a separate goroutine
go s.runRetentionLoop(ctx)

// Spec 069 A2: load/rebuild the usage aggregate before processing events,
// then start the periodic snapshot flush loop.
s.initUsageFromStorage()
go s.runUsageFlushLoop(ctx)

s.logger.Info("Activity service started")

for {
select {
case <-ctx.Done():
s.logger.Info("Activity service shutting down")
// Flush-on-shutdown: persist the final usage snapshot (Spec 069 A2).
s.persistUsage()
close(s.done)
return
case evt, ok := <-eventCh:
if !ok {
s.logger.Info("Activity service event channel closed")
s.persistUsage()
close(s.done)
return
}
Expand Down Expand Up @@ -303,6 +322,13 @@ func (s *ActivityService) handleToolCallCompleted(evt Event) {
zap.String("tool_name", toolName),
zap.String("status", status))

// Fold the persisted call into the usage aggregate (Spec 069 A2). Done
// only on save success so the in-memory rollup stays consistent with a
// cold-start rebuild that re-scans persisted records.
if s.usage != nil {
s.usage.Apply(record)
}

// Run async sensitive data detection (Spec 026)
if s.detector != nil {
go s.runAsyncDetection(record.ID, arguments, response)
Expand Down Expand Up @@ -336,6 +362,15 @@ func (s *ActivityService) handlePolicyDecision(evt Event) {
zap.Error(err),
zap.String("server_name", serverName),
zap.String("decision", decision))
return
}

// Fold blocked attempts into the usage aggregate (Spec 069 A2). Apply
// ignores non-blocked decisions, so passing every policy decision is safe.
// Done only on save success so the in-memory rollup stays consistent with a
// cold-start rebuild that re-scans persisted records.
if s.usage != nil {
s.usage.Apply(record)
}
}

Expand Down
36 changes: 36 additions & 0 deletions internal/runtime/apply_config_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path/filepath"
"runtime"
"testing"
"time"

"go.uber.org/zap"

Expand Down Expand Up @@ -107,6 +108,41 @@ func TestApplyConfig_HotReloadableChange(t *testing.T) {
assert.Equal(t, 20, savedCfg.ToolsLimit, "Config file should be updated with new ToolsLimit value")
}

// TestApplyConfig_ObservabilityHotReload (MCP-835 / Codex finding #3): changing
// the observability usage persist interval must hot-reload into the running
// ActivityService — previously ApplyConfig only handled logging/truncator, so
// SetUsagePersistInterval's "hot-reloadable" promise was unfulfilled.
func TestApplyConfig_ObservabilityHotReload(t *testing.T) {
tmpDir := t.TempDir()
cfgPath := filepath.Join(tmpDir, "config.json")

initialCfg := config.DefaultConfig()
initialCfg.Listen = "127.0.0.1:8080"
initialCfg.DataDir = tmpDir
require.NoError(t, config.SaveConfig(initialCfg, cfgPath))

rt, err := New(initialCfg, cfgPath, zap.NewNop())
require.NoError(t, err)
defer func() { _ = rt.Close() }()

// Default cadence is 30s before the reload.
require.Equal(t, DefaultUsagePersistInterval, rt.ActivityService().usagePersistInterval())

newCfg := config.DefaultConfig()
newCfg.Listen = "127.0.0.1:8080"
newCfg.DataDir = tmpDir
newCfg.Observability.UsagePersistInterval = config.Duration(10 * time.Second)

result, err := rt.ApplyConfig(newCfg, cfgPath)
require.NoError(t, err)
require.NotNil(t, result)

assert.False(t, result.RequiresRestart, "observability cadence change is hot-reloadable")
assert.Contains(t, result.ChangedFields, "observability")
assert.Equal(t, 10*time.Second, rt.ActivityService().usagePersistInterval(),
"new persist interval must be applied to the running ActivityService")
}

// TestApplyConfig_SaveFailure tests handling of save errors
func TestApplyConfig_SaveFailure(t *testing.T) {
// Skip on Windows: chmod on directories doesn't reliably prevent file creation
Expand Down
6 changes: 6 additions & 0 deletions internal/runtime/config_hotreload.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func DetectConfigChanges(oldCfg, newCfg *config.Config) *ConfigApplyResult {
result.ChangedFields = append(result.ChangedFields, "environment")
}

// Observability cadence (Spec 069 A2 — can be hot-reloaded; the usage flush
// loop re-reads the interval each cycle, so applying it is just a setter).
if !reflect.DeepEqual(oldCfg.Observability, newCfg.Observability) {
result.ChangedFields = append(result.ChangedFields, "observability")
}

// If no changes detected
if len(result.ChangedFields) == 0 {
result.AppliedImmediately = false
Expand Down
38 changes: 32 additions & 6 deletions internal/runtime/config_hotreload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,32 @@ import (
"github.com/stretchr/testify/require"
)

// TestDetectConfigChanges_Observability (MCP-835 / Codex finding #3): changing
// the observability usage cadence must be detected as a hot-reloadable change so
// ApplyConfig can push the new persist interval to the running ActivityService.
// SetUsagePersistInterval advertises hot-reload; the detector must back it.
func TestDetectConfigChanges_Observability(t *testing.T) {
base := &config.Config{
Listen: "127.0.0.1:8080", DataDir: "/d", TLS: &config.TLSConfig{},
Observability: &config.ObservabilityConfig{
UsageCacheTTL: config.Duration(5 * time.Second),
UsagePersistInterval: config.Duration(30 * time.Second),
},
}
changed := &config.Config{
Listen: "127.0.0.1:8080", DataDir: "/d", TLS: &config.TLSConfig{},
Observability: &config.ObservabilityConfig{
UsageCacheTTL: config.Duration(5 * time.Second),
UsagePersistInterval: config.Duration(10 * time.Second),
},
}

result := DetectConfigChanges(base, changed)
require.True(t, result.Success)
assert.Contains(t, result.ChangedFields, "observability")
assert.False(t, result.RequiresRestart, "cadence change is hot-reloadable")
}

func TestDetectConfigChanges(t *testing.T) {
baseConfig := &config.Config{
Listen: "127.0.0.1:8080",
Expand Down Expand Up @@ -49,7 +75,7 @@ func TestDetectConfigChanges(t *testing.T) {
Listen: ":9090", // Changed
DataDir: "/test/data",
APIKey: "test-key",
ToolsLimit: 15,
ToolsLimit: 15,
ToolResponseLimit: 1000,
CallToolTimeout: config.Duration(60 * time.Second),
Servers: []*config.ServerConfig{},
Expand All @@ -67,7 +93,7 @@ func TestDetectConfigChanges(t *testing.T) {
Listen: "127.0.0.1:8080",
DataDir: "/different/data", // Changed
APIKey: "test-key",
ToolsLimit: 15,
ToolsLimit: 15,
ToolResponseLimit: 1000,
CallToolTimeout: config.Duration(60 * time.Second),
Servers: []*config.ServerConfig{},
Expand All @@ -85,7 +111,7 @@ func TestDetectConfigChanges(t *testing.T) {
Listen: "127.0.0.1:8080",
DataDir: "/test/data",
APIKey: "new-key", // Changed
ToolsLimit: 15,
ToolsLimit: 15,
ToolResponseLimit: 1000,
CallToolTimeout: config.Duration(60 * time.Second),
Servers: []*config.ServerConfig{},
Expand All @@ -103,7 +129,7 @@ func TestDetectConfigChanges(t *testing.T) {
Listen: "127.0.0.1:8080",
DataDir: "/test/data",
APIKey: "test-key",
ToolsLimit: 15,
ToolsLimit: 15,
ToolResponseLimit: 1000,
CallToolTimeout: config.Duration(60 * time.Second),
Servers: []*config.ServerConfig{},
Expand All @@ -124,7 +150,7 @@ func TestDetectConfigChanges(t *testing.T) {
Listen: "127.0.0.1:8080",
DataDir: "/test/data",
APIKey: "test-key",
ToolsLimit: 20, // Changed
ToolsLimit: 20, // Changed
ToolResponseLimit: 1000,
CallToolTimeout: config.Duration(60 * time.Second),
Servers: []*config.ServerConfig{},
Expand All @@ -144,7 +170,7 @@ func TestDetectConfigChanges(t *testing.T) {
Listen: "127.0.0.1:8080",
DataDir: "/test/data",
APIKey: "test-key",
ToolsLimit: 15,
ToolsLimit: 15,
ToolResponseLimit: 1000,
CallToolTimeout: config.Duration(60 * time.Second),
Servers: []*config.ServerConfig{ // Changed
Expand Down
Loading
Loading