-
Notifications
You must be signed in to change notification settings - Fork 657
feat(py): implement Reflection API v2 with WebSocket and JSON-RPC 2.0 #4401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @yesudeep, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant architectural change to the Genkit Python Reflection API by implementing version 2. This new version shifts from a traditional HTTP server model (v1) where the runtime hosts a server, to a WebSocket client model where the runtime connects outbound to a centralized manager. This change enables bidirectional communication, improves environment flexibility, and simplifies discovery. The existing v1 API is preserved in a separate module, allowing for a graceful transition and configurable activation based on an environment variable. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and well-structured feature: the Reflection API v2, which uses WebSockets and JSON-RPC 2.0. The refactoring to separate v1 and v2 logic is clean, and the new implementation is comprehensive. I've identified a few key areas for improvement: the reconnection logic is missing the advertised exponential backoff, the action cancellation mechanism is not functional, and there's a minor case of redundant logging. Addressing these points will greatly improve the robustness and reliability of the new API.
a9e37c3 to
0074ab0
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a major feature: the Reflection API v2, which uses WebSockets and JSON-RPC for communication. The implementation is robust, featuring automatic reconnection, cancellation support, and a clean separation from the existing V1 API. The code is well-documented and includes a comprehensive set of new tests. My review focuses on a significant area of code duplication that should be refactored for better maintainability, and a couple of minor points to improve the consistency and correctness of the new V2 API.
| # Reflection API v1: Start HTTP server | ||
| reflection_server = _make_reflection_server(self.registry, server_spec) | ||
|
|
||
| # Use lazy_write=True to prevent race condition where file exists before server is up | ||
| async with RuntimeManager(server_spec, lazy_write=True) as runtime_manager: | ||
| # We use anyio.TaskGroup because it is compatible with | ||
| # asyncio's event loop and works with Python 3.10 | ||
| # (asyncio.TaskGroup was added in 3.11, and we can switch to | ||
| # that when we drop support for 3.10). | ||
| async with anyio.create_task_group() as tg: | ||
| # Start reflection server in the background. | ||
| tg.start_soon(reflection_server.serve, name='genkit-reflection-server') | ||
| await logger.ainfo(f'Started Genkit reflection server at {server_spec.url}') | ||
|
|
||
| # Start SIGTERM handler | ||
| tg.start_soon(handle_sigterm, tg, name='genkit-sigterm-handler') | ||
|
|
||
| # Wait for server to be responsive | ||
| # We need to loop and poll the health endpoint or wait for uvicorn to be ready | ||
| # Since uvicorn run is blocking (but we are in a task), we can't | ||
| # easily hook into its startup | ||
| # unless we use uvicorn's server object directly which we do. | ||
| # reflection_server.started is set when uvicorn starts. | ||
|
|
||
| # Simple polling loop | ||
|
|
||
| max_retries = 20 # 2 seconds total roughly | ||
| for _i in range(max_retries): | ||
| try: | ||
| # TODO(#4334): Use async http client if available to avoid blocking loop? | ||
| # But we are in dev mode, so maybe okay. | ||
| # Actually we should use anyio.to_thread to avoid blocking event loop | ||
| # or assume standard lib urllib is fast enough for localhost. | ||
|
|
||
| # Using sync urllib in async loop blocks the loop! | ||
| # We must use anyio.to_thread or a non-blocking check. | ||
| # But let's check if reflection_server object has a 'started' flag we can trust. | ||
| # uvicorn.Server has 'started' attribute but it might be internal state. | ||
|
|
||
| # Let's stick to simple polling with to_thread for safety | ||
| def check_health() -> bool: | ||
| health_url = f'{server_spec.url}/api/__health' | ||
| with urllib.request.urlopen(health_url, timeout=0.5) as response: | ||
| return response.status == 200 | ||
|
|
||
| is_healthy = await anyio.to_thread.run_sync(check_health) # type: ignore[attr-defined] | ||
| if is_healthy: | ||
| break | ||
| except Exception: | ||
| await anyio.sleep(0.1) | ||
| else: | ||
| logger.warning( | ||
| f'Reflection server at {server_spec.url} did not become healthy in time.' | ||
| ) | ||
|
|
||
| # Now write the file (or verify it persisted) | ||
| _ = runtime_manager.write_runtime_file() | ||
|
|
||
| # Start the (potentially short-lived) user coroutine wrapper | ||
| tg.start_soon(run_user_coro_wrapper, name='genkit-user-coroutine') | ||
| await logger.ainfo('Started Genkit user coroutine') | ||
|
|
||
| # Block here until the task group is canceled (e.g. Ctrl+C) | ||
| # or a task raises an unhandled exception. It should not | ||
| # exit just because the user coroutine finishes. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block introduces a significant amount of duplicated code. The logic for starting the V1 reflection server is nearly identical to the code that existed before it was moved into this else block. This makes the dev_runner function harder to read and maintain.
To improve this, I recommend extracting the V1 server startup logic into a separate private async function. This function could then be called from this else block, which would greatly simplify the main control flow and eliminate the code duplication.
| value_type = params.get('type', '') | ||
|
|
||
| async def handle_run_action( | ||
| request: Request, | ||
| ) -> JSONResponse | StreamingResponse: | ||
| """Handle the runAction endpoint for executing registered actions. | ||
| if value_type == 'defaultModel': | ||
| return self._registry.list_values('defaultModel') | ||
|
|
||
| Flow: | ||
| 1. Reads and validates the request payload | ||
| 2. Looks up the requested action | ||
| 3. Executes the action with the provided input | ||
| 4. Returns the action result as JSON with trace ID | ||
| # Return all values if type not recognized | ||
| return self._registry.list_values(value_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior for listValues appears to have changed from V1. In the V1 API, providing a type other than 'defaultModel' resulted in an error. This V2 implementation passes any unrecognized type directly to self._registry.list_values(), and the comment Return all values if type not recognized might be misleading depending on the registry's behavior.
If only 'defaultModel' is currently supported, it would be better to enforce this for API consistency and to provide clearer error feedback to the client. This can be done by raising an error for unsupported types, which will be converted into a proper JSON-RPC error response.
| value_type = params.get('type', '') | |
| async def handle_run_action( | |
| request: Request, | |
| ) -> JSONResponse | StreamingResponse: | |
| """Handle the runAction endpoint for executing registered actions. | |
| if value_type == 'defaultModel': | |
| return self._registry.list_values('defaultModel') | |
| Flow: | |
| 1. Reads and validates the request payload | |
| 2. Looks up the requested action | |
| 3. Executes the action with the provided input | |
| 4. Returns the action result as JSON with trace ID | |
| # Return all values if type not recognized | |
| return self._registry.list_values(value_type) | |
| value_type = params.get('type') | |
| if not value_type: | |
| raise ValueError("The 'type' parameter is required for listValues.") | |
| if value_type != 'defaultModel': | |
| raise ValueError(f"Value type '{value_type}' is not supported. Only 'defaultModel' is currently supported.") | |
| return self._registry.list_values(value_type) |
| return None, JsonRpcError( | ||
| code=404, | ||
| message='Action not found or already completed', | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error code 404 is an HTTP status code. For consistency within the JSON-RPC protocol, it's preferable to use an error code from the ranges defined in the JSON-RPC 2.0 specification.
For application-specific errors like an action not being found, the recommended range is -32000 to -32099. Using a code from this range would make the API more consistent for JSON-RPC clients.
| return None, JsonRpcError( | |
| code=404, | |
| message='Action not found or already completed', | |
| ) | |
| return None, JsonRpcError( | |
| code=-32004, # Or another code in the implementation-defined server error range. | |
| message='Action not found or already completed', | |
| ) |
0074ab0 to
6b8771d
Compare
This implements the Reflection API v2 for Python following RFC #4211. Architecture Change: - V1 (HTTP server): Runtime hosts an HTTP server, CLI/DevUI connect to it - V2 (WebSocket client): CLI hosts a WebSocket server, Runtimes connect outbound The v2 API reverses the connection direction, allowing better support for bidirectional actions and environments where binding a port is impractical. Module Reorganization: - reflection.py: New v2 WebSocket client (primary when enabled) - reflection_v1.py: Existing HTTP server implementation (default) V2 Implementation: - ReflectionClientV2 connects to a runtime manager via WebSocket - JSON-RPC 2.0 protocol for all communication - Supports: listActions, runAction, cancelAction, listValues - Streaming: runActionState and streamChunk notifications - Auto-reconnection with exponential backoff (1s to 60s max) - Proper task-based cancellation via asyncio.current_task().cancel() Activation: - V2 is activated when GENKIT_REFLECTION_V2_SERVER env var is set - V1 remains the default when env var is not set Dependencies: - websockets>=15.0 as core dependency Tests: - 25 tests for v2 WebSocket client - 7 tests for v1 HTTP server See: RFC #4211
6b8771d to
b5c6fe3
Compare
Summary
Implements the Reflection API v2 for Python following RFC #4211.
Architecture Comparison
V1 (HTTP Server) - Current Default
V2 (WebSocket Client) - When GENKIT_REFLECTION_V2_SERVER is set
Data Flow (V1 HTTP)
Data Flow (V2 WebSocket)
Protocol Methods (V2)
registerconfigurelistActionslistValuesrunActionrunActionStatestreamChunkcancelActionModule Organization
reflection.pyreflection_v1.pyActivation
GENKIT_REFLECTION_V2_SERVER=ws://localhost:4100Dependencies
websockets>=15.0as core dependencyTests
Related