Skip to content

Commit 87e11f4

Browse files
SWE Destroyerclaude
andcommitted
Use port constants in core SDK files and fix formatting
- Import DEFAULT_ACP_PORT in deploy_handlers.py and base_acp_server.py - Fix import sorting and formatting (ruff auto-fix) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent cd5bb16 commit 87e11f4

File tree

4 files changed

+59
-71
lines changed

4 files changed

+59
-71
lines changed

src/agentex/lib/cli/handlers/deploy_handlers.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
from rich.console import Console
1212

1313
from agentex.lib.utils.logging import make_logger
14+
from agentex.lib.constants.ports import DEFAULT_ACP_PORT
1415
from agentex.lib.cli.utils.exceptions import HelmError, DeploymentError
1516
from agentex.lib.cli.utils.path_utils import PathResolutionError, calculate_docker_acp_module
16-
from agentex.lib.constants.ports import DEFAULT_ACP_PORT
1717
from agentex.lib.environment_variables import EnvVarKeys
1818
from agentex.lib.cli.utils.kubectl_utils import check_and_switch_cluster_context
1919
from agentex.lib.sdk.config.agent_config import AgentConfig
@@ -240,7 +240,14 @@ def add_acp_command_to_helm_values(helm_values: dict[str, Any], manifest: AgentM
240240
try:
241241
docker_acp_module = calculate_docker_acp_module(manifest, manifest_path)
242242
# Create the uvicorn command with the correct module path
243-
helm_values["command"] = ["uvicorn", f"{docker_acp_module}:acp", "--host", "0.0.0.0", "--port", str(DEFAULT_ACP_PORT)]
243+
helm_values["command"] = [
244+
"uvicorn",
245+
f"{docker_acp_module}:acp",
246+
"--host",
247+
"0.0.0.0",
248+
"--port",
249+
str(DEFAULT_ACP_PORT),
250+
]
244251
logger.info(f"Using dynamic ACP command: uvicorn {docker_acp_module}:acp")
245252
except (PathResolutionError, Exception) as e:
246253
# Fallback to default command structure

src/agentex/lib/cli/handlers/run_handlers.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
get_file_paths,
1616
calculate_uvicorn_target_for_local,
1717
)
18-
from agentex.lib.constants.ports import DEFAULT_AGENTEX_BASE_URL
1918
from agentex.lib.environment_variables import EnvVarKeys
2019
from agentex.lib.sdk.config.agent_manifest import AgentManifest
2120
from agentex.lib.cli.handlers.cleanup_handlers import cleanup_agent_workflows, should_cleanup_on_restart
@@ -95,16 +94,16 @@ async def start_temporal_worker_with_reload(
9594
worker_process = await start_temporal_worker(worker_path, env, manifest_dir)
9695
process_manager.add_process(worker_process)
9796
return asyncio.create_task(stream_process_output(worker_process, "WORKER"))
98-
97+
9998
async def worker_runner() -> None:
10099
current_process: asyncio.subprocess.Process | None = None
101100
output_task: asyncio.Task[None] | None = None
102-
101+
103102
console.print(f"[blue]Starting Temporal worker with auto-reload from {worker_path}...[/blue]")
104-
103+
105104
async def start_worker() -> asyncio.subprocess.Process:
106105
nonlocal current_process, output_task
107-
106+
108107
# PRE-RESTART CLEANUP - NEW!
109108
if current_process is not None:
110109
# Extract agent name from worker path for cleanup
@@ -113,7 +112,7 @@ async def start_worker() -> asyncio.subprocess.Process:
113112
console.print(f"FOUND AGENT_NAME FROM ENV VARS: {agent_name} {agent_name is None}")
114113
if agent_name is None:
115114
agent_name = worker_path.parent.parent.name
116-
115+
117116
# Perform cleanup if configured
118117
if should_cleanup_on_restart():
119118
console.print("[yellow]Cleaning up workflows before worker restart...[/yellow]")
@@ -122,7 +121,7 @@ async def start_worker() -> asyncio.subprocess.Process:
122121
except Exception as e:
123122
logger.warning(f"Cleanup failed: {e}")
124123
console.print(f"[yellow]⚠ Cleanup failed: {str(e)}[/yellow]")
125-
124+
126125
# Clean up previous process
127126
if current_process and current_process.returncode is None:
128127
current_process.terminate()
@@ -131,41 +130,41 @@ async def start_worker() -> asyncio.subprocess.Process:
131130
except asyncio.TimeoutError:
132131
current_process.kill()
133132
await current_process.wait()
134-
133+
135134
# Cancel previous output task
136135
if output_task:
137136
output_task.cancel()
138137
try:
139138
await output_task
140139
except asyncio.CancelledError:
141140
pass
142-
141+
143142
current_process = await start_temporal_worker(worker_path, env, manifest_dir)
144143
process_manager.add_process(current_process)
145144
console.print("[green]Temporal worker started[/green]")
146145
return current_process
147-
146+
148147
try:
149148
# Start initial worker
150149
current_process = await start_worker()
151150
if current_process:
152151
output_task = asyncio.create_task(stream_process_output(current_process, "WORKER"))
153-
152+
154153
# Watch for file changes
155154
async for changes in awatch(manifest_dir, recursive=True):
156155
# Filter for Python files
157-
py_changes = [(change, path) for change, path in changes if str(path).endswith('.py')]
158-
156+
py_changes = [(change, path) for change, path in changes if str(path).endswith(".py")]
157+
159158
if py_changes:
160159
changed_files = [str(Path(path).relative_to(worker_path.parent)) for _, path in py_changes]
161160
console.print(f"[yellow]File changes detected: {changed_files}[/yellow]")
162161
console.print("[yellow]Restarting Temporal worker...[/yellow]")
163-
162+
164163
# Restart worker (with cleanup handled in start_worker)
165164
await start_worker()
166165
if current_process:
167166
output_task = asyncio.create_task(stream_process_output(current_process, "WORKER"))
168-
167+
169168
except asyncio.CancelledError:
170169
# Clean shutdown
171170
if output_task:
@@ -174,7 +173,7 @@ async def start_worker() -> asyncio.subprocess.Process:
174173
await output_task
175174
except asyncio.CancelledError:
176175
pass
177-
176+
178177
if current_process and current_process.returncode is None:
179178
current_process.terminate()
180179
try:
@@ -183,7 +182,7 @@ async def start_worker() -> asyncio.subprocess.Process:
183182
current_process.kill()
184183
await current_process.wait()
185184
raise
186-
185+
187186
return asyncio.create_task(worker_runner())
188187

189188

@@ -193,7 +192,7 @@ async def start_acp_server(
193192
"""Start the ACP server process"""
194193
# Use file path relative to manifest directory if possible
195194
uvicorn_target = calculate_uvicorn_target_for_local(acp_path, manifest_dir)
196-
195+
197196
cmd = [
198197
sys.executable,
199198
"-m",
@@ -297,11 +296,17 @@ async def run_agent(manifest_path: str, debug_config: "DebugConfig | None" = Non
297296
manifest_dir = Path(manifest_path).parent
298297
if debug_config and debug_config.should_debug_acp():
299298
acp_process = await start_acp_server_debug(
300-
file_paths["acp"], manifest.local_development.agent.port, agent_env, debug_config # type: ignore[union-attr]
299+
file_paths["acp"],
300+
manifest.local_development.agent.port,
301+
agent_env,
302+
debug_config, # type: ignore[union-attr]
301303
)
302304
else:
303305
acp_process = await start_acp_server(
304-
file_paths["acp"], manifest.local_development.agent.port, agent_env, manifest_dir # type: ignore[union-attr]
306+
file_paths["acp"],
307+
manifest.local_development.agent.port,
308+
agent_env,
309+
manifest_dir, # type: ignore[union-attr]
305310
)
306311
process_manager.add_process(acp_process)
307312

@@ -314,14 +319,14 @@ async def run_agent(manifest_path: str, debug_config: "DebugConfig | None" = Non
314319
if is_temporal_agent(manifest) and file_paths["worker"]:
315320
if debug_config and debug_config.should_debug_worker():
316321
# In debug mode, start worker without auto-reload to prevent conflicts
317-
worker_process = await start_temporal_worker_debug(
318-
file_paths["worker"], agent_env, debug_config
319-
)
322+
worker_process = await start_temporal_worker_debug(file_paths["worker"], agent_env, debug_config)
320323
process_manager.add_process(worker_process)
321324
worker_task = asyncio.create_task(stream_process_output(worker_process, "WORKER"))
322325
else:
323326
# Normal mode with auto-reload
324-
worker_task = await start_temporal_worker_with_reload(file_paths["worker"], agent_env, process_manager, manifest_dir)
327+
worker_task = await start_temporal_worker_with_reload(
328+
file_paths["worker"], agent_env, process_manager, manifest_dir
329+
)
325330
tasks.append(worker_task)
326331

327332
console.print(
@@ -334,7 +339,7 @@ async def run_agent(manifest_path: str, debug_config: "DebugConfig | None" = Non
334339
await process_manager.wait_for_shutdown()
335340
except KeyboardInterrupt:
336341
console.print("\n[yellow]Received shutdown signal...[/yellow]")
337-
342+
338343
# Cancel output streaming tasks
339344
for task in tasks:
340345
task.cancel()
@@ -352,9 +357,6 @@ async def run_agent(manifest_path: str, debug_config: "DebugConfig | None" = Non
352357
await process_manager.cleanup_processes()
353358

354359

355-
356-
357-
358360
def create_agent_environment(manifest: AgentManifest) -> dict[str, str]:
359361
"""Create environment variables for agent processes without modifying os.environ"""
360362
# Start with current environment
@@ -378,6 +380,7 @@ def create_agent_environment(manifest: AgentManifest) -> dict[str, str]:
378380

379381
# Add authorization principal if set - for local development, auth is optional
380382
from agentex.lib.cli.utils.auth_utils import _encode_principal_context
383+
381384
encoded_principal = _encode_principal_context(manifest)
382385
if encoded_principal:
383386
env_vars[EnvVarKeys.AUTH_PRINCIPAL_B64] = encoded_principal

src/agentex/lib/environment_variables.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
from __future__ import annotations
32

43
import os
@@ -7,8 +6,8 @@
76

87
from dotenv import load_dotenv
98

10-
from agentex.lib.constants.ports import DEFAULT_ACP_PORT, DEFAULT_AGENTEX_BASE_URL, DEFAULT_HEALTH_CHECK_PORT
119
from agentex.lib.utils.logging import make_logger
10+
from agentex.lib.constants.ports import DEFAULT_ACP_PORT, DEFAULT_AGENTEX_BASE_URL, DEFAULT_HEALTH_CHECK_PORT
1211
from agentex.lib.utils.model_utils import BaseModel
1312

1413
PROJECT_ROOT = Path(__file__).resolve().parents[2]

src/agentex/lib/sdk/fastacp/base/base_acp_server.py

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
)
2626
from agentex.lib.utils.logging import make_logger, ctx_var_request_id
2727
from agentex.lib.types.json_rpc import JSONRPCError, JSONRPCRequest, JSONRPCResponse
28-
from agentex.lib.utils.model_utils import BaseModel
29-
from agentex.lib.utils.registration import register_agent
3028

3129
# from agentex.lib.sdk.fastacp.types import BaseACPConfig
3230
from agentex.lib.constants.ports import DEFAULT_ACP_PORT
31+
from agentex.lib.utils.model_utils import BaseModel
32+
from agentex.lib.utils.registration import register_agent
3333
from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables
3434
from agentex.types.task_message_update import TaskMessageUpdate, StreamTaskMessageFull
3535
from agentex.types.task_message_content import TaskMessageContent
@@ -140,26 +140,21 @@ async def _handle_jsonrpc(self, request: Request):
140140
error=JSONRPCError(code=-32601, message="Unauthorized"),
141141
)
142142

143-
144143
# Check if method is valid first
145144
try:
146145
method = RPCMethod(rpc_request.method)
147146
except ValueError:
148147
logger.error(f"Method {rpc_request.method} was invalid")
149148
return JSONRPCResponse(
150149
id=rpc_request.id,
151-
error=JSONRPCError(
152-
code=-32601, message=f"Method {rpc_request.method} not found"
153-
),
150+
error=JSONRPCError(code=-32601, message=f"Method {rpc_request.method} not found"),
154151
)
155152

156153
if method not in self._handlers or self._handlers[method] is None:
157154
logger.error(f"Method {method} not found on existing ACP server")
158155
return JSONRPCResponse(
159156
id=rpc_request.id,
160-
error=JSONRPCError(
161-
code=-32601, message=f"Method {method} not found"
162-
),
157+
error=JSONRPCError(code=-32601, message=f"Method {method} not found"),
163158
)
164159

165160
# Extract application headers using allowlist approach (only x-* headers)
@@ -172,11 +167,11 @@ async def _handle_jsonrpc(self, request: Request):
172167
and key.lower() not in FASTACP_HEADER_SKIP_EXACT
173168
and not any(key.lower().startswith(p) for p in FASTACP_HEADER_SKIP_PREFIXES)
174169
}
175-
170+
176171
# Parse params into appropriate model based on method and include headers
177172
params_model = PARAMS_MODEL_BY_METHOD[method]
178173
params_data = dict(rpc_request.params) if rpc_request.params else {}
179-
174+
180175
# Add custom headers to the request structure if any headers were provided
181176
# Gateway sends filtered headers via HTTP, SDK extracts and populates params.request
182177
if custom_headers:
@@ -192,12 +187,8 @@ async def _handle_jsonrpc(self, request: Request):
192187
return None
193188
else:
194189
# Handle streaming vs non-streaming for MESSAGE_SEND
195-
if method == RPCMethod.MESSAGE_SEND and isinstance(
196-
result, AsyncGenerator
197-
):
198-
return await self._handle_streaming_response(
199-
rpc_request.id, result
200-
)
190+
if method == RPCMethod.MESSAGE_SEND and isinstance(result, AsyncGenerator):
191+
return await self._handle_streaming_response(rpc_request.id, result)
201192
else:
202193
if isinstance(result, BaseModel):
203194
result = result.model_dump()
@@ -209,14 +200,10 @@ async def _handle_jsonrpc(self, request: Request):
209200
return JSONRPCResponse(id=None)
210201

211202
# For regular requests, start processing in background but return immediately
212-
asyncio.create_task(
213-
self._process_request(rpc_request.id, method, params)
214-
)
203+
asyncio.create_task(self._process_request(rpc_request.id, method, params))
215204

216205
# Return immediate acknowledgment
217-
return JSONRPCResponse(
218-
id=rpc_request.id, result={"status": "processing"}
219-
)
206+
return JSONRPCResponse(id=rpc_request.id, result={"status": "processing"})
220207

221208
except Exception as e:
222209
logger.error(f"Error handling JSON-RPC request: {e}", exc_info=True)
@@ -228,9 +215,7 @@ async def _handle_jsonrpc(self, request: Request):
228215
error=JSONRPCError(code=-32603, message=str(e)).model_dump(),
229216
)
230217

231-
async def _handle_streaming_response(
232-
self, request_id: int | str, async_gen: AsyncGenerator
233-
):
218+
async def _handle_streaming_response(self, request_id: int | str, async_gen: AsyncGenerator):
234219
"""Handle streaming response by formatting TaskMessageUpdate objects as JSON-RPC stream"""
235220

236221
async def generate_json_rpc_stream():
@@ -240,9 +225,7 @@ async def generate_json_rpc_stream():
240225
# Validate using Pydantic's TypeAdapter to ensure it's a proper TaskMessageUpdate
241226
try:
242227
# This will validate that chunk conforms to the TaskMessageUpdate union type
243-
validated_chunk = task_message_update_adapter.validate_python(
244-
chunk
245-
)
228+
validated_chunk = task_message_update_adapter.validate_python(chunk)
246229
# Use mode="json" to properly serialize datetime objects
247230
chunk_data = validated_chunk.model_dump(mode="json")
248231
except ValidationError as e:
@@ -285,18 +268,14 @@ async def _process_notification(self, method: RPCMethod, params: Any):
285268
except Exception as e:
286269
logger.error(f"Error processing notification {method}: {e}", exc_info=True)
287270

288-
async def _process_request(
289-
self, request_id: int | str, method: RPCMethod, params: Any
290-
):
271+
async def _process_request(self, request_id: int | str, method: RPCMethod, params: Any):
291272
"""Process a request in the background"""
292273
try:
293274
handler = self._handlers[method]
294275
await handler(params)
295276
# Note: In a real implementation, you might want to store the result somewhere
296277
# or notify the client through a different mechanism
297-
logger.info(
298-
f"Successfully processed request {request_id} for method {method}"
299-
)
278+
logger.info(f"Successfully processed request {request_id} for method {method}")
300279
except Exception as e:
301280
logger.error(
302281
f"Error processing request {request_id} for method {method}: {e}",
@@ -369,7 +348,7 @@ async def message_send_wrapper(params: SendMessageParams):
369348
# Check if the function is an async generator function
370349

371350
# Regardless of whether the Agent developer implemented an Async generator or not, we will always turn the function into an async generator and yield SSE events back tot he Agentex server so there is only one way for it to process the response. Then, based on the client's desire to stream or not, the Agentex server will either yield back the async generator objects directly (if streaming) or aggregate the content into a list of TaskMessageContents and to dispatch to the client. This basically gives the Agentex server the flexibility to handle both cases itself.
372-
351+
373352
if inspect.isasyncgenfunction(fn):
374353
# The client wants streaming, an async generator already streams the content, so just return it
375354
return fn(params)
@@ -381,7 +360,9 @@ async def message_send_wrapper(params: SendMessageParams):
381360
task_message_content_list = []
382361
elif isinstance(task_message_content_response, list):
383362
# Filter out None values from lists
384-
task_message_content_list = [content for content in task_message_content_response if content is not None]
363+
task_message_content_list = [
364+
content for content in task_message_content_response if content is not None
365+
]
385366
else:
386367
task_message_content_list = [task_message_content_response]
387368

@@ -405,5 +386,3 @@ async def async_generator(task_message_content_list: list[TaskMessageContent]):
405386
def run(self, host: str = "0.0.0.0", port: int = DEFAULT_ACP_PORT, **kwargs):
406387
"""Start the Uvicorn server for async handlers."""
407388
uvicorn.run(self, host=host, port=port, **kwargs)
408-
409-

0 commit comments

Comments
 (0)