Skip to content

Commit 7c3cfc9

Browse files
committed
Add multi-database support: connections UI + db.list/db.use MCP tools
Phase 0: settings_db schema for multiple DB connections with CRUD operations (add/activate/delete), Settings UI with connection list, type/mode/active tags, and separated query settings panel. Phase 1: db.list and db.use MCP tools for per-session database switching. SessionDBManager caches engines by URL and tracks per-session overrides. All existing tools (sql.query, sql.schema, db.apply, etc.) now resolve executor/introspector dynamically based on session context. Session DB state is cleaned up on expiry via GC.
1 parent d2999dc commit 7c3cfc9

9 files changed

Lines changed: 621 additions & 48 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ SSE endpoint: http://localhost:8123/mcp (GET for Server-Sent Events)
6868
| `sql.query` | Execute read-only SQL queries (SELECT, WITH, EXPLAIN, SHOW, DESCRIBE) |
6969
| `sql.schema` | Introspect tables, columns, types, and indexes |
7070
| `sql.explain` | Get EXPLAIN plan for a query |
71+
| `db.list` | List registered database connections (name, type, host, mode) |
72+
| `db.use` | Switch active database for this MCP session |
7173
| `db.design` | Generate a desired schema template |
7274
| `db.schema.diff` | Compare desired schema against current database |
7375
| `db.migrate.plan` | Generate migration SQL from a schema diff |

app/main.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@
1515
from app.mcp.registry import ToolRegistry
1616
from app.mcp.resources import ResourceRegistry
1717
from app.mcp.tools import build_tools
18+
from app.session_db import SessionDBManager
1819
from app.web.routes import build_router
1920

2021
logger = QueryLogger()
22+
session_db_mgr = SessionDBManager("")
2123

2224

2325
def _build_app_state() -> tuple[Config, ToolRegistry]:
26+
global session_db_mgr
2427
cfg = Config.load()
28+
session_db_mgr = SessionDBManager(cfg.db_url)
2529
reg = ToolRegistry()
26-
for tool_def, handler in build_tools(cfg, logger):
30+
for tool_def, handler in build_tools(cfg, logger, session_db_mgr):
2731
reg.register(tool_def, handler)
2832
return cfg, reg
2933

@@ -75,6 +79,7 @@ async def _get_session(session_id: str) -> asyncio.Queue[str] | None:
7579
async def _remove_session(session_id: str) -> None:
7680
async with _sessions_lock:
7781
_sessions.pop(session_id, None)
82+
session_db_mgr.clear_session(session_id)
7883

7984

8085
async def _enqueue(session_id: str, payload: Dict[str, Any]) -> bool:
@@ -97,6 +102,7 @@ async def _gc_sessions() -> None:
97102
]
98103
for session_id in expired:
99104
_sessions.pop(session_id, None)
105+
session_db_mgr.clear_session(session_id)
100106

101107

102108
@asynccontextmanager
@@ -246,6 +252,7 @@ async def mcp(request: Request) -> Response:
246252
if session_id and await _enqueue(session_id, err):
247253
return Response(status_code=202)
248254
return _response(err)
255+
arguments["_context"] = {"session_id": session_id}
249256
tool_result = registry.call(tool_name, arguments)
250257
is_error = bool(tool_result.get("error"))
251258
content = [

app/mcp/tools.py

Lines changed: 179 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,63 @@
33
from app.config import Config
44
from app.logging import QueryLogEntry, QueryLogger, now_iso
55
from app.mcp.registry import ToolAnnotations, ToolDef
6+
from app.session_db import SessionDBManager
67
from app.sql.executor import SQLExecutor
78
from app.sql.schema import SchemaIntrospector
89

910

10-
def build_tools(config: Config, logger: QueryLogger) -> List[Tuple[ToolDef, Any]]:
11-
executor = SQLExecutor(config.db_url)
12-
introspector = SchemaIntrospector(config.db_url)
11+
def build_tools(
12+
config: Config, logger: QueryLogger, session_mgr: SessionDBManager | None = None
13+
) -> List[Tuple[ToolDef, Any]]:
14+
# Default executor/introspector for non-session-aware calls
15+
default_executor = SQLExecutor(config.db_url)
16+
default_introspector = SchemaIntrospector(config.db_url)
17+
18+
def _executor(payload: Dict[str, Any]) -> SQLExecutor:
19+
"""Return per-session executor or default."""
20+
ctx = payload.get("_context") or {}
21+
sid = ctx.get("session_id")
22+
if session_mgr and sid:
23+
engine = session_mgr.get_engine_for_session(sid)
24+
ex = SQLExecutor.__new__(SQLExecutor)
25+
ex.engine = engine
26+
return ex
27+
return default_executor
28+
29+
def _introspector(payload: Dict[str, Any]) -> SchemaIntrospector:
30+
"""Return per-session introspector or default."""
31+
ctx = payload.get("_context") or {}
32+
sid = ctx.get("session_id")
33+
if session_mgr and sid:
34+
engine = session_mgr.get_engine_for_session(sid)
35+
si = SchemaIntrospector.__new__(SchemaIntrospector)
36+
si.engine = engine
37+
return si
38+
return default_introspector
39+
40+
def _effective_mode(payload: Dict[str, Any]) -> str:
41+
"""Return per-session mode or default."""
42+
ctx = payload.get("_context") or {}
43+
sid = ctx.get("session_id")
44+
if session_mgr and sid:
45+
return session_mgr.get_mode(sid, config.mode)
46+
return config.mode
47+
48+
def _db_type(payload: Dict[str, Any]) -> str:
49+
"""Return per-session db_type or default."""
50+
ctx = payload.get("_context") or {}
51+
sid = ctx.get("session_id")
52+
if session_mgr and sid:
53+
return session_mgr.get_db_type(sid)
54+
return config.db_type
1355

1456
def sql_query(payload: Dict[str, Any]) -> Dict[str, Any]:
1557
sql = payload.get("sql", "")
1658
limit_override = payload.get("limit")
1759
effective_limit = config.limit_default
1860
if isinstance(limit_override, int) and limit_override > 0:
1961
effective_limit = min(limit_override, config.limit_default)
62+
executor = _executor(payload)
2063
rows, columns, elapsed_ms, error = executor.execute(
2164
sql,
2265
mode="read-only",
@@ -49,12 +92,14 @@ def sql_query(payload: Dict[str, Any]) -> Dict[str, Any]:
4992

5093
def sql_schema(payload: Dict[str, Any]) -> Dict[str, Any]:
5194
table = payload.get("table")
95+
introspector = _introspector(payload)
5296
schema = introspector.get_schema(table=table)
5397
return {"schema": schema}
5498

5599
def sql_explain(payload: Dict[str, Any]) -> Dict[str, Any]:
56100
sql = payload.get("sql", "")
57101
plan_sql = f"EXPLAIN {sql}"
102+
executor = _executor(payload)
58103
rows, columns, elapsed_ms, error = executor.execute(
59104
plan_sql,
60105
mode="read-only",
@@ -93,6 +138,7 @@ def db_schema_diff(payload: Dict[str, Any]) -> Dict[str, Any]:
93138
desired = payload.get("desired_schema")
94139
if not isinstance(desired, dict):
95140
return {"error": "desired_schema must be an object"}
141+
introspector = _introspector(payload)
96142
current = introspector.get_schema_simple()
97143
desired_tables = desired.get("tables", {})
98144
current_tables = current.get("tables", {})
@@ -228,9 +274,11 @@ def db_apply(payload: Dict[str, Any]) -> Dict[str, Any]:
228274
statements = [s.strip() for s in sql.split(";") if s.strip()]
229275
if len(statements) != 1:
230276
return {"error": "Only a single SQL statement is allowed"}
277+
executor = _executor(payload)
278+
mode = _effective_mode(payload)
231279
_, _, elapsed_ms, error = executor.execute(
232280
statements[0],
233-
mode=config.mode,
281+
mode=mode,
234282
limit_default=config.limit_default,
235283
timeout_ms=config.timeout_ms,
236284
)
@@ -246,8 +294,10 @@ def db_migrate(payload: Dict[str, Any]) -> Dict[str, Any]:
246294
statements = [s.strip() for s in sql.split(";") if s.strip()]
247295
if not statements:
248296
return {"error": "No SQL statements found"}
249-
if config.mode != "execute":
297+
mode = _effective_mode(payload)
298+
if mode != "execute":
250299
return {"error": "Migration requires MODE=execute"}
300+
executor = _executor(payload)
251301

252302
results = []
253303
for stmt in statements:
@@ -256,7 +306,7 @@ def db_migrate(payload: Dict[str, Any]) -> Dict[str, Any]:
256306
continue
257307
_, _, elapsed_ms, error = executor.execute(
258308
stmt,
259-
mode=config.mode,
309+
mode=mode,
260310
limit_default=config.limit_default,
261311
timeout_ms=config.timeout_ms,
262312
)
@@ -279,13 +329,14 @@ def db_migrate_plan(payload: Dict[str, Any]) -> Dict[str, Any]:
279329
if destructive and not config.allow_destructive:
280330
return {"error": "Destructive operations are disabled by config"}
281331

282-
diff = db_schema_diff({"desired_schema": desired})
332+
diff = db_schema_diff({"desired_schema": desired, "_context": payload.get("_context")})
283333
if diff.get("error"):
284334
return diff
285335

286336
statements: List[str] = []
287337
warnings: List[str] = []
288338
desired_tables = desired.get("tables", {})
339+
executor = _executor(payload)
289340
dialect = executor.engine.dialect.name
290341

291342
for table in diff.get("missing_tables", []):
@@ -369,17 +420,71 @@ def db_migrate_plan_apply(payload: Dict[str, Any]) -> Dict[str, Any]:
369420
desired = payload.get("desired_schema")
370421
destructive = bool(payload.get("destructive", False))
371422
dry_run = bool(payload.get("dry_run", False))
372-
plan = db_migrate_plan({"desired_schema": desired, "destructive": destructive})
423+
plan = db_migrate_plan(
424+
{
425+
"desired_schema": desired,
426+
"destructive": destructive,
427+
"_context": payload.get("_context"),
428+
}
429+
)
373430
if plan.get("error"):
374431
return plan
375432
sql = ";\n".join(plan.get("statements", []))
376433
if not sql.strip():
377434
return {"error": "Plan produced no statements"}
378-
migrate_result = db_migrate({"sql": sql, "dry_run": dry_run})
435+
migrate_result = db_migrate(
436+
{"sql": sql, "dry_run": dry_run, "_context": payload.get("_context")}
437+
)
379438
migrate_result["plan"] = plan
380439
return migrate_result
381440

382-
return [
441+
# ── db.list / db.use ─────────────────────────────────────────────────────
442+
443+
def db_list(payload: Dict[str, Any]) -> Dict[str, Any]:
444+
if not session_mgr:
445+
return {"connections": [], "note": "Multi-database not enabled"}
446+
connections = session_mgr.list_connections()
447+
ctx = payload.get("_context") or {}
448+
sid = ctx.get("session_id")
449+
# Mark which one this session is using
450+
session_conn = session_mgr.get_session_connection(sid)
451+
session_conn_id = session_conn["id"] if session_conn else None
452+
for c in connections:
453+
c["current"] = c["id"] == session_conn_id if session_conn_id else c["is_active"]
454+
return {"connections": connections}
455+
456+
def db_use(payload: Dict[str, Any]) -> Dict[str, Any]:
457+
if not session_mgr:
458+
return {"error": "Multi-database not enabled"}
459+
ctx = payload.get("_context") or {}
460+
sid = ctx.get("session_id")
461+
if not sid:
462+
return {"error": "No MCP session — db.use requires a session"}
463+
connection_id = payload.get("connection_id")
464+
connection_name = payload.get("name")
465+
if connection_id is None and not connection_name:
466+
return {"error": "Provide connection_id or name"}
467+
connections = session_mgr.list_connections()
468+
target = None
469+
if connection_id is not None:
470+
target = next((c for c in connections if c["id"] == connection_id), None)
471+
elif connection_name:
472+
target = next((c for c in connections if c["name"] == connection_name), None)
473+
if not target:
474+
return {"error": f"Connection not found: {connection_id or connection_name}"}
475+
session_mgr.set_session_db(sid, target["id"])
476+
return {
477+
"ok": True,
478+
"active": {
479+
"id": target["id"],
480+
"name": target["name"],
481+
"db_type": target["db_type"],
482+
"mode": target["mode"],
483+
"host": target["host"],
484+
},
485+
}
486+
487+
tools: list[tuple[ToolDef, Any]] = [
383488
(
384489
ToolDef(
385490
name="sql.query",
@@ -462,6 +567,68 @@ def db_migrate_plan_apply(payload: Dict[str, Any]) -> Dict[str, Any]:
462567
),
463568
sql_explain,
464569
),
570+
(
571+
ToolDef(
572+
name="db.list",
573+
title="List Databases",
574+
description=(
575+
"List all registered database connections. "
576+
"Shows name, type, host, mode, and which one is "
577+
"currently active for this session."
578+
),
579+
input_schema={"type": "object", "properties": {}},
580+
output_schema={
581+
"type": "object",
582+
"properties": {
583+
"connections": {
584+
"type": "array",
585+
"items": {
586+
"type": "object",
587+
"properties": {
588+
"id": {"type": "integer"},
589+
"name": {"type": "string"},
590+
"db_type": {"type": "string"},
591+
"mode": {"type": "string"},
592+
"host": {"type": "string"},
593+
"is_active": {"type": "boolean"},
594+
"current": {"type": "boolean"},
595+
},
596+
},
597+
},
598+
},
599+
},
600+
annotations=ToolAnnotations(read_only_hint=True, idempotent_hint=True),
601+
),
602+
db_list,
603+
),
604+
(
605+
ToolDef(
606+
name="db.use",
607+
title="Switch Database",
608+
description=(
609+
"Switch the active database for this MCP session. "
610+
"Provide either connection_id or name from db.list. "
611+
"Only affects this session — other sessions keep their own database."
612+
),
613+
input_schema={
614+
"type": "object",
615+
"properties": {
616+
"connection_id": {"type": "integer"},
617+
"name": {"type": "string"},
618+
},
619+
},
620+
output_schema={
621+
"type": "object",
622+
"properties": {
623+
"ok": {"type": "boolean"},
624+
"active": {"type": "object"},
625+
"error": {"type": "string"},
626+
},
627+
},
628+
annotations=ToolAnnotations(read_only_hint=False, idempotent_hint=True),
629+
),
630+
db_use,
631+
),
465632
(
466633
ToolDef(
467634
name="db.design",
@@ -636,3 +803,5 @@ def db_migrate_plan_apply(payload: Dict[str, Any]) -> Dict[str, Any]:
636803
db_migrate_plan_apply,
637804
),
638805
]
806+
807+
return tools

0 commit comments

Comments
 (0)