Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions internal/contracts/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,66 @@ type ServerTokenMetrics struct {
PerServerToolListSizes map[string]int `json:"per_server_tool_list_sizes"` // Token size per server
}

// UsageAggregateResponse is the GET /api/v1/activity/usage payload (Spec 069 A3).
// It is served from the actor-owned in-memory usage aggregate snapshot (never a
// per-request full-log scan, SC-005).
//
// Windowing semantics (informed by the A2 aggregate shape, data-model.md §2):
// - The per-tool rollup (Tools) carries lifetime-cumulative metrics. `window`
// scopes the tool LIST to tools last used within the span (by LastUsed); the
// counts/bytes/latency themselves are lifetime totals over the aggregate's
// retention horizon. Exact per-tool windowed counts would require per-tool
// time buckets in the aggregate and are a deferred follow-on.
// - The Timeline buckets are global (not per-tool); `window` trims them to the
// requested span. Timeline is therefore not filtered by tool/server/status.
// - tool/server/status act as membership filters on the per-tool rollup.
type UsageAggregateResponse struct {
Window string `json:"window"`
GeneratedAt time.Time `json:"generated_at"`
FreshnessMs int64 `json:"freshness_ms"` // age of the underlying snapshot in ms
TokenSource string `json:"token_source"` // "bytes" (size-based proxy, FR-006)
TokensSaved int `json:"tokens_saved"` // echoed from ServerTokenMetrics (FR-007)
TokensSavedPercentage float64 `json:"tokens_saved_percentage"`
Tools []UsageToolStat `json:"tools"`
Other *UsageOtherBucket `json:"other,omitempty"` // present only when the list was truncated to top-N
Timeline []UsageTimeBucket `json:"timeline"`
}

// UsageToolStat is the per-(server,tool) rollup row in UsageAggregateResponse.
type UsageToolStat struct {
Server string `json:"server"`
Tool string `json:"tool"`
Calls int64 `json:"calls"`
Errors int64 `json:"errors"`
ErrorRate float64 `json:"error_rate"`
Blocked int64 `json:"blocked"`
TotalRespBytes int64 `json:"total_resp_bytes"`
AvgRespBytes *int64 `json:"avg_resp_bytes"` // null when sized_calls == 0 (only legacy 0-byte calls)
TotalReqBytes int64 `json:"total_req_bytes"`
AvgReqBytes *int64 `json:"avg_req_bytes"` // null when no sized request calls
SizedCalls int64 `json:"sized_calls"` // calls with known response size (basis for avg_resp_bytes)
P50Ms int64 `json:"p50_ms"`
P95Ms int64 `json:"p95_ms"`
LastUsed time.Time `json:"last_used"`
}

// UsageOtherBucket folds the tail of the per-tool list beyond top-N (FR: charts
// stay readable on high-cardinality logs).
type UsageOtherBucket struct {
ToolsFolded int `json:"tools_folded"`
Calls int64 `json:"calls"`
TotalRespBytes int64 `json:"total_resp_bytes"`
}

// UsageTimeBucket is one timeline bar (executed calls only; blocked attempts and
// non-tool records are excluded by the aggregate).
type UsageTimeBucket struct {
Start time.Time `json:"start"`
Calls int64 `json:"calls"`
Errors int64 `json:"errors"`
TotalRespBytes int64 `json:"total_resp_bytes"`
}

// LogEntry represents a single log entry
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Expand Down
297 changes: 297 additions & 0 deletions internal/httpapi/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"encoding/json"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"time"

"github.com/go-chi/chi/v5"

"github.com/smart-mcp-proxy/mcpproxy-go/internal/config"
"github.com/smart-mcp-proxy/mcpproxy-go/internal/contracts"
internalRuntime "github.com/smart-mcp-proxy/mcpproxy-go/internal/runtime"
"github.com/smart-mcp-proxy/mcpproxy-go/internal/storage"
)

Expand Down Expand Up @@ -673,3 +676,297 @@ func buildTopTools(counts map[string]int, limit int) []contracts.ActivityTopTool
}
return result
}

// =============================================================================
// Spec 069 A3 (MCP-750): GET /api/v1/activity/usage
// =============================================================================

const (
usageDefaultTop = 20
usageDefaultSort = "resp_bytes"
usageDefaultWindow = "24h"
usageTokenSource = "bytes" // size-based proxy (FR-006); FR-010 → "estimated_tokens"
)

// usageParams holds the validated query parameters for the usage endpoint.
type usageParams struct {
window string // "24h" | "7d" | "all"
server string
tool string
status string // "" | "success" | "error" | "blocked"
top int
sort string // "calls" | "resp_bytes" | "error_rate" | "p95"
}

// cacheKey is a stable identity for the params, used by the short-TTL cache.
func (p usageParams) cacheKey() string {
return strings.Join([]string{p.window, p.server, p.tool, p.status, p.sort, strconv.Itoa(p.top)}, "|")
}

// windowStart returns the lower time bound for the window relative to now, plus
// whether a bound applies (false for "all").
func (p usageParams) windowStart(now time.Time) (time.Time, bool) {
switch p.window {
case "24h":
return now.Add(-24 * time.Hour), true
case "7d":
return now.Add(-7 * 24 * time.Hour), true
default: // "all"
return time.Time{}, false
}
}

// parseUsageParams validates the usage query string, returning a 400-style error
// message for any bad enum / non-int top.
func parseUsageParams(r *http.Request) (usageParams, error) {
q := r.URL.Query()
p := usageParams{
window: usageDefaultWindow,
server: q.Get("server"),
tool: q.Get("tool"),
status: q.Get("status"),
top: usageDefaultTop,
sort: usageDefaultSort,
}

if v := q.Get("window"); v != "" {
switch v {
case "24h", "7d", "all":
p.window = v
default:
return p, fmt.Errorf("invalid window %q (expected 24h, 7d, or all)", v)
}
}

if v := q.Get("sort"); v != "" {
switch v {
case "calls", "resp_bytes", "error_rate", "p95":
p.sort = v
default:
return p, fmt.Errorf("invalid sort %q (expected calls, resp_bytes, error_rate, or p95)", v)
}
}

if p.status != "" {
switch p.status {
case "success", "error", "blocked":
default:
return p, fmt.Errorf("invalid status %q (expected success, error, or blocked)", p.status)
}
}

if v := q.Get("top"); v != "" {
n, err := strconv.Atoi(v)
if err != nil || n < 1 {
return p, fmt.Errorf("invalid top %q (expected a positive integer)", v)
}
p.top = n
}

return p, nil
}

// handleActivityUsage handles GET /api/v1/activity/usage
// @Summary Get usage statistics aggregate
// @Description Returns the actor-owned usage aggregate (per-tool rollup + timeline + tokens-saved headline) for the Web UI usage graphs (Spec 069). Served from an in-memory snapshot — never a per-request full-log scan. Per-tool metrics are lifetime-cumulative; `window` scopes the timeline and filters the tool list to tools active within the span.
// @Tags Activity
// @Accept json
// @Produce json
// @Param window query string false "Time window for timeline + tool-list membership" Enums(24h, 7d, all)
// @Param server query string false "Filter to one server"
// @Param tool query string false "Filter to one tool"
// @Param status query string false "Filter to tools with activity of this status" Enums(success, error, blocked)
// @Param top query int false "Top-N tools by sort key; remainder folded into 'other' (default 20)"
// @Param sort query string false "Ranking key for the per-tool list" Enums(calls, resp_bytes, error_rate, p95)
// @Success 200 {object} contracts.APIResponse{data=contracts.UsageAggregateResponse}
// @Failure 400 {object} contracts.APIResponse
// @Failure 401 {object} contracts.APIResponse
// @Security ApiKeyHeader
// @Security ApiKeyQuery
// @Router /api/v1/activity/usage [get]
func (s *Server) handleActivityUsage(w http.ResponseWriter, r *http.Request) {
params, err := parseUsageParams(r)
if err != nil {
s.writeError(w, r, http.StatusBadRequest, err.Error())
return
}

ttl := s.usageCacheTTL()
key := params.cacheKey()
if cached := s.getUsageCache(key, ttl); cached != nil {
s.writeSuccess(w, cached)
return
}

snap := s.controller.UsageSnapshot()
tokens, _ := s.controller.GetTokenSavings()

resp := buildUsageResponse(snap, tokens, params, time.Now().UTC())
s.putUsageCache(key, resp, ttl)
s.writeSuccess(w, resp)
}

// usageCacheTTL reads the configured read-cache freshness bound (FR-005),
// falling back to the default when config is unavailable. Read per request so
// the value hot-reloads with config.
func (s *Server) usageCacheTTL() time.Duration {
def := time.Duration(config.DefaultObservabilityConfig().UsageCacheTTL)
cfgIface := s.controller.GetCurrentConfig()
cfg, ok := cfgIface.(*config.Config)
if !ok || cfg == nil || cfg.Observability == nil {
return def
}
if d := time.Duration(cfg.Observability.UsageCacheTTL); d > 0 {
return d
}
return def
}

// buildUsageResponse projects the usage snapshot into the API contract, applying
// window/filter/sort/top-N. It performs no I/O and never scans the activity log
// (SC-005): the actor-owned snapshot is the incremental-precompute path (FR-005).
func buildUsageResponse(snap *internalRuntime.UsageAggregate, tokens *contracts.ServerTokenMetrics, p usageParams, now time.Time) *contracts.UsageAggregateResponse {
resp := &contracts.UsageAggregateResponse{
Window: p.window,
GeneratedAt: now,
TokenSource: usageTokenSource,
Tools: make([]contracts.UsageToolStat, 0),
Timeline: make([]contracts.UsageTimeBucket, 0),
}
if tokens != nil {
resp.TokensSaved = tokens.SavedTokens
resp.TokensSavedPercentage = tokens.SavedTokensPercentage
}
if snap == nil {
return resp
}
if !snap.UpdatedAt.IsZero() {
if age := now.Sub(snap.UpdatedAt); age > 0 {
resp.FreshnessMs = age.Milliseconds()
}
}

start, bounded := p.windowStart(now)

// Per-tool rollup: filter by membership, project to contract rows.
rows := make([]contracts.UsageToolStat, 0, len(snap.Tools))
for _, tu := range snap.Tools {
if p.server != "" && tu.Server != p.server {
continue
}
if p.tool != "" && tu.Tool != p.tool {
continue
}
if !usageMatchesStatus(tu, p.status) {
continue
}
if bounded && tu.LastUsed.Before(start) {
continue // tool idle for the whole window
}
rows = append(rows, usageToolStat(tu))
}

sortUsageRows(rows, p.sort)

// Top-N + 'other' fold.
if len(rows) > p.top {
other := &contracts.UsageOtherBucket{}
for _, row := range rows[p.top:] {
other.ToolsFolded++
other.Calls += row.Calls
other.TotalRespBytes += row.TotalRespBytes
}
resp.Other = other
rows = rows[:p.top]
}
resp.Tools = rows

// Timeline: global buckets trimmed to the window span.
for _, b := range snap.Timeline() {
if bounded && b.Start.Before(start) {
continue
}
resp.Timeline = append(resp.Timeline, contracts.UsageTimeBucket{
Start: b.Start,
Calls: b.Calls,
Errors: b.Errors,
TotalRespBytes: b.RespBytesSum,
})
}

return resp
}

// usageMatchesStatus reports whether a tool has any activity of the requested
// status. Status filters operate as membership filters on the cumulative
// per-tool rollup (the aggregate does not retain per-status byte breakdowns).
func usageMatchesStatus(tu *internalRuntime.ToolUsage, status string) bool {
switch status {
case "":
return true
case "error":
return tu.Errors > 0
case "blocked":
return tu.Blocked > 0
case "success":
return tu.Calls-tu.Errors > 0
default:
return true
}
}

// usageToolStat projects a runtime ToolUsage into the API contract row.
func usageToolStat(tu *internalRuntime.ToolUsage) contracts.UsageToolStat {
row := contracts.UsageToolStat{
Server: tu.Server,
Tool: tu.Tool,
Calls: tu.Calls,
Errors: tu.Errors,
ErrorRate: tu.ErrorRate(),
Blocked: tu.Blocked,
TotalRespBytes: tu.RespBytesSum,
TotalReqBytes: tu.ReqBytesSum,
SizedCalls: tu.SizedRespCalls,
P50Ms: tu.Percentile(0.50),
P95Ms: tu.Percentile(0.95),
LastUsed: tu.LastUsed,
}
if avg, ok := tu.AvgRespBytes(); ok {
row.AvgRespBytes = &avg
}
if avg, ok := tu.AvgReqBytes(); ok {
row.AvgReqBytes = &avg
}
return row
}

// sortUsageRows orders rows descending by the requested key, breaking ties by
// server:tool for a deterministic response.
func sortUsageRows(rows []contracts.UsageToolStat, key string) {
less := func(i, j int) bool {
a, b := rows[i], rows[j]
switch key {
case "calls":
if a.Calls != b.Calls {
return a.Calls > b.Calls
}
case "error_rate":
if a.ErrorRate != b.ErrorRate {
return a.ErrorRate > b.ErrorRate
}
case "p95":
if a.P95Ms != b.P95Ms {
return a.P95Ms > b.P95Ms
}
default: // resp_bytes
if a.TotalRespBytes != b.TotalRespBytes {
return a.TotalRespBytes > b.TotalRespBytes
}
}
if a.Server != b.Server {
return a.Server < b.Server
}
return a.Tool < b.Tool
}
sort.Slice(rows, less)
}
Loading
Loading