Skip to content

Conversation

@yesudeep
Copy link
Contributor

@yesudeep yesudeep commented Feb 1, 2026

Summary

Implements the Reflection API v2 for Python following RFC #4211.

Architecture Comparison

V1 (HTTP Server) - Current Default

     ┌─────────────────────┐          ┌─────────────────────┐
     │    Genkit CLI       │          │      Dev UI         │
     │   (HTTP Client)     │          │    (HTTP Client)    │
     └──────────┬──────────┘          └──────────┬──────────┘
                │                                │
                │       HTTP Requests            │
                │    (GET /api/actions, etc)     │
                │                                │
                ▼                                ▼
     ┌───────────────────────────────────────────────────────┐
     │                  Genkit Runtime                       │
     │               ┌────────────────────┐                  │
     │               │   HTTP Server      │                  │
     │               │   (port 3100)      │                  │
     │               └────────────────────┘                  │
     │               ┌────────────────────┐                  │
     │               │     Registry       │                  │
     │               │  (Actions, Flows)  │                  │
     │               └────────────────────┘                  │
     └───────────────────────────────────────────────────────┘

     Discovery: Runtime writes file to ~/.genkit/{runtimeId}.runtime.json
     Connection: CLI reads file, finds port, connects via HTTP

V2 (WebSocket Client) - When GENKIT_REFLECTION_V2_SERVER is set

     ┌───────────────────────────────────────────────────────┐
     │                 Runtime Manager                       │
     │               (CLI WebSocket Server)                  │
     │               ┌────────────────────┐                  │
     │               │  WebSocket Server  │                  │
     │               │   (port 4100)      │                  │
     │               └────────────────────┘                  │
     │               ┌────────────────────┐                  │
     │               │     Dev UI         │                  │
     │               └────────────────────┘                  │
     └───────────────────────────────────────────────────────┘
                     ▲           ▲           ▲
                     │           │           │
        WebSocket    │           │           │   WebSocket
        Connect      │           │           │   Connect
                     │           │           │
     ┌───────────────┴───┐ ┌─────┴─────┐ ┌───┴───────────────┐
     │   Genkit Runtime  │ │  Runtime  │ │   Genkit Runtime  │
     │   (Python app)    │ │  (JS app) │ │   (Go app)        │
     └───────────────────┘ └───────────┘ └───────────────────┘

     Discovery: Runtime reads GENKIT_REFLECTION_V2_SERVER env var
     Connection: Runtime connects outbound to Manager via WebSocket

Data Flow (V1 HTTP)

     CLI / Dev UI                         Reflection Server
          │                                    │
          │  ──── GET /api/__health ────►      │
          │  ◄──── {"status": "OK"} ────       │
          │                                    │
          │  ──── GET /api/actions ────►       │
          │  ◄──── {actions dict} ────         │
          │                                    │
          │  ──── POST /api/runAction ────►    │  (key, input, context)
          │  ◄──── X-Genkit-Trace-Id header ── │  (early flush)
          │  ◄──── stream chunks ────          │  (if streaming)
          │  ◄──── final result ────           │
          │                                    │
          │  ──── POST /api/cancelAction ──►   │  (traceId)
          │  ◄──── {"message": "..."} ────     │
          │                                    │

Data Flow (V2 WebSocket)

     Genkit Runtime                    Runtime Manager Server
          │                                    │
          │  ──── WebSocket Connect ────►      │
          │                                    │
          │  ──── register (JSON-RPC) ────►    │  (id, pid, version, envs)
          │                                    │
          │  ◄──── configure notification ──   │  (telemetryServerUrl)
          │                                    │
          │  ◄──── listActions request ────    │
          │  ──── response with actions ────►  │
          │                                    │
          │  ◄──── runAction request ────      │  (key, input, stream)
          │  ──── runActionState notif ────►   │  (sends traceId early)
          │  ──── streamChunk notification ──► │  (if streaming)
          │  ──── response with result ────►   │
          │                                    │
          │  ◄──── cancelAction request ────   │  (traceId)
          │  ──── response (cancelled) ────►   │
          │                                    │

Protocol Methods (V2)

Method Direction Type Description
register Runtime→Manager Notification Register runtime info (id, pid, version)
configure Manager→Runtime Notification Push config (telemetry URL)
listActions Manager→Runtime Request List available actions
listValues Manager→Runtime Request List values by type
runAction Manager→Runtime Request Execute an action
runActionState Runtime→Manager Notification Send traceId early
streamChunk Runtime→Manager Notification Stream output chunk
cancelAction Manager→Runtime Request Cancel running action

Module Organization

Module Description
reflection.py V2 WebSocket client (primary when enabled)
reflection_v1.py V1 HTTP server (default)

Activation

Mode Configuration
V2 Set GENKIT_REFLECTION_V2_SERVER=ws://localhost:4100
V1 Default when env var is not set (backward compatible)

Dependencies

  • Added websockets>=15.0 as core dependency

Tests

  • 25 tests for v2 WebSocket client
  • 7 tests for v1 HTTP server

Related

@github-actions github-actions bot added feature New feature or request python Python config labels Feb 1, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @yesudeep, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant architectural change to the Genkit Python Reflection API by implementing version 2. This new version shifts from a traditional HTTP server model (v1) where the runtime hosts a server, to a WebSocket client model where the runtime connects outbound to a centralized manager. This change enables bidirectional communication, improves environment flexibility, and simplifies discovery. The existing v1 API is preserved in a separate module, allowing for a graceful transition and configurable activation based on an environment variable.

Highlights

  • Reflection API v2 Implementation: Implemented the Reflection API v2 for Python, aligning with the Go and JavaScript implementations and RFC RFC: Reflection API V2 (WebSocket) #4211. This new version utilizes WebSockets and JSON-RPC 2.0 for communication.
  • Architectural Shift: The connection direction is reversed in v2: the Genkit runtime now acts as a WebSocket client, connecting outbound to a centralized runtime manager, rather than hosting an HTTP server (v1).
  • New ReflectionClientV2: A new ReflectionClientV2 class has been introduced to manage WebSocket connections, handle JSON-RPC 2.0 requests (e.g., listActions, runAction, cancelAction, listValues), and process notifications (register, configure). It includes auto-reconnection with exponential backoff and active action tracking for cancellation.
  • Preservation of Reflection API v1: The existing Reflection API v1 (HTTP server) implementation has been moved to a new reflection_v1.py module, ensuring backward compatibility. The system dynamically chooses between v1 and v2 based on the GENKIT_REFLECTION_V2_SERVER environment variable.
  • Dependency Update: The websockets>=15.0 library has been added as a core dependency to support the new WebSocket-based communication.
  • Testing: Comprehensive tests have been added for the new v2 WebSocket client (25 tests), and existing tests for the v1 HTTP server (7 tests) have been preserved and moved to a dedicated test file.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-structured feature: the Reflection API v2, which uses WebSockets and JSON-RPC 2.0. The refactoring to separate v1 and v2 logic is clean, and the new implementation is comprehensive. I've identified a few key areas for improvement: the reconnection logic is missing the advertised exponential backoff, the action cancellation mechanism is not functional, and there's a minor case of redundant logging. Addressing these points will greatly improve the robustness and reliability of the new API.

@yesudeep yesudeep force-pushed the yesudeep/feat/reflection-api-v2 branch 3 times, most recently from a9e37c3 to 0074ab0 Compare February 1, 2026 01:59
@yesudeep
Copy link
Contributor Author

yesudeep commented Feb 1, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major feature: the Reflection API v2, which uses WebSockets and JSON-RPC for communication. The implementation is robust, featuring automatic reconnection, cancellation support, and a clean separation from the existing V1 API. The code is well-documented and includes a comprehensive set of new tests. My review focuses on a significant area of code duplication that should be refactored for better maintainability, and a couple of minor points to improve the consistency and correctness of the new V2 API.

Comment on lines 190 to 197
# Reflection API v1: Start HTTP server
reflection_server = _make_reflection_server(self.registry, server_spec)

# Use lazy_write=True to prevent race condition where file exists before server is up
async with RuntimeManager(server_spec, lazy_write=True) as runtime_manager:
# We use anyio.TaskGroup because it is compatible with
# asyncio's event loop and works with Python 3.10
# (asyncio.TaskGroup was added in 3.11, and we can switch to
# that when we drop support for 3.10).
async with anyio.create_task_group() as tg:
# Start reflection server in the background.
tg.start_soon(reflection_server.serve, name='genkit-reflection-server')
await logger.ainfo(f'Started Genkit reflection server at {server_spec.url}')

# Start SIGTERM handler
tg.start_soon(handle_sigterm, tg, name='genkit-sigterm-handler')

# Wait for server to be responsive
# We need to loop and poll the health endpoint or wait for uvicorn to be ready
# Since uvicorn run is blocking (but we are in a task), we can't
# easily hook into its startup
# unless we use uvicorn's server object directly which we do.
# reflection_server.started is set when uvicorn starts.

# Simple polling loop

max_retries = 20 # 2 seconds total roughly
for _i in range(max_retries):
try:
# TODO(#4334): Use async http client if available to avoid blocking loop?
# But we are in dev mode, so maybe okay.
# Actually we should use anyio.to_thread to avoid blocking event loop
# or assume standard lib urllib is fast enough for localhost.

# Using sync urllib in async loop blocks the loop!
# We must use anyio.to_thread or a non-blocking check.
# But let's check if reflection_server object has a 'started' flag we can trust.
# uvicorn.Server has 'started' attribute but it might be internal state.

# Let's stick to simple polling with to_thread for safety
def check_health() -> bool:
health_url = f'{server_spec.url}/api/__health'
with urllib.request.urlopen(health_url, timeout=0.5) as response:
return response.status == 200

is_healthy = await anyio.to_thread.run_sync(check_health) # type: ignore[attr-defined]
if is_healthy:
break
except Exception:
await anyio.sleep(0.1)
else:
logger.warning(
f'Reflection server at {server_spec.url} did not become healthy in time.'
)

# Now write the file (or verify it persisted)
_ = runtime_manager.write_runtime_file()

# Start the (potentially short-lived) user coroutine wrapper
tg.start_soon(run_user_coro_wrapper, name='genkit-user-coroutine')
await logger.ainfo('Started Genkit user coroutine')

# Block here until the task group is canceled (e.g. Ctrl+C)
# or a task raises an unhandled exception. It should not
# exit just because the user coroutine finishes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block introduces a significant amount of duplicated code. The logic for starting the V1 reflection server is nearly identical to the code that existed before it was moved into this else block. This makes the dev_runner function harder to read and maintain.

To improve this, I recommend extracting the V1 server startup logic into a separate private async function. This function could then be called from this else block, which would greatly simplify the main control flow and eliminate the code duplication.

Comment on lines 517 to 523
value_type = params.get('type', '')

async def handle_run_action(
request: Request,
) -> JSONResponse | StreamingResponse:
"""Handle the runAction endpoint for executing registered actions.
if value_type == 'defaultModel':
return self._registry.list_values('defaultModel')

Flow:
1. Reads and validates the request payload
2. Looks up the requested action
3. Executes the action with the provided input
4. Returns the action result as JSON with trace ID
# Return all values if type not recognized
return self._registry.list_values(value_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The behavior for listValues appears to have changed from V1. In the V1 API, providing a type other than 'defaultModel' resulted in an error. This V2 implementation passes any unrecognized type directly to self._registry.list_values(), and the comment Return all values if type not recognized might be misleading depending on the registry's behavior.

If only 'defaultModel' is currently supported, it would be better to enforce this for API consistency and to provide clearer error feedback to the client. This can be done by raising an error for unsupported types, which will be converted into a proper JSON-RPC error response.

Suggested change
value_type = params.get('type', '')
async def handle_run_action(
request: Request,
) -> JSONResponse | StreamingResponse:
"""Handle the runAction endpoint for executing registered actions.
if value_type == 'defaultModel':
return self._registry.list_values('defaultModel')
Flow:
1. Reads and validates the request payload
2. Looks up the requested action
3. Executes the action with the provided input
4. Returns the action result as JSON with trace ID
# Return all values if type not recognized
return self._registry.list_values(value_type)
value_type = params.get('type')
if not value_type:
raise ValueError("The 'type' parameter is required for listValues.")
if value_type != 'defaultModel':
raise ValueError(f"Value type '{value_type}' is not supported. Only 'defaultModel' is currently supported.")
return self._registry.list_values(value_type)

Comment on lines 676 to 687
return None, JsonRpcError(
code=404,
message='Action not found or already completed',
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error code 404 is an HTTP status code. For consistency within the JSON-RPC protocol, it's preferable to use an error code from the ranges defined in the JSON-RPC 2.0 specification.

For application-specific errors like an action not being found, the recommended range is -32000 to -32099. Using a code from this range would make the API more consistent for JSON-RPC clients.

Suggested change
return None, JsonRpcError(
code=404,
message='Action not found or already completed',
)
return None, JsonRpcError(
code=-32004, # Or another code in the implementation-defined server error range.
message='Action not found or already completed',
)

@yesudeep yesudeep force-pushed the yesudeep/feat/reflection-api-v2 branch from 0074ab0 to 6b8771d Compare February 1, 2026 02:06
This implements the Reflection API v2 for Python following RFC #4211.

Architecture Change:
- V1 (HTTP server): Runtime hosts an HTTP server, CLI/DevUI connect to it
- V2 (WebSocket client): CLI hosts a WebSocket server, Runtimes connect outbound

The v2 API reverses the connection direction, allowing better support for
bidirectional actions and environments where binding a port is impractical.

Module Reorganization:
- reflection.py: New v2 WebSocket client (primary when enabled)
- reflection_v1.py: Existing HTTP server implementation (default)

V2 Implementation:
- ReflectionClientV2 connects to a runtime manager via WebSocket
- JSON-RPC 2.0 protocol for all communication
- Supports: listActions, runAction, cancelAction, listValues
- Streaming: runActionState and streamChunk notifications
- Auto-reconnection with exponential backoff (1s to 60s max)
- Proper task-based cancellation via asyncio.current_task().cancel()

Activation:
- V2 is activated when GENKIT_REFLECTION_V2_SERVER env var is set
- V1 remains the default when env var is not set

Dependencies:
- websockets>=15.0 as core dependency

Tests:
- 25 tests for v2 WebSocket client
- 7 tests for v1 HTTP server

See: RFC #4211
@yesudeep yesudeep force-pushed the yesudeep/feat/reflection-api-v2 branch from 6b8771d to b5c6fe3 Compare February 2, 2026 07:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

config feature New feature or request python Python

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant