Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3e152cf
docs(openai-passthrough): add implementation plan
xiehust May 25, 2026
14a0b8f
docs(design): add OpenAI passthrough endpoints design doc
xiehust May 25, 2026
b2cfff2
feat(openai-passthrough): add ENABLE_OPENAI_PASSTHROUGH flag and resp…
xiehust May 25, 2026
e9f43fd
feat(auth): accept Authorization: Bearer alongside x-api-key
xiehust May 25, 2026
24d91ae
feat(openai-passthrough): add usage normalization and SSE extraction …
xiehust May 25, 2026
e3b8dac
feat(openai-passthrough): add model mapping resolver with passthrough…
xiehust May 25, 2026
8422684
feat(usage): record api_surface and reasoning_tokens on usage rows
xiehust May 25, 2026
c3cf90d
feat(openai-passthrough): add httpx singleton client and header helper
xiehust May 25, 2026
5aac233
feat(openai-passthrough): add SSE passthrough generator with usage tee
xiehust May 25, 2026
93408d5
feat(openai-passthrough): non-streaming /chat/completions endpoint
xiehust May 25, 2026
888eabd
test(openai-passthrough): streaming /chat/completions integration tests
xiehust May 25, 2026
e091bd7
feat(openai-passthrough): /responses endpoint (POST, streaming + non-…
xiehust May 25, 2026
8abab65
feat(openai-passthrough): /responses CRUD passthrough (GET, DELETE, c…
xiehust May 25, 2026
bb2b1b3
feat(openai-passthrough): /models endpoint passthrough
xiehust May 25, 2026
8048cc3
test(openai-passthrough): pin guardrail header forwarding behavior
xiehust May 25, 2026
6fb932f
chore(openai-passthrough): lint and type cleanup
xiehust May 25, 2026
d7c7d37
docs(openai-passthrough): document new feature in env.example, CLAUDE…
xiehust May 25, 2026
fc827c7
fix(openai-passthrough): yield structured SSE error on upstream timeo…
xiehust May 25, 2026
20b5897
fix(openai-passthrough): build absolute upstream URLs to preserve /v1…
xiehust May 25, 2026
c88eb51
fix(cdk): propagate ENABLE_OPENAI_PASSTHROUGH env var to ECS task def…
xiehust May 25, 2026
b926b42
fix(openai-passthrough): synthesize event: lines for Responses API SSE
xiehust May 25, 2026
e950e8a
fix(openai-passthrough): surface upstream HTTP errors with real statu…
xiehust May 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ black app tests && ruff check app tests && mypy app
- **InvokeModel API** (Claude models): Native Anthropic format, minimal conversion, full beta feature support
- **Converse API** (non-Claude models): Requires format conversion, unified API for all Bedrock models
- **OpenAI Chat Completions API** (non-Claude models, optional): When `ENABLE_OPENAI_COMPAT=True`, non-Claude models use Bedrock's OpenAI-compatible endpoint via bedrock-mantle instead of Converse API
- **OpenAI Passthrough** (any model bedrock-mantle accepts, optional): When `ENABLE_OPENAI_PASSTHROUGH=True`, mounts `/openai/v1/{chat/completions,responses,responses/{id},models}` for clients using OpenAI-format directly.

**Routing**: If model ID contains "anthropic" or "claude" → InvokeModel; else if `ENABLE_OPENAI_COMPAT` → OpenAI Chat Completions; else → Converse.
**Routing**: If model ID contains "anthropic" or "claude" → InvokeModel; else if `ENABLE_OPENAI_COMPAT` → OpenAI Chat Completions; else → Converse. OpenAI Passthrough routes are independent and mount at `/openai/v1/*`.

> **Detailed conversion flows, content block mapping, and streaming implementation**: see [docs/architecture/detailed-flows.md](docs/architecture/detailed-flows.md)

Expand Down Expand Up @@ -104,6 +105,7 @@ Each feature has detailed docs in [docs/architecture/features.md](docs/architect
- **OpenTelemetry Tracing**: OTEL GenAI semantic conventions, session-based trace grouping. Zero overhead when disabled.
- **Admin Portal**: Separate FastAPI app for API key/usage/pricing management with Cognito auth.
- **OpenAI-Compatible API**: Non-Claude models can optionally use Bedrock's OpenAI Chat Completions API via bedrock-mantle endpoint instead of Converse API. Controlled by `ENABLE_OPENAI_COMPAT` flag. Maps `thinking` to OpenAI `reasoning` with configurable effort thresholds.
- **OpenAI Passthrough**: New `/openai/v1/*` endpoints accept OpenAI-native Chat Completions and Responses API requests and forward them to bedrock-mantle. Distinct from `ENABLE_OPENAI_COMPAT` (which routes Anthropic-format requests on `/v1/messages`). Reuses proxy API key auth, rate limits, budgets, and usage tracking. Controlled by `ENABLE_OPENAI_PASSTHROUGH`.

## Common Development Tasks

Expand Down
6 changes: 6 additions & 0 deletions app/api/openai_passthrough/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""OpenAI Passthrough — accepts OpenAI Chat Completions and Responses API
calls from clients and forwards them to AWS bedrock-mantle.
"""
from app.api.openai_passthrough.router import router

__all__ = ["router"]
69 changes: 69 additions & 0 deletions app/api/openai_passthrough/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Async httpx client to bedrock-mantle, lazily constructed and reused.

Headers are NOT set on the client itself; they're added per-request in the
router so we can include the proxy's Bedrock API key in Authorization.

URL building note: we deliberately do NOT set ``base_url`` on the AsyncClient.
httpx follows RFC 3986 path-merging, which means a request path starting with
``/`` REPLACES the path component of the base_url. With
``OPENAI_BASE_URL=https://bedrock-mantle.us-west-2.api.aws/v1``, calling
``client.post("/chat/completions")`` would produce
``https://bedrock-mantle.us-west-2.api.aws/chat/completions`` (the ``/v1`` is
dropped). To avoid this footgun we build full URLs explicitly via
``upstream_url(path)``.
"""
from __future__ import annotations

import httpx

from app.core.config import settings

_client: httpx.AsyncClient | None = None


def get_client() -> httpx.AsyncClient:
global _client
if _client is None:
_client = httpx.AsyncClient(
timeout=httpx.Timeout(settings.bedrock_timeout, connect=10.0),
limits=httpx.Limits(max_connections=200, max_keepalive_connections=50),
)
return _client


def reset_client_for_testing() -> None:
"""Reset the singleton — only call this from test fixtures."""
global _client
if _client is not None:
# AsyncClient.aclose() is async; tests will close the loop after, so we
# null it here and let the GC clean up the underlying transport.
_client = None


def upstream_url(path: str) -> str:
"""Build a full upstream URL by appending ``path`` to ``OPENAI_BASE_URL``.

Avoids httpx's RFC 3986 path-replacement behaviour by always producing a
fully-qualified URL.

Examples:
OPENAI_BASE_URL=https://bedrock-mantle.us-west-2.api.aws/v1
upstream_url("/chat/completions") -> https://bedrock-mantle.us-west-2.api.aws/v1/chat/completions
upstream_url("models") -> https://bedrock-mantle.us-west-2.api.aws/v1/models
"""
base = settings.openai_base_url.rstrip("/")
if not path.startswith("/"):
path = "/" + path
return base + path


def upstream_headers(extra: dict[str, str] | None = None) -> dict[str, str]:
"""Build the Authorization + standard headers for an upstream call."""
headers = {
"Authorization": f"Bearer {settings.openai_api_key}",
"Content-Type": "application/json",
"User-Agent": "bedrock-api-proxy/openai-passthrough",
}
if extra:
headers.update(extra)
return headers
33 changes: 33 additions & 0 deletions app/api/openai_passthrough/model_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Model ID resolution for the OpenAI passthrough endpoints.

Looks up the client-supplied model in the existing model_mapping table; if a
mapping exists, substitute it. Otherwise, pass through unchanged so callers
can use Bedrock-native IDs (e.g. ``openai.gpt-oss-120b``) directly without
needing to register them.
"""
from __future__ import annotations

import logging

logger = logging.getLogger(__name__)


def resolve_model_id(model: str, model_mapping_manager) -> str:
"""Resolve a client-supplied model ID via the mapping table, with fallback.

Args:
model: The ``model`` field from the client request.
model_mapping_manager: An app.db.dynamodb.ModelMappingManager instance.

Returns:
The resolved Bedrock model ID, or the original string if no mapping
exists or the lookup fails.
"""
if not model:
return model
try:
mapped = model_mapping_manager.get_mapping(model)
except Exception as exc:
logger.warning("[OPENAI-PASSTHROUGH] model mapping lookup failed for %r: %s", model, exc)
return model
return mapped or model
239 changes: 239 additions & 0 deletions app/api/openai_passthrough/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
"""FastAPI routes for the OpenAI passthrough endpoints.

Mounted at /openai/v1 only when settings.enable_openai_passthrough is True.
"""
from __future__ import annotations

import logging
from typing import Any, cast
from uuid import uuid4

from fastapi import APIRouter, Depends, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse

from app.api.openai_passthrough.client import get_client, upstream_headers, upstream_url
from app.api.openai_passthrough.model_mapping import resolve_model_id
from app.api.openai_passthrough.streaming import (
UpstreamConnectionError,
open_upstream_stream,
stream_passthrough_response,
)
from app.api.openai_passthrough.usage_extractor import normalize_usage
from app.db.dynamodb import DynamoDBClient, ModelMappingManager, UsageTracker
from app.middleware.auth import get_api_key_info

logger = logging.getLogger(__name__)
router = APIRouter()

_ddb: DynamoDBClient | None = None
_mapping: ModelMappingManager | None = None
_usage: UsageTracker | None = None


def _managers() -> tuple[ModelMappingManager, UsageTracker]:
"""Lazily build DDB managers — keeps import-time side effects out of tests."""
global _ddb, _mapping, _usage
if _ddb is None or _mapping is None or _usage is None:
_ddb = DynamoDBClient()
_mapping = ModelMappingManager(_ddb)
_usage = UsageTracker(_ddb)
return _mapping, _usage


def _record_usage(api_key_info: dict[str, Any], raw_usage: dict[str, Any], model: str, api_surface: str) -> None:
_, usage = _managers()
norm = normalize_usage(raw_usage, api_surface)
try:
usage.record_usage(
api_key=api_key_info.get("api_key", ""),
request_id=str(uuid4()),
model=model,
input_tokens=norm["input_tokens"],
output_tokens=norm["output_tokens"],
cached_tokens=norm["cache_read_input_tokens"],
cache_write_input_tokens=norm["cache_creation_input_tokens"],
api_surface=api_surface,
reasoning_tokens=norm["reasoning_tokens"],
)
except Exception as exc:
logger.warning("[OPENAI-PASSTHROUGH] usage recording failed: %s", exc)


def _passthrough_extra_headers(request: Request) -> dict[str, str]:
"""Forward Bedrock-specific headers from the client to upstream (e.g. guardrails)."""
extra: dict[str, str] = {}
for name, value in request.headers.items():
if name.lower().startswith("x-amzn-bedrock-"):
extra[name] = value
return extra


@router.post("/chat/completions")
async def chat_completions(
request: Request,
api_key_info: dict[str, Any] = Depends(get_api_key_info),
):
body = await request.json()
mapping, _ = _managers()
body["model"] = resolve_model_id(body.get("model", ""), mapping)
extra = _passthrough_extra_headers(request)

if body.get("stream"):
try:
upstream_resp, error_body = await open_upstream_stream(
"POST", "/chat/completions", body, extra
)
except UpstreamConnectionError as exc:
return JSONResponse(
{"error": {"message": exc.message, "type": "upstream_error"}},
status_code=exc.status_code,
)
if error_body is not None:
return JSONResponse(
_decode_error_body(error_body),
status_code=upstream_resp.status_code,
)

async def on_complete(usage: dict[str, Any]) -> None:
_record_usage(api_key_info, usage, body["model"], "chat_completions")

return StreamingResponse(
stream_passthrough_response(upstream_resp, "chat_completions", on_complete),
media_type="text/event-stream",
)

resp = await get_client().post(
upstream_url("/chat/completions"), json=body, headers=upstream_headers(extra)
)
if resp.status_code >= 400:
return JSONResponse(_safe_json(resp), status_code=resp.status_code)

data = resp.json()
if isinstance(data, dict) and isinstance(data.get("usage"), dict):
_record_usage(api_key_info, data["usage"], body["model"], "chat_completions")
return JSONResponse(data, status_code=resp.status_code)


@router.post("/responses")
async def responses_create(
request: Request,
api_key_info: dict[str, Any] = Depends(get_api_key_info),
):
body = await request.json()
mapping, _ = _managers()
body["model"] = resolve_model_id(body.get("model", ""), mapping)
extra = _passthrough_extra_headers(request)

if body.get("stream"):
try:
upstream_resp, error_body = await open_upstream_stream(
"POST", "/responses", body, extra
)
except UpstreamConnectionError as exc:
return JSONResponse(
{"error": {"message": exc.message, "type": "upstream_error"}},
status_code=exc.status_code,
)
if error_body is not None:
return JSONResponse(
_decode_error_body(error_body),
status_code=upstream_resp.status_code,
)

async def on_complete(usage: dict[str, Any]) -> None:
_record_usage(api_key_info, usage, body["model"], "responses")

return StreamingResponse(
stream_passthrough_response(upstream_resp, "responses", on_complete),
media_type="text/event-stream",
)

resp = await get_client().post(
upstream_url("/responses"), json=body, headers=upstream_headers(extra)
)
if resp.status_code >= 400:
return JSONResponse(_safe_json(resp), status_code=resp.status_code)

data = resp.json()
if isinstance(data, dict) and isinstance(data.get("usage"), dict):
_record_usage(api_key_info, data["usage"], body["model"], "responses")
return JSONResponse(data, status_code=resp.status_code)


async def _passthrough_request(request: Request, path: str) -> Response:
"""Forward request to upstream and mirror the upstream response."""
extra = _passthrough_extra_headers(request)
body = None
if request.method in ("POST", "PUT", "PATCH"):
try:
body = await request.json()
except Exception:
body = None
resp = await get_client().request(
request.method, upstream_url(path), json=body, headers=upstream_headers(extra)
)
return Response(
content=resp.content,
status_code=resp.status_code,
media_type=resp.headers.get("content-type"),
)


@router.api_route("/responses/{response_id}", methods=["GET", "DELETE"])
async def responses_get_or_delete(
response_id: str,
request: Request,
_: dict[str, Any] = Depends(get_api_key_info),
):
return await _passthrough_request(request, f"/responses/{response_id}")


@router.post("/responses/{response_id}/cancel")
async def responses_cancel(
response_id: str,
request: Request,
_: dict[str, Any] = Depends(get_api_key_info),
):
return await _passthrough_request(request, f"/responses/{response_id}/cancel")


@router.get("/responses/{response_id}/input_items")
async def responses_input_items(
response_id: str,
request: Request,
_: dict[str, Any] = Depends(get_api_key_info),
):
return await _passthrough_request(request, f"/responses/{response_id}/input_items")


@router.get("/models")
async def list_models(
request: Request,
_: dict[str, Any] = Depends(get_api_key_info),
):
return await _passthrough_request(request, "/models")


def _safe_json(resp) -> dict[str, Any]:
try:
return cast(dict[str, Any], resp.json())
except ValueError:
return {"error": {"message": resp.text, "type": "upstream_error"}}


def _decode_error_body(body: bytes) -> dict[str, Any]:
"""Parse a non-2xx upstream body as JSON, falling back to a wrapped string."""
import json as _json

try:
decoded = _json.loads(body)
except (ValueError, TypeError):
return {
"error": {
"message": body.decode("utf-8", "replace"),
"type": "upstream_error",
}
}
if isinstance(decoded, dict):
return cast(dict[str, Any], decoded)
return {"error": {"message": str(decoded), "type": "upstream_error"}}
Loading