From cd4ade9fb5a083c6f483c5e6d1cd660cb8e5e8fc Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Thu, 19 Mar 2026 09:38:06 -0300 Subject: [PATCH 01/13] Remote connection support via HTTP + SSE --- CHANGELOG.md | 1 + docs/config.json | 35 ++ docs/remote-server-plan.md | 493 ++++++++++++++++++++++++++++ src/eca/config.clj | 1 + src/eca/features/chat.clj | 6 +- src/eca/features/chat/lifecycle.clj | 3 +- src/eca/handlers.clj | 4 +- src/eca/messenger.clj | 2 + src/eca/remote/auth.clj | 41 +++ src/eca/remote/handlers.clj | 256 +++++++++++++++ src/eca/remote/messenger.clj | 45 +++ src/eca/remote/middleware.clj | 31 ++ src/eca/remote/routes.clj | 96 ++++++ src/eca/remote/server.clj | 100 ++++++ src/eca/remote/sse.clj | 100 ++++++ src/eca/server.clj | 20 +- test/eca/remote/auth_test.clj | 51 +++ test/eca/remote/handlers_test.clj | 83 +++++ test/eca/remote/messenger_test.clj | 54 +++ test/eca/remote/sse_test.clj | 72 ++++ test/eca/test_helper.clj | 2 + 21 files changed, 1490 insertions(+), 6 deletions(-) create mode 100644 docs/remote-server-plan.md create mode 100644 src/eca/remote/auth.clj create mode 100644 src/eca/remote/handlers.clj create mode 100644 src/eca/remote/messenger.clj create mode 100644 src/eca/remote/middleware.clj create mode 100644 src/eca/remote/routes.clj create mode 100644 src/eca/remote/server.clj create mode 100644 src/eca/remote/sse.clj create mode 100644 test/eca/remote/auth_test.clj create mode 100644 test/eca/remote/handlers_test.clj create mode 100644 test/eca/remote/messenger_test.clj create mode 100644 test/eca/remote/sse_test.clj diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ab212db0..56cbd7a0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Fix MCP server auth being invalidated when only URL query parameters change. +- Add remote web control server for browser-based chat observation and control via `web.eca.dev`. #333 ## 0.115.2 diff --git a/docs/config.json b/docs/config.json index 74e340ce5..705cd6117 100644 --- a/docs/config.json +++ b/docs/config.json @@ -332,6 +332,41 @@ ] } }, + "remote": { + "type": "object", + "description": "Remote web control server configuration. When enabled, starts an embedded HTTP server allowing a web frontend (web.eca.dev) to observe and control chat sessions in real-time.", + "markdownDescription": "Remote web control server configuration. When enabled, starts an embedded HTTP server allowing a web frontend (`web.eca.dev`) to observe and control chat sessions in real-time.", + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean", + "description": "Enables the remote HTTP server.", + "markdownDescription": "Enables the remote HTTP server." + }, + "host": { + "type": "string", + "description": "Host used in the logged URL for web.eca.dev to connect back to. Can be a LAN IP, public IP, domain, or tunnel URL. When unset, auto-detected via InetAddress/getLocalHost.", + "markdownDescription": "Host used in the logged URL for `web.eca.dev` to connect back to. Can be a LAN IP, public IP, domain, or tunnel URL. When unset, auto-detected via `InetAddress/getLocalHost`.", + "examples": [ + "192.168.1.42", + "myserver.example.com" + ] + }, + "port": { + "type": "integer", + "description": "Port the HTTP server listens on. When unset, a random free port is used.", + "markdownDescription": "Port the HTTP server listens on. When unset, a random free port is used.", + "examples": [ + 7888 + ] + }, + "password": { + "type": "string", + "description": "Auth token for Bearer authentication. When unset, a 32-byte hex token is auto-generated and logged to stderr.", + "markdownDescription": "Auth token for Bearer authentication. When unset, a 32-byte hex token is auto-generated and logged to stderr." + } + } + }, "network": { "type": "object", "description": "Network configuration for custom CA certificates and mTLS client certificates. Values support dynamic string interpolation (e.g. '${env:SSL_CERT_FILE}').", diff --git a/docs/remote-server-plan.md b/docs/remote-server-plan.md new file mode 100644 index 000000000..809f4660c --- /dev/null +++ b/docs/remote-server-plan.md @@ -0,0 +1,493 @@ +# Remote Web Control Server — Design Plan + +> GitHub Issue: [#333](https://github.com/editor-code-assistant/eca/issues/333) + +## Overview + +Allow a web frontend (hosted at `web.eca.dev`) to connect to a running ECA session +and observe/control chat sessions in real-time. Each ECA process optionally starts an +embedded HTTP server alongside the existing stdio JSON-RPC server. The web UI connects +via REST for commands and SSE for live updates. + +**Architecture:** Option A — Embedded HTTP server per ECA process. +User opens different browser tabs for different sessions (host:port) and different tabs +for each chat within a session. + +## Config + +```json +{ + "remote": { + "enabled": true, + "host": "myserver.example.com", + "port": 7888, + "password": "my-secret" + } +} +``` + +| Field | Required | Default | Description | +|------------|----------|-----------------------|------------------------------------------------| +| `enabled` | yes | — | Enables the remote HTTP server | +| `host` | no | auto-detected LAN IP | Host used in the logged URL for `web.eca.dev` to connect back to. Can be a LAN IP, public IP, domain, or tunnel URL (e.g. ngrok, tailscale). Not set → auto-detect via `InetAddress/getLocalHost`. | +| `port` | no | random free port | Port the HTTP server listens on | +| `password` | no | auto-generated token | Auth token; auto-generated and logged if unset | + +Server always binds `0.0.0.0` on the configured port. The `host` field only affects the +logged URL — it tells the web frontend where to reach the server, it does not change what +address the server binds to. + +## Authentication Flow + +Token-based auth using `Authorization: Bearer` header on all requests. No cookies — +avoids `Secure`/`SameSite` issues that break non-localhost connections (LAN IPs, remote). + +1. ECA starts, logs to stderr: + ``` + 🌐 Remote server started on port 7888 + 🔗 https://web.eca.dev?host=192.168.1.42:7888&token=a3f8b2... + ``` + The `host` param tells `web.eca.dev` where to connect back to. Resolved as: + config `host` if set → otherwise auto-detected LAN IP via `InetAddress/getLocalHost`. + If `InetAddress/getLocalHost` fails or returns loopback, falls back to `127.0.0.1` + and logs a warning suggesting the user configure `remote.host`. +2. User clicks the URL → arrives at `web.eca.dev` with `host` and `token` in query params. +3. Frontend stores the token in JS memory (or `localStorage` to survive refresh). +4. Frontend strips token from URL via `history.replaceState`. +5. All REST requests include `Authorization: Bearer ` header. +6. SSE stream is consumed via `fetch()` + `ReadableStream` (not `EventSource`, which + doesn't support custom headers). The `Authorization` header is sent on the SSE + request. Wire format is identical SSE (`event:`/`data:`/`\n\n`). + +The auth token is either user-configured (`password` in config) or auto-generated as a +32-byte hex string via `java.security.SecureRandom` (64 characters). Stored in runtime +state only (not persisted to disk). + +**No CSRF protection needed** — CSRF is a cookie-only attack vector. Bearer tokens in +headers are not sent automatically by browsers. + +**Fallback:** Users can also go to `web.eca.dev` directly and manually enter host:port + +token in a connect form. + +## REST API + +All API routes are prefixed with `/api/v1/`. +All JSON keys in requests and responses use **camelCase** (matching the existing JSON-RPC +protocol convention). Internal Clojure kebab-case keywords are converted on +serialization. +All responses use `Content-Type: application/json; charset=utf-8` unless noted otherwise. + +### HTTP Status Codes + +| Status | Meaning | +|--------|-----------------------------------------------------------| +| `200` | Successful GET, or POST that returns data | +| `204` | Successful POST/DELETE with no response body | +| `302` | Redirect (`GET /`) | +| `400` | Malformed request body or missing required fields | +| `401` | Missing or invalid `Authorization: Bearer` token | +| `404` | Chat or tool-call ID not found | +| `409` | Chat in wrong state for operation (e.g. stop on idle) | +| `500` | Internal server error | + +### Health & Redirect + +| Method | Path | Auth | Description | +|--------|------------------|------|------------------------------------------------| +| `GET` | `/` | No | `302` redirect to `web.eca.dev` with host+token params | +| `GET` | `/api/v1/health` | No | `200` — `{"status": "ok", "version": "0.x.y"}` | + +### Session + +| Method | Path | Description | +|--------|--------------------|-----------------------------------------------------| +| `GET` | `/api/v1/session` | Session info (workspaces, models, agents, config) | + +Response: +```json +{ + "version": "0.x.y", + "protocolVersion": "1.0", + "workspaceFolders": ["/home/user/project"], + "models": [{"id": "...", "name": "...", "provider": "..."}], + "agents": [{"id": "...", "name": "...", "description": "..."}], + "mcpServers": [{"name": "...", "status": "running"}] +} +``` + +### Chats + +**List chats:** + +`GET /api/v1/chats` → `200` +```json +[{"id": "uuid", "title": "Fix login bug", "status": "idle", "createdAt": 1234567890}] +``` + +**Get chat:** + +`GET /api/v1/chats/:id` → `200` / `404` +```json +{ + "id": "uuid", + "title": "Fix login bug", + "status": "idle", + "createdAt": 1234567890, + "messages": [{"role": "user", "content": "...", "contentId": "uuid"}, ...], + "toolCalls": {"tc-id": {"name": "read_file", "status": "called", "arguments": "..."}}, + "task": {"nextId": 1, "tasks": [...]} +} +``` + +**Send prompt** (creates chat implicitly if `:id` is new — web UI generates UUID): + +`POST /api/v1/chats/:id/prompt` → `200` / `400` +```json +// Request +{"message": "fix the login bug", "model": "optional-model-id", "agent": "optional-agent-id"} + +// Response +{"chatId": "uuid", "model": "anthropic/claude-sonnet-4-20250514", "status": "running"} +``` + +**Stop generation:** + +`POST /api/v1/chats/:id/stop` → `204` / `404` / `409` + +**Approve tool call:** + +`POST /api/v1/chats/:id/approve/:tcid` → `204` / `404` / `409` + +**Reject tool call:** + +`POST /api/v1/chats/:id/reject/:tcid` → `204` / `404` / `409` + +**Rollback:** + +`POST /api/v1/chats/:id/rollback` → `204` / `404` +```json +// Request +{"contentId": "uuid-of-message-to-rollback-to"} +``` + +**Clear chat:** + +`POST /api/v1/chats/:id/clear` → `204` / `404` + +**Delete chat:** + +`DELETE /api/v1/chats/:id` → `204` / `404` + +**Change model:** + +`POST /api/v1/chats/:id/model` → `204` / `404` / `400` +```json +// Request +{"model": "anthropic/claude-sonnet-4-20250514"} +``` +Maps to `chat/selectedModelChanged` notification handler. + +**Change agent:** + +`POST /api/v1/chats/:id/agent` → `204` / `404` / `400` +```json +// Request +{"agent": "code"} +``` +Maps to `chat/selectedAgentChanged` notification handler. + +**Change variant:** + +`POST /api/v1/chats/:id/variant` → `204` / `404` / `400` +```json +// Request +{"variant": "high"} +``` + +### SSE + +`GET /api/v1/events` — `Content-Type: text/event-stream` + +Optional `?chat=` filter: when provided, `chat:*` events are filtered to only that +chat. `session:*`, `config:*`, and `tool:*` events are always sent regardless of filter. + +## SSE Events + +### Event Types + +``` +event: session:connected ← Full state dump on SSE connect +event: session:disconnecting ← Server shutting down +event: session:message ← showMessage notifications (errors, warnings, info) + +event: chat:content-received ← Text chunks, tool progress, usage (from IMessenger) +event: chat:status-changed ← idle/running/stopping transitions +event: chat:cleared ← Chat history cleared +event: chat:deleted ← Chat removed + +event: config:updated ← Model/agent/config changes +event: tool:server-updated ← MCP server status changes +``` + +### Event Payloads + +**`session:connected`** — Full state dump so the web client can bootstrap: +```json +{ + "version": "0.x.y", + "protocolVersion": "1.0", + "chats": [{"id": "...", "title": "...", "status": "idle", "createdAt": 123}], + "models": [{"id": "...", "name": "...", "provider": "..."}], + "agents": [{"id": "...", "name": "...", "description": "..."}], + "mcpServers": [{"name": "...", "status": "running"}], + "workspaceFolders": ["/home/user/project"] +} +``` + +**`session:disconnecting`** — Server shutting down: +```json +{"reason": "shutdown"} +``` + +**`session:message`** — from `showMessage`: +```json +{"type": "warning", "message": "Rate limit approaching"} +``` + +**`chat:content-received`** — the `data` field is the JSON serialization of the same +params passed to `IMessenger.chat-content-received`, with keys camelCased. Examples: +```json +{"chatId": "abc", "role": "assistant", "content": {"type": "text", "text": "Hello...", "contentId": "uuid"}} +{"chatId": "abc", "role": "system", "content": {"type": "progress", "state": "running", "text": "Thinking..."}} +{"chatId": "abc", "role": "system", "content": {"type": "toolCallPrepare", "toolCallId": "tc1", "name": "eca__read_file"}} +{"chatId": "abc", "role": "system", "content": {"type": "toolCalled", "toolCallId": "tc1", "result": "..."}} +``` + +**`chat:status-changed`**: +```json +{"chatId": "abc", "status": "running"} +``` + +**`chat:cleared`**: +```json +{"chatId": "abc"} +``` + +**`chat:deleted`**: +```json +{"chatId": "abc"} +``` + +**`config:updated`** — partial config fields that changed: +```json +{"models": [...], "agents": [...]} +``` + +**`tool:server-updated`**: +```json +{"name": "github-mcp", "status": "running"} +``` + +### Reliability + +- **Heartbeat:** Server sends `:` comment line every 15 seconds to detect dead clients. +- **Backpressure:** Each SSE client gets a `core.async` channel with a dropping buffer + (buffer size: 256 events). Slow clients drop events rather than blocking the main + messenger path. Clients can recover by re-fetching chat state via REST. +- **Stale cleanup:** Failed writes remove the client from the connection set. + +## CORS + +Required because `web.eca.dev` (different origin) connects to `localhost:`. + +``` +Access-Control-Allow-Origin: https://web.eca.dev +Access-Control-Allow-Methods: GET, POST, DELETE, OPTIONS +Access-Control-Allow-Headers: Content-Type, Authorization +``` + +No `Access-Control-Allow-Credentials` needed — auth is via `Authorization` header, not +cookies. + +## Error Responses + +All errors use a consistent envelope: + +```json +{ + "error": { + "code": "chat_not_found", + "message": "Chat abc-123 does not exist" + } +} +``` + +No raw exceptions or stack traces are returned to the web client. + +### Error Codes + +| Code | HTTP Status | Description | +|-----------------------|-------------|------------------------------------------| +| `unauthorized` | `401` | Missing or invalid Bearer token | +| `invalid_request` | `400` | Malformed JSON or missing required fields| +| `chat_not_found` | `404` | Chat ID doesn't exist in `db*` | +| `tool_call_not_found` | `404` | Tool call ID doesn't exist in chat | +| `chat_wrong_status` | `409` | Chat not in valid state for operation | +| `internal_error` | `500` | Unexpected server error | + +## Key Architecture Decisions + +### BroadcastMessenger + +Wraps the existing `ServerMessenger`, implements `IMessenger`. Every method: +1. Delegates to the inner `ServerMessenger` (sends to editor via stdio). +2. Broadcasts the event to all connected SSE clients (via `core.async` channels). + +**Exceptions (inner-only, no broadcast):** +- `editor-diagnostics` — web client cannot provide editor diagnostics. +- `rewrite-content-received` — editor-only feature (targets editor selections). + +**IMessenger method → SSE event type mapping:** + +| IMessenger method | SSE event type | Broadcast? | +|----------------------------|---------------------------|------------| +| `chat-content-received` | `chat:content-received` | ✅ Yes | +| `chat-cleared` | `chat:cleared` | ✅ Yes | +| `chat-status-changed` ★ | `chat:status-changed` | ✅ Yes | +| `chat-deleted` ★ | `chat:deleted` | ✅ Yes | +| `config-updated` | `config:updated` | ✅ Yes | +| `tool-server-updated` | `tool:server-updated` | ✅ Yes | +| `showMessage` | `session:message` | ✅ Yes | +| `editor-diagnostics` | — | ❌ Inner | +| `rewrite-content-received` | — | ❌ Inner | + +★ = **New `IMessenger` methods.** These state transitions currently happen via direct +`swap!` on `db*` without going through the messenger. Adding them to `IMessenger` gives +`BroadcastMessenger` a clean broadcast path. The `ServerMessenger` (stdio) implementation +of these new methods is a no-op — the editor JSON-RPC protocol has no equivalent +notification for these events. + +**Call sites for new methods:** +- `chat-status-changed` — called from `prompt` (→ `:running`), `finish-chat-prompt!` + (→ `:idle`/`:stopping`), and `promptStop` (→ `:stopping`). +- `chat-deleted` — called from `delete-chat` after removing the chat from `db*`. + +### Handler Reuse + +REST handlers bridge to the same feature functions used by JSON-RPC handlers +(`eca.handlers`, `eca.features.chat`). They receive the same `components` map +`{:db* :messenger :config :metrics}` — no business logic duplication. + +### Port Conflict Handling + +If the configured port is in use, the server logs a warning and continues without the +remote server. The ECA process does not crash. + +### Editor Remains Primary + +The editor (stdio) client is always the primary client. The web UI is a secondary client +that can observe and control chat but cannot fulfill editor-specific requests +(`editor/getDiagnostics`). Both clients can send commands concurrently — for tool +approvals, first response wins (promise `deliver` is idempotent). + +## Implementation Steps + +### Step 1 — Config schema +Add `remote` section to `docs/config.json` with `enabled`, `host`, `port`, and `password` +fields. Parse and validate in config loading. + +### Step 2 — SSE connection management (`eca.remote.sse`) +- Atom holding set of SSE client connections. +- Each client: Ring async response channel + `core.async` channel with dropping buffer. +- `broadcast!` function: puts event on all client channels. +- Writer loop per client: takes from channel, writes SSE-formatted string, removes client on error. +- Heartbeat loop: writes `:` comment to all clients every 15 seconds. + +### Step 3 — BroadcastMessenger (`eca.remote.messenger`) +- Implements `IMessenger`. +- Wraps inner `ServerMessenger`. +- Delegates all methods to inner + broadcasts to SSE. +- `editor-diagnostics` → inner only. + +### Step 4 — Auth middleware (`eca.remote.auth`) +- Ring middleware: reads `Authorization: Bearer ` header, validates against the + configured or auto-generated token. +- Returns `401 Unauthorized` with error envelope if missing or invalid. +- `/api/v1/health` and `GET /` are exempt from auth. + +### Step 5 — CORS middleware (`eca.remote.middleware`) +- Ring middleware adding CORS headers (`Authorization` in allowed headers). +- Handles OPTIONS preflight requests. +- No `Access-Control-Allow-Credentials` needed (Bearer auth, not cookies). + +### Step 6 — REST handlers (`eca.remote.handlers`) +- Read endpoints: pull from `db*` directly. +- Write endpoints: bridge to existing handler functions in `eca.features.chat`. +- Consistent error envelope on all error paths. +- `GET /` redirect to `web.eca.dev`. + +### Step 7 — Routes + HTTP server (`eca.remote.routes`, `eca.remote.server`) +- Ring route table with `/api/v1/` prefix. +- Middleware composition: CORS → Bearer auth → JSON parsing → routes. +- Jetty lifecycle: start on configured port, stop on shutdown. +- Graceful port conflict handling (catch `BindException`, log warning, continue without + remote server). + +### Step 8 — Integration into ECA startup/shutdown (`eca.server`) +- `start-server!`: if remote enabled: + 1. Create SSE connections atom. + 2. Create `BroadcastMessenger` wrapping `ServerMessenger`, passing SSE connections atom + (constructor: `(->BroadcastMessenger inner-messenger sse-connections-atom)`). + 3. Start Jetty on configured port, passing SSE connections atom and `components` to + route handlers. + 4. Log clickable `web.eca.dev` URL to stderr. +- `shutdown`: send `session:disconnecting` SSE event, wait up to 5 seconds for in-flight + responses, close SSE connections, stop Jetty. Running chat generations are not + interrupted — they continue on the stdio side. + +### Step 9 — CHANGELOG +Add feature entry under Unreleased referencing #333. + +### Step 10 — Tests +- `eca.remote.sse-test` — heartbeat, backpressure, stale client cleanup. +- `eca.remote.auth-test` — Bearer token validation, missing/invalid token rejection. +- `eca.remote.handlers-test` — REST endpoints with mock `db*`. +- `eca.remote.messenger-test` — BroadcastMessenger delegation + SSE broadcasting. + +## File Summary + +### Create + +| File | Purpose | +|------|---------| +| `src/eca/remote/server.clj` | HTTP server lifecycle (start/stop Jetty) | +| `src/eca/remote/routes.clj` | Ring route table, middleware composition | +| `src/eca/remote/handlers.clj` | REST API handlers | +| `src/eca/remote/sse.clj` | SSE connection management | +| `src/eca/remote/messenger.clj` | BroadcastMessenger | +| `src/eca/remote/auth.clj` | Bearer token auth middleware | +| `src/eca/remote/middleware.clj` | CORS middleware | +| `test/eca/remote/sse_test.clj` | SSE tests | +| `test/eca/remote/auth_test.clj` | Auth tests | +| `test/eca/remote/handlers_test.clj` | Handler tests | +| `test/eca/remote/messenger_test.clj` | BroadcastMessenger tests | + +### Modify + +| File | Change | +|------|--------| +| `src/eca/messenger.clj` | Add `chat-status-changed` and `chat-deleted` to `IMessenger` protocol | +| `src/eca/server.clj` | Implement new methods in `ServerMessenger` (no-op), start/stop remote server, wrap messenger | +| `src/eca/features/chat.clj` | Call `chat-status-changed` on status transitions | +| `src/eca/features/chat/lifecycle.clj` | Call `chat-status-changed` in `finish-chat-prompt!`, call `chat-deleted` in `delete-chat` | +| `docs/config.json` | Add `remote` config schema | +| `CHANGELOG.md` | Feature entry under Unreleased | + +## Future Improvements (out of scope for v1) + +- Request logging middleware +- Subagent chat filtering in list endpoint +- Source tagging on SSE events (editor vs web origin) +- SSE event batching for high-frequency streaming +- Rate limiting on auth failures +- Configurable CORS allowed origins diff --git a/src/eca/config.clj b/src/eca/config.clj index f4fec883c..07e904673 100644 --- a/src/eca/config.clj +++ b/src/eca/config.clj @@ -196,6 +196,7 @@ :netrcFile nil :autoCompactPercentage 75 :plugins {"eca" {:source "https://github.com/editor-code-assistant/eca-plugins.git"}} + :remote {:enabled false} :env "prod"}) (defn ^:private parse-dynamic-string-values diff --git a/src/eca/features/chat.clj b/src/eca/features/chat.clj index f903d09d8..397303440 100644 --- a/src/eca/features/chat.clj +++ b/src/eca/features/chat.clj @@ -383,7 +383,7 @@ Run preRequest hooks before any heavy lifting. Only :prompt-message supports rewrite, other only allow additionalContext append." [user-messages source-type - {:keys [db* config chat-id provider model full-model agent instructions metrics message] :as chat-ctx}] + {:keys [db* config chat-id provider model full-model agent instructions metrics message messenger] :as chat-ctx}] (when-not full-model (throw (ex-info llm-api/no-available-model-error-msg {}))) (let [original-text (or message (-> user-messages first :content first :text)) @@ -432,6 +432,7 @@ (logger/info logger-tag "Superseding active prompt" {:chat-id chat-id :status (get-in @db* [:chats chat-id :status])})) (swap! db* assoc-in [:chats chat-id :status] :running) + (messenger/chat-status-changed messenger {:chat-id chat-id :status :running}) (swap! db* assoc-in [:chats chat-id :prompt-id] prompt-id) (swap! db* assoc-in [:chats chat-id :model] full-model) (let [chat-ctx (assoc chat-ctx :prompt-id prompt-id) @@ -925,7 +926,7 @@ (lifecycle/finish-chat-prompt! :stopping (dissoc chat-ctx :on-finished-side-effect))))) (defn delete-chat - [{:keys [chat-id]} db* config metrics] + [{:keys [chat-id]} db* messenger config metrics] (when-let [chat (get-in @db* [:chats chat-id])] ;; Trigger chatEnd hook BEFORE deleting (chat still exists in cache) (f.hooks/trigger-if-matches! :chatEnd @@ -938,6 +939,7 @@ config)) ;; Delete chat from memory (swap! db* update :chats dissoc chat-id) + (messenger/chat-deleted messenger {:chat-id chat-id}) ;; Save updated cache (without this chat) (db/update-workspaces-cache! @db* metrics)) diff --git a/src/eca/features/chat/lifecycle.clj b/src/eca/features/chat/lifecycle.clj index 3febeccf8..70f58f8f3 100644 --- a/src/eca/features/chat/lifecycle.clj +++ b/src/eca/features/chat/lifecycle.clj @@ -67,10 +67,11 @@ (name from) content)) -(defn finish-chat-prompt! [status {:keys [message chat-id db* metrics config on-finished-side-effect prompt-id] :as chat-ctx}] +(defn finish-chat-prompt! [status {:keys [message chat-id db* messenger metrics config on-finished-side-effect prompt-id] :as chat-ctx}] (when-not (and prompt-id (not= prompt-id (get-in @db* [:chats chat-id :prompt-id]))) (when-not (get-in @db* [:chats chat-id :auto-compacting?]) (swap! db* assoc-in [:chats chat-id :status] status) + (messenger/chat-status-changed messenger {:chat-id chat-id :status status}) (let [db @db* subagent? (some? (get-in db [:chats chat-id :subagent])) hook-type (if subagent? :subagentPostRequest :postRequest) diff --git a/src/eca/handlers.clj b/src/eca/handlers.clj index 38a744a86..ed81af548 100644 --- a/src/eca/handlers.clj +++ b/src/eca/handlers.clj @@ -182,9 +182,9 @@ (metrics/task metrics :eca/chat-prompt-stop (f.chat/prompt-stop params db* messenger metrics))) -(defn chat-delete [{:keys [db* config metrics]} params] +(defn chat-delete [{:keys [db* messenger config metrics]} params] (metrics/task metrics :eca/chat-delete - (f.chat/delete-chat params db* config metrics) + (f.chat/delete-chat params db* messenger config metrics) {})) (defn chat-clear [{:keys [db* metrics]} params] diff --git a/src/eca/messenger.clj b/src/eca/messenger.clj index 46d60e83c..78656386e 100644 --- a/src/eca/messenger.clj +++ b/src/eca/messenger.clj @@ -7,6 +7,8 @@ (defprotocol IMessenger (chat-content-received [this data]) (chat-cleared [this params]) + (chat-status-changed [this params]) + (chat-deleted [this params]) (rewrite-content-received [this data]) (tool-server-updated [this params]) (config-updated [this params]) diff --git a/src/eca/remote/auth.clj b/src/eca/remote/auth.clj new file mode 100644 index 000000000..47ff968df --- /dev/null +++ b/src/eca/remote/auth.clj @@ -0,0 +1,41 @@ +(ns eca.remote.auth + "Bearer token authentication middleware for the remote server." + (:require + [cheshire.core :as json]) + (:import + [java.security SecureRandom])) + +(set! *warn-on-reflection* true) + +(defn generate-token + "Generates a cryptographically random 32-byte hex token (64 characters)." + [] + (let [random (SecureRandom.) + bytes (byte-array 32)] + (.nextBytes random bytes) + (apply str (map #(format "%02x" (bit-and % 0xff)) bytes)))) + +(def ^:private unauthorized-response + {:status 401 + :headers {"Content-Type" "application/json; charset=utf-8"} + :body (json/generate-string + {:error {:code "unauthorized" + :message "Missing or invalid Authorization Bearer token"}})}) + +(defn- extract-bearer-token [request] + (when-let [auth-header (get-in request [:headers "authorization"])] + (when (.startsWith ^String auth-header "Bearer ") + (subs auth-header 7)))) + +(defn wrap-bearer-auth + "Ring middleware that validates Bearer token auth. + Exempt paths (like /api/v1/health and GET /) skip auth." + [handler token exempt-paths] + (let [exempt-set (set exempt-paths)] + (fn [request] + (if (contains? exempt-set (:uri request)) + (handler request) + (let [request-token (extract-bearer-token request)] + (if (= token request-token) + (handler request) + unauthorized-response)))))) diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj new file mode 100644 index 000000000..6750eea52 --- /dev/null +++ b/src/eca/remote/handlers.clj @@ -0,0 +1,256 @@ +(ns eca.remote.handlers + "REST API handlers for the remote web control server. + Each handler receives components map and Ring request, delegates to + the same feature functions used by JSON-RPC handlers." + (:require + [cheshire.core :as json] + [eca.config :as config] + [eca.features.chat :as f.chat] + [eca.handlers :as handlers] + [eca.messenger :as messenger] + [eca.remote.sse :as sse] + [eca.shared :as shared]) + (:import + [java.io InputStream PipedInputStream PipedOutputStream])) + +(set! *warn-on-reflection* true) + +(defn- parse-body [request] + (when-let [body (:body request)] + (try + (json/parse-string + (if (instance? InputStream body) + (slurp ^InputStream body) + (str body)) + true) + (catch Exception _e nil)))) + +(defn- json-response + ([status body] + {:status status + :headers {"Content-Type" "application/json; charset=utf-8"} + :body (json/generate-string body)}) + ([body] (json-response 200 body))) + +(defn- error-response [status code message] + (json-response status {:error {:code code :message message}})) + +(defn- no-content [] + {:status 204 :headers {} :body nil}) + +(defn- chat-or-404 [db* chat-id] + (get-in @db* [:chats chat-id])) + +(defn- camel-keys [m] + (shared/map->camel-cased-map m)) + +;; --- Health & Redirect --- + +(defn handle-root [_components _request {:keys [host token]}] + {:status 302 + :headers {"Location" (str "https://web.eca.dev?host=" host "&token=" token)} + :body nil}) + +(defn handle-health [_components _request] + (json-response {:status "ok" :version (config/eca-version)})) + +;; --- Session --- + +(defn handle-session [{:keys [db*]} _request] + (let [db @db* + config (config/all db)] + (json-response + {:version (config/eca-version) + :protocolVersion "1.0" + :workspaceFolders (mapv #(shared/uri->filename (:uri %)) (:workspace-folders db)) + :models (mapv (fn [[id _]] {:id id :name id :provider (first (shared/full-model->provider+model id))}) + (:models db)) + :agents (mapv (fn [name] {:id name :name name :description (get-in config [:agent name :description])}) + (config/primary-agent-names config)) + :mcpServers (mapv (fn [[name client]] + {:name name :status (or (:status client) "unknown")}) + (:mcp-clients db))}))) + +;; --- Chats --- + +(defn handle-list-chats [{:keys [db*]} _request] + (let [chats (->> (vals (:chats @db*)) + (remove :subagent) + (mapv (fn [{:keys [id title status created-at]}] + {:id id + :title title + :status (or status :idle) + :createdAt created-at})))] + (json-response chats))) + +(defn handle-get-chat [{:keys [db*]} _request chat-id] + (if-let [chat (chat-or-404 db* chat-id)] + (json-response + (camel-keys + {:id (:id chat) + :title (:title chat) + :status (or (:status chat) :idle) + :created-at (:created-at chat) + :messages (or (:messages chat) []) + :tool-calls (or (:tool-calls chat) {}) + :task (:task chat)})) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")))) + +;; --- Chat Actions --- + +(defn handle-prompt [{:keys [db* messenger metrics] :as components} request chat-id] + (let [body (parse-body request)] + (if-not (:message body) + (error-response 400 "invalid_request" "Missing required field: message") + (let [config (config/all @db*) + params (shared/map->camel-cased-map + (cond-> {:chat-id chat-id + :message (:message body)} + (:model body) (assoc :model (:model body)) + (:agent body) (assoc :agent (:agent body)) + (:variant body) (assoc :variant (:variant body)))) + result (handlers/chat-prompt (assoc components :config config) params)] + (json-response (camel-keys result)))))) + +(defn handle-stop [{:keys [db* messenger metrics] :as components} _request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (if-not (identical? :running (get-in @db* [:chats chat-id :status])) + (error-response 409 "chat_wrong_status" "Chat is not running") + (let [config (config/all @db*)] + (handlers/chat-prompt-stop (assoc components :config config) {:chat-id chat-id}) + (no-content))))) + +(defn handle-approve [{:keys [db*] :as components} _request chat-id tool-call-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (if-not (get-in @db* [:chats chat-id :tool-calls tool-call-id]) + (error-response 404 "tool_call_not_found" (str "Tool call " tool-call-id " does not exist")) + (let [config (config/all @db*)] + (handlers/chat-tool-call-approve + (assoc components :config config) + {:chat-id chat-id :tool-call-id tool-call-id}) + (no-content))))) + +(defn handle-reject [{:keys [db*] :as components} _request chat-id tool-call-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (if-not (get-in @db* [:chats chat-id :tool-calls tool-call-id]) + (error-response 404 "tool_call_not_found" (str "Tool call " tool-call-id " does not exist")) + (let [config (config/all @db*)] + (handlers/chat-tool-call-reject + (assoc components :config config) + {:chat-id chat-id :tool-call-id tool-call-id}) + (no-content))))) + +(defn handle-rollback [{:keys [db*] :as components} request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [body (parse-body request) + config (config/all @db*)] + (handlers/chat-rollback + (assoc components :config config) + {:chat-id chat-id :content-id (:contentId body)}) + (no-content)))) + +(defn handle-clear [{:keys [db*] :as components} _request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [config (config/all @db*)] + (handlers/chat-clear + (assoc components :config config) + {:chat-id chat-id :messages true}) + (messenger/chat-cleared (:messenger components) {:chat-id chat-id :messages true}) + (no-content)))) + +(defn handle-delete-chat [{:keys [db*] :as components} _request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [config (config/all @db*)] + (handlers/chat-delete + (assoc components :config config) + {:chat-id chat-id}) + (no-content)))) + +(defn handle-change-model [{:keys [db*] :as components} request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [body (parse-body request)] + (if-not (:model body) + (error-response 400 "invalid_request" "Missing required field: model") + (let [config (config/all @db*)] + (handlers/chat-selected-model-changed + (assoc components :config config) + {:model (:model body)}) + (no-content)))))) + +(defn handle-change-agent [{:keys [db*] :as components} request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [body (parse-body request)] + (if-not (:agent body) + (error-response 400 "invalid_request" "Missing required field: agent") + (let [config (config/all @db*)] + (handlers/chat-selected-agent-changed + (assoc components :config config) + {:agent (:agent body)}) + (no-content)))))) + +(defn handle-change-variant [{:keys [db*] :as components} request chat-id] + (if-not (chat-or-404 db* chat-id) + (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) + (let [body (parse-body request)] + (if-not (:variant body) + (error-response 400 "invalid_request" "Missing required field: variant") + (let [config (config/all @db*)] + (handlers/chat-selected-model-changed + (assoc components :config config) + {:model (get-in @db* [:chats chat-id :model]) + :variant (:variant body)}) + (no-content)))))) + +;; --- SSE Events --- + +(defn handle-events [{:keys [db*]} _request {:keys [sse-connections*]}] + (let [pipe-out (PipedOutputStream.) + pipe-in (PipedInputStream. pipe-out)] + ;; Start the SSE writer on a separate thread + (future + (try + (let [client (sse/add-client! sse-connections* pipe-out) + db @db* + config (config/all db) + state-dump {:version (config/eca-version) + :protocolVersion "1.0" + :chats (->> (vals (:chats db)) + (remove :subagent) + (mapv (fn [{:keys [id title status created-at]}] + {:id id + :title title + :status (or status :idle) + :createdAt created-at}))) + :models (mapv (fn [[id _]] {:id id :name id}) + (:models db)) + :agents (mapv (fn [name] {:id name :name name}) + (config/primary-agent-names config)) + :mcpServers (mapv (fn [[name client-info]] + {:name name :status (or (:status client-info) "unknown")}) + (:mcp-clients db)) + :workspaceFolders (mapv #(shared/uri->filename (:uri %)) + (:workspace-folders db))}] + ;; Send initial state dump + (sse/broadcast! (atom #{client}) "session:connected" state-dump) + ;; Block to keep the connection open — writer loop handles events + (try + (while true + (Thread/sleep 60000)) + (catch InterruptedException _e nil) + (catch Exception _e nil)) + (sse/remove-client! sse-connections* client)) + (catch Exception _e nil))) + {:status 200 + :headers {"Content-Type" "text/event-stream" + "Cache-Control" "no-cache" + "Connection" "keep-alive" + "X-Accel-Buffering" "no"} + :body pipe-in})) diff --git a/src/eca/remote/messenger.clj b/src/eca/remote/messenger.clj new file mode 100644 index 000000000..ae99c6625 --- /dev/null +++ b/src/eca/remote/messenger.clj @@ -0,0 +1,45 @@ +(ns eca.remote.messenger + "BroadcastMessenger wraps an inner IMessenger (typically ServerMessenger) + and broadcasts events to all connected SSE clients." + (:require + [eca.messenger :as messenger] + [eca.remote.sse :as sse])) + +(set! *warn-on-reflection* true) + +(defrecord BroadcastMessenger [inner sse-connections*] + messenger/IMessenger + + (chat-content-received [_this data] + (messenger/chat-content-received inner data) + (sse/broadcast! sse-connections* "chat:content-received" data)) + + (chat-cleared [_this params] + (messenger/chat-cleared inner params) + (sse/broadcast! sse-connections* "chat:cleared" params)) + + (chat-status-changed [_this params] + (messenger/chat-status-changed inner params) + (sse/broadcast! sse-connections* "chat:status-changed" params)) + + (chat-deleted [_this params] + (messenger/chat-deleted inner params) + (sse/broadcast! sse-connections* "chat:deleted" params)) + + (rewrite-content-received [_this data] + (messenger/rewrite-content-received inner data)) + + (tool-server-updated [_this params] + (messenger/tool-server-updated inner params) + (sse/broadcast! sse-connections* "tool:server-updated" params)) + + (config-updated [_this params] + (messenger/config-updated inner params) + (sse/broadcast! sse-connections* "config:updated" params)) + + (showMessage [_this msg] + (messenger/showMessage inner msg) + (sse/broadcast! sse-connections* "session:message" msg)) + + (editor-diagnostics [_this uri] + (messenger/editor-diagnostics inner uri))) diff --git a/src/eca/remote/middleware.clj b/src/eca/remote/middleware.clj new file mode 100644 index 000000000..b725b214e --- /dev/null +++ b/src/eca/remote/middleware.clj @@ -0,0 +1,31 @@ +(ns eca.remote.middleware + "CORS and JSON middleware for the remote server.") + +(set! *warn-on-reflection* true) + +(def ^:private cors-headers + {"Access-Control-Allow-Origin" "https://web.eca.dev" + "Access-Control-Allow-Methods" "GET, POST, DELETE, OPTIONS" + "Access-Control-Allow-Headers" "Content-Type, Authorization"}) + +(defn wrap-cors + "Ring middleware adding CORS headers for web.eca.dev. + Handles OPTIONS preflight with 204." + [handler] + (fn [request] + (if (= :options (:request-method request)) + {:status 204 + :headers cors-headers + :body nil} + (let [response (handler request)] + (update response :headers merge cors-headers))))) + +(defn wrap-json-content-type + "Ring middleware that sets default JSON content-type on responses + that don't already have one." + [handler] + (fn [request] + (let [response (handler request)] + (if (get-in response [:headers "Content-Type"]) + response + (assoc-in response [:headers "Content-Type"] "application/json; charset=utf-8"))))) diff --git a/src/eca/remote/routes.clj b/src/eca/remote/routes.clj new file mode 100644 index 000000000..90cbcc682 --- /dev/null +++ b/src/eca/remote/routes.clj @@ -0,0 +1,96 @@ +(ns eca.remote.routes + "Ring route table and middleware composition for the remote server." + (:require + [clojure.string :as string] + [eca.remote.auth :as auth] + [eca.remote.handlers :as handlers] + [eca.remote.middleware :as middleware])) + +(set! *warn-on-reflection* true) + +(defn- path-segments + "Splits a URI path into segments, e.g. /api/v1/chats/abc → [\"api\" \"v1\" \"chats\" \"abc\"]" + [^String uri] + (filterv (complement string/blank?) (string/split uri #"/"))) + +(defn- match-route + "Simple path-based router. Returns [handler-fn & args] or nil." + [components request {:keys [token host sse-connections*]}] + (let [method (:request-method request) + segments (path-segments (:uri request))] + (case segments + [] (when (= :get method) + [handlers/handle-root components request {:host host :token token}]) + + ["api" "v1" "health"] + (when (= :get method) + [handlers/handle-health components request]) + + ["api" "v1" "session"] + (when (= :get method) + [handlers/handle-session components request]) + + ["api" "v1" "chats"] + (when (= :get method) + [handlers/handle-list-chats components request]) + + ["api" "v1" "events"] + (when (= :get method) + [handlers/handle-events components request {:sse-connections* sse-connections*}]) + + ;; Dynamic routes with path params + (when (and (>= (count segments) 4) + (= "api" (nth segments 0)) + (= "v1" (nth segments 1)) + (= "chats" (nth segments 2))) + (let [chat-id (nth segments 3)] + (case (count segments) + 4 (case method + :get [handlers/handle-get-chat components request chat-id] + :delete [handlers/handle-delete-chat components request chat-id] + nil) + 5 (let [action (nth segments 4)] + (when (= :post method) + (case action + "prompt" [handlers/handle-prompt components request chat-id] + "stop" [handlers/handle-stop components request chat-id] + "rollback" [handlers/handle-rollback components request chat-id] + "clear" [handlers/handle-clear components request chat-id] + "model" [handlers/handle-change-model components request chat-id] + "agent" [handlers/handle-change-agent components request chat-id] + "variant" [handlers/handle-change-variant components request chat-id] + nil))) + 6 (let [action (nth segments 4) + tcid (nth segments 5)] + (when (= :post method) + (case action + "approve" [handlers/handle-approve components request chat-id tcid] + "reject" [handlers/handle-reject components request chat-id tcid] + nil))) + nil)))))) + +(defn- not-found-response [] + {:status 404 + :headers {"Content-Type" "application/json; charset=utf-8"} + :body "{\"error\":{\"code\":\"not_found\",\"message\":\"Route not found\"}}"}) + +(defn create-handler + "Creates the Ring handler with middleware composition. + components: the ECA components map {:db* :messenger :metrics :server} + opts: {:token :host :sse-connections*}" + [components opts] + (let [token (:token opts)] + (-> (fn [request] + (if-let [match (match-route components request opts)] + (let [[handler & args] match] + (try + (apply handler args) + (catch Exception e + {:status 500 + :headers {"Content-Type" "application/json; charset=utf-8"} + :body (str "{\"error\":{\"code\":\"internal_error\"," + "\"message\":\"" (.getMessage e) "\"}}")}))) + (not-found-response))) + (middleware/wrap-json-content-type) + (auth/wrap-bearer-auth token ["/" "/api/v1/health"]) + (middleware/wrap-cors)))) diff --git a/src/eca/remote/server.clj b/src/eca/remote/server.clj new file mode 100644 index 000000000..2d6b8cd37 --- /dev/null +++ b/src/eca/remote/server.clj @@ -0,0 +1,100 @@ +(ns eca.remote.server + "HTTP server lifecycle for the remote web control server." + (:require + [eca.config :as config] + [eca.logger :as logger] + [eca.remote.auth :as auth] + [eca.remote.routes :as routes] + [eca.remote.sse :as sse] + [ring.adapter.jetty :as jetty]) + (:import + [java.net BindException InetAddress] + [org.eclipse.jetty.server NetworkConnector Server])) + +(set! *warn-on-reflection* true) + +(def ^:private logger-tag "[REMOTE]") + +(defn- detect-host + "Auto-detects the LAN IP via InetAddress/getLocalHost. + Falls back to 127.0.0.1 if detection fails or returns loopback." + [] + (try + (let [addr (InetAddress/getLocalHost) + host (.getHostAddress addr)] + (if (.isLoopbackAddress addr) + (do (logger/warn logger-tag "Auto-detected loopback address. Consider setting remote.host in config.") + "127.0.0.1") + host)) + (catch Exception e + (logger/warn logger-tag "Failed to detect LAN IP:" (.getMessage e) + "Consider setting remote.host in config.") + "127.0.0.1"))) + +(defn- resolve-port + "Returns the configured port, or 0 for auto-assignment." + [remote-config] + (or (:port remote-config) 0)) + +(defn start! + "Starts the remote HTTP server if enabled in config. + Returns a map with :server :sse-connections* :heartbeat-stop-ch :token :host + or nil if not enabled or port is in use." + [components] + (let [db @(:db* components) + config (config/all db) + remote-config (:remote config)] + (when (:enabled remote-config) + (let [sse-connections* (sse/create-connections) + token (or (:password remote-config) (auth/generate-token)) + host (or (:host remote-config) (detect-host)) + port (resolve-port remote-config) + handler (routes/create-handler components + {:token token + :host host + :sse-connections* sse-connections*})] + (try + (let [jetty-server ^Server (jetty/run-jetty handler + {:port port + :host "0.0.0.0" + :join? false}) + actual-port (.getLocalPort ^NetworkConnector (first (.getConnectors jetty-server))) + heartbeat-ch (sse/start-heartbeat! sse-connections*) + connect-url (str "https://web.eca.dev?host=" + host ":" actual-port + "&token=" token)] + (logger/info logger-tag (str "🌐 Remote server started on port " actual-port)) + (logger/info logger-tag (str "🔗 " connect-url)) + {:server jetty-server + :sse-connections* sse-connections* + :heartbeat-stop-ch heartbeat-ch + :token token + :host (str host ":" actual-port)}) + (catch BindException e + (logger/warn logger-tag "Port" port "is already in use:" (.getMessage e) + "Remote server will not start.") + nil) + (catch Exception e + (logger/warn logger-tag "Failed to start remote server:" (.getMessage e)) + nil)))))) + +(defn stop! + "Stops the remote HTTP server, broadcasting disconnecting event first." + [{:keys [^Server server sse-connections* heartbeat-stop-ch]}] + (when server + (logger/info logger-tag "Stopping remote server...") + ;; Broadcast disconnecting event + (when sse-connections* + (sse/broadcast! sse-connections* "session:disconnecting" {:reason "shutdown"})) + ;; Stop heartbeat + (when heartbeat-stop-ch + (clojure.core.async/close! heartbeat-stop-ch)) + ;; Give in-flight responses a moment + (Thread/sleep 1000) + ;; Close all SSE connections + (when sse-connections* + (sse/close-all! sse-connections*)) + ;; Stop Jetty with 5s timeout + (.setStopTimeout server 5000) + (.stop server) + (logger/info logger-tag "Remote server stopped."))) diff --git a/src/eca/remote/sse.clj b/src/eca/remote/sse.clj new file mode 100644 index 000000000..1d1b9eed5 --- /dev/null +++ b/src/eca/remote/sse.clj @@ -0,0 +1,100 @@ +(ns eca.remote.sse + "SSE connection management for the remote web control server. + Each connected client gets a core.async channel with a dropping buffer. + A writer loop per client reads from the channel and writes SSE-formatted + text to the Ring async response." + (:require + [cheshire.core :as json] + [clojure.core.async :as async] + [eca.logger :as logger]) + (:import + [java.io OutputStream])) + +(set! *warn-on-reflection* true) + +(def ^:private logger-tag "[REMOTE-SSE]") +(def ^:private buffer-size 256) +(def ^:private heartbeat-interval-ms 15000) + +(defn create-connections + "Creates a new SSE connections atom (set of client maps)." + [] + (atom #{})) + +(defn- write-sse! [^OutputStream os ^String data] + (.write os (.getBytes data "UTF-8")) + (.flush os)) + +(defn- format-sse-event [{:keys [event data]}] + (str "event: " event "\n" + "data: " (json/generate-string data) "\n\n")) + +(defn- start-writer-loop! + "Starts a go-loop that reads events from the client channel and writes + SSE-formatted text to the output stream. Removes the client on error." + [connections* {:keys [ch os] :as client}] + (async/go-loop [] + (when-let [event (async/Metrics db*) + stdio-messenger (->ServerMessenger server db*) + remote-config (:remote (config/read-file-configs)) + sse-connections* (when (:enabled remote-config) + (atom #{})) + messenger (if sse-connections* + (remote.messenger/->BroadcastMessenger stdio-messenger sse-connections*) + stdio-messenger) components {:db* db* - :messenger (->ServerMessenger server db*) + :messenger messenger :metrics metrics :server server}] (logger/info "[server]" "Starting server...") (metrics/start! metrics) + (when sse-connections* + (when-let [rs (remote.server/start! components)] + (reset! remote-server* rs))) (monitor-server-logs (:log-ch server)) (setup-dev-environment db* components) (jsonrpc.server/start server components))) diff --git a/test/eca/remote/auth_test.clj b/test/eca/remote/auth_test.clj new file mode 100644 index 000000000..e22a4c634 --- /dev/null +++ b/test/eca/remote/auth_test.clj @@ -0,0 +1,51 @@ +(ns eca.remote.auth-test + (:require + [cheshire.core :as json] + [clojure.test :refer [deftest is testing]] + [eca.remote.auth :as auth])) + +(deftest generate-token-test + (testing "generates a 64-character hex string" + (let [token (auth/generate-token)] + (is (= 64 (count token))) + (is (re-matches #"[0-9a-f]{64}" token)))) + + (testing "generates unique tokens" + (let [tokens (repeatedly 10 auth/generate-token)] + (is (= 10 (count (set tokens))))))) + +(def ^:private test-token "test-secret-token") + +(defn- test-handler [_request] + {:status 200 :headers {} :body "ok"}) + +(defn- make-request + ([method uri] (make-request method uri nil)) + ([method uri token] + (cond-> {:request-method method + :uri uri + :headers {}} + token (assoc-in [:headers "authorization"] (str "Bearer " token))))) + +(deftest wrap-bearer-auth-test + (let [handler (auth/wrap-bearer-auth test-handler test-token ["/api/v1/health" "/"])] + + (testing "allows requests with valid token" + (let [response (handler (make-request :get "/api/v1/chats" test-token))] + (is (= 200 (:status response))))) + + (testing "rejects requests with missing token" + (let [response (handler (make-request :get "/api/v1/chats"))] + (is (= 401 (:status response))) + (let [body (json/parse-string (:body response) true)] + (is (= "unauthorized" (get-in body [:error :code])))))) + + (testing "rejects requests with wrong token" + (let [response (handler (make-request :get "/api/v1/chats" "wrong-token"))] + (is (= 401 (:status response))))) + + (testing "exempt paths skip auth" + (let [response (handler (make-request :get "/api/v1/health"))] + (is (= 200 (:status response)))) + (let [response (handler (make-request :get "/"))] + (is (= 200 (:status response))))))) diff --git a/test/eca/remote/handlers_test.clj b/test/eca/remote/handlers_test.clj new file mode 100644 index 000000000..bf5a84c46 --- /dev/null +++ b/test/eca/remote/handlers_test.clj @@ -0,0 +1,83 @@ +(ns eca.remote.handlers-test + (:require + [cheshire.core :as json] + [clojure.test :refer [deftest is testing use-fixtures]] + [eca.config :as config] + [eca.db :as db] + [eca.remote.handlers :as handlers] + [eca.test-helper :as h])) + +(h/reset-components-before-test) + +(defn- components [] + (let [c (h/components)] + (assoc c :config (config/all @(:db* c))))) + +(deftest handle-health-test + (testing "returns ok with version" + (let [response (handlers/handle-health nil nil) + body (json/parse-string (:body response) true)] + (is (= 200 (:status response))) + (is (= "ok" (:status body))) + (is (string? (:version body)))))) + +(deftest handle-root-test + (testing "redirects to web.eca.dev" + (let [response (handlers/handle-root nil nil {:host "192.168.1.1:7888" :token "abc123"})] + (is (= 302 (:status response))) + (is (= "https://web.eca.dev?host=192.168.1.1:7888&token=abc123" + (get-in response [:headers "Location"])))))) + +(deftest handle-list-chats-test + (testing "returns empty list when no chats" + (let [response (handlers/handle-list-chats (components) nil) + body (json/parse-string (:body response) true)] + (is (= 200 (:status response))) + (is (= [] body)))) + + (testing "returns chats excluding subagents" + (swap! (h/db*) assoc :chats {"c1" {:id "c1" :title "Test" :status :idle :created-at 123} + "c2" {:id "c2" :title "Sub" :status :running :subagent true}}) + (let [response (handlers/handle-list-chats (components) nil) + body (json/parse-string (:body response) true)] + (is (= 1 (count body))) + (is (= "c1" (:id (first body))))))) + +(deftest handle-get-chat-test + (testing "returns 404 for missing chat" + (let [response (handlers/handle-get-chat (components) nil "nonexistent") + body (json/parse-string (:body response) true)] + (is (= 404 (:status response))) + (is (= "chat_not_found" (get-in body [:error :code]))))) + + (testing "returns chat data for existing chat" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :title "My Chat" :status :idle :messages []}) + (let [response (handlers/handle-get-chat (components) nil "c1") + body (json/parse-string (:body response) true)] + (is (= 200 (:status response))) + (is (= "c1" (:id body))) + (is (= "My Chat" (:title body)))))) + +(deftest handle-stop-test + (testing "returns 404 for missing chat" + (let [response (handlers/handle-stop (components) nil "nonexistent")] + (is (= 404 (:status response))))) + + (testing "returns 409 for non-running chat" + (swap! (h/db*) assoc-in [:chats "c1"] {:id "c1" :status :idle}) + (let [response (handlers/handle-stop (components) nil "c1")] + (is (= 409 (:status response)))))) + +(deftest handle-prompt-test + (testing "returns 400 for missing message" + (let [request {:body (java.io.ByteArrayInputStream. (.getBytes "{}" "UTF-8"))} + response (handlers/handle-prompt (components) request "c1")] + (is (= 400 (:status response)))))) + +(deftest handle-session-test + (testing "returns session info" + (let [response (handlers/handle-session (components) nil) + body (json/parse-string (:body response) true)] + (is (= 200 (:status response))) + (is (string? (:version body))) + (is (= "1.0" (:protocolVersion body)))))) diff --git a/test/eca/remote/messenger_test.clj b/test/eca/remote/messenger_test.clj new file mode 100644 index 000000000..a35538435 --- /dev/null +++ b/test/eca/remote/messenger_test.clj @@ -0,0 +1,54 @@ +(ns eca.remote.messenger-test + (:require + [clojure.test :refer [deftest is testing use-fixtures]] + [eca.remote.messenger :as remote.messenger] + [eca.remote.sse :as sse] + [eca.test-helper :as h])) + +(h/reset-components-before-test) + +(deftest broadcast-messenger-delegates-and-broadcasts-test + (let [inner (h/messenger) + sse-connections* (sse/create-connections) + broadcast-messenger (remote.messenger/->BroadcastMessenger inner sse-connections*) + os (java.io.ByteArrayOutputStream.) + _client (sse/add-client! sse-connections* os)] + + (testing "chat-content-received delegates to inner and broadcasts" + (let [data {:chat-id "c1" :role :assistant :content {:type :text :text "hi"}}] + (eca.messenger/chat-content-received broadcast-messenger data) + (Thread/sleep 100) + (is (seq (:chat-content-received (h/messages)))) + (is (.contains (.toString os "UTF-8") "chat:content-received")))) + + (testing "chat-status-changed delegates and broadcasts" + (let [params {:chat-id "c1" :status :running}] + (eca.messenger/chat-status-changed broadcast-messenger params) + (Thread/sleep 100) + (is (seq (:chat-status-changed (h/messages)))) + (is (.contains (.toString os "UTF-8") "chat:status-changed")))) + + (testing "chat-deleted delegates and broadcasts" + (let [params {:chat-id "c1"}] + (eca.messenger/chat-deleted broadcast-messenger params) + (Thread/sleep 100) + (is (seq (:chat-deleted (h/messages)))) + (is (.contains (.toString os "UTF-8") "chat:deleted")))) + + (testing "editor-diagnostics delegates to inner only (no broadcast)" + (let [os2 (java.io.ByteArrayOutputStream.) + _client2 (sse/add-client! sse-connections* os2)] + (eca.messenger/editor-diagnostics broadcast-messenger nil) + (Thread/sleep 100) + (is (not (.contains (.toString os2 "UTF-8") "editor"))))) + + (testing "rewrite-content-received delegates to inner only (no broadcast)" + (let [os3 (java.io.ByteArrayOutputStream.) + _client3 (sse/add-client! sse-connections* os3) + data {:chat-id "c1" :content {:type :text :text "rewritten"}}] + (eca.messenger/rewrite-content-received broadcast-messenger data) + (Thread/sleep 100) + (is (seq (:rewrite-content-received (h/messages)))) + (is (not (.contains (.toString os3 "UTF-8") "rewrite"))))) + + (sse/close-all! sse-connections*))) diff --git a/test/eca/remote/sse_test.clj b/test/eca/remote/sse_test.clj new file mode 100644 index 000000000..de3c82ed4 --- /dev/null +++ b/test/eca/remote/sse_test.clj @@ -0,0 +1,72 @@ +(ns eca.remote.sse-test + (:require + [cheshire.core :as json] + [clojure.core.async :as async] + [clojure.test :refer [deftest is testing]] + [eca.remote.sse :as sse]) + (:import + [java.io ByteArrayOutputStream])) + +(deftest create-connections-test + (testing "creates an atom with empty set" + (let [conns (sse/create-connections)] + (is (instance? clojure.lang.Atom conns)) + (is (= #{} @conns))))) + +(deftest add-and-remove-client-test + (testing "add-client! adds to connections, remove-client! removes" + (let [conns (sse/create-connections) + os (ByteArrayOutputStream.) + client (sse/add-client! conns os)] + (is (= 1 (count @conns))) + (is (contains? @conns client)) + (sse/remove-client! conns client) + (is (= 0 (count @conns)))))) + +(deftest broadcast-test + (testing "broadcast! writes SSE-formatted event to client" + (let [conns (sse/create-connections) + os (ByteArrayOutputStream.) + client (sse/add-client! conns os)] + (sse/broadcast! conns "chat:status-changed" {:chatId "abc" :status "running"}) + ;; Give writer loop time to process + (Thread/sleep 100) + (let [output (.toString os "UTF-8")] + (is (.contains output "event: chat:status-changed")) + (is (.contains output "data: "))) + (sse/remove-client! conns client)))) + +(deftest broadcast-backpressure-test + (testing "dropping buffer prevents blocking when client is slow" + (let [conns (sse/create-connections) + os (ByteArrayOutputStream.) + client (sse/add-client! conns os)] + ;; Flood the channel — should not block + (dotimes [i 500] + (sse/broadcast! conns "test:event" {:i i})) + ;; Should still be connected (no exception) + (is (= 1 (count @conns))) + (sse/remove-client! conns client)))) + +(deftest close-all-test + (testing "close-all! removes all clients" + (let [conns (sse/create-connections) + os1 (ByteArrayOutputStream.) + os2 (ByteArrayOutputStream.)] + (sse/add-client! conns os1) + (sse/add-client! conns os2) + (is (= 2 (count @conns))) + (sse/close-all! conns) + (is (= 0 (count @conns)))))) + +(deftest heartbeat-test + (testing "heartbeat sends comment lines to clients" + (let [conns (sse/create-connections) + os (ByteArrayOutputStream.) + client (sse/add-client! conns os)] + ;; Start heartbeat with very short interval for testing + ;; We just verify start-heartbeat! returns a channel + (let [stop-ch (sse/start-heartbeat! conns)] + (is (some? stop-ch)) + (async/close! stop-ch)) + (sse/remove-client! conns client)))) diff --git a/test/eca/test_helper.clj b/test/eca/test_helper.clj index e64df4b53..bc1945f11 100644 --- a/test/eca/test_helper.clj +++ b/test/eca/test_helper.clj @@ -30,6 +30,8 @@ messenger/IMessenger (chat-content-received [_ data] (swap! messages* update :chat-content-received (fnil conj []) data)) (chat-cleared [_ params] (swap! messages* update :chat-clear (fnil conj []) params)) + (chat-status-changed [_ params] (swap! messages* update :chat-status-changed (fnil conj []) params)) + (chat-deleted [_ params] (swap! messages* update :chat-deleted (fnil conj []) params)) (rewrite-content-received [_ data] (swap! messages* update :rewrite-content-received (fnil conj []) data)) (config-updated [_ data] (swap! messages* update :config-updated (fnil conj []) data)) (tool-server-updated [_ data] (swap! messages* update :tool-server-update (fnil conj []) data)) From 8494ad63dfa1c7aec664532df00f3a26c9d716e5 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Thu, 19 Mar 2026 10:31:54 -0300 Subject: [PATCH 02/13] Fixes --- src/eca/features/chat.clj | 3 +- src/eca/handlers.clj | 4 +- src/eca/remote/auth.clj | 10 ++- src/eca/remote/handlers.clj | 120 +++++++++++++---------------- src/eca/remote/messenger.clj | 20 +++-- src/eca/remote/routes.clj | 13 ++-- src/eca/remote/server.clj | 23 +++--- src/eca/remote/sse.clj | 94 ++++++++++++---------- src/eca/server.clj | 4 +- test/eca/remote/messenger_test.clj | 19 +++-- 10 files changed, 168 insertions(+), 142 deletions(-) diff --git a/src/eca/features/chat.clj b/src/eca/features/chat.clj index 397303440..f64079c3a 100644 --- a/src/eca/features/chat.clj +++ b/src/eca/features/chat.clj @@ -945,13 +945,14 @@ (defn clear-chat "Clear specific aspects of a chat. Currently supports clearing :messages." - [{:keys [chat-id messages]} db* metrics] + [{:keys [chat-id messages]} db* messenger metrics] (when (get-in @db* [:chats chat-id]) (swap! db* update-in [:chats chat-id] (fn [chat] (cond-> chat messages (-> (assoc :messages []) (dissoc :tool-calls :last-api :usage :task))))) + (messenger/chat-cleared messenger {:chat-id chat-id :messages messages}) (db/update-workspaces-cache! @db* metrics))) (defn rollback-chat diff --git a/src/eca/handlers.clj b/src/eca/handlers.clj index ed81af548..142620887 100644 --- a/src/eca/handlers.clj +++ b/src/eca/handlers.clj @@ -187,9 +187,9 @@ (f.chat/delete-chat params db* messenger config metrics) {})) -(defn chat-clear [{:keys [db* metrics]} params] +(defn chat-clear [{:keys [db* messenger metrics]} params] (metrics/task metrics :eca/chat-clear - (f.chat/clear-chat params db* metrics) + (f.chat/clear-chat params db* messenger metrics) {})) (defn chat-rollback [{:keys [db* metrics messenger]} params] diff --git a/src/eca/remote/auth.clj b/src/eca/remote/auth.clj index 47ff968df..d6a8a4f80 100644 --- a/src/eca/remote/auth.clj +++ b/src/eca/remote/auth.clj @@ -3,7 +3,7 @@ (:require [cheshire.core :as json]) (:import - [java.security SecureRandom])) + [java.security MessageDigest SecureRandom])) (set! *warn-on-reflection* true) @@ -27,6 +27,12 @@ (when (.startsWith ^String auth-header "Bearer ") (subs auth-header 7)))) +(defn- constant-time-equals? + "Constant-time comparison to prevent timing side-channel attacks." + [^String a ^String b] + (and a b + (MessageDigest/isEqual (.getBytes a "UTF-8") (.getBytes b "UTF-8")))) + (defn wrap-bearer-auth "Ring middleware that validates Bearer token auth. Exempt paths (like /api/v1/health and GET /) skip auth." @@ -36,6 +42,6 @@ (if (contains? exempt-set (:uri request)) (handler request) (let [request-token (extract-bearer-token request)] - (if (= token request-token) + (if (constant-time-equals? token request-token) (handler request) unauthorized-response)))))) diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj index 6750eea52..7c1214bb2 100644 --- a/src/eca/remote/handlers.clj +++ b/src/eca/remote/handlers.clj @@ -4,14 +4,16 @@ the same feature functions used by JSON-RPC handlers." (:require [cheshire.core :as json] + [clojure.core.async :as async] [eca.config :as config] [eca.features.chat :as f.chat] [eca.handlers :as handlers] [eca.messenger :as messenger] [eca.remote.sse :as sse] - [eca.shared :as shared]) + [eca.shared :as shared] + [ring.core.protocols :as ring.protocols]) (:import - [java.io InputStream PipedInputStream PipedOutputStream])) + [java.io InputStream])) (set! *warn-on-reflection* true) @@ -44,6 +46,22 @@ (defn- camel-keys [m] (shared/map->camel-cased-map m)) +;; --- Shared --- + +(defn- session-state + "Builds the session state map used for both GET /session and SSE session:connected." + [db config] + {:version (config/eca-version) + :protocolVersion "1.0" + :workspaceFolders (mapv #(shared/uri->filename (:uri %)) (:workspace-folders db)) + :models (mapv (fn [[id _]] {:id id :name id :provider (first (shared/full-model->provider+model id))}) + (:models db)) + :agents (mapv (fn [name] {:id name :name name :description (get-in config [:agent name :description])}) + (config/primary-agent-names config)) + :mcpServers (mapv (fn [[name client-info]] + {:name name :status (or (:status client-info) "unknown")}) + (:mcp-clients db))}) + ;; --- Health & Redirect --- (defn handle-root [_components _request {:keys [host token]}] @@ -59,17 +77,7 @@ (defn handle-session [{:keys [db*]} _request] (let [db @db* config (config/all db)] - (json-response - {:version (config/eca-version) - :protocolVersion "1.0" - :workspaceFolders (mapv #(shared/uri->filename (:uri %)) (:workspace-folders db)) - :models (mapv (fn [[id _]] {:id id :name id :provider (first (shared/full-model->provider+model id))}) - (:models db)) - :agents (mapv (fn [name] {:id name :name name :description (get-in config [:agent name :description])}) - (config/primary-agent-names config)) - :mcpServers (mapv (fn [[name client]] - {:name name :status (or (:status client) "unknown")}) - (:mcp-clients db))}))) + (json-response (session-state db config)))) ;; --- Chats --- @@ -98,21 +106,20 @@ ;; --- Chat Actions --- -(defn handle-prompt [{:keys [db* messenger metrics] :as components} request chat-id] +(defn handle-prompt [{:keys [db*] :as components} request chat-id] (let [body (parse-body request)] (if-not (:message body) (error-response 400 "invalid_request" "Missing required field: message") (let [config (config/all @db*) - params (shared/map->camel-cased-map - (cond-> {:chat-id chat-id - :message (:message body)} - (:model body) (assoc :model (:model body)) - (:agent body) (assoc :agent (:agent body)) - (:variant body) (assoc :variant (:variant body)))) + params (cond-> {:chat-id chat-id + :message (:message body)} + (:model body) (assoc :model (:model body)) + (:agent body) (assoc :agent (:agent body)) + (:variant body) (assoc :variant (:variant body))) result (handlers/chat-prompt (assoc components :config config) params)] (json-response (camel-keys result)))))) -(defn handle-stop [{:keys [db* messenger metrics] :as components} _request chat-id] +(defn handle-stop [{:keys [db*] :as components} _request chat-id] (if-not (chat-or-404 db* chat-id) (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")) (if-not (identical? :running (get-in @db* [:chats chat-id :status])) @@ -160,7 +167,6 @@ (handlers/chat-clear (assoc components :config config) {:chat-id chat-id :messages true}) - (messenger/chat-cleared (:messenger components) {:chat-id chat-id :messages true}) (no-content)))) (defn handle-delete-chat [{:keys [db*] :as components} _request chat-id] @@ -202,55 +208,39 @@ (let [body (parse-body request)] (if-not (:variant body) (error-response 400 "invalid_request" "Missing required field: variant") - (let [config (config/all @db*)] + (let [config (config/all @db*) + model (or (get-in @db* [:chats chat-id :model]) + (f.chat/default-model @db* config))] (handlers/chat-selected-model-changed (assoc components :config config) - {:model (get-in @db* [:chats chat-id :model]) + {:model model :variant (:variant body)}) (no-content)))))) ;; --- SSE Events --- -(defn handle-events [{:keys [db*]} _request {:keys [sse-connections*]}] - (let [pipe-out (PipedOutputStream.) - pipe-in (PipedInputStream. pipe-out)] - ;; Start the SSE writer on a separate thread - (future +(deftype SSEBody [db* sse-connections*] + ring.protocols/StreamableResponseBody + (write-body-to-stream [_ _response os] + ;; Jetty calls this on its thread with the raw servlet output stream. + ;; We register it as an SSE client and block until the connection closes. + (let [done-ch (async/chan) + client (sse/add-client! sse-connections* os done-ch)] (try - (let [client (sse/add-client! sse-connections* pipe-out) - db @db* + (let [db @db* config (config/all db) - state-dump {:version (config/eca-version) - :protocolVersion "1.0" - :chats (->> (vals (:chats db)) - (remove :subagent) - (mapv (fn [{:keys [id title status created-at]}] - {:id id - :title title - :status (or status :idle) - :createdAt created-at}))) - :models (mapv (fn [[id _]] {:id id :name id}) - (:models db)) - :agents (mapv (fn [name] {:id name :name name}) - (config/primary-agent-names config)) - :mcpServers (mapv (fn [[name client-info]] - {:name name :status (or (:status client-info) "unknown")}) - (:mcp-clients db)) - :workspaceFolders (mapv #(shared/uri->filename (:uri %)) - (:workspace-folders db))}] - ;; Send initial state dump - (sse/broadcast! (atom #{client}) "session:connected" state-dump) - ;; Block to keep the connection open — writer loop handles events - (try - (while true - (Thread/sleep 60000)) - (catch InterruptedException _e nil) - (catch Exception _e nil)) - (sse/remove-client! sse-connections* client)) - (catch Exception _e nil))) - {:status 200 - :headers {"Content-Type" "text/event-stream" - "Cache-Control" "no-cache" - "Connection" "keep-alive" - "X-Accel-Buffering" "no"} - :body pipe-in})) + state-dump (session-state db config)] + (sse/broadcast! (atom #{client}) "session:connected" state-dump)) + ;; Block this Jetty thread until the writer loop terminates + ;; (client disconnect, write error, or server shutdown) + (async/SSEBody db* sse-connections*)}) diff --git a/src/eca/remote/messenger.clj b/src/eca/remote/messenger.clj index ae99c6625..d3af2067c 100644 --- a/src/eca/remote/messenger.clj +++ b/src/eca/remote/messenger.clj @@ -3,43 +3,47 @@ and broadcasts events to all connected SSE clients." (:require [eca.messenger :as messenger] - [eca.remote.sse :as sse])) + [eca.remote.sse :as sse] + [eca.shared :as shared])) (set! *warn-on-reflection* true) +(defn- ->camel [data] + (shared/map->camel-cased-map data)) + (defrecord BroadcastMessenger [inner sse-connections*] messenger/IMessenger (chat-content-received [_this data] (messenger/chat-content-received inner data) - (sse/broadcast! sse-connections* "chat:content-received" data)) + (sse/broadcast! sse-connections* "chat:content-received" (->camel data))) (chat-cleared [_this params] (messenger/chat-cleared inner params) - (sse/broadcast! sse-connections* "chat:cleared" params)) + (sse/broadcast! sse-connections* "chat:cleared" (->camel params))) (chat-status-changed [_this params] (messenger/chat-status-changed inner params) - (sse/broadcast! sse-connections* "chat:status-changed" params)) + (sse/broadcast! sse-connections* "chat:status-changed" (->camel params))) (chat-deleted [_this params] (messenger/chat-deleted inner params) - (sse/broadcast! sse-connections* "chat:deleted" params)) + (sse/broadcast! sse-connections* "chat:deleted" (->camel params))) (rewrite-content-received [_this data] (messenger/rewrite-content-received inner data)) (tool-server-updated [_this params] (messenger/tool-server-updated inner params) - (sse/broadcast! sse-connections* "tool:server-updated" params)) + (sse/broadcast! sse-connections* "tool:server-updated" (->camel params))) (config-updated [_this params] (messenger/config-updated inner params) - (sse/broadcast! sse-connections* "config:updated" params)) + (sse/broadcast! sse-connections* "config:updated" (->camel params))) (showMessage [_this msg] (messenger/showMessage inner msg) - (sse/broadcast! sse-connections* "session:message" msg)) + (sse/broadcast! sse-connections* "session:message" (->camel msg))) (editor-diagnostics [_this uri] (messenger/editor-diagnostics inner uri))) diff --git a/src/eca/remote/routes.clj b/src/eca/remote/routes.clj index 90cbcc682..3dc7c84e3 100644 --- a/src/eca/remote/routes.clj +++ b/src/eca/remote/routes.clj @@ -1,6 +1,7 @@ (ns eca.remote.routes "Ring route table and middleware composition for the remote server." (:require + [cheshire.core :as json] [clojure.string :as string] [eca.remote.auth :as auth] [eca.remote.handlers :as handlers] @@ -15,12 +16,12 @@ (defn- match-route "Simple path-based router. Returns [handler-fn & args] or nil." - [components request {:keys [token host sse-connections*]}] + [components request {:keys [token host* sse-connections*]}] (let [method (:request-method request) segments (path-segments (:uri request))] (case segments [] (when (= :get method) - [handlers/handle-root components request {:host host :token token}]) + [handlers/handle-root components request {:host @host* :token token}]) ["api" "v1" "health"] (when (= :get method) @@ -77,7 +78,7 @@ (defn create-handler "Creates the Ring handler with middleware composition. components: the ECA components map {:db* :messenger :metrics :server} - opts: {:token :host :sse-connections*}" + opts: {:token :host* :sse-connections*}" [components opts] (let [token (:token opts)] (-> (fn [request] @@ -88,9 +89,9 @@ (catch Exception e {:status 500 :headers {"Content-Type" "application/json; charset=utf-8"} - :body (str "{\"error\":{\"code\":\"internal_error\"," - "\"message\":\"" (.getMessage e) "\"}}")}))) + :body (json/generate-string + {:error {:code "internal_error" + :message (or (.getMessage e) "Unknown error")}})}))) (not-found-response))) - (middleware/wrap-json-content-type) (auth/wrap-bearer-auth token ["/" "/api/v1/health"]) (middleware/wrap-cors)))) diff --git a/src/eca/remote/server.clj b/src/eca/remote/server.clj index 2d6b8cd37..aa6e83a0d 100644 --- a/src/eca/remote/server.clj +++ b/src/eca/remote/server.clj @@ -1,6 +1,7 @@ (ns eca.remote.server "HTTP server lifecycle for the remote web control server." (:require + [clojure.core.async :as async] [eca.config :as config] [eca.logger :as logger] [eca.remote.auth :as auth] @@ -38,20 +39,22 @@ (defn start! "Starts the remote HTTP server if enabled in config. + sse-connections* is the shared SSE connections atom (also used by BroadcastMessenger). Returns a map with :server :sse-connections* :heartbeat-stop-ch :token :host or nil if not enabled or port is in use." - [components] + [components sse-connections*] (let [db @(:db* components) config (config/all db) remote-config (:remote config)] (when (:enabled remote-config) - (let [sse-connections* (sse/create-connections) - token (or (:password remote-config) (auth/generate-token)) - host (or (:host remote-config) (detect-host)) + (let [token (or (:password remote-config) (auth/generate-token)) + host-base (or (:host remote-config) (detect-host)) port (resolve-port remote-config) + ;; Use atom so the handler sees host:port after Jetty resolves the actual port + host+port* (atom host-base) handler (routes/create-handler components {:token token - :host host + :host* host+port* :sse-connections* sse-connections*})] (try (let [jetty-server ^Server (jetty/run-jetty handler @@ -59,9 +62,11 @@ :host "0.0.0.0" :join? false}) actual-port (.getLocalPort ^NetworkConnector (first (.getConnectors jetty-server))) + host-with-port (str host-base ":" actual-port) + _ (reset! host+port* host-with-port) heartbeat-ch (sse/start-heartbeat! sse-connections*) connect-url (str "https://web.eca.dev?host=" - host ":" actual-port + host-with-port "&token=" token)] (logger/info logger-tag (str "🌐 Remote server started on port " actual-port)) (logger/info logger-tag (str "🔗 " connect-url)) @@ -69,7 +74,7 @@ :sse-connections* sse-connections* :heartbeat-stop-ch heartbeat-ch :token token - :host (str host ":" actual-port)}) + :host host-with-port}) (catch BindException e (logger/warn logger-tag "Port" port "is already in use:" (.getMessage e) "Remote server will not start.") @@ -88,9 +93,7 @@ (sse/broadcast! sse-connections* "session:disconnecting" {:reason "shutdown"})) ;; Stop heartbeat (when heartbeat-stop-ch - (clojure.core.async/close! heartbeat-stop-ch)) - ;; Give in-flight responses a moment - (Thread/sleep 1000) + (async/close! heartbeat-stop-ch)) ;; Close all SSE connections (when sse-connections* (sse/close-all! sse-connections*)) diff --git a/src/eca/remote/sse.clj b/src/eca/remote/sse.clj index 1d1b9eed5..eb2cd12e6 100644 --- a/src/eca/remote/sse.clj +++ b/src/eca/remote/sse.clj @@ -1,14 +1,14 @@ (ns eca.remote.sse "SSE connection management for the remote web control server. Each connected client gets a core.async channel with a dropping buffer. - A writer loop per client reads from the channel and writes SSE-formatted + A writer thread per client reads from the channel and writes SSE-formatted text to the Ring async response." (:require [cheshire.core :as json] [clojure.core.async :as async] [eca.logger :as logger]) (:import - [java.io OutputStream])) + [java.io IOException OutputStream])) (set! *warn-on-reflection* true) @@ -30,31 +30,41 @@ "data: " (json/generate-string data) "\n\n")) (defn- start-writer-loop! - "Starts a go-loop that reads events from the client channel and writes - SSE-formatted text to the output stream. Removes the client on error." - [connections* {:keys [ch os] :as client}] - (async/go-loop [] - (when-let [event (async/ {:ch ch :os os} + done-ch (assoc :done-ch done-ch))] + (swap! connections* conj client) + (start-writer-loop! connections* client) + client))) (defn remove-client! "Removes a client from the connections set and closes its channel." @@ -72,24 +82,26 @@ (async/offer! ch event)))) (defn start-heartbeat! - "Starts a background loop that sends SSE comment lines to all clients - every 15 seconds. Returns a channel that can be closed to stop the loop." + "Starts a background thread that sends SSE comment lines to all clients + every 15 seconds. Returns a channel that can be closed to stop the loop. + Uses async/thread (not go-loop) because write-sse! performs blocking I/O." [connections*] (let [stop-ch (async/chan)] - (async/go-loop [] - (let [[_ port] (async/alts! [stop-ch (async/timeout heartbeat-interval-ms)])] - (when-not (= port stop-ch) - (let [dead-clients (atom #{})] - (doseq [{:keys [^OutputStream os] :as client} @connections*] - (try - (write-sse! os ":\n\n") - (catch Exception _e - (swap! dead-clients conj client)))) - (when (seq @dead-clients) - (swap! connections* #(reduce disj % @dead-clients)) - (doseq [{:keys [ch]} @dead-clients] - (async/close! ch)))) - (recur)))) + (async/thread + (loop [] + (let [[_ port] (async/alts!! [stop-ch (async/timeout heartbeat-interval-ms)])] + (when-not (= port stop-ch) + (let [dead-clients (atom #{})] + (doseq [{:keys [^OutputStream os] :as client} @connections*] + (try + (write-sse! os ":\n\n") + (catch IOException _e + (swap! dead-clients conj client)))) + (when (seq @dead-clients) + (swap! connections* #(reduce disj % @dead-clients)) + (doseq [{:keys [ch]} @dead-clients] + (async/close! ch)))) + (recur))))) stop-ch)) (defn close-all! diff --git a/src/eca/server.clj b/src/eca/server.clj index d385bf091..ae6fb1292 100644 --- a/src/eca/server.clj +++ b/src/eca/server.clj @@ -173,6 +173,8 @@ (let [db* (atom db/initial-db) metrics (->Metrics db*) stdio-messenger (->ServerMessenger server db*) + ;; Read remote config from file-based sources (global/env/custom). + ;; Workspace-level config is not available yet (initialize hasn't been called). remote-config (:remote (config/read-file-configs)) sse-connections* (when (:enabled remote-config) (atom #{})) @@ -186,7 +188,7 @@ (logger/info "[server]" "Starting server...") (metrics/start! metrics) (when sse-connections* - (when-let [rs (remote.server/start! components)] + (when-let [rs (remote.server/start! components sse-connections*)] (reset! remote-server* rs))) (monitor-server-logs (:log-ch server)) (setup-dev-environment db* components) diff --git a/test/eca/remote/messenger_test.clj b/test/eca/remote/messenger_test.clj index a35538435..4188ee021 100644 --- a/test/eca/remote/messenger_test.clj +++ b/test/eca/remote/messenger_test.clj @@ -14,26 +14,33 @@ os (java.io.ByteArrayOutputStream.) _client (sse/add-client! sse-connections* os)] - (testing "chat-content-received delegates to inner and broadcasts" + (testing "chat-content-received delegates to inner and broadcasts camelCase" (let [data {:chat-id "c1" :role :assistant :content {:type :text :text "hi"}}] (eca.messenger/chat-content-received broadcast-messenger data) (Thread/sleep 100) (is (seq (:chat-content-received (h/messages)))) - (is (.contains (.toString os "UTF-8") "chat:content-received")))) + (let [output (.toString os "UTF-8")] + (is (.contains output "chat:content-received")) + (is (.contains output "\"chatId\"") "SSE broadcast should use camelCase keys") + (is (not (.contains output "\"chat-id\"")) "SSE broadcast should not use kebab-case keys")))) - (testing "chat-status-changed delegates and broadcasts" + (testing "chat-status-changed delegates and broadcasts camelCase" (let [params {:chat-id "c1" :status :running}] (eca.messenger/chat-status-changed broadcast-messenger params) (Thread/sleep 100) (is (seq (:chat-status-changed (h/messages)))) - (is (.contains (.toString os "UTF-8") "chat:status-changed")))) + (let [output (.toString os "UTF-8")] + (is (.contains output "chat:status-changed")) + (is (.contains output "\"chatId\""))))) - (testing "chat-deleted delegates and broadcasts" + (testing "chat-deleted delegates and broadcasts camelCase" (let [params {:chat-id "c1"}] (eca.messenger/chat-deleted broadcast-messenger params) (Thread/sleep 100) (is (seq (:chat-deleted (h/messages)))) - (is (.contains (.toString os "UTF-8") "chat:deleted")))) + (let [output (.toString os "UTF-8")] + (is (.contains output "chat:deleted")) + (is (.contains output "\"chatId\""))))) (testing "editor-diagnostics delegates to inner only (no broadcast)" (let [os2 (java.io.ByteArrayOutputStream.) From 30a74eb15c32e2d3ba644d0ab12b39ac9dec99c6 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Thu, 19 Mar 2026 10:41:23 -0300 Subject: [PATCH 03/13] Update plan --- docs/remote-server-plan.md | 261 ++++++++++++++++++++++++++----------- 1 file changed, 182 insertions(+), 79 deletions(-) diff --git a/docs/remote-server-plan.md b/docs/remote-server-plan.md index 809f4660c..04ecdc96f 100644 --- a/docs/remote-server-plan.md +++ b/docs/remote-server-plan.md @@ -1,6 +1,10 @@ -# Remote Web Control Server — Design Plan +# Remote Web Control Server — Design & Implementation > GitHub Issue: [#333](https://github.com/editor-code-assistant/eca/issues/333) +> +> **Status: v1 implemented.** This document reflects the actual implementation. Sections +> marked with 🔧 contain implementation notes that diverge from or extend the original +> design. ## Overview @@ -28,7 +32,7 @@ for each chat within a session. | Field | Required | Default | Description | |------------|----------|-----------------------|------------------------------------------------| -| `enabled` | yes | — | Enables the remote HTTP server | +| `enabled` | no | `false` | Enables the remote HTTP server | | `host` | no | auto-detected LAN IP | Host used in the logged URL for `web.eca.dev` to connect back to. Can be a LAN IP, public IP, domain, or tunnel URL (e.g. ngrok, tailscale). Not set → auto-detect via `InetAddress/getLocalHost`. | | `port` | no | random free port | Port the HTTP server listens on | | `password` | no | auto-generated token | Auth token; auto-generated and logged if unset | @@ -63,6 +67,9 @@ The auth token is either user-configured (`password` in config) or auto-generate 32-byte hex string via `java.security.SecureRandom` (64 characters). Stored in runtime state only (not persisted to disk). +🔧 Token validation uses constant-time comparison (`MessageDigest/isEqual`) to prevent +timing side-channel attacks. + **No CSRF protection needed** — CSRF is a cookie-only attack vector. Bearer tokens in headers are not sent automatically by browsers. @@ -208,8 +215,8 @@ Maps to `chat/selectedAgentChanged` notification handler. `GET /api/v1/events` — `Content-Type: text/event-stream` -Optional `?chat=` filter: when provided, `chat:*` events are filtered to only that -chat. `session:*`, `config:*`, and `tool:*` events are always sent regardless of filter. +🔧 The `?chat=` filter is **not yet implemented** in v1 — all events are sent to all +clients. Planned for a future iteration. ## SSE Events @@ -292,10 +299,15 @@ params passed to `IMessenger.chat-content-received`, with keys camelCased. Examp - **Heartbeat:** Server sends `:` comment line every 15 seconds to detect dead clients. - **Backpressure:** Each SSE client gets a `core.async` channel with a dropping buffer - (buffer size: 256 events). Slow clients drop events rather than blocking the main - messenger path. Clients can recover by re-fetching chat state via REST. + (buffer size: 256 events). `broadcast!` uses `async/offer!` (non-blocking, drops if + full). Slow clients drop events rather than blocking the main messenger path. Clients + can recover by re-fetching chat state via REST. - **Stale cleanup:** Failed writes remove the client from the connection set. +🔧 **Threading model:** Writer loops and heartbeat use `async/thread` (real threads with +blocking ``. @@ -341,7 +353,13 @@ No raw exceptions or stack traces are returned to the web client. Wraps the existing `ServerMessenger`, implements `IMessenger`. Every method: 1. Delegates to the inner `ServerMessenger` (sends to editor via stdio). -2. Broadcasts the event to all connected SSE clients (via `core.async` channels). +2. Converts data to camelCase via `shared/map->camel-cased-map`. +3. Broadcasts the event to all connected SSE clients (via `core.async` channels). + +🔧 The camelCase conversion is essential because `IMessenger` methods receive internal +kebab-case Clojure maps. The JSON-RPC layer (stdio) handles its own key conversion, but +SSE broadcasts serialize directly via Cheshire — without explicit conversion, SSE payloads +would have kebab-case keys like `chat-id` instead of `chatId`. **Exceptions (inner-only, no broadcast):** - `editor-diagnostics` — web client cannot provide editor diagnostics. @@ -390,104 +408,189 @@ that can observe and control chat but cannot fulfill editor-specific requests (`editor/getDiagnostics`). Both clients can send commands concurrently — for tool approvals, first response wins (promise `deliver` is idempotent). -## Implementation Steps - -### Step 1 — Config schema -Add `remote` section to `docs/config.json` with `enabled`, `host`, `port`, and `password` -fields. Parse and validate in config loading. - -### Step 2 — SSE connection management (`eca.remote.sse`) -- Atom holding set of SSE client connections. -- Each client: Ring async response channel + `core.async` channel with dropping buffer. -- `broadcast!` function: puts event on all client channels. -- Writer loop per client: takes from channel, writes SSE-formatted string, removes client on error. -- Heartbeat loop: writes `:` comment to all clients every 15 seconds. - -### Step 3 — BroadcastMessenger (`eca.remote.messenger`) -- Implements `IMessenger`. -- Wraps inner `ServerMessenger`. -- Delegates all methods to inner + broadcasts to SSE. -- `editor-diagnostics` → inner only. - -### Step 4 — Auth middleware (`eca.remote.auth`) -- Ring middleware: reads `Authorization: Bearer ` header, validates against the - configured or auto-generated token. -- Returns `401 Unauthorized` with error envelope if missing or invalid. -- `/api/v1/health` and `GET /` are exempt from auth. +## Implementation Notes + +> The steps below reflect what was actually implemented. -### Step 5 — CORS middleware (`eca.remote.middleware`) -- Ring middleware adding CORS headers (`Authorization` in allowed headers). -- Handles OPTIONS preflight requests. -- No `Access-Control-Allow-Credentials` needed (Bearer auth, not cookies). - -### Step 6 — REST handlers (`eca.remote.handlers`) -- Read endpoints: pull from `db*` directly. -- Write endpoints: bridge to existing handler functions in `eca.features.chat`. -- Consistent error envelope on all error paths. -- `GET /` redirect to `web.eca.dev`. - -### Step 7 — Routes + HTTP server (`eca.remote.routes`, `eca.remote.server`) -- Ring route table with `/api/v1/` prefix. -- Middleware composition: CORS → Bearer auth → JSON parsing → routes. -- Jetty lifecycle: start on configured port, stop on shutdown. -- Graceful port conflict handling (catch `BindException`, log warning, continue without - remote server). - -### Step 8 — Integration into ECA startup/shutdown (`eca.server`) -- `start-server!`: if remote enabled: - 1. Create SSE connections atom. - 2. Create `BroadcastMessenger` wrapping `ServerMessenger`, passing SSE connections atom - (constructor: `(->BroadcastMessenger inner-messenger sse-connections-atom)`). - 3. Start Jetty on configured port, passing SSE connections atom and `components` to - route handlers. - 4. Log clickable `web.eca.dev` URL to stderr. -- `shutdown`: send `session:disconnecting` SSE event, wait up to 5 seconds for in-flight - responses, close SSE connections, stop Jetty. Running chat generations are not - interrupted — they continue on the stdio side. - -### Step 9 — CHANGELOG -Add feature entry under Unreleased referencing #333. - -### Step 10 — Tests -- `eca.remote.sse-test` — heartbeat, backpressure, stale client cleanup. -- `eca.remote.auth-test` — Bearer token validation, missing/invalid token rejection. -- `eca.remote.handlers-test` — REST endpoints with mock `db*`. -- `eca.remote.messenger-test` — BroadcastMessenger delegation + SSE broadcasting. +### Config schema +Added `remote` section to `docs/config.json` with `enabled`, `host`, `port`, and +`password` fields. Added `:remote {:enabled false}` to `initial-config*` in `config.clj`. + +🔧 Remote config is read via `config/read-file-configs` at startup (before `initialize`), +so only global/env/custom-file configs are available — workspace-level `.eca/config.json` +is not included. This is intentional: workspace folders aren't known until `initialize`. + +### SSE connection management (`eca.remote.sse`) +- Atom holding set of SSE client maps `{:ch :os :done-ch}`. +- `add-client!` accepts optional `done-ch` — closed when writer loop terminates, allowing + callers to block on it for lifecycle management. +- `broadcast!` uses `async/offer!` (non-blocking put, drops if full). +- Writer loop per client: `async/thread` with blocking `` filter** — specified in the API but not yet implemented; all events + go to all clients - Request logging middleware - Subagent chat filtering in list endpoint - Source tagging on SSE events (editor vs web origin) - SSE event batching for high-frequency streaming - Rate limiting on auth failures - Configurable CORS allowed origins +- Route-level integration tests (current tests call handlers directly) +- `GET /api/v1/chats/:id` message content filtering (messages can be very large) + +## Web Frontend Notes + +> Notes for implementing the `web.eca.dev` frontend. + +### Consuming SSE + +The SSE stream **must** be consumed via `fetch()` + `ReadableStream`, not `EventSource`, +because `EventSource` does not support custom `Authorization` headers: + +```javascript +const response = await fetch(`http://${host}/api/v1/events`, { + headers: { 'Authorization': `Bearer ${token}` } +}); +const reader = response.body.getReader(); +const decoder = new TextDecoder(); +// Parse SSE wire format: "event: \ndata: \n\n" +``` + +### Connection Lifecycle + +1. On connect, the first SSE event is `session:connected` with a full state dump + (chats, models, agents, MCP servers, workspace folders). +2. The frontend should use this to bootstrap its state — no separate REST call needed. +3. On disconnect, re-fetch state via `GET /api/v1/session` + `GET /api/v1/chats` to + recover any missed events, then reconnect to SSE. +4. A heartbeat (`:` comment) arrives every 15 seconds — if no data for >30s, assume + disconnected and reconnect. + +### Chat Content Events + +`chat:content-received` events carry the same payloads as the editor JSON-RPC protocol. +Key content types the frontend should handle: + +| `content.type` | Description | Key fields | +|--------------------|-------------------------------------|---------------------------------------| +| `text` | Streamed text chunk | `text`, `contentId` | +| `progress` | Status indicator | `state` (running/finished), `text` | +| `metadata` | Chat metadata update | `title` | +| `usage` | Token usage stats | `inputTokens`, `outputTokens` | +| `toolCallPrepare` | Tool call starting | `name`, `id`, `argumentsText` | +| `toolCallRun` | Tool call approved, about to run | `name`, `id`, `arguments` | +| `toolCallRunning` | Tool call executing | `name`, `id` | +| `toolCalled` | Tool call finished | `name`, `id`, `error`, `outputs` | +| `reasonStarted` | Thinking/reasoning started | `id` | +| `reasonText` | Thinking text chunk | `id`, `text` | +| `reasonFinished` | Thinking finished | `id`, `totalTimeMs` | +| `hookActionStarted` | Hook action started | `id`, `name` | +| `hookActionFinished` | Hook action finished | `id`, `name`, `status`, `output` | + +### Model/Agent/Variant Changes + +`POST /api/v1/chats/:id/model`, `/agent`, `/variant` are **session-wide** operations +despite the chat-scoped URL. They change the selected model/agent for new prompts across +the entire session, matching the editor behavior. The chat-id in the URL is validated +(404 if not found) but not used for scoping. + +### Tool Call Approval + +When a tool call requires approval, its status in the chat's `toolCalls` map will be +`"waiting-approval"`. The frontend can show an approve/reject UI and call: +- `POST /api/v1/chats/:id/approve/:tcid` — allow the tool call +- `POST /api/v1/chats/:id/reject/:tcid` — deny the tool call + +First response wins — if the editor user approves before the web user (or vice versa), +the second approval is a no-op (`promise deliver` is idempotent). From e7ffe8c0d720a6038d2468fa8b6da65bb854a3fb Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Thu, 19 Mar 2026 14:56:47 -0300 Subject: [PATCH 04/13] Small fixes --- src/eca/remote/middleware.clj | 39 +++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/src/eca/remote/middleware.clj b/src/eca/remote/middleware.clj index b725b214e..ee3bd3cca 100644 --- a/src/eca/remote/middleware.clj +++ b/src/eca/remote/middleware.clj @@ -3,22 +3,39 @@ (set! *warn-on-reflection* true) -(def ^:private cors-headers - {"Access-Control-Allow-Origin" "https://web.eca.dev" - "Access-Control-Allow-Methods" "GET, POST, DELETE, OPTIONS" - "Access-Control-Allow-Headers" "Content-Type, Authorization"}) +(def ^:private allowed-origin "https://web.eca.dev") + +(defn- allowed-origin? + "Returns the origin if allowed, nil otherwise. + Allows web.eca.dev and localhost for development." + [origin] + (when origin + (cond + (= origin allowed-origin) origin + (re-matches #"http://localhost(:\d+)?" origin) origin + (re-matches #"http://127\.0\.0\.1(:\d+)?" origin) origin + :else nil))) + +(defn- cors-headers-for + [request] + (let [origin (get-in request [:headers "origin"]) + resolved (or (allowed-origin? origin) allowed-origin)] + {"Access-Control-Allow-Origin" resolved + "Access-Control-Allow-Methods" "GET, POST, DELETE, OPTIONS" + "Access-Control-Allow-Headers" "Content-Type, Authorization"})) (defn wrap-cors - "Ring middleware adding CORS headers for web.eca.dev. + "Ring middleware adding CORS headers for web.eca.dev and localhost. Handles OPTIONS preflight with 204." [handler] (fn [request] - (if (= :options (:request-method request)) - {:status 204 - :headers cors-headers - :body nil} - (let [response (handler request)] - (update response :headers merge cors-headers))))) + (let [headers (cors-headers-for request)] + (if (= :options (:request-method request)) + {:status 204 + :headers headers + :body nil} + (let [response (handler request)] + (update response :headers merge headers)))))) (defn wrap-json-content-type "Ring middleware that sets default JSON content-type on responses From 392e18cfa9a9030a182c2f50e40a9b802c40ead2 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Thu, 19 Mar 2026 18:33:30 -0300 Subject: [PATCH 05/13] Improve --- src/eca/remote/handlers.clj | 54 ++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj index 670708333..8ab85f4fc 100644 --- a/src/eca/remote/handlers.clj +++ b/src/eca/remote/handlers.clj @@ -50,25 +50,41 @@ (defn- session-state "Builds the session state map used for both GET /session and SSE session:connected." [db config] - {:version (config/eca-version) - :protocolVersion "1.0" - :workspaceFolders (mapv #(shared/uri->filename (:uri %)) (:workspace-folders db)) - :models (mapv (fn [[id _]] {:id id :name id :provider (first (shared/full-model->provider+model id))}) - (:models db)) - :agents (mapv (fn [name] {:id name :name name :description (get-in config [:agent name :description])}) - (config/primary-agent-names config)) - :mcpServers (mapv (fn [[name client-info]] - {:name name :status (or (:status client-info) "unknown")}) - (:mcp-clients db)) - :chats (->> (vals (:chats db)) - (remove :subagent) - (mapv (fn [chat] - (camel-keys - {:id (:id chat) - :title (:title chat) - :status (or (:status chat) :idle) - :created-at (:created-at chat) - :messages (or (:messages chat) [])}))))}) + (let [last-config (:last-config-notified db) + default-model (or (get-in last-config [:chat :select-model]) + (f.chat/default-model db config)) + default-agent-name (or (get-in last-config [:chat :select-agent]) + (config/validate-agent-name + (or (:defaultAgent (:chat config)) + (:defaultAgent config)) + config)) + variants (or (get-in last-config [:chat :variants]) []) + selected-variant (get-in last-config [:chat :select-variant])] + {:version (config/eca-version) + :protocolVersion "1.0" + :workspaceFolders (mapv #(shared/uri->filename (:uri %)) (:workspace-folders db)) + :models (mapv (fn [[id _]] {:id id :name id :provider (first (shared/full-model->provider+model id))}) + (:models db)) + :agents (mapv (fn [name] {:id name :name name :description (get-in config [:agent name :description])}) + (config/primary-agent-names config)) + :mcpServers (mapv (fn [[name client-info]] + {:name name :status (or (:status client-info) "unknown")}) + (:mcp-clients db)) + :chats (->> (vals (:chats db)) + (remove :subagent) + (mapv (fn [chat] + (camel-keys + {:id (:id chat) + :title (:title chat) + :status (or (:status chat) :idle) + :created-at (:created-at chat) + :messages (or (:messages chat) [])})))) + :welcomeMessage (or (:welcomeMessage (:chat config)) + (:welcomeMessage config)) + :selectModel default-model + :selectAgent default-agent-name + :variants variants + :selectedVariant selected-variant})) ;; --- Health & Redirect --- From e54ab3c3babf4b929fd65176765aa428e066bb32 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Thu, 19 Mar 2026 23:34:40 -0300 Subject: [PATCH 06/13] Improvements to sync --- CHANGELOG.md | 1 + src/eca/handlers.clj | 15 ++++++-- src/eca/remote/handlers.clj | 58 ++++++++++++++++++---------- src/eca/remote/routes.clj | 77 +++++++++++++++++++++++-------------- src/eca/remote/server.clj | 47 ++++++++++++++-------- src/eca/server.clj | 11 ++++-- 6 files changed, 136 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbc3a1f79..89e788878 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Add remote web control server for browser-based chat observation and control via `web.eca.dev`. #333 +- Show remote connect URL in welcome message when remote server is enabled. ## 0.115.5 diff --git a/src/eca/handlers.clj b/src/eca/handlers.clj index 142620887..4407aed34 100644 --- a/src/eca/handlers.clj +++ b/src/eca/handlers.clj @@ -42,6 +42,15 @@ (when (and agent-variant variants (some #{agent-variant} variants)) agent-variant))) +(defn welcome-message + "Builds the welcome message from config, appending the remote URL when available." + [db config] + (let [base (or (:welcomeMessage (:chat config)) ;;legacy + (:welcomeMessage config))] + (if-let [url (:remote-connect-url db)] + (str base "\n🌐 Remote: " url "\n") + base))) + (defn initialize [{:keys [db* metrics]} params] (metrics/task metrics :eca/initialize (reset! config/initialization-config* (shared/map->camel-cased-map (:initialization-options params))) @@ -54,8 +63,7 @@ (when-not (:pureConfig config) (db/load-db-from-cache! db* config metrics)) - {:chat-welcome-message (or (:welcomeMessage (:chat config)) ;;legacy - (:welcomeMessage config))}))) + {:chat-welcome-message (welcome-message @db* config)}))) (defn initialized [{:keys [db* messenger config metrics]}] (metrics/task metrics :eca/initialized @@ -81,8 +89,7 @@ :select-agent default-agent-name :variants (or variants []) :select-variant (select-variant default-agent-config variants) - :welcome-message (or (:welcomeMessage (:chat config)) ;;legacy - (:welcomeMessage config)) + :welcome-message (welcome-message @db* config) ;; Deprecated, remove after changing emacs, vscode and intellij. :default-model default-model :default-agent default-agent-name diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj index 8ab85f4fc..6eee81dde 100644 --- a/src/eca/remote/handlers.clj +++ b/src/eca/remote/handlers.clj @@ -16,7 +16,7 @@ (set! *warn-on-reflection* true) -(defn- parse-body [request] +(defn ^:private parse-body [request] (when-let [body (:body request)] (try (json/parse-string @@ -26,28 +26,26 @@ true) (catch Exception _e nil)))) -(defn- json-response +(defn ^:private json-response ([status body] {:status status :headers {"Content-Type" "application/json; charset=utf-8"} :body (json/generate-string body)}) ([body] (json-response 200 body))) -(defn- error-response [status code message] +(defn ^:private error-response [status code message] (json-response status {:error {:code code :message message}})) -(defn- no-content [] +(defn ^:private no-content [] {:status 204 :headers {} :body nil}) -(defn- chat-or-404 [db* chat-id] +(defn ^:private chat-or-404 [db* chat-id] (get-in @db* [:chats chat-id])) -(defn- camel-keys [m] +(defn ^:private camel-keys [m] (shared/map->camel-cased-map m)) -;; --- Shared --- - -(defn- session-state +(defn ^:private session-state "Builds the session state map used for both GET /session and SSE session:connected." [db config] (let [last-config (:last-config-notified db) @@ -79,15 +77,13 @@ :status (or (:status chat) :idle) :created-at (:created-at chat) :messages (or (:messages chat) [])})))) - :welcomeMessage (or (:welcomeMessage (:chat config)) - (:welcomeMessage config)) + :welcomeMessage (handlers/welcome-message db config) :selectModel default-model :selectAgent default-agent-name :variants variants + :trust (boolean (:trust db)) :selectedVariant selected-variant})) -;; --- Health & Redirect --- - (defn handle-root [_components _request {:keys [host token]}] {:status 302 :headers {"Location" (str "https://web.eca.dev?host=" host "&token=" token)} @@ -96,15 +92,11 @@ (defn handle-health [_components _request] (json-response {:status "ok" :version (config/eca-version)})) -;; --- Session --- - (defn handle-session [{:keys [db*]} _request] (let [db @db* config (config/all db)] (json-response (session-state db config)))) -;; --- Chats --- - (defn handle-list-chats [{:keys [db*]} _request] (let [chats (->> (vals (:chats @db*)) (remove :subagent) @@ -128,8 +120,6 @@ :task (:task chat)})) (error-response 404 "chat_not_found" (str "Chat " chat-id " does not exist")))) -;; --- Chat Actions --- - (defn handle-prompt [{:keys [db*] :as components} request chat-id] (let [body (parse-body request)] (if-not (:message body) @@ -139,7 +129,8 @@ :message (:message body)} (:model body) (assoc :model (:model body)) (:agent body) (assoc :agent (:agent body)) - (:variant body) (assoc :variant (:variant body))) + (:variant body) (assoc :variant (:variant body)) + (:trust body) (assoc :trust (:trust body))) result (handlers/chat-prompt (assoc components :config config) params)] (json-response (camel-keys result)))))) @@ -241,7 +232,32 @@ :variant (:variant body)}) (no-content)))))) -;; --- SSE Events --- +(defn handle-set-trust [{:keys [db*]} request {:keys [sse-connections*]}] + (let [body (parse-body request) + trust (boolean (:trust body))] + (swap! db* assoc :trust trust) + (sse/broadcast! sse-connections* "trust:updated" {:trust trust}) + (json-response {:trust trust}))) + +(defn handle-mcp-start [{:keys [db*] :as components} _request server-name] + (let [config (config/all @db*)] + (handlers/mcp-start-server (assoc components :config config) {:name server-name}) + (no-content))) + +(defn handle-mcp-stop [{:keys [db*] :as components} _request server-name] + (let [config (config/all @db*)] + (handlers/mcp-stop-server (assoc components :config config) {:name server-name}) + (no-content))) + +(defn handle-mcp-connect [{:keys [db*] :as components} _request server-name] + (let [config (config/all @db*)] + (handlers/mcp-connect-server (assoc components :config config) {:name server-name}) + (no-content))) + +(defn handle-mcp-logout [{:keys [db*] :as components} _request server-name] + (let [config (config/all @db*)] + (handlers/mcp-logout-server (assoc components :config config) {:name server-name}) + (no-content))) (deftype SSEBody [db* sse-connections*] ring.protocols/StreamableResponseBody diff --git a/src/eca/remote/routes.clj b/src/eca/remote/routes.clj index 3dc7c84e3..0e0ac0847 100644 --- a/src/eca/remote/routes.clj +++ b/src/eca/remote/routes.clj @@ -9,12 +9,12 @@ (set! *warn-on-reflection* true) -(defn- path-segments +(defn ^:private path-segments "Splits a URI path into segments, e.g. /api/v1/chats/abc → [\"api\" \"v1\" \"chats\" \"abc\"]" [^String uri] (filterv (complement string/blank?) (string/split uri #"/"))) -(defn- match-route +(defn ^:private match-route "Simple path-based router. Returns [handler-fn & args] or nil." [components request {:keys [token host* sse-connections*]}] (let [method (:request-method request) @@ -39,38 +39,57 @@ (when (= :get method) [handlers/handle-events components request {:sse-connections* sse-connections*}]) + ["api" "v1" "trust"] + (when (= :post method) + [handlers/handle-set-trust components request {:sse-connections* sse-connections*}]) + ;; Dynamic routes with path params (when (and (>= (count segments) 4) (= "api" (nth segments 0)) - (= "v1" (nth segments 1)) - (= "chats" (nth segments 2))) - (let [chat-id (nth segments 3)] - (case (count segments) - 4 (case method - :get [handlers/handle-get-chat components request chat-id] - :delete [handlers/handle-delete-chat components request chat-id] - nil) - 5 (let [action (nth segments 4)] - (when (= :post method) - (case action - "prompt" [handlers/handle-prompt components request chat-id] - "stop" [handlers/handle-stop components request chat-id] - "rollback" [handlers/handle-rollback components request chat-id] - "clear" [handlers/handle-clear components request chat-id] - "model" [handlers/handle-change-model components request chat-id] - "agent" [handlers/handle-change-agent components request chat-id] - "variant" [handlers/handle-change-variant components request chat-id] - nil))) - 6 (let [action (nth segments 4) - tcid (nth segments 5)] - (when (= :post method) - (case action - "approve" [handlers/handle-approve components request chat-id tcid] - "reject" [handlers/handle-reject components request chat-id tcid] - nil))) + (= "v1" (nth segments 1))) + (let [resource (nth segments 2)] + (case resource + "chats" + (let [chat-id (nth segments 3)] + (case (count segments) + 4 (case method + :get [handlers/handle-get-chat components request chat-id] + :delete [handlers/handle-delete-chat components request chat-id] + nil) + 5 (let [action (nth segments 4)] + (when (= :post method) + (case action + "prompt" [handlers/handle-prompt components request chat-id] + "stop" [handlers/handle-stop components request chat-id] + "rollback" [handlers/handle-rollback components request chat-id] + "clear" [handlers/handle-clear components request chat-id] + "model" [handlers/handle-change-model components request chat-id] + "agent" [handlers/handle-change-agent components request chat-id] + "variant" [handlers/handle-change-variant components request chat-id] + nil))) + 6 (let [action (nth segments 4) + tcid (nth segments 5)] + (when (= :post method) + (case action + "approve" [handlers/handle-approve components request chat-id tcid] + "reject" [handlers/handle-reject components request chat-id tcid] + nil))) + nil)) + + "mcp" + (when (and (= 5 (count segments)) (= :post method)) + (let [server-name (nth segments 3) + action (nth segments 4)] + (case action + "start" [handlers/handle-mcp-start components request server-name] + "stop" [handlers/handle-mcp-stop components request server-name] + "connect" [handlers/handle-mcp-connect components request server-name] + "logout" [handlers/handle-mcp-logout components request server-name] + nil))) + nil)))))) -(defn- not-found-response [] +(defn ^:private not-found-response [] {:status 404 :headers {"Content-Type" "application/json; charset=utf-8"} :body "{\"error\":{\"code\":\"not_found\",\"message\":\"Route not found\"}}"}) diff --git a/src/eca/remote/server.clj b/src/eca/remote/server.clj index aa6e83a0d..bcdac25a8 100644 --- a/src/eca/remote/server.clj +++ b/src/eca/remote/server.clj @@ -9,30 +9,44 @@ [eca.remote.sse :as sse] [ring.adapter.jetty :as jetty]) (:import - [java.net BindException InetAddress] + [java.net BindException Inet4Address InetAddress NetworkInterface] [org.eclipse.jetty.server NetworkConnector Server])) (set! *warn-on-reflection* true) (def ^:private logger-tag "[REMOTE]") -(defn- detect-host - "Auto-detects the LAN IP via InetAddress/getLocalHost. - Falls back to 127.0.0.1 if detection fails or returns loopback." +(defn ^:private detect-lan-ip + "Enumerates network interfaces to find a site-local (private) IPv4 address. + Returns the IP string or nil when none is found." [] (try - (let [addr (InetAddress/getLocalHost) - host (.getHostAddress addr)] - (if (.isLoopbackAddress addr) - (do (logger/warn logger-tag "Auto-detected loopback address. Consider setting remote.host in config.") - "127.0.0.1") - host)) - (catch Exception e - (logger/warn logger-tag "Failed to detect LAN IP:" (.getMessage e) - "Consider setting remote.host in config.") - "127.0.0.1"))) + (->> (enumeration-seq (NetworkInterface/getNetworkInterfaces)) + (filter (fn [^NetworkInterface ni] + (and (.isUp ni) + (not (.isLoopback ni))))) + (mapcat (fn [^NetworkInterface ni] + (enumeration-seq (.getInetAddresses ni)))) + (filter (fn [^InetAddress addr] + (and (instance? Inet4Address addr) + (.isSiteLocalAddress addr)))) + (some (fn [^InetAddress addr] (.getHostAddress addr)))) + (catch Exception _ nil))) -(defn- resolve-port +(defn ^:private detect-host + "Auto-detects the LAN IP by scanning network interfaces for a private IPv4 address. + Falls back to InetAddress/getLocalHost, then 127.0.0.1." + [] + (or (detect-lan-ip) + (try + (let [addr (InetAddress/getLocalHost) + host (.getHostAddress addr)] + (when-not (.isLoopbackAddress addr) host)) + (catch Exception _ nil)) + (do (logger/warn logger-tag "Could not detect LAN IP. Consider setting remote.host in config.") + "127.0.0.1"))) + +(defn ^:private resolve-port "Returns the configured port, or 0 for auto-assignment." [remote-config] (or (:port remote-config) 0)) @@ -74,7 +88,8 @@ :sse-connections* sse-connections* :heartbeat-stop-ch heartbeat-ch :token token - :host host-with-port}) + :host host-with-port + :connect-url connect-url}) (catch BindException e (logger/warn logger-tag "Port" port "is already in use:" (.getMessage e) "Remote server will not start.") diff --git a/src/eca/server.clj b/src/eca/server.clj index ae6fb1292..940c0289c 100644 --- a/src/eca/server.clj +++ b/src/eca/server.clj @@ -146,8 +146,12 @@ (chat-cleared [_this params] (jsonrpc.server/discarding-stdout (jsonrpc.server/send-notification server "chat/cleared" params))) - (chat-status-changed [_this _params]) - (chat-deleted [_this _params]) + (chat-status-changed [_this params] + (jsonrpc.server/discarding-stdout + (jsonrpc.server/send-notification server "chat/statusChanged" params))) + (chat-deleted [_this params] + (jsonrpc.server/discarding-stdout + (jsonrpc.server/send-notification server "chat/deleted" params))) (rewrite-content-received [_this content] (jsonrpc.server/discarding-stdout (jsonrpc.server/send-notification server "rewrite/contentReceived" content))) @@ -189,7 +193,8 @@ (metrics/start! metrics) (when sse-connections* (when-let [rs (remote.server/start! components sse-connections*)] - (reset! remote-server* rs))) + (reset! remote-server* rs) + (swap! db* assoc :remote-connect-url (:connect-url rs)))) (monitor-server-logs (:log-ch server)) (setup-dev-environment db* components) (jsonrpc.server/start server components))) From a99fa1ec61144e91e1e82f371a6e281800f83d70 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Fri, 20 Mar 2026 09:17:00 -0300 Subject: [PATCH 07/13] Return datetimes of server --- src/eca/features/chat.clj | 1 + src/eca/remote/handlers.clj | 13 +++++++++---- src/eca/server.clj | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/eca/features/chat.clj b/src/eca/features/chat.clj index af61536a6..081dec0e0 100644 --- a/src/eca/features/chat.clj +++ b/src/eca/features/chat.clj @@ -432,6 +432,7 @@ (logger/info logger-tag "Superseding active prompt" {:chat-id chat-id :status (get-in @db* [:chats chat-id :status])})) (swap! db* assoc-in [:chats chat-id :status] :running) + (swap! db* assoc-in [:chats chat-id :updated-at] (System/currentTimeMillis)) (messenger/chat-status-changed messenger {:chat-id chat-id :status :running}) (swap! db* assoc-in [:chats chat-id :prompt-id] prompt-id) (swap! db* assoc-in [:chats chat-id :model] full-model) diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj index 6eee81dde..17fee8185 100644 --- a/src/eca/remote/handlers.clj +++ b/src/eca/remote/handlers.clj @@ -12,7 +12,8 @@ [eca.shared :as shared] [ring.core.protocols :as ring.protocols]) (:import - [java.io InputStream])) + [java.io InputStream] + [java.time Instant])) (set! *warn-on-reflection* true) @@ -76,7 +77,9 @@ :title (:title chat) :status (or (:status chat) :idle) :created-at (:created-at chat) - :messages (or (:messages chat) [])})))) + :updated-at (:updated-at chat)})))) + :startedAt (when-let [ms (:started-at db)] + (.toString (Instant/ofEpochMilli ^long ms))) :welcomeMessage (handlers/welcome-message db config) :selectModel default-model :selectAgent default-agent-name @@ -100,11 +103,12 @@ (defn handle-list-chats [{:keys [db*]} _request] (let [chats (->> (vals (:chats @db*)) (remove :subagent) - (mapv (fn [{:keys [id title status created-at]}] + (mapv (fn [{:keys [id title status created-at updated-at]}] {:id id :title title :status (or status :idle) - :createdAt created-at})))] + :createdAt created-at + :updatedAt updated-at})))] (json-response chats))) (defn handle-get-chat [{:keys [db*]} _request chat-id] @@ -115,6 +119,7 @@ :title (:title chat) :status (or (:status chat) :idle) :created-at (:created-at chat) + :updated-at (:updated-at chat) :messages (or (:messages chat) []) :tool-calls (or (:tool-calls chat) {}) :task (:task chat)})) diff --git a/src/eca/server.clj b/src/eca/server.clj index 940c0289c..27194624f 100644 --- a/src/eca/server.clj +++ b/src/eca/server.clj @@ -174,7 +174,7 @@ (metrics/->NoopMetrics))) (defn start-server! [server] - (let [db* (atom db/initial-db) + (let [db* (atom (assoc db/initial-db :started-at (System/currentTimeMillis))) metrics (->Metrics db*) stdio-messenger (->ServerMessenger server db*) ;; Read remote config from file-based sources (global/env/custom). From 4f9df56e9a3f53dd84ba729bd1c5622ae5eeee1e Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Fri, 20 Mar 2026 09:33:53 -0300 Subject: [PATCH 08/13] Use pass instead of token --- src/eca/remote/auth.clj | 9 +++++---- src/eca/remote/handlers.clj | 4 ++-- src/eca/remote/routes.clj | 2 +- src/eca/remote/server.clj | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/eca/remote/auth.clj b/src/eca/remote/auth.clj index d6a8a4f80..09a1eaf25 100644 --- a/src/eca/remote/auth.clj +++ b/src/eca/remote/auth.clj @@ -7,13 +7,14 @@ (set! *warn-on-reflection* true) +(def ^:private alphanumeric "abcdefghijklmnopqrstuvwxyz0123456789") + (defn generate-token - "Generates a cryptographically random 32-byte hex token (64 characters)." + "Generates a random 5-character alphanumeric password." [] (let [random (SecureRandom.) - bytes (byte-array 32)] - (.nextBytes random bytes) - (apply str (map #(format "%02x" (bit-and % 0xff)) bytes)))) + len (count alphanumeric)] + (apply str (repeatedly 5 #(nth alphanumeric (.nextInt random len)))))) (def ^:private unauthorized-response {:status 401 diff --git a/src/eca/remote/handlers.clj b/src/eca/remote/handlers.clj index 17fee8185..a5478bb30 100644 --- a/src/eca/remote/handlers.clj +++ b/src/eca/remote/handlers.clj @@ -87,9 +87,9 @@ :trust (boolean (:trust db)) :selectedVariant selected-variant})) -(defn handle-root [_components _request {:keys [host token]}] +(defn handle-root [_components _request {:keys [host password]}] {:status 302 - :headers {"Location" (str "https://web.eca.dev?host=" host "&token=" token)} + :headers {"Location" (str "https://web.eca.dev?host=" host "&pass=" password)} :body nil}) (defn handle-health [_components _request] diff --git a/src/eca/remote/routes.clj b/src/eca/remote/routes.clj index 0e0ac0847..edc9f6066 100644 --- a/src/eca/remote/routes.clj +++ b/src/eca/remote/routes.clj @@ -21,7 +21,7 @@ segments (path-segments (:uri request))] (case segments [] (when (= :get method) - [handlers/handle-root components request {:host @host* :token token}]) + [handlers/handle-root components request {:host @host* :password token}]) ["api" "v1" "health"] (when (= :get method) diff --git a/src/eca/remote/server.clj b/src/eca/remote/server.clj index bcdac25a8..e5a94e720 100644 --- a/src/eca/remote/server.clj +++ b/src/eca/remote/server.clj @@ -81,7 +81,7 @@ heartbeat-ch (sse/start-heartbeat! sse-connections*) connect-url (str "https://web.eca.dev?host=" host-with-port - "&token=" token)] + "&pass=" token)] (logger/info logger-tag (str "🌐 Remote server started on port " actual-port)) (logger/info logger-tag (str "🔗 " connect-url)) {:server jetty-server From 2cfd3fe380b6185879c83986d6948fac54aac833 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Fri, 20 Mar 2026 09:49:12 -0300 Subject: [PATCH 09/13] Fix tests --- test/eca/remote/auth_test.clj | 6 +++--- test/eca/remote/handlers_test.clj | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/test/eca/remote/auth_test.clj b/test/eca/remote/auth_test.clj index e22a4c634..600ed57f2 100644 --- a/test/eca/remote/auth_test.clj +++ b/test/eca/remote/auth_test.clj @@ -5,10 +5,10 @@ [eca.remote.auth :as auth])) (deftest generate-token-test - (testing "generates a 64-character hex string" + (testing "generates a 5-character alphanumeric string" (let [token (auth/generate-token)] - (is (= 64 (count token))) - (is (re-matches #"[0-9a-f]{64}" token)))) + (is (= 5 (count token))) + (is (re-matches #"[a-z0-9]{5}" token)))) (testing "generates unique tokens" (let [tokens (repeatedly 10 auth/generate-token)] diff --git a/test/eca/remote/handlers_test.clj b/test/eca/remote/handlers_test.clj index bf5a84c46..3a5158404 100644 --- a/test/eca/remote/handlers_test.clj +++ b/test/eca/remote/handlers_test.clj @@ -23,9 +23,9 @@ (deftest handle-root-test (testing "redirects to web.eca.dev" - (let [response (handlers/handle-root nil nil {:host "192.168.1.1:7888" :token "abc123"})] + (let [response (handlers/handle-root nil nil {:host "192.168.1.1:7888" :password "abc123"})] (is (= 302 (:status response))) - (is (= "https://web.eca.dev?host=192.168.1.1:7888&token=abc123" + (is (= "https://web.eca.dev?host=192.168.1.1:7888&pass=abc123" (get-in response [:headers "Location"])))))) (deftest handle-list-chats-test From 1bf9b38fd2e0b4db1e0967f0e45323ce4b280e65 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Fri, 20 Mar 2026 10:26:04 -0300 Subject: [PATCH 10/13] Improve port finding --- CHANGELOG.md | 1 - docs/config.json | 6 +-- src/eca/remote/server.clj | 85 ++++++++++++++++++++++++++------------- 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89e788878..bbc3a1f79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,6 @@ ## Unreleased - Add remote web control server for browser-based chat observation and control via `web.eca.dev`. #333 -- Show remote connect URL in welcome message when remote server is enabled. ## 0.115.5 diff --git a/docs/config.json b/docs/config.json index 705cd6117..59b1dc09a 100644 --- a/docs/config.json +++ b/docs/config.json @@ -354,10 +354,10 @@ }, "port": { "type": "integer", - "description": "Port the HTTP server listens on. When unset, a random free port is used.", - "markdownDescription": "Port the HTTP server listens on. When unset, a random free port is used.", + "description": "Port the HTTP server listens on. When unset, tries port 6666, then 6667, 6668, etc. up to 10 attempts.", + "markdownDescription": "Port the HTTP server listens on. When unset, tries port `6666`, then `6667`, `6668`, etc. up to 10 attempts.", "examples": [ - 7888 + 6666 ] }, "password": { diff --git a/src/eca/remote/server.clj b/src/eca/remote/server.clj index e5a94e720..f82595497 100644 --- a/src/eca/remote/server.clj +++ b/src/eca/remote/server.clj @@ -16,6 +16,14 @@ (def ^:private logger-tag "[REMOTE]") +(def ^:private default-port + "Base port for the remote server when no explicit port is configured." + 6666) + +(def ^:private max-port-attempts + "Number of sequential ports to try before giving up." + 20) + (defn ^:private detect-lan-ip "Enumerates network interfaces to find a site-local (private) IPv4 address. Returns the IP string or nil when none is found." @@ -46,10 +54,26 @@ (do (logger/warn logger-tag "Could not detect LAN IP. Consider setting remote.host in config.") "127.0.0.1"))) -(defn ^:private resolve-port - "Returns the configured port, or 0 for auto-assignment." - [remote-config] - (or (:port remote-config) 0)) +(defn ^:private try-start-jetty + "Attempts to start Jetty on the given port. Returns the Server on success, + nil on BindException." + ^Server [handler port] + (try + (jetty/run-jetty handler {:port port :host "0.0.0.0" :join? false}) + (catch BindException _ + nil))) + +(defn ^:private start-with-retry + "Tries sequential ports starting from base-port up to max-port-attempts. + Returns [server actual-port] on success, nil if all attempts fail." + [handler base-port] + (loop [port base-port + attempts 0] + (when (< attempts max-port-attempts) + (if-let [server (try-start-jetty handler port)] + [server (.getLocalPort ^NetworkConnector (first (.getConnectors ^Server server)))] + (do (logger/debug logger-tag (str "Port " port " in use, trying " (inc port) "...")) + (recur (inc port) (inc attempts))))))) (defn start! "Starts the remote HTTP server if enabled in config. @@ -63,7 +87,7 @@ (when (:enabled remote-config) (let [token (or (:password remote-config) (auth/generate-token)) host-base (or (:host remote-config) (detect-host)) - port (resolve-port remote-config) + user-port (:port remote-config) ;; Use atom so the handler sees host:port after Jetty resolves the actual port host+port* (atom host-base) handler (routes/create-handler components @@ -71,28 +95,35 @@ :host* host+port* :sse-connections* sse-connections*})] (try - (let [jetty-server ^Server (jetty/run-jetty handler - {:port port - :host "0.0.0.0" - :join? false}) - actual-port (.getLocalPort ^NetworkConnector (first (.getConnectors jetty-server))) - host-with-port (str host-base ":" actual-port) - _ (reset! host+port* host-with-port) - heartbeat-ch (sse/start-heartbeat! sse-connections*) - connect-url (str "https://web.eca.dev?host=" - host-with-port - "&pass=" token)] - (logger/info logger-tag (str "🌐 Remote server started on port " actual-port)) - (logger/info logger-tag (str "🔗 " connect-url)) - {:server jetty-server - :sse-connections* sse-connections* - :heartbeat-stop-ch heartbeat-ch - :token token - :host host-with-port - :connect-url connect-url}) - (catch BindException e - (logger/warn logger-tag "Port" port "is already in use:" (.getMessage e) - "Remote server will not start.") + (if-let [[^Server jetty-server actual-port] + (if user-port + ;; User-specified port: single attempt, no retry + (if-let [server (try-start-jetty handler user-port)] + [server (.getLocalPort ^NetworkConnector (first (.getConnectors ^Server server)))] + (do (logger/warn logger-tag "Port" user-port "is already in use." + "Remote server will not start.") + nil)) + ;; Default: try sequential ports starting from default-port + (or (start-with-retry handler default-port) + (do (logger/warn logger-tag + (str "Could not bind to ports " default-port "-" + (+ default-port (dec max-port-attempts)) + ". Remote server will not start.")) + nil)))] + (let [host-with-port (str host-base ":" actual-port) + _ (reset! host+port* host-with-port) + heartbeat-ch (sse/start-heartbeat! sse-connections*) + connect-url (str "https://web.eca.dev?host=" + host-with-port + "&pass=" token)] + (logger/info logger-tag (str "🌐 Remote server started on port " actual-port)) + (logger/info logger-tag (str "🔗 " connect-url)) + {:server jetty-server + :sse-connections* sse-connections* + :heartbeat-stop-ch heartbeat-ch + :token token + :host host-with-port + :connect-url connect-url}) nil) (catch Exception e (logger/warn logger-tag "Failed to start remote server:" (.getMessage e)) From 6798b96a7573dd94b67a0ebceee105757f0d34c9 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Fri, 20 Mar 2026 10:36:13 -0300 Subject: [PATCH 11/13] Fix port --- docs/config.json | 6 +++--- src/eca/remote/server.clj | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/config.json b/docs/config.json index 59b1dc09a..a790f1675 100644 --- a/docs/config.json +++ b/docs/config.json @@ -354,10 +354,10 @@ }, "port": { "type": "integer", - "description": "Port the HTTP server listens on. When unset, tries port 6666, then 6667, 6668, etc. up to 10 attempts.", - "markdownDescription": "Port the HTTP server listens on. When unset, tries port `6666`, then `6667`, `6668`, etc. up to 10 attempts.", + "description": "Port the HTTP server listens on. When unset, tries port 7777, then 7778, 7779, etc. up to 10 attempts.", + "markdownDescription": "Port the HTTP server listens on. When unset, tries port `7777`, then `7778`, `7779`, etc. up to 10 attempts.", "examples": [ - 6666 + 7777 ] }, "password": { diff --git a/src/eca/remote/server.clj b/src/eca/remote/server.clj index f82595497..da8016181 100644 --- a/src/eca/remote/server.clj +++ b/src/eca/remote/server.clj @@ -18,7 +18,7 @@ (def ^:private default-port "Base port for the remote server when no explicit port is configured." - 6666) + 7777) (def ^:private max-port-attempts "Number of sequential ports to try before giving up." From 64b630a6ad00889c7602aa3c85bb961b75859dab Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Fri, 20 Mar 2026 14:37:04 -0300 Subject: [PATCH 12/13] Fix port connection --- src/eca/remote/server.clj | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/eca/remote/server.clj b/src/eca/remote/server.clj index da8016181..66c603654 100644 --- a/src/eca/remote/server.clj +++ b/src/eca/remote/server.clj @@ -9,6 +9,7 @@ [eca.remote.sse :as sse] [ring.adapter.jetty :as jetty]) (:import + [java.io IOException] [java.net BindException Inet4Address InetAddress NetworkInterface] [org.eclipse.jetty.server NetworkConnector Server])) @@ -56,12 +57,12 @@ (defn ^:private try-start-jetty "Attempts to start Jetty on the given port. Returns the Server on success, - nil on BindException." + nil when the port is already in use (BindException or Jetty's wrapping IOException)." ^Server [handler port] (try (jetty/run-jetty handler {:port port :host "0.0.0.0" :join? false}) - (catch BindException _ - nil))) + (catch BindException _ nil) + (catch IOException _ nil))) (defn ^:private start-with-retry "Tries sequential ports starting from base-port up to max-port-attempts. From 0fbcaeb868ae07b6b0749bf3bbf9f8f0971d2482 Mon Sep 17 00:00:00 2001 From: Eric Dallo Date: Fri, 20 Mar 2026 15:48:41 -0300 Subject: [PATCH 13/13] lint --- src/eca/remote/middleware.clj | 10 ---------- test/eca/remote/handlers_test.clj | 3 +-- test/eca/remote/messenger_test.clj | 13 +++++++------ test/eca/remote/sse_test.clj | 1 - 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/src/eca/remote/middleware.clj b/src/eca/remote/middleware.clj index ee3bd3cca..eabb75731 100644 --- a/src/eca/remote/middleware.clj +++ b/src/eca/remote/middleware.clj @@ -36,13 +36,3 @@ :body nil} (let [response (handler request)] (update response :headers merge headers)))))) - -(defn wrap-json-content-type - "Ring middleware that sets default JSON content-type on responses - that don't already have one." - [handler] - (fn [request] - (let [response (handler request)] - (if (get-in response [:headers "Content-Type"]) - response - (assoc-in response [:headers "Content-Type"] "application/json; charset=utf-8"))))) diff --git a/test/eca/remote/handlers_test.clj b/test/eca/remote/handlers_test.clj index 3a5158404..4dbee00e5 100644 --- a/test/eca/remote/handlers_test.clj +++ b/test/eca/remote/handlers_test.clj @@ -1,9 +1,8 @@ (ns eca.remote.handlers-test (:require [cheshire.core :as json] - [clojure.test :refer [deftest is testing use-fixtures]] + [clojure.test :refer [deftest is testing]] [eca.config :as config] - [eca.db :as db] [eca.remote.handlers :as handlers] [eca.test-helper :as h])) diff --git a/test/eca/remote/messenger_test.clj b/test/eca/remote/messenger_test.clj index 4188ee021..1b6d9e263 100644 --- a/test/eca/remote/messenger_test.clj +++ b/test/eca/remote/messenger_test.clj @@ -1,6 +1,7 @@ (ns eca.remote.messenger-test (:require - [clojure.test :refer [deftest is testing use-fixtures]] + [clojure.test :refer [deftest is testing]] + [eca.messenger :as messenger] [eca.remote.messenger :as remote.messenger] [eca.remote.sse :as sse] [eca.test-helper :as h])) @@ -16,7 +17,7 @@ (testing "chat-content-received delegates to inner and broadcasts camelCase" (let [data {:chat-id "c1" :role :assistant :content {:type :text :text "hi"}}] - (eca.messenger/chat-content-received broadcast-messenger data) + (messenger/chat-content-received broadcast-messenger data) (Thread/sleep 100) (is (seq (:chat-content-received (h/messages)))) (let [output (.toString os "UTF-8")] @@ -26,7 +27,7 @@ (testing "chat-status-changed delegates and broadcasts camelCase" (let [params {:chat-id "c1" :status :running}] - (eca.messenger/chat-status-changed broadcast-messenger params) + (messenger/chat-status-changed broadcast-messenger params) (Thread/sleep 100) (is (seq (:chat-status-changed (h/messages)))) (let [output (.toString os "UTF-8")] @@ -35,7 +36,7 @@ (testing "chat-deleted delegates and broadcasts camelCase" (let [params {:chat-id "c1"}] - (eca.messenger/chat-deleted broadcast-messenger params) + (messenger/chat-deleted broadcast-messenger params) (Thread/sleep 100) (is (seq (:chat-deleted (h/messages)))) (let [output (.toString os "UTF-8")] @@ -45,7 +46,7 @@ (testing "editor-diagnostics delegates to inner only (no broadcast)" (let [os2 (java.io.ByteArrayOutputStream.) _client2 (sse/add-client! sse-connections* os2)] - (eca.messenger/editor-diagnostics broadcast-messenger nil) + (messenger/editor-diagnostics broadcast-messenger nil) (Thread/sleep 100) (is (not (.contains (.toString os2 "UTF-8") "editor"))))) @@ -53,7 +54,7 @@ (let [os3 (java.io.ByteArrayOutputStream.) _client3 (sse/add-client! sse-connections* os3) data {:chat-id "c1" :content {:type :text :text "rewritten"}}] - (eca.messenger/rewrite-content-received broadcast-messenger data) + (messenger/rewrite-content-received broadcast-messenger data) (Thread/sleep 100) (is (seq (:rewrite-content-received (h/messages)))) (is (not (.contains (.toString os3 "UTF-8") "rewrite"))))) diff --git a/test/eca/remote/sse_test.clj b/test/eca/remote/sse_test.clj index de3c82ed4..e78fb2ca9 100644 --- a/test/eca/remote/sse_test.clj +++ b/test/eca/remote/sse_test.clj @@ -1,6 +1,5 @@ (ns eca.remote.sse-test (:require - [cheshire.core :as json] [clojure.core.async :as async] [clojure.test :refer [deftest is testing]] [eca.remote.sse :as sse])