-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy path_server.py
More file actions
1306 lines (1116 loc) · 60.9 KB
/
_server.py
File metadata and controls
1306 lines (1116 loc) · 60.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) Microsoft. All rights reserved.
"""FastAPI server implementation."""
import asyncio
import importlib.metadata
import inspect
import json
import logging
import os
import secrets
import uuid
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import asynccontextmanager
from typing import Any, cast
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from ._deployment import DeploymentManager
from ._discovery import EntityDiscovery
from ._executor import AgentFrameworkExecutor
from ._mapper import MessageMapper
from ._openai import OpenAIExecutor
from .models import AgentFrameworkRequest, MetaResponse, OpenAIError
from .models._discovery_models import Deployment, DeploymentConfig, DiscoveryResponse, EntityInfo
logger = logging.getLogger(__name__)
# Get package version
try:
__version__ = importlib.metadata.version("agent-framework-devui")
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0" # Fallback for development mode
# No AuthMiddleware class needed - we'll use the decorator pattern instead
class DevServer:
"""Development Server - OpenAI compatible API server for debugging agents."""
def __init__(
self,
entities_dir: str | None = None,
port: int = 8080,
host: str = "127.0.0.1",
cors_origins: list[str] | None = None,
ui_enabled: bool = True,
mode: str = "developer",
) -> None:
"""Initialize the development server.
Args:
entities_dir: Directory to scan for entities
port: Port to run server on
host: Host to bind server to
cors_origins: List of allowed CORS origins
ui_enabled: Whether to enable the UI
mode: Server mode - 'developer' (full access, verbose errors) or 'user' (restricted APIs, generic errors)
"""
self.entities_dir = entities_dir
self.port = port
self.host = host
# Smart CORS defaults: permissive for localhost, restrictive for network-exposed deployments
if cors_origins is None:
# Localhost development: allow cross-origin for dev tools (e.g., frontend dev server)
# Network-exposed: empty list (same-origin only, no CORS)
cors_origins = ["*"] if host in ("127.0.0.1", "localhost") else []
self.cors_origins = cors_origins
self.ui_enabled = ui_enabled
self.mode = mode
self.executor: AgentFrameworkExecutor | None = None
self.openai_executor: OpenAIExecutor | None = None
self.deployment_manager = DeploymentManager()
self._app: FastAPI | None = None
self._pending_entities: list[Any] | None = None
self._running_tasks: dict[str, asyncio.Task[Any]] = {} # Track running response tasks for cancellation
def _is_dev_mode(self) -> bool:
"""Check if running in developer mode.
Returns:
True if in developer mode, False if in user mode
"""
return self.mode == "developer"
def _format_error(self, error: Exception, context: str = "Operation") -> str:
"""Format error message based on server mode.
In developer mode: Returns detailed error message for debugging.
In user mode: Returns generic message and logs details internally.
Args:
error: The exception that occurred
context: Description of the operation that failed (e.g., "Request execution")
Returns:
Formatted error message appropriate for the current mode
"""
if self._is_dev_mode():
# Developer mode: Show full error details for debugging
return f"{context} failed: {error!s}"
# User mode: Generic message to user, detailed logging internally
logger.error(f"{context} failed: {error}", exc_info=True)
return f"{context} failed"
def _require_developer_mode(self, feature: str = "operation") -> None:
"""Check if current mode allows developer operations.
Args:
feature: Name of the feature being accessed (for error message)
Raises:
HTTPException: If in user mode
"""
if self.mode == "user":
logger.warning(f"Blocked {feature} access in user mode")
raise HTTPException(
status_code=403,
detail={
"error": {
"message": f"Access denied: {feature} requires developer mode",
"type": "permission_denied",
"code": "developer_mode_required",
"current_mode": self.mode,
}
},
)
async def _ensure_executor(self) -> AgentFrameworkExecutor:
"""Ensure executor is initialized."""
if self.executor is None:
logger.info("Initializing Agent Framework executor...")
# Create components directly
entity_discovery = EntityDiscovery(self.entities_dir)
message_mapper = MessageMapper()
self.executor = AgentFrameworkExecutor(entity_discovery, message_mapper)
# Discover entities from directory
discovered_entities = await self.executor.discover_entities()
logger.info(f"Discovered {len(discovered_entities)} entities from directory")
# Register any pending in-memory entities
if self._pending_entities:
discovery = self.executor.entity_discovery
for entity in self._pending_entities:
try:
entity_info = await discovery.create_entity_info_from_object(entity, source="in_memory")
discovery.register_entity(entity_info.id, entity_info, entity)
logger.info(f"Registered in-memory entity: {entity_info.id}")
except Exception as e:
logger.error(f"Failed to register in-memory entity: {e}")
self._pending_entities = None # Clear after registration
# Get the final entity count after all registration
all_entities = self.executor.entity_discovery.list_entities()
logger.info(f"Total entities available: {len(all_entities)}")
return self.executor
async def _ensure_openai_executor(self) -> OpenAIExecutor:
"""Ensure OpenAI executor is initialized.
Returns:
OpenAI executor instance
Raises:
ValueError: If OpenAI executor cannot be initialized
"""
if self.openai_executor is None:
# Initialize local executor first to get conversation_store
local_executor = await self._ensure_executor()
# Create OpenAI executor with shared conversation store
self.openai_executor = OpenAIExecutor(local_executor.conversation_store)
if self.openai_executor.is_configured:
logger.info("OpenAI proxy mode available (OPENAI_API_KEY configured)")
else:
logger.info("OpenAI proxy mode disabled (OPENAI_API_KEY not set)")
return self.openai_executor
async def _cleanup_entities(self) -> None:
"""Cleanup entity resources (close clients, MCP tools, credentials, etc.)."""
if not self.executor:
return
logger.info("Cleaning up entity resources...")
entities = self.executor.entity_discovery.list_entities()
closed_count = 0
mcp_tools_closed = 0
credentials_closed = 0
hook_count = 0
for entity_info in entities:
entity_id = entity_info.id
try:
# Step 1: Execute registered cleanup hooks (NEW)
cleanup_hooks = self.executor.entity_discovery.get_cleanup_hooks(entity_id)
for hook in cleanup_hooks:
try:
if inspect.iscoroutinefunction(hook):
await hook()
else:
hook()
hook_count += 1
logger.debug(f"✓ Executed cleanup hook for: {entity_id}")
except Exception as e:
logger.warning(f"⚠ Cleanup hook failed for {entity_id}: {e}")
# Step 2: Close chat clients and their credentials (EXISTING)
entity_obj = self.executor.entity_discovery.get_entity_object(entity_id)
if entity_obj and hasattr(entity_obj, "chat_client"):
client = entity_obj.chat_client
# Close the chat client itself
if hasattr(client, "close") and callable(client.close):
if inspect.iscoroutinefunction(client.close):
await client.close()
else:
client.close()
closed_count += 1
logger.debug(f"Closed client for entity: {entity_info.id}")
# Close credentials attached to chat clients (e.g., AzureCliCredential)
credential_attrs = ["credential", "async_credential", "_credential", "_async_credential"]
for attr in credential_attrs:
if hasattr(client, attr):
cred = getattr(client, attr)
if cred and hasattr(cred, "close") and callable(cred.close):
try:
if inspect.iscoroutinefunction(cred.close):
await cred.close()
else:
cred.close()
credentials_closed += 1
logger.debug(f"Closed credential for entity: {entity_info.id}")
except Exception as e:
logger.warning(f"Error closing credential for {entity_info.id}: {e}")
# Close MCP tools (framework tracks them in _local_mcp_tools)
if entity_obj and hasattr(entity_obj, "_local_mcp_tools"):
for mcp_tool in entity_obj._local_mcp_tools:
if hasattr(mcp_tool, "close") and callable(mcp_tool.close):
try:
if inspect.iscoroutinefunction(mcp_tool.close):
await mcp_tool.close()
else:
mcp_tool.close()
mcp_tools_closed += 1
tool_name = getattr(mcp_tool, "name", "unknown")
logger.debug(f"Closed MCP tool '{tool_name}' for entity: {entity_info.id}")
except Exception as e:
logger.warning(f"Error closing MCP tool for {entity_info.id}: {e}")
except Exception as e:
logger.warning(f"Error cleaning up entity {entity_id}: {e}")
if hook_count > 0:
logger.info(f"✓ Executed {hook_count} cleanup hook(s)")
if closed_count > 0:
logger.info(f"✓ Closed {closed_count} entity client(s)")
if credentials_closed > 0:
logger.info(f"✓ Closed {credentials_closed} credential(s)")
if mcp_tools_closed > 0:
logger.info(f"✓ Closed {mcp_tools_closed} MCP tool(s)")
# Close OpenAI executor if it exists
if self.openai_executor:
try:
await self.openai_executor.close()
logger.info("Closed OpenAI executor")
except Exception as e:
logger.warning(f"Error closing OpenAI executor: {e}")
def create_app(self) -> FastAPI:
"""Create the FastAPI application."""
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Startup
logger.info("Starting Agent Framework Server")
await self._ensure_executor()
await self._ensure_openai_executor() # Initialize OpenAI executor
yield
# Shutdown
logger.info("Shutting down Agent Framework Server")
# Cleanup entity resources (e.g., close credentials, clients)
if self.executor:
await self._cleanup_entities()
app = FastAPI(
title="Agent Framework Server",
description="OpenAI-compatible API server for Agent Framework and other AI frameworks",
version=__version__,
lifespan=lifespan,
)
# Add CORS middleware
# Note: allow_credentials cannot be True when allow_origins is ["*"]
# For localhost dev with wildcard origins, credentials are disabled
# For network deployments with specific origins or empty list, credentials can be enabled
allow_credentials = self.cors_origins != ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=self.cors_origins,
allow_credentials=allow_credentials,
allow_methods=["*"],
allow_headers=["*"],
)
# Add authentication middleware using decorator pattern
# Auth is enabled by presence of DEVUI_AUTH_TOKEN
auth_token = os.getenv("DEVUI_AUTH_TOKEN", "")
auth_required = bool(auth_token)
if auth_required:
logger.info("Authentication middleware enabled")
@app.middleware("http")
async def auth_middleware(request: Request, call_next: Callable[[Request], Awaitable[Any]]) -> Any:
"""Validate Bearer token authentication.
Skips authentication for health, meta, static UI endpoints, and OPTIONS requests.
"""
# Skip auth for OPTIONS (CORS preflight) requests
if request.method == "OPTIONS":
return await call_next(request)
# Skip auth for health checks, meta endpoint, and static files
if request.url.path in ["/health", "/meta", "/"] or request.url.path.startswith("/assets"):
return await call_next(request)
# Check Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return JSONResponse(
status_code=401,
content={
"error": {
"message": (
"Missing or invalid Authorization header. Expected: Authorization: Bearer <token>"
),
"type": "authentication_error",
"code": "missing_token",
}
},
)
# Extract and validate token
token = auth_header.replace("Bearer ", "", 1).strip()
if not secrets.compare_digest(token, auth_token):
return JSONResponse(
status_code=401,
content={
"error": {
"message": "Invalid authentication token",
"type": "authentication_error",
"code": "invalid_token",
}
},
)
# Token valid, proceed
return await call_next(request)
self._register_routes(app)
self._mount_ui(app)
return app
def _register_routes(self, app: FastAPI) -> None:
"""Register API routes."""
@app.get("/health")
async def health_check() -> dict[str, Any]:
"""Health check endpoint."""
executor = await self._ensure_executor()
# Use list_entities() to avoid re-discovering and re-registering entities
entities = executor.entity_discovery.list_entities()
return {"status": "healthy", "entities_count": len(entities), "framework": "agent_framework"}
@app.get("/meta", response_model=MetaResponse)
async def get_meta() -> MetaResponse:
"""Get server metadata and configuration."""
import os
# Ensure executors are initialized to check capabilities
openai_executor = await self._ensure_openai_executor()
return MetaResponse(
ui_mode=self.mode, # type: ignore[arg-type]
version=__version__,
framework="agent_framework",
runtime="python", # Python DevUI backend
capabilities={
"tracing": os.getenv("ENABLE_OTEL") == "true",
"openai_proxy": openai_executor.is_configured,
"deployment": True, # Deployment feature is available
},
auth_required=bool(os.getenv("DEVUI_AUTH_TOKEN")),
)
@app.get("/v1/entities", response_model=DiscoveryResponse)
async def discover_entities() -> DiscoveryResponse:
"""List all registered entities."""
try:
executor = await self._ensure_executor()
# Use list_entities() instead of discover_entities() to get already-registered entities
entities = executor.entity_discovery.list_entities()
return DiscoveryResponse(entities=entities)
except Exception as e:
logger.error(f"Error listing entities: {e}")
raise HTTPException(status_code=500, detail=f"Entity listing failed: {e!s}") from e
@app.get("/v1/entities/{entity_id}/info", response_model=EntityInfo)
async def get_entity_info(entity_id: str) -> EntityInfo:
"""Get detailed information about a specific entity (triggers lazy loading)."""
try:
executor = await self._ensure_executor()
entity_info = executor.get_entity_info(entity_id)
if not entity_info:
raise HTTPException(status_code=404, detail=f"Entity {entity_id} not found")
# Trigger lazy loading if entity not yet loaded
# This will import the module and enrich metadata
# Pass checkpoint_manager to ensure workflows get checkpoint storage injected
entity_obj = await executor.entity_discovery.load_entity(
entity_id, checkpoint_manager=executor.checkpoint_manager
)
# Get updated entity info (may have been enriched during load)
entity_info = executor.get_entity_info(entity_id) or entity_info
# For workflows, populate additional detailed information
if entity_info.type == "workflow" and entity_obj:
# Entity object already loaded by load_entity() above
# Get workflow structure
workflow_dump = None
if hasattr(entity_obj, "to_dict") and callable(getattr(entity_obj, "to_dict", None)):
try:
workflow_dump = entity_obj.to_dict() # type: ignore[attr-defined]
except Exception:
workflow_dump = None
elif hasattr(entity_obj, "to_json") and callable(getattr(entity_obj, "to_json", None)):
try:
raw_dump = entity_obj.to_json() # type: ignore[attr-defined]
except Exception:
workflow_dump = None
else:
if isinstance(raw_dump, (bytes, bytearray)):
try:
raw_dump = raw_dump.decode()
except Exception:
raw_dump = raw_dump.decode(errors="replace")
if isinstance(raw_dump, str):
try:
parsed_dump = json.loads(raw_dump)
except Exception:
workflow_dump = raw_dump
else:
workflow_dump = parsed_dump if isinstance(parsed_dump, dict) else raw_dump
else:
workflow_dump = raw_dump
elif hasattr(entity_obj, "__dict__"):
workflow_dump = {k: v for k, v in entity_obj.__dict__.items() if not k.startswith("_")}
# Get input schema information
input_schema = {}
input_type_name = "Unknown"
start_executor_id = ""
try:
from ._utils import (
extract_executor_message_types,
generate_input_schema,
select_primary_input_type,
)
start_executor = entity_obj.get_start_executor()
except Exception as e:
logger.debug(f"Could not extract input info for workflow {entity_id}: {e}")
else:
if start_executor:
start_executor_id = getattr(start_executor, "executor_id", "") or getattr(
start_executor, "id", ""
)
message_types = extract_executor_message_types(start_executor)
input_type = select_primary_input_type(message_types)
if input_type:
input_type_name = getattr(input_type, "__name__", str(input_type))
# Generate schema using comprehensive schema generation
input_schema = generate_input_schema(input_type)
if not input_schema:
input_schema = {"type": "string"}
if input_type_name == "Unknown":
input_type_name = "string"
# Get executor list
executor_list = []
if hasattr(entity_obj, "executors") and entity_obj.executors:
executor_list = [getattr(ex, "executor_id", str(ex)) for ex in entity_obj.executors]
# Create copy of entity info and populate workflow-specific fields
# Note: DevUI provides runtime checkpoint storage for ALL workflows via conversations
update_payload: dict[str, Any] = {
"workflow_dump": workflow_dump,
"input_schema": input_schema,
"input_type_name": input_type_name,
"start_executor_id": start_executor_id,
}
if executor_list:
update_payload["executors"] = executor_list
return entity_info.model_copy(update=update_payload)
# For non-workflow entities, return as-is
return entity_info
except HTTPException:
raise
except ValueError as e:
# ValueError from load_entity - could be "not found" or "failed to load"
error_str = str(e)
error_msg = self._format_error(e, "Entity loading")
# Use 404 for "not found", 422 for load failures (entity exists but can't load)
if "not found" in error_str.lower() and "failed to load" not in error_str.lower():
raise HTTPException(status_code=404, detail=error_msg) from e
# Entity exists but failed to load (e.g., missing env vars, import errors)
raise HTTPException(status_code=422, detail=error_msg) from e
except Exception as e:
error_msg = self._format_error(e, "Entity info retrieval")
raise HTTPException(status_code=500, detail=error_msg) from e
@app.post("/v1/entities/{entity_id}/reload")
async def reload_entity(entity_id: str) -> dict[str, Any]:
"""Hot reload entity (clears cache, will reimport on next access).
This enables hot reload during development - edit entity code, call this endpoint,
and the next execution will use the updated code without server restart.
"""
self._require_developer_mode("entity hot reload")
try:
executor = await self._ensure_executor()
# Check if entity exists
entity_info = executor.get_entity_info(entity_id)
if not entity_info:
raise HTTPException(status_code=404, detail=f"Entity {entity_id} not found")
# Check if entity is in-memory (cannot be reloaded)
if entity_info.source == "in_memory":
raise HTTPException(
status_code=400,
detail="In-memory entities cannot be reloaded. "
"They only exist in memory and have no source files to reload from.",
)
# Invalidate cache
executor.entity_discovery.invalidate_entity(entity_id)
return {
"success": True,
"message": f"Entity '{entity_id}' cache cleared. Will reload on next access.",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error reloading entity {entity_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to reload entity: {e!s}") from e
# ============================================================================
# Deployment Endpoints
# ============================================================================
@app.post("/v1/deployments")
async def create_deployment(config: DeploymentConfig) -> StreamingResponse:
"""Deploy entity to Azure Container Apps with streaming events.
Returns SSE stream of deployment progress events.
"""
self._require_developer_mode("deployment")
try:
executor = await self._ensure_executor()
# Validate entity exists and supports deployment
entity_info = executor.get_entity_info(config.entity_id)
if not entity_info:
raise HTTPException(status_code=404, detail=f"Entity {config.entity_id} not found")
if not entity_info.deployment_supported:
reason = entity_info.deployment_reason or "Deployment not supported for this entity"
raise HTTPException(status_code=400, detail=reason)
# Get entity path from metadata
from pathlib import Path
entity_path_str = entity_info.metadata.get("path")
if not entity_path_str:
raise HTTPException(
status_code=400,
detail="Entity path not found in metadata (in-memory entities cannot be deployed)",
)
entity_path = Path(entity_path_str)
# Stream deployment events
async def event_generator() -> AsyncGenerator[str, None]:
async for event in self.deployment_manager.deploy(config, entity_path):
# Format as SSE
import json
yield f"data: {json.dumps(event.model_dump())}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
except HTTPException:
raise
except Exception as e:
error_msg = self._format_error(e, "Deployment creation")
raise HTTPException(status_code=500, detail=error_msg) from e
@app.get("/v1/deployments")
async def list_deployments(entity_id: str | None = None) -> list[Deployment]:
"""List all deployments, optionally filtered by entity."""
self._require_developer_mode("deployment listing")
try:
return await self.deployment_manager.list_deployments(entity_id)
except Exception as e:
error_msg = self._format_error(e, "Deployment listing")
raise HTTPException(status_code=500, detail=error_msg) from e
@app.get("/v1/deployments/{deployment_id}")
async def get_deployment(deployment_id: str) -> Deployment:
"""Get deployment by ID."""
self._require_developer_mode("deployment details")
try:
deployment = await self.deployment_manager.get_deployment(deployment_id)
if not deployment:
raise HTTPException(status_code=404, detail=f"Deployment {deployment_id} not found")
return deployment
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting deployment: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get deployment: {e!s}") from e
@app.delete("/v1/deployments/{deployment_id}")
async def delete_deployment(deployment_id: str) -> dict[str, Any]:
"""Delete deployment from Azure Container Apps."""
self._require_developer_mode("deployment deletion")
try:
await self.deployment_manager.delete_deployment(deployment_id)
return {"success": True, "message": f"Deployment {deployment_id} deleted successfully"}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
except Exception as e:
logger.error(f"Error deleting deployment: {e}")
raise HTTPException(status_code=500, detail=f"Failed to delete deployment: {e!s}") from e
# Convenience endpoint: deploy specific entity
@app.post("/v1/entities/{entity_id}/deploy")
async def deploy_entity(entity_id: str, config: DeploymentConfig) -> StreamingResponse:
"""Convenience endpoint to deploy entity (shortcuts to /v1/deployments)."""
self._require_developer_mode("deployment")
# Override entity_id from path parameter
config.entity_id = entity_id
return await create_deployment(config)
# ============================================================================
# Response/Conversation Endpoints
# ============================================================================
@app.post("/v1/responses")
async def create_response(request: AgentFrameworkRequest, raw_request: Request) -> Any:
"""OpenAI Responses API endpoint - routes to local or OpenAI executor."""
try:
# Check if frontend requested OpenAI proxy mode
proxy_mode = raw_request.headers.get("X-Proxy-Backend")
if proxy_mode == "openai":
# Route to OpenAI executor
logger.info("🔀 Routing to OpenAI proxy mode")
openai_executor = await self._ensure_openai_executor()
if not openai_executor.is_configured:
error = OpenAIError.create(
"OpenAI proxy mode not configured. Set OPENAI_API_KEY environment variable."
)
return JSONResponse(status_code=503, content=error.to_dict())
# Execute via OpenAI with dedicated streaming method
if request.stream:
return StreamingResponse(
self._stream_openai_execution(openai_executor, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
},
)
return await openai_executor.execute_sync(request)
# Route to local Agent Framework executor (original behavior)
raw_body = await raw_request.body()
logger.info(f"Raw request body: {raw_body.decode()}")
logger.info(f"Parsed request: metadata={request.metadata}")
# Get entity_id from metadata
entity_id = request.get_entity_id()
logger.info(f"Extracted entity_id: {entity_id}")
if not entity_id:
error = OpenAIError.create("Missing entity_id in metadata. Provide metadata.entity_id in request.")
return JSONResponse(status_code=400, content=error.to_dict())
# Get executor and validate entity exists
executor = await self._ensure_executor()
try:
entity_info = executor.get_entity_info(entity_id)
logger.info(f"Found entity: {entity_info.name} ({entity_info.type})")
except Exception:
error = OpenAIError.create(f"Entity not found: {entity_id}")
return JSONResponse(status_code=404, content=error.to_dict())
# Execute request
if request.stream:
# Generate response ID for tracking
response_id = f"resp_{uuid.uuid4().hex[:8]}"
logger.info(f"[CANCELLATION] Creating response {response_id} for entity {entity_id}")
return StreamingResponse(
self._stream_with_cancellation(executor, request, response_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"X-Response-ID": response_id, # Include ID for debugging/tracking
},
)
return await executor.execute_sync(request)
except Exception as e:
error_msg = self._format_error(e, "Request execution")
error = OpenAIError.create(error_msg)
return JSONResponse(status_code=500, content=error.to_dict())
@app.post("/v1/responses/{response_id}/cancel")
async def cancel_response(response_id: str) -> dict[str, Any]:
"""Cancel a running response execution.
This endpoint allows explicit cancellation of a running stream.
Note: Cancellation also happens automatically when the client disconnects.
"""
logger.info(f"[CANCELLATION] Cancel request received for {response_id}")
if task := self._running_tasks.get(response_id):
if not task.done():
logger.info(f"[CANCELLATION] Cancelling task for {response_id}")
task.cancel()
# Wait briefly for cancellation to propagate
try: # noqa: SIM105
await asyncio.wait_for(task, timeout=0.5)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
return {"status": "cancelled", "response_id": response_id}
logger.warning(f"[CANCELLATION] Task already completed for {response_id}")
return {"status": "already_completed", "response_id": response_id}
logger.warning(f"[CANCELLATION] No task found for {response_id}")
return {"status": "not_found", "response_id": response_id}
# ========================================
# OpenAI Conversations API (Standard)
# ========================================
@app.post("/v1/conversations", response_model=None)
async def create_conversation(raw_request: Request) -> dict[str, Any] | JSONResponse:
"""Create a new conversation - routes to OpenAI or local based on mode."""
try:
# Parse request body
request_data = await raw_request.json()
# Check if frontend requested OpenAI proxy mode
proxy_mode = raw_request.headers.get("X-Proxy-Backend")
if proxy_mode == "openai":
# Create conversation in OpenAI
openai_executor = await self._ensure_openai_executor()
if not openai_executor.is_configured:
error = OpenAIError.create(
"OpenAI proxy mode not configured. Set OPENAI_API_KEY environment variable.",
type="configuration_error",
code="openai_not_configured",
)
return JSONResponse(status_code=503, content=error.to_dict())
# Use OpenAI client to create conversation
from openai import APIStatusError, AsyncOpenAI, AuthenticationError, PermissionDeniedError
client = AsyncOpenAI(
api_key=openai_executor.api_key,
base_url=openai_executor.base_url,
)
try:
metadata = request_data.get("metadata")
logger.debug(f"Creating OpenAI conversation with metadata: {metadata}")
conversation = await client.conversations.create(metadata=metadata)
logger.info(f"Created OpenAI conversation: {conversation.id}")
return conversation.model_dump()
except AuthenticationError as e:
# 401 - Invalid API key or authentication issue
logger.error(f"OpenAI authentication error creating conversation: {e}")
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
error = OpenAIError.create(
message=error_data.get("message", str(e)),
type=error_data.get("type", "authentication_error"),
code=error_data.get("code", "invalid_api_key"),
)
return JSONResponse(status_code=401, content=error.to_dict())
except PermissionDeniedError as e:
# 403 - Permission denied
logger.error(f"OpenAI permission denied creating conversation: {e}")
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
error = OpenAIError.create(
message=error_data.get("message", str(e)),
type=error_data.get("type", "permission_denied"),
code=error_data.get("code", "insufficient_permissions"),
)
return JSONResponse(status_code=403, content=error.to_dict())
except APIStatusError as e:
# Other OpenAI API errors (rate limit, etc.)
logger.error(f"OpenAI API error creating conversation: {e}")
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
error = OpenAIError.create(
message=error_data.get("message", str(e)),
type=error_data.get("type", "api_error"),
code=error_data.get("code", "unknown_error"),
)
return JSONResponse(
status_code=e.status_code if hasattr(e, "status_code") else 500, content=error.to_dict()
)
# Local mode - use DevUI conversation store
metadata = request_data.get("metadata")
executor = await self._ensure_executor()
conversation = executor.conversation_store.create_conversation(metadata=metadata)
return conversation.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating conversation: {e}", exc_info=True)
error = OpenAIError.create(f"Failed to create conversation: {e!s}")
return JSONResponse(status_code=500, content=error.to_dict())
@app.get("/v1/conversations")
async def list_conversations(
agent_id: str | None = None,
entity_id: str | None = None,
type: str | None = None,
) -> dict[str, Any]:
"""List conversations, optionally filtered by agent_id, entity_id, and/or type.
Query Parameters:
- agent_id: Filter by agent_id (for agent conversations)
- entity_id: Filter by entity_id (for workflow sessions or other entities)
- type: Filter by conversation type (e.g., "workflow_session")
Multiple filters can be combined (AND logic).
"""
try:
executor = await self._ensure_executor()
# Build filter criteria
filters = {}
if agent_id:
filters["agent_id"] = agent_id
if entity_id:
filters["entity_id"] = entity_id
if type:
filters["type"] = type
# Apply filters
conversations = await executor.conversation_store.list_conversations_by_metadata(filters)
return {
"object": "list",
"data": [conv.model_dump() for conv in conversations],
"has_more": False,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing conversations: {e}")
raise HTTPException(status_code=500, detail=f"Failed to list conversations: {e!s}") from e
@app.get("/v1/conversations/{conversation_id}")
async def retrieve_conversation(conversation_id: str) -> dict[str, Any]:
"""Get conversation - OpenAI standard."""
try:
executor = await self._ensure_executor()
conversation = executor.conversation_store.get_conversation(conversation_id)
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
return conversation.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting conversation {conversation_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get conversation: {e!s}") from e
@app.post("/v1/conversations/{conversation_id}")
async def update_conversation(conversation_id: str, request_data: dict[str, Any]) -> dict[str, Any]:
"""Update conversation metadata - OpenAI standard."""
try:
executor = await self._ensure_executor()
metadata = request_data.get("metadata", {})
conversation = executor.conversation_store.update_conversation(conversation_id, metadata=metadata)
return conversation.model_dump()
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating conversation {conversation_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to update conversation: {e!s}") from e
@app.delete("/v1/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str) -> dict[str, Any]:
"""Delete conversation - OpenAI standard."""
try:
executor = await self._ensure_executor()
result = executor.conversation_store.delete_conversation(conversation_id)
return result.model_dump()
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting conversation {conversation_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to delete conversation: {e!s}") from e
@app.post("/v1/conversations/{conversation_id}/items")
async def create_conversation_items(conversation_id: str, request_data: dict[str, Any]) -> dict[str, Any]:
"""Add items to conversation - OpenAI standard."""
try:
executor = await self._ensure_executor()
items = request_data.get("items", [])
conv_items = await executor.conversation_store.add_items(conversation_id, items=items)
return {"object": "list", "data": [item.model_dump() for item in conv_items]}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding items to conversation {conversation_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to add items: {e!s}") from e
@app.get("/v1/conversations/{conversation_id}/items")
async def list_conversation_items(
conversation_id: str, limit: int = 100, after: str | None = None, order: str = "asc"
) -> dict[str, Any]:
"""List conversation items - OpenAI standard."""
try:
executor = await self._ensure_executor()
items, has_more = await executor.conversation_store.list_items(
conversation_id, limit=limit, after=after, order=order
)
# Handle both Pydantic models and dicts (some stores return raw dicts)
serialized_items = []
for item in items:
if hasattr(item, "model_dump"):
serialized_items.append(item.model_dump())
elif isinstance(item, dict):
serialized_items.append(item)
else:
logger.warning(f"Unexpected item type: {type(item)}, converting to dict")