diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dbe4eb80..bbc3a1f79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Add remote web control server for browser-based chat observation and control via `web.eca.dev`. #333 + ## 0.115.5 - Fix chat getting stuck when LLM streaming connection hangs or is cancelled. diff --git a/docs/config.json b/docs/config.json index 74e340ce5..a790f1675 100644 --- a/docs/config.json +++ b/docs/config.json @@ -332,6 +332,41 @@ ] } }, + "remote": { + "type": "object", + "description": "Remote web control server configuration. When enabled, starts an embedded HTTP server allowing a web frontend (web.eca.dev) to observe and control chat sessions in real-time.", + "markdownDescription": "Remote web control server configuration. When enabled, starts an embedded HTTP server allowing a web frontend (`web.eca.dev`) to observe and control chat sessions in real-time.", + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean", + "description": "Enables the remote HTTP server.", + "markdownDescription": "Enables the remote HTTP server." + }, + "host": { + "type": "string", + "description": "Host used in the logged URL for web.eca.dev to connect back to. Can be a LAN IP, public IP, domain, or tunnel URL. When unset, auto-detected via InetAddress/getLocalHost.", + "markdownDescription": "Host used in the logged URL for `web.eca.dev` to connect back to. Can be a LAN IP, public IP, domain, or tunnel URL. When unset, auto-detected via `InetAddress/getLocalHost`.", + "examples": [ + "192.168.1.42", + "myserver.example.com" + ] + }, + "port": { + "type": "integer", + "description": "Port the HTTP server listens on. When unset, tries port 7777, then 7778, 7779, etc. up to 10 attempts.", + "markdownDescription": "Port the HTTP server listens on. When unset, tries port `7777`, then `7778`, `7779`, etc. up to 10 attempts.", + "examples": [ + 7777 + ] + }, + "password": { + "type": "string", + "description": "Auth token for Bearer authentication. When unset, a 32-byte hex token is auto-generated and logged to stderr.", + "markdownDescription": "Auth token for Bearer authentication. When unset, a 32-byte hex token is auto-generated and logged to stderr." + } + } + }, "network": { "type": "object", "description": "Network configuration for custom CA certificates and mTLS client certificates. Values support dynamic string interpolation (e.g. '${env:SSL_CERT_FILE}').", diff --git a/docs/remote-server-plan.md b/docs/remote-server-plan.md new file mode 100644 index 000000000..04ecdc96f --- /dev/null +++ b/docs/remote-server-plan.md @@ -0,0 +1,596 @@ +# Remote Web Control Server — Design & Implementation + +> GitHub Issue: [#333](https://github.com/editor-code-assistant/eca/issues/333) +> +> **Status: v1 implemented.** This document reflects the actual implementation. Sections +> marked with šŸ”§ contain implementation notes that diverge from or extend the original +> design. + +## Overview + +Allow a web frontend (hosted at `web.eca.dev`) to connect to a running ECA session +and observe/control chat sessions in real-time. Each ECA process optionally starts an +embedded HTTP server alongside the existing stdio JSON-RPC server. The web UI connects +via REST for commands and SSE for live updates. + +**Architecture:** Option A — Embedded HTTP server per ECA process. +User opens different browser tabs for different sessions (host:port) and different tabs +for each chat within a session. + +## Config + +```json +{ + "remote": { + "enabled": true, + "host": "myserver.example.com", + "port": 7888, + "password": "my-secret" + } +} +``` + +| Field | Required | Default | Description | +|------------|----------|-----------------------|------------------------------------------------| +| `enabled` | no | `false` | Enables the remote HTTP server | +| `host` | no | auto-detected LAN IP | Host used in the logged URL for `web.eca.dev` to connect back to. Can be a LAN IP, public IP, domain, or tunnel URL (e.g. ngrok, tailscale). Not set → auto-detect via `InetAddress/getLocalHost`. | +| `port` | no | random free port | Port the HTTP server listens on | +| `password` | no | auto-generated token | Auth token; auto-generated and logged if unset | + +Server always binds `0.0.0.0` on the configured port. The `host` field only affects the +logged URL — it tells the web frontend where to reach the server, it does not change what +address the server binds to. + +## Authentication Flow + +Token-based auth using `Authorization: Bearer` header on all requests. No cookies — +avoids `Secure`/`SameSite` issues that break non-localhost connections (LAN IPs, remote). + +1. ECA starts, logs to stderr: + ``` + 🌐 Remote server started on port 7888 + šŸ”— https://web.eca.dev?host=192.168.1.42:7888&token=a3f8b2... + ``` + The `host` param tells `web.eca.dev` where to connect back to. Resolved as: + config `host` if set → otherwise auto-detected LAN IP via `InetAddress/getLocalHost`. + If `InetAddress/getLocalHost` fails or returns loopback, falls back to `127.0.0.1` + and logs a warning suggesting the user configure `remote.host`. +2. User clicks the URL → arrives at `web.eca.dev` with `host` and `token` in query params. +3. Frontend stores the token in JS memory (or `localStorage` to survive refresh). +4. Frontend strips token from URL via `history.replaceState`. +5. All REST requests include `Authorization: Bearer ` header. +6. SSE stream is consumed via `fetch()` + `ReadableStream` (not `EventSource`, which + doesn't support custom headers). The `Authorization` header is sent on the SSE + request. Wire format is identical SSE (`event:`/`data:`/`\n\n`). + +The auth token is either user-configured (`password` in config) or auto-generated as a +32-byte hex string via `java.security.SecureRandom` (64 characters). Stored in runtime +state only (not persisted to disk). + +šŸ”§ Token validation uses constant-time comparison (`MessageDigest/isEqual`) to prevent +timing side-channel attacks. + +**No CSRF protection needed** — CSRF is a cookie-only attack vector. Bearer tokens in +headers are not sent automatically by browsers. + +**Fallback:** Users can also go to `web.eca.dev` directly and manually enter host:port + +token in a connect form. + +## REST API + +All API routes are prefixed with `/api/v1/`. +All JSON keys in requests and responses use **camelCase** (matching the existing JSON-RPC +protocol convention). Internal Clojure kebab-case keywords are converted on +serialization. +All responses use `Content-Type: application/json; charset=utf-8` unless noted otherwise. + +### HTTP Status Codes + +| Status | Meaning | +|--------|-----------------------------------------------------------| +| `200` | Successful GET, or POST that returns data | +| `204` | Successful POST/DELETE with no response body | +| `302` | Redirect (`GET /`) | +| `400` | Malformed request body or missing required fields | +| `401` | Missing or invalid `Authorization: Bearer` token | +| `404` | Chat or tool-call ID not found | +| `409` | Chat in wrong state for operation (e.g. stop on idle) | +| `500` | Internal server error | + +### Health & Redirect + +| Method | Path | Auth | Description | +|--------|------------------|------|------------------------------------------------| +| `GET` | `/` | No | `302` redirect to `web.eca.dev` with host+token params | +| `GET` | `/api/v1/health` | No | `200` — `{"status": "ok", "version": "0.x.y"}` | + +### Session + +| Method | Path | Description | +|--------|--------------------|-----------------------------------------------------| +| `GET` | `/api/v1/session` | Session info (workspaces, models, agents, config) | + +Response: +```json +{ + "version": "0.x.y", + "protocolVersion": "1.0", + "workspaceFolders": ["/home/user/project"], + "models": [{"id": "...", "name": "...", "provider": "..."}], + "agents": [{"id": "...", "name": "...", "description": "..."}], + "mcpServers": [{"name": "...", "status": "running"}] +} +``` + +### Chats + +**List chats:** + +`GET /api/v1/chats` → `200` +```json +[{"id": "uuid", "title": "Fix login bug", "status": "idle", "createdAt": 1234567890}] +``` + +**Get chat:** + +`GET /api/v1/chats/:id` → `200` / `404` +```json +{ + "id": "uuid", + "title": "Fix login bug", + "status": "idle", + "createdAt": 1234567890, + "messages": [{"role": "user", "content": "...", "contentId": "uuid"}, ...], + "toolCalls": {"tc-id": {"name": "read_file", "status": "called", "arguments": "..."}}, + "task": {"nextId": 1, "tasks": [...]} +} +``` + +**Send prompt** (creates chat implicitly if `:id` is new — web UI generates UUID): + +`POST /api/v1/chats/:id/prompt` → `200` / `400` +```json +// Request +{"message": "fix the login bug", "model": "optional-model-id", "agent": "optional-agent-id"} + +// Response +{"chatId": "uuid", "model": "anthropic/claude-sonnet-4-20250514", "status": "running"} +``` + +**Stop generation:** + +`POST /api/v1/chats/:id/stop` → `204` / `404` / `409` + +**Approve tool call:** + +`POST /api/v1/chats/:id/approve/:tcid` → `204` / `404` / `409` + +**Reject tool call:** + +`POST /api/v1/chats/:id/reject/:tcid` → `204` / `404` / `409` + +**Rollback:** + +`POST /api/v1/chats/:id/rollback` → `204` / `404` +```json +// Request +{"contentId": "uuid-of-message-to-rollback-to"} +``` + +**Clear chat:** + +`POST /api/v1/chats/:id/clear` → `204` / `404` + +**Delete chat:** + +`DELETE /api/v1/chats/:id` → `204` / `404` + +**Change model:** + +`POST /api/v1/chats/:id/model` → `204` / `404` / `400` +```json +// Request +{"model": "anthropic/claude-sonnet-4-20250514"} +``` +Maps to `chat/selectedModelChanged` notification handler. + +**Change agent:** + +`POST /api/v1/chats/:id/agent` → `204` / `404` / `400` +```json +// Request +{"agent": "code"} +``` +Maps to `chat/selectedAgentChanged` notification handler. + +**Change variant:** + +`POST /api/v1/chats/:id/variant` → `204` / `404` / `400` +```json +// Request +{"variant": "high"} +``` + +### SSE + +`GET /api/v1/events` — `Content-Type: text/event-stream` + +šŸ”§ The `?chat=` filter is **not yet implemented** in v1 — all events are sent to all +clients. Planned for a future iteration. + +## SSE Events + +### Event Types + +``` +event: session:connected ← Full state dump on SSE connect +event: session:disconnecting ← Server shutting down +event: session:message ← showMessage notifications (errors, warnings, info) + +event: chat:content-received ← Text chunks, tool progress, usage (from IMessenger) +event: chat:status-changed ← idle/running/stopping transitions +event: chat:cleared ← Chat history cleared +event: chat:deleted ← Chat removed + +event: config:updated ← Model/agent/config changes +event: tool:server-updated ← MCP server status changes +``` + +### Event Payloads + +**`session:connected`** — Full state dump so the web client can bootstrap: +```json +{ + "version": "0.x.y", + "protocolVersion": "1.0", + "chats": [{"id": "...", "title": "...", "status": "idle", "createdAt": 123}], + "models": [{"id": "...", "name": "...", "provider": "..."}], + "agents": [{"id": "...", "name": "...", "description": "..."}], + "mcpServers": [{"name": "...", "status": "running"}], + "workspaceFolders": ["/home/user/project"] +} +``` + +**`session:disconnecting`** — Server shutting down: +```json +{"reason": "shutdown"} +``` + +**`session:message`** — from `showMessage`: +```json +{"type": "warning", "message": "Rate limit approaching"} +``` + +**`chat:content-received`** — the `data` field is the JSON serialization of the same +params passed to `IMessenger.chat-content-received`, with keys camelCased. Examples: +```json +{"chatId": "abc", "role": "assistant", "content": {"type": "text", "text": "Hello...", "contentId": "uuid"}} +{"chatId": "abc", "role": "system", "content": {"type": "progress", "state": "running", "text": "Thinking..."}} +{"chatId": "abc", "role": "system", "content": {"type": "toolCallPrepare", "toolCallId": "tc1", "name": "eca__read_file"}} +{"chatId": "abc", "role": "system", "content": {"type": "toolCalled", "toolCallId": "tc1", "result": "..."}} +``` + +**`chat:status-changed`**: +```json +{"chatId": "abc", "status": "running"} +``` + +**`chat:cleared`**: +```json +{"chatId": "abc"} +``` + +**`chat:deleted`**: +```json +{"chatId": "abc"} +``` + +**`config:updated`** — partial config fields that changed: +```json +{"models": [...], "agents": [...]} +``` + +**`tool:server-updated`**: +```json +{"name": "github-mcp", "status": "running"} +``` + +### Reliability + +- **Heartbeat:** Server sends `:` comment line every 15 seconds to detect dead clients. +- **Backpressure:** Each SSE client gets a `core.async` channel with a dropping buffer + (buffer size: 256 events). `broadcast!` uses `async/offer!` (non-blocking, drops if + full). Slow clients drop events rather than blocking the main messenger path. Clients + can recover by re-fetching chat state via REST. +- **Stale cleanup:** Failed writes remove the client from the connection set. + +šŸ”§ **Threading model:** Writer loops and heartbeat use `async/thread` (real threads with +blocking ``. + +``` +Access-Control-Allow-Origin: https://web.eca.dev +Access-Control-Allow-Methods: GET, POST, DELETE, OPTIONS +Access-Control-Allow-Headers: Content-Type, Authorization +``` + +No `Access-Control-Allow-Credentials` needed — auth is via `Authorization` header, not +cookies. + +## Error Responses + +All errors use a consistent envelope: + +```json +{ + "error": { + "code": "chat_not_found", + "message": "Chat abc-123 does not exist" + } +} +``` + +No raw exceptions or stack traces are returned to the web client. + +### Error Codes + +| Code | HTTP Status | Description | +|-----------------------|-------------|------------------------------------------| +| `unauthorized` | `401` | Missing or invalid Bearer token | +| `invalid_request` | `400` | Malformed JSON or missing required fields| +| `chat_not_found` | `404` | Chat ID doesn't exist in `db*` | +| `tool_call_not_found` | `404` | Tool call ID doesn't exist in chat | +| `chat_wrong_status` | `409` | Chat not in valid state for operation | +| `internal_error` | `500` | Unexpected server error | + +## Key Architecture Decisions + +### BroadcastMessenger + +Wraps the existing `ServerMessenger`, implements `IMessenger`. Every method: +1. Delegates to the inner `ServerMessenger` (sends to editor via stdio). +2. Converts data to camelCase via `shared/map->camel-cased-map`. +3. Broadcasts the event to all connected SSE clients (via `core.async` channels). + +šŸ”§ The camelCase conversion is essential because `IMessenger` methods receive internal +kebab-case Clojure maps. The JSON-RPC layer (stdio) handles its own key conversion, but +SSE broadcasts serialize directly via Cheshire — without explicit conversion, SSE payloads +would have kebab-case keys like `chat-id` instead of `chatId`. + +**Exceptions (inner-only, no broadcast):** +- `editor-diagnostics` — web client cannot provide editor diagnostics. +- `rewrite-content-received` — editor-only feature (targets editor selections). + +**IMessenger method → SSE event type mapping:** + +| IMessenger method | SSE event type | Broadcast? | +|----------------------------|---------------------------|------------| +| `chat-content-received` | `chat:content-received` | āœ… Yes | +| `chat-cleared` | `chat:cleared` | āœ… Yes | +| `chat-status-changed` ā˜… | `chat:status-changed` | āœ… Yes | +| `chat-deleted` ā˜… | `chat:deleted` | āœ… Yes | +| `config-updated` | `config:updated` | āœ… Yes | +| `tool-server-updated` | `tool:server-updated` | āœ… Yes | +| `showMessage` | `session:message` | āœ… Yes | +| `editor-diagnostics` | — | āŒ Inner | +| `rewrite-content-received` | — | āŒ Inner | + +ā˜… = **New `IMessenger` methods.** These state transitions currently happen via direct +`swap!` on `db*` without going through the messenger. Adding them to `IMessenger` gives +`BroadcastMessenger` a clean broadcast path. The `ServerMessenger` (stdio) implementation +of these new methods is a no-op — the editor JSON-RPC protocol has no equivalent +notification for these events. + +**Call sites for new methods:** +- `chat-status-changed` — called from `prompt` (→ `:running`), `finish-chat-prompt!` + (→ `:idle`/`:stopping`), and `promptStop` (→ `:stopping`). +- `chat-deleted` — called from `delete-chat` after removing the chat from `db*`. + +### Handler Reuse + +REST handlers bridge to the same feature functions used by JSON-RPC handlers +(`eca.handlers`, `eca.features.chat`). They receive the same `components` map +`{:db* :messenger :config :metrics}` — no business logic duplication. + +### Port Conflict Handling + +If the configured port is in use, the server logs a warning and continues without the +remote server. The ECA process does not crash. + +### Editor Remains Primary + +The editor (stdio) client is always the primary client. The web UI is a secondary client +that can observe and control chat but cannot fulfill editor-specific requests +(`editor/getDiagnostics`). Both clients can send commands concurrently — for tool +approvals, first response wins (promise `deliver` is idempotent). + +## Implementation Notes + +> The steps below reflect what was actually implemented. + +### Config schema +Added `remote` section to `docs/config.json` with `enabled`, `host`, `port`, and +`password` fields. Added `:remote {:enabled false}` to `initial-config*` in `config.clj`. + +šŸ”§ Remote config is read via `config/read-file-configs` at startup (before `initialize`), +so only global/env/custom-file configs are available — workspace-level `.eca/config.json` +is not included. This is intentional: workspace folders aren't known until `initialize`. + +### SSE connection management (`eca.remote.sse`) +- Atom holding set of SSE client maps `{:ch :os :done-ch}`. +- `add-client!` accepts optional `done-ch` — closed when writer loop terminates, allowing + callers to block on it for lifecycle management. +- `broadcast!` uses `async/offer!` (non-blocking put, drops if full). +- Writer loop per client: `async/thread` with blocking `` filter** — specified in the API but not yet implemented; all events + go to all clients +- Request logging middleware +- Subagent chat filtering in list endpoint +- Source tagging on SSE events (editor vs web origin) +- SSE event batching for high-frequency streaming +- Rate limiting on auth failures +- Configurable CORS allowed origins +- Route-level integration tests (current tests call handlers directly) +- `GET /api/v1/chats/:id` message content filtering (messages can be very large) + +## Web Frontend Notes + +> Notes for implementing the `web.eca.dev` frontend. + +### Consuming SSE + +The SSE stream **must** be consumed via `fetch()` + `ReadableStream`, not `EventSource`, +because `EventSource` does not support custom `Authorization` headers: + +```javascript +const response = await fetch(`http://${host}/api/v1/events`, { + headers: { 'Authorization': `Bearer ${token}` } +}); +const reader = response.body.getReader(); +const decoder = new TextDecoder(); +// Parse SSE wire format: "event: \ndata: \n\n" +``` + +### Connection Lifecycle + +1. On connect, the first SSE event is `session:connected` with a full state dump + (chats, models, agents, MCP servers, workspace folders). +2. The frontend should use this to bootstrap its state — no separate REST call needed. +3. On disconnect, re-fetch state via `GET /api/v1/session` + `GET /api/v1/chats` to + recover any missed events, then reconnect to SSE. +4. A heartbeat (`:` comment) arrives every 15 seconds — if no data for >30s, assume + disconnected and reconnect. + +### Chat Content Events + +`chat:content-received` events carry the same payloads as the editor JSON-RPC protocol. +Key content types the frontend should handle: + +| `content.type` | Description | Key fields | +|--------------------|-------------------------------------|---------------------------------------| +| `text` | Streamed text chunk | `text`, `contentId` | +| `progress` | Status indicator | `state` (running/finished), `text` | +| `metadata` | Chat metadata update | `title` | +| `usage` | Token usage stats | `inputTokens`, `outputTokens` | +| `toolCallPrepare` | Tool call starting | `name`, `id`, `argumentsText` | +| `toolCallRun` | Tool call approved, about to run | `name`, `id`, `arguments` | +| `toolCallRunning` | Tool call executing | `name`, `id` | +| `toolCalled` | Tool call finished | `name`, `id`, `error`, `outputs` | +| `reasonStarted` | Thinking/reasoning started | `id` | +| `reasonText` | Thinking text chunk | `id`, `text` | +| `reasonFinished` | Thinking finished | `id`, `totalTimeMs` | +| `hookActionStarted` | Hook action started | `id`, `name` | +| `hookActionFinished` | Hook action finished | `id`, `name`, `status`, `output` | + +### Model/Agent/Variant Changes + +`POST /api/v1/chats/:id/model`, `/agent`, `/variant` are **session-wide** operations +despite the chat-scoped URL. They change the selected model/agent for new prompts across +the entire session, matching the editor behavior. The chat-id in the URL is validated +(404 if not found) but not used for scoping. + +### Tool Call Approval + +When a tool call requires approval, its status in the chat's `toolCalls` map will be +`"waiting-approval"`. The frontend can show an approve/reject UI and call: +- `POST /api/v1/chats/:id/approve/:tcid` — allow the tool call +- `POST /api/v1/chats/:id/reject/:tcid` — deny the tool call + +First response wins — if the editor user approves before the web user (or vice versa), +the second approval is a no-op (`promise deliver` is idempotent). diff --git a/src/eca/config.clj b/src/eca/config.clj index f4fec883c..07e904673 100644 --- a/src/eca/config.clj +++ b/src/eca/config.clj @@ -196,6 +196,7 @@ :netrcFile nil :autoCompactPercentage 75 :plugins {"eca" {:source "https://github.com/editor-code-assistant/eca-plugins.git"}} + :remote {:enabled false} :env "prod"}) (defn ^:private parse-dynamic-string-values diff --git a/src/eca/features/chat.clj b/src/eca/features/chat.clj index 17769c255..081dec0e0 100644 --- a/src/eca/features/chat.clj +++ b/src/eca/features/chat.clj @@ -383,7 +383,7 @@ Run preRequest hooks before any heavy lifting. Only :prompt-message supports rewrite, other only allow additionalContext append." [user-messages source-type - {:keys [db* config chat-id provider model full-model agent instructions metrics message] :as chat-ctx}] + {:keys [db* config chat-id provider model full-model agent instructions metrics message messenger] :as chat-ctx}] (when-not full-model (throw (ex-info llm-api/no-available-model-error-msg {}))) (let [original-text (or message (-> user-messages first :content first :text)) @@ -432,6 +432,8 @@ (logger/info logger-tag "Superseding active prompt" {:chat-id chat-id :status (get-in @db* [:chats chat-id :status])})) (swap! db* assoc-in [:chats chat-id :status] :running) + (swap! db* assoc-in [:chats chat-id :updated-at] (System/currentTimeMillis)) + (messenger/chat-status-changed messenger {:chat-id chat-id :status :running}) (swap! db* assoc-in [:chats chat-id :prompt-id] prompt-id) (swap! db* assoc-in [:chats chat-id :model] full-model) (let [chat-ctx (assoc chat-ctx :prompt-id prompt-id) @@ -667,6 +669,7 @@ (finally (when (contains? #{:stopping :running} (get-in @db* [:chats chat-id :status])) (swap! db* assoc-in [:chats chat-id :status] :idle) + (messenger/chat-status-changed (:messenger chat-ctx) {:chat-id chat-id :status :idle}) (db/update-workspaces-cache! @db* metrics)))))))))) (defn ^:private send-mcp-prompt! @@ -929,7 +932,7 @@ (lifecycle/finish-chat-prompt! :stopping (dissoc chat-ctx :on-finished-side-effect))))) (defn delete-chat - [{:keys [chat-id]} db* config metrics] + [{:keys [chat-id]} db* messenger config metrics] (when-let [chat (get-in @db* [:chats chat-id])] ;; Trigger chatEnd hook BEFORE deleting (chat still exists in cache) (f.hooks/trigger-if-matches! :chatEnd @@ -942,18 +945,20 @@ config)) ;; Delete chat from memory (swap! db* update :chats dissoc chat-id) + (messenger/chat-deleted messenger {:chat-id chat-id}) ;; Save updated cache (without this chat) (db/update-workspaces-cache! @db* metrics)) (defn clear-chat "Clear specific aspects of a chat. Currently supports clearing :messages." - [{:keys [chat-id messages]} db* metrics] + [{:keys [chat-id messages]} db* messenger metrics] (when (get-in @db* [:chats chat-id]) (swap! db* update-in [:chats chat-id] (fn [chat] (cond-> chat messages (-> (assoc :messages []) (dissoc :tool-calls :last-api :usage :task))))) + (messenger/chat-cleared messenger {:chat-id chat-id :messages messages}) (db/update-workspaces-cache! @db* metrics))) (defn rollback-chat diff --git a/src/eca/features/chat/lifecycle.clj b/src/eca/features/chat/lifecycle.clj index 3febeccf8..70f58f8f3 100644 --- a/src/eca/features/chat/lifecycle.clj +++ b/src/eca/features/chat/lifecycle.clj @@ -67,10 +67,11 @@ (name from) content)) -(defn finish-chat-prompt! [status {:keys [message chat-id db* metrics config on-finished-side-effect prompt-id] :as chat-ctx}] +(defn finish-chat-prompt! [status {:keys [message chat-id db* messenger metrics config on-finished-side-effect prompt-id] :as chat-ctx}] (when-not (and prompt-id (not= prompt-id (get-in @db* [:chats chat-id :prompt-id]))) (when-not (get-in @db* [:chats chat-id :auto-compacting?]) (swap! db* assoc-in [:chats chat-id :status] status) + (messenger/chat-status-changed messenger {:chat-id chat-id :status status}) (let [db @db* subagent? (some? (get-in db [:chats chat-id :subagent])) hook-type (if subagent? :subagentPostRequest :postRequest) diff --git a/src/eca/handlers.clj b/src/eca/handlers.clj index 38a744a86..4407aed34 100644 --- a/src/eca/handlers.clj +++ b/src/eca/handlers.clj @@ -42,6 +42,15 @@ (when (and agent-variant variants (some #{agent-variant} variants)) agent-variant))) +(defn welcome-message + "Builds the welcome message from config, appending the remote URL when available." + [db config] + (let [base (or (:welcomeMessage (:chat config)) ;;legacy + (:welcomeMessage config))] + (if-let [url (:remote-connect-url db)] + (str base "\n🌐 Remote: " url "\n") + base))) + (defn initialize [{:keys [db* metrics]} params] (metrics/task metrics :eca/initialize (reset! config/initialization-config* (shared/map->camel-cased-map (:initialization-options params))) @@ -54,8 +63,7 @@ (when-not (:pureConfig config) (db/load-db-from-cache! db* config metrics)) - {:chat-welcome-message (or (:welcomeMessage (:chat config)) ;;legacy - (:welcomeMessage config))}))) + {:chat-welcome-message (welcome-message @db* config)}))) (defn initialized [{:keys [db* messenger config metrics]}] (metrics/task metrics :eca/initialized @@ -81,8 +89,7 @@ :select-agent default-agent-name :variants (or variants []) :select-variant (select-variant default-agent-config variants) - :welcome-message (or (:welcomeMessage (:chat config)) ;;legacy - (:welcomeMessage config)) + :welcome-message (welcome-message @db* config) ;; Deprecated, remove after changing emacs, vscode and intellij. :default-model default-model :default-agent default-agent-name @@ -182,14 +189,14 @@ (metrics/task metrics :eca/chat-prompt-stop (f.chat/prompt-stop params db* messenger metrics))) -(defn chat-delete [{:keys [db* config metrics]} params] +(defn chat-delete [{:keys [db* messenger config metrics]} params] (metrics/task metrics :eca/chat-delete - (f.chat/delete-chat params db* config metrics) + (f.chat/delete-chat params db* messenger config metrics) {})) -(defn chat-clear [{:keys [db* metrics]} params] +(defn chat-clear [{:keys [db* messenger metrics]} params] (metrics/task metrics :eca/chat-clear - (f.chat/clear-chat params db* metrics) + (f.chat/clear-chat params db* messenger metrics) {})) (defn chat-rollback [{:keys [db* metrics messenger]} params] diff --git a/src/eca/messenger.clj b/src/eca/messenger.clj index 46d60e83c..78656386e 100644 --- a/src/eca/messenger.clj +++ b/src/eca/messenger.clj @@ -7,6 +7,8 @@ (defprotocol IMessenger (chat-content-received [this data]) (chat-cleared [this params]) + (chat-status-changed [this params]) + (chat-deleted [this params]) (rewrite-content-received [this data]) (tool-server-updated [this params]) (config-updated [this params]) diff --git a/src/eca/remote/auth.clj b/src/eca/remote/auth.clj new file mode 100644 index 000000000..09a1eaf25 --- /dev/null +++ b/src/eca/remote/auth.clj @@ -0,0 +1,48 @@ +(ns eca.remote.auth + "Bearer token authentication middleware for the remote server." + (:require + [cheshire.core :as json]) + (:import + [java.security MessageDigest SecureRandom])) + +(set! *warn-on-reflection* true) + +(def ^:private alphanumeric "abcdefghijklmnopqrstuvwxyz0123456789") + +(defn generate-token + "Generates a random 5-character alphanumeric password." + [] + (let [random (SecureRandom.) + len (count alphanumeric)] + (apply str (repeatedly 5 #(nth alphanumeric (.nextInt random len)))))) + +(def ^:private unauthorized-response + {:status 401 + :headers {"Content-Type" "application/json; charset=utf-8"} + :body (json/generate-string + {:error {:code "unauthorized" + :message "Missing or invalid Authorization Bearer token"}})}) + +(defn- extract-bearer-token [request] + (when-let [auth-header (get-in request [:headers "authorization"])] + (when (.startsWith ^String auth-header "Bearer ") + (subs auth-header 7)))) + +(defn- constant-time-equals? + "Constant-time comparison to prevent timing side-channel attacks." + [^String a ^String b] + (and a b + (MessageDigest/isEqual (.getBytes a "UTF-8") (.getBytes b "UTF-8")))) + +(defn wrap-bearer-auth + "Ring middleware that validates Bearer token auth. + Exempt paths (like /api/v1/health and GET /) skip auth." + [handler token exempt-paths] + (let [exempt-set (set exempt-paths)] + (fn [request] + (if (contains? exempt-set (:uri request)) + (handler request) + (let [request-token (extract-bearer-token request)] + (if (constant-time-equals? token request-token) + (handler request) + unauthorized-response)))))) diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj new file mode 100644 index 000000000..a5478bb30 --- /dev/null +++ b/src/eca/remote/handlers.clj @@ -0,0 +1,291 @@ +(ns eca.remote.handlers + "REST API handlers for the remote web control server. + Each handler receives components map and Ring request, delegates to + the same feature functions used by JSON-RPC handlers." + (:require + [cheshire.core :as json] + [clojure.core.async :as async] + [eca.config :as config] + [eca.features.chat :as f.chat] + [eca.handlers :as handlers] + [eca.remote.sse :as sse] + [eca.shared :as shared] + [ring.core.protocols :as ring.protocols]) + (:import + [java.io InputStream] + [java.time Instant])) + +(set! *warn-on-reflection* true) + +(defn ^:private parse-body [request] + (when-let [body (:body request)] + (try + (json/parse-string + (if (instance? InputStream body) + (slurp ^InputStream body) + (str body)) + true) + (catch Exception _e nil)))) + +(defn ^:private json-response + ([status body] + {:status status + :headers {"Content-Type" "application/json; charset=utf-8"} + :body (json/generate-string body)}) + ([body] (json-response 200 body))) + +(defn ^:private error-response [status code message] + (json-response status {:error {:code code :message message}})) + +(defn ^:private no-content [] + {:status 204 :headers {} :body nil}) + +(defn ^:private chat-or-404 [db* chat-id] + (get-in @db* [:chats chat-id])) + +(defn ^:private camel-keys [m] + (shared/map->camel-cased-map m)) + +(defn ^:private session-state + "Builds the session state map used for both GET /session and SSE session:connected." + [db config] + (let [last-config (:last-config-notified db) + default-model (or (get-in last-config [:chat :select-model]) + (f.chat/default-model db config)) + default-agent-name (or (get-in last-config [:chat :select-agent]) + (config/validate-agent-name + (or (:defaultAgent (:chat config)) + (:defaultAgent config)) + config)) + variants (or (get-in last-config [:chat :variants]) []) + selected-variant (get-in last-config [:chat :select-variant])] + {:version (config/eca-version) + :protocolVersion "1.0" + :workspaceFolders (mapv #(shared/uri->filename (:uri %)) (:workspace-folders db)) + :models (mapv (fn [[id _]] {:id id :name id :provider (first (shared/full-model->provider+model id))}) + (:models db)) + :agents (mapv (fn [name] {:id name :name name :description (get-in config [:agent name :description])}) + (config/primary-agent-names config)) + :mcpServers (mapv (fn [[name client-info]] + {:name name :status (or (:status client-info) "unknown")}) + (:mcp-clients db)) + :chats (->> (vals (:chats db)) + (remove :subagent) + (mapv (fn [chat] + (camel-keys + {:id (:id chat) + :title (:title chat) + :status (or (:status chat) :idle) + :created-at (:created-at chat) + :updated-at (:updated-at chat)})))) + :startedAt (when-let [ms (:started-at db)] + (.toString (Instant/ofEpochMilli ^long ms))) + :welcomeMessage (handlers/welcome-message db config) + :selectModel default-model + :selectAgent default-agent-name + :variants variants + :trust (boolean (:trust db)) + :selectedVariant selected-variant})) + +(defn handle-root [_components _request {:keys [host password]}] + {:status 302 + :headers {"Location" (str "https://web.eca.dev?host=" host "&pass=" password)} + :body nil}) + +(defn handle-health [_components _request] + (json-response {:status "ok" :version (config/eca-version)})) + +(defn handle-session [{:keys [db*]} _request] + (let [db @db* + config (config/all db)] + (json-response (session-state db config)))) + +(defn handle-list-chats [{:keys [db*]} _request] + (let [chats (->> (vals (:chats @db*)) + (remove :subagent) + (mapv (fn [{:keys [id title status created-at updated-at]}] + {:id id + :title title + :status (or status :idle) + :createdAt created-at + :updatedAt updated-at})))] + (json-response chats))) + +(defn handle-get-chat [{:keys [db*]} _request chat-id] + (if-let [chat (chat-or-404 db* chat-id)] + (json-response + (camel-keys + {:id (:id chat) + :title (:title chat) + :status (or (:status chat) :idle) + :created-at (:created-at chat) + :updated-at (:updated-at chat) + :messages (or (:messages chat) []) + :tool-calls (or (:tool-calls chat) {}) + :task (:task chat)})) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")))) + +(defn handle-prompt [{:keys [db*] :as components} request chat-id] + (let [body (parse-body request)] + (if-not (:message body) + (error-response 400 "invalid_request" "Missing required field: message") + (let [config (config/all @db*) + params (cond-> {:chat-id chat-id + :message (:message body)} + (:model body) (assoc :model (:model body)) + (:agent body) (assoc :agent (:agent body)) + (:variant body) (assoc :variant (:variant body)) + (:trust body) (assoc :trust (:trust body))) + result (handlers/chat-prompt (assoc components :config config) params)] + (json-response (camel-keys result)))))) + +(defn handle-stop [{:keys [db*] :as components} _request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (if-not (identical? :running (get-in @db* [:chats chat-id :status])) + (error-response 409 "chat_wrong_status" "Chat is not running") + (let [config (config/all @db*)] + (handlers/chat-prompt-stop (assoc components :config config) {:chat-id chat-id}) + (no-content))))) + +(defn handle-approve [{:keys [db*] :as components} _request chat-id tool-call-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (if-not (get-in @db* [:chats chat-id :tool-calls tool-call-id]) + (error-response 404 "tool_call_not_found" (str "Tool call " tool-call-id " does not exist")) + (let [config (config/all @db*)] + (handlers/chat-tool-call-approve + (assoc components :config config) + {:chat-id chat-id :tool-call-id tool-call-id}) + (no-content))))) + +(defn handle-reject [{:keys [db*] :as components} _request chat-id tool-call-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (if-not (get-in @db* [:chats chat-id :tool-calls tool-call-id]) + (error-response 404 "tool_call_not_found" (str "Tool call " tool-call-id " does not exist")) + (let [config (config/all @db*)] + (handlers/chat-tool-call-reject + (assoc components :config config) + {:chat-id chat-id :tool-call-id tool-call-id}) + (no-content))))) + +(defn handle-rollback [{:keys [db*] :as components} request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [body (parse-body request) + config (config/all @db*)] + (handlers/chat-rollback + (assoc components :config config) + {:chat-id chat-id :content-id (:contentId body)}) + (no-content)))) + +(defn handle-clear [{:keys [db*] :as components} _request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [config (config/all @db*)] + (handlers/chat-clear + (assoc components :config config) + {:chat-id chat-id :messages true}) + (no-content)))) + +(defn handle-delete-chat [{:keys [db*] :as components} _request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [config (config/all @db*)] + (handlers/chat-delete + (assoc components :config config) + {:chat-id chat-id}) + (no-content)))) + +(defn handle-change-model [{:keys [db*] :as components} request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [body (parse-body request)] + (if-not (:model body) + (error-response 400 "invalid_request" "Missing required field: model") + (let [config (config/all @db*)] + (handlers/chat-selected-model-changed + (assoc components :config config) + {:model (:model body)}) + (no-content)))))) + +(defn handle-change-agent [{:keys [db*] :as components} request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [body (parse-body request)] + (if-not (:agent body) + (error-response 400 "invalid_request" "Missing required field: agent") + (let [config (config/all @db*)] + (handlers/chat-selected-agent-changed + (assoc components :config config) + {:agent (:agent body)}) + (no-content)))))) + +(defn handle-change-variant [{:keys [db*] :as components} request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [body (parse-body request)] + (if-not (:variant body) + (error-response 400 "invalid_request" "Missing required field: variant") + (let [config (config/all @db*) + model (or (get-in @db* [:chats chat-id :model]) + (f.chat/default-model @db* config))] + (handlers/chat-selected-model-changed + (assoc components :config config) + {:model model + :variant (:variant body)}) + (no-content)))))) + +(defn handle-set-trust [{:keys [db*]} request {:keys [sse-connections*]}] + (let [body (parse-body request) + trust (boolean (:trust body))] + (swap! db* assoc :trust trust) + (sse/broadcast! sse-connections* "trust:updated" {:trust trust}) + (json-response {:trust trust}))) + +(defn handle-mcp-start [{:keys [db*] :as components} _request server-name] + (let [config (config/all @db*)] + (handlers/mcp-start-server (assoc components :config config) {:name server-name}) + (no-content))) + +(defn handle-mcp-stop [{:keys [db*] :as components} _request server-name] + (let [config (config/all @db*)] + (handlers/mcp-stop-server (assoc components :config config) {:name server-name}) + (no-content))) + +(defn handle-mcp-connect [{:keys [db*] :as components} _request server-name] + (let [config (config/all @db*)] + (handlers/mcp-connect-server (assoc components :config config) {:name server-name}) + (no-content))) + +(defn handle-mcp-logout [{:keys [db*] :as components} _request server-name] + (let [config (config/all @db*)] + (handlers/mcp-logout-server (assoc components :config config) {:name server-name}) + (no-content))) + +(deftype SSEBody [db* sse-connections*] + ring.protocols/StreamableResponseBody + (write-body-to-stream [_ _response os] + ;; Jetty calls this on its thread with the raw servlet output stream. + ;; We register it as an SSE client and block until the connection closes. + (let [done-ch (async/chan) + client (sse/add-client! sse-connections* os done-ch)] + (try + (let [db @db* + config (config/all db) + state-dump (session-state db config)] + (sse/broadcast! (atom #{client}) "session:connected" state-dump)) + ;; Block this Jetty thread until the writer loop terminates + ;; (client disconnect, write error, or server shutdown) + (async/SSEBody db* sse-connections*)}) diff --git a/src/eca/remote/messenger.clj b/src/eca/remote/messenger.clj new file mode 100644 index 000000000..d3af2067c --- /dev/null +++ b/src/eca/remote/messenger.clj @@ -0,0 +1,49 @@ +(ns eca.remote.messenger + "BroadcastMessenger wraps an inner IMessenger (typically ServerMessenger) + and broadcasts events to all connected SSE clients." + (:require + [eca.messenger :as messenger] + [eca.remote.sse :as sse] + [eca.shared :as shared])) + +(set! *warn-on-reflection* true) + +(defn- ->camel [data] + (shared/map->camel-cased-map data)) + +(defrecord BroadcastMessenger [inner sse-connections*] + messenger/IMessenger + + (chat-content-received [_this data] + (messenger/chat-content-received inner data) + (sse/broadcast! sse-connections* "chat:content-received" (->camel data))) + + (chat-cleared [_this params] + (messenger/chat-cleared inner params) + (sse/broadcast! sse-connections* "chat:cleared" (->camel params))) + + (chat-status-changed [_this params] + (messenger/chat-status-changed inner params) + (sse/broadcast! sse-connections* "chat:status-changed" (->camel params))) + + (chat-deleted [_this params] + (messenger/chat-deleted inner params) + (sse/broadcast! sse-connections* "chat:deleted" (->camel params))) + + (rewrite-content-received [_this data] + (messenger/rewrite-content-received inner data)) + + (tool-server-updated [_this params] + (messenger/tool-server-updated inner params) + (sse/broadcast! sse-connections* "tool:server-updated" (->camel params))) + + (config-updated [_this params] + (messenger/config-updated inner params) + (sse/broadcast! sse-connections* "config:updated" (->camel params))) + + (showMessage [_this msg] + (messenger/showMessage inner msg) + (sse/broadcast! sse-connections* "session:message" (->camel msg))) + + (editor-diagnostics [_this uri] + (messenger/editor-diagnostics inner uri))) diff --git a/src/eca/remote/middleware.clj b/src/eca/remote/middleware.clj new file mode 100644 index 000000000..eabb75731 --- /dev/null +++ b/src/eca/remote/middleware.clj @@ -0,0 +1,38 @@ +(ns eca.remote.middleware + "CORS and JSON middleware for the remote server.") + +(set! *warn-on-reflection* true) + +(def ^:private allowed-origin "https://web.eca.dev") + +(defn- allowed-origin? + "Returns the origin if allowed, nil otherwise. + Allows web.eca.dev and localhost for development." + [origin] + (when origin + (cond + (= origin allowed-origin) origin + (re-matches #"http://localhost(:\d+)?" origin) origin + (re-matches #"http://127\.0\.0\.1(:\d+)?" origin) origin + :else nil))) + +(defn- cors-headers-for + [request] + (let [origin (get-in request [:headers "origin"]) + resolved (or (allowed-origin? origin) allowed-origin)] + {"Access-Control-Allow-Origin" resolved + "Access-Control-Allow-Methods" "GET, POST, DELETE, OPTIONS" + "Access-Control-Allow-Headers" "Content-Type, Authorization"})) + +(defn wrap-cors + "Ring middleware adding CORS headers for web.eca.dev and localhost. + Handles OPTIONS preflight with 204." + [handler] + (fn [request] + (let [headers (cors-headers-for request)] + (if (= :options (:request-method request)) + {:status 204 + :headers headers + :body nil} + (let [response (handler request)] + (update response :headers merge headers)))))) diff --git a/src/eca/remote/routes.clj b/src/eca/remote/routes.clj new file mode 100644 index 000000000..edc9f6066 --- /dev/null +++ b/src/eca/remote/routes.clj @@ -0,0 +1,116 @@ +(ns eca.remote.routes + "Ring route table and middleware composition for the remote server." + (:require + [cheshire.core :as json] + [clojure.string :as string] + [eca.remote.auth :as auth] + [eca.remote.handlers :as handlers] + [eca.remote.middleware :as middleware])) + +(set! *warn-on-reflection* true) + +(defn ^:private path-segments + "Splits a URI path into segments, e.g. /api/v1/chats/abc → [\"api\" \"v1\" \"chats\" \"abc\"]" + [^String uri] + (filterv (complement string/blank?) (string/split uri #"/"))) + +(defn ^:private match-route + "Simple path-based router. Returns [handler-fn & args] or nil." + [components request {:keys [token host* sse-connections*]}] + (let [method (:request-method request) + segments (path-segments (:uri request))] + (case segments + [] (when (= :get method) + [handlers/handle-root components request {:host @host* :password token}]) + + ["api" "v1" "health"] + (when (= :get method) + [handlers/handle-health components request]) + + ["api" "v1" "session"] + (when (= :get method) + [handlers/handle-session components request]) + + ["api" "v1" "chats"] + (when (= :get method) + [handlers/handle-list-chats components request]) + + ["api" "v1" "events"] + (when (= :get method) + [handlers/handle-events components request {:sse-connections* sse-connections*}]) + + ["api" "v1" "trust"] + (when (= :post method) + [handlers/handle-set-trust components request {:sse-connections* sse-connections*}]) + + ;; Dynamic routes with path params + (when (and (>= (count segments) 4) + (= "api" (nth segments 0)) + (= "v1" (nth segments 1))) + (let [resource (nth segments 2)] + (case resource + "chats" + (let [chat-id (nth segments 3)] + (case (count segments) + 4 (case method + :get [handlers/handle-get-chat components request chat-id] + :delete [handlers/handle-delete-chat components request chat-id] + nil) + 5 (let [action (nth segments 4)] + (when (= :post method) + (case action + "prompt" [handlers/handle-prompt components request chat-id] + "stop" [handlers/handle-stop components request chat-id] + "rollback" [handlers/handle-rollback components request chat-id] + "clear" [handlers/handle-clear components request chat-id] + "model" [handlers/handle-change-model components request chat-id] + "agent" [handlers/handle-change-agent components request chat-id] + "variant" [handlers/handle-change-variant components request chat-id] + nil))) + 6 (let [action (nth segments 4) + tcid (nth segments 5)] + (when (= :post method) + (case action + "approve" [handlers/handle-approve components request chat-id tcid] + "reject" [handlers/handle-reject components request chat-id tcid] + nil))) + nil)) + + "mcp" + (when (and (= 5 (count segments)) (= :post method)) + (let [server-name (nth segments 3) + action (nth segments 4)] + (case action + "start" [handlers/handle-mcp-start components request server-name] + "stop" [handlers/handle-mcp-stop components request server-name] + "connect" [handlers/handle-mcp-connect components request server-name] + "logout" [handlers/handle-mcp-logout components request server-name] + nil))) + + nil)))))) + +(defn ^:private not-found-response [] + {:status 404 + :headers {"Content-Type" "application/json; charset=utf-8"} + :body "{\"error\":{\"code\":\"not_found\",\"message\":\"Route not found\"}}"}) + +(defn create-handler + "Creates the Ring handler with middleware composition. + components: the ECA components map {:db* :messenger :metrics :server} + opts: {:token :host* :sse-connections*}" + [components opts] + (let [token (:token opts)] + (-> (fn [request] + (if-let [match (match-route components request opts)] + (let [[handler & args] match] + (try + (apply handler args) + (catch Exception e + {:status 500 + :headers {"Content-Type" "application/json; charset=utf-8"} + :body (json/generate-string + {:error {:code "internal_error" + :message (or (.getMessage e) "Unknown error")}})}))) + (not-found-response))) + (auth/wrap-bearer-auth token ["/" "/api/v1/health"]) + (middleware/wrap-cors)))) diff --git a/src/eca/remote/server.clj b/src/eca/remote/server.clj new file mode 100644 index 000000000..66c603654 --- /dev/null +++ b/src/eca/remote/server.clj @@ -0,0 +1,150 @@ +(ns eca.remote.server + "HTTP server lifecycle for the remote web control server." + (:require + [clojure.core.async :as async] + [eca.config :as config] + [eca.logger :as logger] + [eca.remote.auth :as auth] + [eca.remote.routes :as routes] + [eca.remote.sse :as sse] + [ring.adapter.jetty :as jetty]) + (:import + [java.io IOException] + [java.net BindException Inet4Address InetAddress NetworkInterface] + [org.eclipse.jetty.server NetworkConnector Server])) + +(set! *warn-on-reflection* true) + +(def ^:private logger-tag "[REMOTE]") + +(def ^:private default-port + "Base port for the remote server when no explicit port is configured." + 7777) + +(def ^:private max-port-attempts + "Number of sequential ports to try before giving up." + 20) + +(defn ^:private detect-lan-ip + "Enumerates network interfaces to find a site-local (private) IPv4 address. + Returns the IP string or nil when none is found." + [] + (try + (->> (enumeration-seq (NetworkInterface/getNetworkInterfaces)) + (filter (fn [^NetworkInterface ni] + (and (.isUp ni) + (not (.isLoopback ni))))) + (mapcat (fn [^NetworkInterface ni] + (enumeration-seq (.getInetAddresses ni)))) + (filter (fn [^InetAddress addr] + (and (instance? Inet4Address addr) + (.isSiteLocalAddress addr)))) + (some (fn [^InetAddress addr] (.getHostAddress addr)))) + (catch Exception _ nil))) + +(defn ^:private detect-host + "Auto-detects the LAN IP by scanning network interfaces for a private IPv4 address. + Falls back to InetAddress/getLocalHost, then 127.0.0.1." + [] + (or (detect-lan-ip) + (try + (let [addr (InetAddress/getLocalHost) + host (.getHostAddress addr)] + (when-not (.isLoopbackAddress addr) host)) + (catch Exception _ nil)) + (do (logger/warn logger-tag "Could not detect LAN IP. Consider setting remote.host in config.") + "127.0.0.1"))) + +(defn ^:private try-start-jetty + "Attempts to start Jetty on the given port. Returns the Server on success, + nil when the port is already in use (BindException or Jetty's wrapping IOException)." + ^Server [handler port] + (try + (jetty/run-jetty handler {:port port :host "0.0.0.0" :join? false}) + (catch BindException _ nil) + (catch IOException _ nil))) + +(defn ^:private start-with-retry + "Tries sequential ports starting from base-port up to max-port-attempts. + Returns [server actual-port] on success, nil if all attempts fail." + [handler base-port] + (loop [port base-port + attempts 0] + (when (< attempts max-port-attempts) + (if-let [server (try-start-jetty handler port)] + [server (.getLocalPort ^NetworkConnector (first (.getConnectors ^Server server)))] + (do (logger/debug logger-tag (str "Port " port " in use, trying " (inc port) "...")) + (recur (inc port) (inc attempts))))))) + +(defn start! + "Starts the remote HTTP server if enabled in config. + sse-connections* is the shared SSE connections atom (also used by BroadcastMessenger). + Returns a map with :server :sse-connections* :heartbeat-stop-ch :token :host + or nil if not enabled or port is in use." + [components sse-connections*] + (let [db @(:db* components) + config (config/all db) + remote-config (:remote config)] + (when (:enabled remote-config) + (let [token (or (:password remote-config) (auth/generate-token)) + host-base (or (:host remote-config) (detect-host)) + user-port (:port remote-config) + ;; Use atom so the handler sees host:port after Jetty resolves the actual port + host+port* (atom host-base) + handler (routes/create-handler components + {:token token + :host* host+port* + :sse-connections* sse-connections*})] + (try + (if-let [[^Server jetty-server actual-port] + (if user-port + ;; User-specified port: single attempt, no retry + (if-let [server (try-start-jetty handler user-port)] + [server (.getLocalPort ^NetworkConnector (first (.getConnectors ^Server server)))] + (do (logger/warn logger-tag "Port" user-port "is already in use." + "Remote server will not start.") + nil)) + ;; Default: try sequential ports starting from default-port + (or (start-with-retry handler default-port) + (do (logger/warn logger-tag + (str "Could not bind to ports " default-port "-" + (+ default-port (dec max-port-attempts)) + ". Remote server will not start.")) + nil)))] + (let [host-with-port (str host-base ":" actual-port) + _ (reset! host+port* host-with-port) + heartbeat-ch (sse/start-heartbeat! sse-connections*) + connect-url (str "https://web.eca.dev?host=" + host-with-port + "&pass=" token)] + (logger/info logger-tag (str "🌐 Remote server started on port " actual-port)) + (logger/info logger-tag (str "šŸ”— " connect-url)) + {:server jetty-server + :sse-connections* sse-connections* + :heartbeat-stop-ch heartbeat-ch + :token token + :host host-with-port + :connect-url connect-url}) + nil) + (catch Exception e + (logger/warn logger-tag "Failed to start remote server:" (.getMessage e)) + nil)))))) + +(defn stop! + "Stops the remote HTTP server, broadcasting disconnecting event first." + [{:keys [^Server server sse-connections* heartbeat-stop-ch]}] + (when server + (logger/info logger-tag "Stopping remote server...") + ;; Broadcast disconnecting event + (when sse-connections* + (sse/broadcast! sse-connections* "session:disconnecting" {:reason "shutdown"})) + ;; Stop heartbeat + (when heartbeat-stop-ch + (async/close! heartbeat-stop-ch)) + ;; Close all SSE connections + (when sse-connections* + (sse/close-all! sse-connections*)) + ;; Stop Jetty with 5s timeout + (.setStopTimeout server 5000) + (.stop server) + (logger/info logger-tag "Remote server stopped."))) diff --git a/src/eca/remote/sse.clj b/src/eca/remote/sse.clj new file mode 100644 index 000000000..eb2cd12e6 --- /dev/null +++ b/src/eca/remote/sse.clj @@ -0,0 +1,112 @@ +(ns eca.remote.sse + "SSE connection management for the remote web control server. + Each connected client gets a core.async channel with a dropping buffer. + A writer thread per client reads from the channel and writes SSE-formatted + text to the Ring async response." + (:require + [cheshire.core :as json] + [clojure.core.async :as async] + [eca.logger :as logger]) + (:import + [java.io IOException OutputStream])) + +(set! *warn-on-reflection* true) + +(def ^:private logger-tag "[REMOTE-SSE]") +(def ^:private buffer-size 256) +(def ^:private heartbeat-interval-ms 15000) + +(defn create-connections + "Creates a new SSE connections atom (set of client maps)." + [] + (atom #{})) + +(defn- write-sse! [^OutputStream os ^String data] + (.write os (.getBytes data "UTF-8")) + (.flush os)) + +(defn- format-sse-event [{:keys [event data]}] + (str "event: " event "\n" + "data: " (json/generate-string data) "\n\n")) + +(defn- start-writer-loop! + "Starts a thread that reads events from the client channel and writes + SSE-formatted text to the output stream. Removes the client on error. + Closes done-ch when the loop terminates (so callers can block on it). + Uses async/thread (not go-loop) because write-sse! performs blocking I/O." + [connections* {:keys [ch os done-ch] :as client}] + (async/thread + (try + (loop [] + (when-let [event (async/ {:ch ch :os os} + done-ch (assoc :done-ch done-ch))] + (swap! connections* conj client) + (start-writer-loop! connections* client) + client))) + +(defn remove-client! + "Removes a client from the connections set and closes its channel." + [connections* client] + (swap! connections* disj client) + (async/close! (:ch client))) + +(defn broadcast! + "Puts an event on all connected client channels. + event-type is a string like \"chat:content-received\". + data is the payload map." + [connections* event-type data] + (let [event {:event event-type :data data}] + (doseq [{:keys [ch]} @connections*] + (async/offer! ch event)))) + +(defn start-heartbeat! + "Starts a background thread that sends SSE comment lines to all clients + every 15 seconds. Returns a channel that can be closed to stop the loop. + Uses async/thread (not go-loop) because write-sse! performs blocking I/O." + [connections*] + (let [stop-ch (async/chan)] + (async/thread + (loop [] + (let [[_ port] (async/alts!! [stop-ch (async/timeout heartbeat-interval-ms)])] + (when-not (= port stop-ch) + (let [dead-clients (atom #{})] + (doseq [{:keys [^OutputStream os] :as client} @connections*] + (try + (write-sse! os ":\n\n") + (catch IOException _e + (swap! dead-clients conj client)))) + (when (seq @dead-clients) + (swap! connections* #(reduce disj % @dead-clients)) + (doseq [{:keys [ch]} @dead-clients] + (async/close! ch)))) + (recur))))) + stop-ch)) + +(defn close-all! + "Closes all client channels and clears the connections set." + [connections*] + (doseq [{:keys [ch]} @connections*] + (async/close! ch)) + (reset! connections* #{})) diff --git a/src/eca/server.clj b/src/eca/server.clj index 6b7753b0e..27194624f 100644 --- a/src/eca/server.clj +++ b/src/eca/server.clj @@ -9,6 +9,8 @@ [eca.metrics :as metrics] [eca.nrepl :as nrepl] [eca.opentelemetry :as opentelemetry] + [eca.remote.messenger :as remote.messenger] + [eca.remote.server :as remote.server] [eca.shared :as shared :refer [assoc-some]] [jsonrpc4clj.io-server :as io-server] [jsonrpc4clj.liveness-probe :as liveness-probe] @@ -20,9 +22,13 @@ [_level & args] (apply logger/info args)) +(def ^:private remote-server* (atom nil)) + (defn ^:private exit [server] (metrics/task :eca/exit + (when-let [rs @remote-server*] + (remote.server/stop! rs)) (jsonrpc.server/shutdown server) ;; blocks, waiting up to 10s for previously received messages to be processed (shutdown-agents) (System/exit 0))) @@ -140,6 +146,12 @@ (chat-cleared [_this params] (jsonrpc.server/discarding-stdout (jsonrpc.server/send-notification server "chat/cleared" params))) + (chat-status-changed [_this params] + (jsonrpc.server/discarding-stdout + (jsonrpc.server/send-notification server "chat/statusChanged" params))) + (chat-deleted [_this params] + (jsonrpc.server/discarding-stdout + (jsonrpc.server/send-notification server "chat/deleted" params))) (rewrite-content-received [_this content] (jsonrpc.server/discarding-stdout (jsonrpc.server/send-notification server "rewrite/contentReceived" content))) @@ -162,14 +174,27 @@ (metrics/->NoopMetrics))) (defn start-server! [server] - (let [db* (atom db/initial-db) + (let [db* (atom (assoc db/initial-db :started-at (System/currentTimeMillis))) metrics (->Metrics db*) + stdio-messenger (->ServerMessenger server db*) + ;; Read remote config from file-based sources (global/env/custom). + ;; Workspace-level config is not available yet (initialize hasn't been called). + remote-config (:remote (config/read-file-configs)) + sse-connections* (when (:enabled remote-config) + (atom #{})) + messenger (if sse-connections* + (remote.messenger/->BroadcastMessenger stdio-messenger sse-connections*) + stdio-messenger) components {:db* db* - :messenger (->ServerMessenger server db*) + :messenger messenger :metrics metrics :server server}] (logger/info "[server]" "Starting server...") (metrics/start! metrics) + (when sse-connections* + (when-let [rs (remote.server/start! components sse-connections*)] + (reset! remote-server* rs) + (swap! db* assoc :remote-connect-url (:connect-url rs)))) (monitor-server-logs (:log-ch server)) (setup-dev-environment db* components) (jsonrpc.server/start server components))) diff --git a/test/eca/remote/auth_test.clj b/test/eca/remote/auth_test.clj new file mode 100644 index 000000000..600ed57f2 --- /dev/null +++ b/test/eca/remote/auth_test.clj @@ -0,0 +1,51 @@ +(ns eca.remote.auth-test + (:require + [cheshire.core :as json] + [clojure.test :refer [deftest is testing]] + [eca.remote.auth :as auth])) + +(deftest generate-token-test + (testing "generates a 5-character alphanumeric string" + (let [token (auth/generate-token)] + (is (= 5 (count token))) + (is (re-matches #"[a-z0-9]{5}" token)))) + + (testing "generates unique tokens" + (let [tokens (repeatedly 10 auth/generate-token)] + (is (= 10 (count (set tokens))))))) + +(def ^:private test-token "test-secret-token") + +(defn- test-handler [_request] + {:status 200 :headers {} :body "ok"}) + +(defn- make-request + ([method uri] (make-request method uri nil)) + ([method uri token] + (cond-> {:request-method method + :uri uri + :headers {}} + token (assoc-in [:headers "authorization"] (str "Bearer " token))))) + +(deftest wrap-bearer-auth-test + (let [handler (auth/wrap-bearer-auth test-handler test-token ["/api/v1/health" "/"])] + + (testing "allows requests with valid token" + (let [response (handler (make-request :get "/api/v1/chats" test-token))] + (is (= 200 (:status response))))) + + (testing "rejects requests with missing token" + (let [response (handler (make-request :get "/api/v1/chats"))] + (is (= 401 (:status response))) + (let [body (json/parse-string (:body response) true)] + (is (= "unauthorized" (get-in body [:error :code])))))) + + (testing "rejects requests with wrong token" + (let [response (handler (make-request :get "/api/v1/chats" "wrong-token"))] + (is (= 401 (:status response))))) + + (testing "exempt paths skip auth" + (let [response (handler (make-request :get "/api/v1/health"))] + (is (= 200 (:status response)))) + (let [response (handler (make-request :get "/"))] + (is (= 200 (:status response))))))) diff --git a/test/eca/remote/handlers_test.clj b/test/eca/remote/handlers_test.clj new file mode 100644 index 000000000..4dbee00e5 --- /dev/null +++ b/test/eca/remote/handlers_test.clj @@ -0,0 +1,82 @@ +(ns eca.remote.handlers-test + (:require + [cheshire.core :as json] + [clojure.test :refer [deftest is testing]] + [eca.config :as config] + [eca.remote.handlers :as handlers] + [eca.test-helper :as h])) + +(h/reset-components-before-test) + +(defn- components [] + (let [c (h/components)] + (assoc c :config (config/all @(:db* c))))) + +(deftest handle-health-test + (testing "returns ok with version" + (let [response (handlers/handle-health nil nil) + body (json/parse-string (:body response) true)] + (is (= 200 (:status response))) + (is (= "ok" (:status body))) + (is (string? (:version body)))))) + +(deftest handle-root-test + (testing "redirects to web.eca.dev" + (let [response (handlers/handle-root nil nil {:host "192.168.1.1:7888" :password "abc123"})] + (is (= 302 (:status response))) + (is (= "https://web.eca.dev?host=192.168.1.1:7888&pass=abc123" + (get-in response [:headers "Location"])))))) + +(deftest handle-list-chats-test + (testing "returns empty list when no chats" + (let [response (handlers/handle-list-chats (components) nil) + body (json/parse-string (:body response) true)] + (is (= 200 (:status response))) + (is (= [] body)))) + + (testing "returns chats excluding subagents" + (swap! (h/db*) assoc :chats {"c1" {:id "c1" :title "Test" :status :idle :created-at 123} + "c2" {:id "c2" :title "Sub" :status :running :subagent true}}) + (let [response (handlers/handle-list-chats (components) nil) + body (json/parse-string (:body response) true)] + (is (= 1 (count body))) + (is (= "c1" (:id (first body))))))) + +(deftest handle-get-chat-test + (testing "returns 404 for missing chat" + (let [response (handlers/handle-get-chat (components) nil "nonexistent") + body (json/parse-string (:body response) true)] + (is (= 404 (:status response))) + (is (= "chat_not_found" (get-in body [:error :code]))))) + + (testing "returns chat data for existing chat" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "My Chat" :status :idle :messages []}) + (let [response (handlers/handle-get-chat (components) nil "c1") + body (json/parse-string (:body response) true)] + (is (= 200 (:status response))) + (is (= "c1" (:id body))) + (is (= "My Chat" (:title body)))))) + +(deftest handle-stop-test + (testing "returns 404 for missing chat" + (let [response (handlers/handle-stop (components) nil "nonexistent")] + (is (= 404 (:status response))))) + + (testing "returns 409 for non-running chat" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :status :idle}) + (let [response (handlers/handle-stop (components) nil "c1")] + (is (= 409 (:status response)))))) + +(deftest handle-prompt-test + (testing "returns 400 for missing message" + (let [request {:body (java.io.ByteArrayInputStream. (.getBytes "{}" "UTF-8"))} + response (handlers/handle-prompt (components) request "c1")] + (is (= 400 (:status response)))))) + +(deftest handle-session-test + (testing "returns session info" + (let [response (handlers/handle-session (components) nil) + body (json/parse-string (:body response) true)] + (is (= 200 (:status response))) + (is (string? (:version body))) + (is (= "1.0" (:protocolVersion body)))))) diff --git a/test/eca/remote/messenger_test.clj b/test/eca/remote/messenger_test.clj new file mode 100644 index 000000000..1b6d9e263 --- /dev/null +++ b/test/eca/remote/messenger_test.clj @@ -0,0 +1,62 @@ +(ns eca.remote.messenger-test + (:require + [clojure.test :refer [deftest is testing]] + [eca.messenger :as messenger] + [eca.remote.messenger :as remote.messenger] + [eca.remote.sse :as sse] + [eca.test-helper :as h])) + +(h/reset-components-before-test) + +(deftest broadcast-messenger-delegates-and-broadcasts-test + (let [inner (h/messenger) + sse-connections* (sse/create-connections) + broadcast-messenger (remote.messenger/->BroadcastMessenger inner sse-connections*) + os (java.io.ByteArrayOutputStream.) + _client (sse/add-client! sse-connections* os)] + + (testing "chat-content-received delegates to inner and broadcasts camelCase" + (let [data {:chat-id "c1" :role :assistant :content {:type :text :text "hi"}}] + (messenger/chat-content-received broadcast-messenger data) + (Thread/sleep 100) + (is (seq (:chat-content-received (h/messages)))) + (let [output (.toString os "UTF-8")] + (is (.contains output "chat:content-received")) + (is (.contains output "\"chatId\"") "SSE broadcast should use camelCase keys") + (is (not (.contains output "\"chat-id\"")) "SSE broadcast should not use kebab-case keys")))) + + (testing "chat-status-changed delegates and broadcasts camelCase" + (let [params {:chat-id "c1" :status :running}] + (messenger/chat-status-changed broadcast-messenger params) + (Thread/sleep 100) + (is (seq (:chat-status-changed (h/messages)))) + (let [output (.toString os "UTF-8")] + (is (.contains output "chat:status-changed")) + (is (.contains output "\"chatId\""))))) + + (testing "chat-deleted delegates and broadcasts camelCase" + (let [params {:chat-id "c1"}] + (messenger/chat-deleted broadcast-messenger params) + (Thread/sleep 100) + (is (seq (:chat-deleted (h/messages)))) + (let [output (.toString os "UTF-8")] + (is (.contains output "chat:deleted")) + (is (.contains output "\"chatId\""))))) + + (testing "editor-diagnostics delegates to inner only (no broadcast)" + (let [os2 (java.io.ByteArrayOutputStream.) + _client2 (sse/add-client! sse-connections* os2)] + (messenger/editor-diagnostics broadcast-messenger nil) + (Thread/sleep 100) + (is (not (.contains (.toString os2 "UTF-8") "editor"))))) + + (testing "rewrite-content-received delegates to inner only (no broadcast)" + (let [os3 (java.io.ByteArrayOutputStream.) + _client3 (sse/add-client! sse-connections* os3) + data {:chat-id "c1" :content {:type :text :text "rewritten"}}] + (messenger/rewrite-content-received broadcast-messenger data) + (Thread/sleep 100) + (is (seq (:rewrite-content-received (h/messages)))) + (is (not (.contains (.toString os3 "UTF-8") "rewrite"))))) + + (sse/close-all! sse-connections*))) diff --git a/test/eca/remote/sse_test.clj b/test/eca/remote/sse_test.clj new file mode 100644 index 000000000..e78fb2ca9 --- /dev/null +++ b/test/eca/remote/sse_test.clj @@ -0,0 +1,71 @@ +(ns eca.remote.sse-test + (:require + [clojure.core.async :as async] + [clojure.test :refer [deftest is testing]] + [eca.remote.sse :as sse]) + (:import + [java.io ByteArrayOutputStream])) + +(deftest create-connections-test + (testing "creates an atom with empty set" + (let [conns (sse/create-connections)] + (is (instance? clojure.lang.Atom conns)) + (is (= #{} @conns))))) + +(deftest add-and-remove-client-test + (testing "add-client! adds to connections, remove-client! removes" + (let [conns (sse/create-connections) + os (ByteArrayOutputStream.) + client (sse/add-client! conns os)] + (is (= 1 (count @conns))) + (is (contains? @conns client)) + (sse/remove-client! conns client) + (is (= 0 (count @conns)))))) + +(deftest broadcast-test + (testing "broadcast! writes SSE-formatted event to client" + (let [conns (sse/create-connections) + os (ByteArrayOutputStream.) + client (sse/add-client! conns os)] + (sse/broadcast! conns "chat:status-changed" {:chatId "abc" :status "running"}) + ;; Give writer loop time to process + (Thread/sleep 100) + (let [output (.toString os "UTF-8")] + (is (.contains output "event: chat:status-changed")) + (is (.contains output "data: "))) + (sse/remove-client! conns client)))) + +(deftest broadcast-backpressure-test + (testing "dropping buffer prevents blocking when client is slow" + (let [conns (sse/create-connections) + os (ByteArrayOutputStream.) + client (sse/add-client! conns os)] + ;; Flood the channel — should not block + (dotimes [i 500] + (sse/broadcast! conns "test:event" {:i i})) + ;; Should still be connected (no exception) + (is (= 1 (count @conns))) + (sse/remove-client! conns client)))) + +(deftest close-all-test + (testing "close-all! removes all clients" + (let [conns (sse/create-connections) + os1 (ByteArrayOutputStream.) + os2 (ByteArrayOutputStream.)] + (sse/add-client! conns os1) + (sse/add-client! conns os2) + (is (= 2 (count @conns))) + (sse/close-all! conns) + (is (= 0 (count @conns)))))) + +(deftest heartbeat-test + (testing "heartbeat sends comment lines to clients" + (let [conns (sse/create-connections) + os (ByteArrayOutputStream.) + client (sse/add-client! conns os)] + ;; Start heartbeat with very short interval for testing + ;; We just verify start-heartbeat! returns a channel + (let [stop-ch (sse/start-heartbeat! conns)] + (is (some? stop-ch)) + (async/close! stop-ch)) + (sse/remove-client! conns client)))) diff --git a/test/eca/test_helper.clj b/test/eca/test_helper.clj index e64df4b53..bc1945f11 100644 --- a/test/eca/test_helper.clj +++ b/test/eca/test_helper.clj @@ -30,6 +30,8 @@ messenger/IMessenger (chat-content-received [_ data] (swap! messages* update :chat-content-received (fnil conj []) data)) (chat-cleared [_ params] (swap! messages* update :chat-clear (fnil conj []) params)) + (chat-status-changed [_ params] (swap! messages* update :chat-status-changed (fnil conj []) params)) + (chat-deleted [_ params] (swap! messages* update :chat-deleted (fnil conj []) params)) (rewrite-content-received [_ data] (swap! messages* update :rewrite-content-received (fnil conj []) data)) (config-updated [_ data] (swap! messages* update :config-updated (fnil conj []) data)) (tool-server-updated [_ data] (swap! messages* update :tool-server-update (fnil conj []) data))