http: switch worker delivery from pool.Exec to ConnectRPC proxy#251
Conversation
Workers no longer talk to RoadRunner via goridge stdio. They connect IN over h2c and pull requests from a new in-process queue. The plugin still owns worker process lifecycle (start/restart/scale/Reset) via pool/v2 — only the request data path moves. proxy package (new): Queue brokers HTTP producers and worker consumers via Submit / Next / TryNext / Deliver / Cancel / Close. Server hosts the HttpProxyService over h2c with FetchRequest (single, blocking) and FetchRequests (batched: one blocking Next, the rest TryNext so a concurrent consumer cannot strand the loop). StreamRequest / StreamResponse remain reserved. handler: ServeHTTP submits an HttpHandlerRequest (UUIDv7 Id) to the queue and blocks on a per-request response channel with a context-aware timeout. Envelopes come from sync.Pool. For multipart, uploads are extracted to tmpdir and req.Uploads is marshaled AFTER ups.Open so per-file Error/Size/TempFilename reflect final state; for everything else the body is passed through raw. plugin / config: plugin.go wires proxy.Queue + proxy.Server alongside the pool. New Proxy config block (address, request_timeout, inbox_size, debug); the no-op raw_body field is gone. servers/http11: replaced the deprecated golang.org/x/net/http2/h2c package with stdlib http.Server.Protocols. tests: all plugin-level deps bumped from /v5 to /v6 (config, server, rpc, informer, resetter, memory, send, static, headers). RPC helpers switched from goridge/v4/pkg/rpc to the ConnectRPC clients exposed by the latest plugins. Handler-level tests migrated off staticPool to proxy.Queue + a fake Go-side worker. goccy/go-json replaced by stdlib encoding/json. golangci-lint clean across both modules. PHP SDK is out of scope: client.php still speaks goridge stdio, so the plugin-level integration tests that spawn real PHP workers (TestHTTPPost, TestHTTPOTLP*, TestHTTPBigResp, TestSSLNoHTTP) need the SDK side to land first.
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (11)
📝 WalkthroughWalkthroughThe PR replaces the HTTP plugin's PSR-7/pool-based request streaming with a ConnectRPC proxy queue architecture: an in-memory request/response broker coordinates HTTP handlers and external worker clients, eliminating direct pool execution and enabling pure queue-based work distribution with timeout and inbox capacity constraints. ChangesQueue-Based Handler Architecture Migration
Test Infrastructure Refactoring
Supporting Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches🧪 Generate unit tests (beta)
|
# Conflicts: # handler/pool.go # handler/response.go
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugin.go (1)
138-157:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftFail startup if the proxy server can't come up, and clean it up on later init errors.
p.proxyServer.Serve()is fire-and-forget here, so a bind/start failure is only reported onerrChafterServe()has already continued starting HTTP listeners. On top of that, ifinitServers()fails afterward, the proxy goroutine, queue, and worker pool are left running. Please gate readiness and add rollback for the already-started components before treating the plugin as started.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugin.go` around lines 138 - 157, The proxy server is started fire-and-forget via p.proxyServer.Serve(), so startup bind failures can be missed and resources (p.proxyServer, p.queue, p.handler/worker pool) are leaked if p.initServers() later fails; change the startup to wait for Serve() to report initial readiness/error (e.g., start Serve in a goroutine but block until it signals success or returns an error, or call a Start/Listen method that returns error), and if p.initServers() returns an error perform rollback by stopping p.proxyServer (call its Close/Stop), draining/closing p.queue (proxy.NewQueue) and shutting down the handler/worker pool (handler.NewHandler shutdown method) before sending errCh and returning; reference p.proxyServer.Serve(), proxy.NewServer, p.queue, p.handler, and p.initServers() to locate and implement the readiness gating and cleanup.
🧹 Nitpick comments (1)
tests/handler_test.go (1)
63-70: ⚡ Quick winMake server startup deterministic instead of sleeping.
ListenAndServeruns in a goroutine, then the helper just sleeps for 10ms before returning. That makes the whole suite timing-sensitive on slower CI and still doesn't guarantee the socket is ready. Bind the listener first and callServe, so readiness is deterministic.Possible fix
+ln, err := net.Listen("tcp", addr) +require.NoError(t, err) -hs := &http.Server{Addr: addr, Handler: h, ReadHeaderTimeout: time.Minute} +hs := &http.Server{Handler: h, ReadHeaderTimeout: time.Minute} go func() { - if err := hs.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := hs.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { t.Errorf("error listening the interface: error %v", err) } }() -// give the listener a moment to bind -time.Sleep(10 * time.Millisecond)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/handler_test.go` around lines 63 - 70, The test currently starts the server with hs.ListenAndServe in a goroutine then sleeps 10ms; instead bind the socket first with net.Listen("tcp", addr) to ensure readiness, then start hs.Serve(listener) in the goroutine (using the same hs := &http.Server{Addr: addr, Handler: h, ReadHeaderTimeout: time.Minute}), and remove the time.Sleep; also ensure the listener is closed in test teardown (or defer listener.Close()) so the test cleans up deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@config/config.go`:
- Around line 50-52: InitDefaults currently leaves negative RequestTimeout
values in Proxy config which later cause time.NewTimer(cfg.Proxy.RequestTimeout)
to panic; validate and reject negative durations at config initialization by
adding a check in InitDefaults (or a config validation function) that inspects
Proxy.RequestTimeout and returns an error if RequestTimeout < 0, updating any
callers to handle the error on startup (reference symbols: RequestTimeout field,
InitDefaults function, cfg.Proxy.RequestTimeout,
handler.ServeHTTP/time.NewTimer).
In `@handler/handler.go`:
- Around line 48-49: The HttpHandlerRequest objects are being recycled via
reqPool while ownership escapes ServeHTTP through queue.Submit, causing Reset to
corrupt requests still in q.inbox or being serialized by FetchRequest; stop
pooling these envelopes: remove use of reqPool and always allocate new
*HttpHandlerRequest in ServeHTTP (or only return objects to the pool after an
explicit queue-side acknowledgement), and delete or guard calls to Reset() that
run when the request may still be queued (inspect functions ServeHTTP,
queue.Submit, FetchRequest, Reset and places that call reqPool.Put).
- Around line 180-198: The Http2-Push control header in the headers map is being
used for server push but is still copied to the client; update the handler to
remove or skip the HTTP2Push entry before the final headers copy: after handling
push (the block using headers[HTTP2Push], HTTP2Push constant, and pusher.Push)
delete headers[HTTP2Push] or ensure the for k,v range that adds to w.Header()
skips k == HTTP2Push so the internal control metadata is not forwarded to the
response; preserve existing behavior for Trailer handling (handleProtoTrailers)
and the rest of the header loop.
In `@handler/parse.go`:
- Around line 13-15: handleRequestErr currently doesn't map the new sentinel
ErrMaxLevelExceeded to a client error; update the error switch in
handleRequestErr to add a case for ErrMaxLevelExceeded and return
http.StatusBadRequest (instead of defaulting to 500). Locate the
handleRequestErr function and add a case matching ErrMaxLevelExceeded that sets
the response status to http.StatusBadRequest so malformed form depth returns
4xx.
In `@plugin.go`:
- Around line 191-198: The shutdown sequence currently calls
p.proxyServer.Stop(ctx) before closing the internal queue, which causes
http.Shutdown (via p.proxyServer.Stop) to hang waiting for handlers blocked on
queue.Next(ctx); change the order so you call p.queue.Close() before invoking
p.proxyServer.Stop(ctx) (i.e., ensure p.queue != nil -> p.queue.Close() runs
prior to p.proxyServer.Stop(ctx)), so blocked RPC handlers are unblocked and
proxyServer.Stop can complete promptly.
In `@proxy/queue.go`:
- Around line 91-117: Queued requests keep occupying the bounded channel after
cancelation; update Cancel to record canceled request IDs in a protected set
(e.g., q.canceled map guarded by q.mu) when removing the entry from q.pending,
and modify Queue.Next and Queue.TryNext to repeatedly read from q.inbox and
skip/discard any request whose ID is present in q.canceled (clearing that ID
from the set once skipped) until a non-canceled request is returned (or inbox
closed / ctx done); ensure all access to q.canceled and q.pending is
synchronized with the existing mutex and that skipping canceled items reclaims
capacity immediately.
In `@proxy/server.go`:
- Around line 113-143: Clamp the batch capacity before using it as the slice
capacity in FetchRequests: compute a safeCap (e.g., min(batch, s.queue.Len()+1,
someHardMax)) after reading req.Msg.GetBatchSize() and use that for make(..., 0,
safeCap); keep the existing fallback to batch=1 for non-positive inputs, then
append the first item (from s.queue.Next) and use up to safeCap-1 iterations
with s.queue.TryNext to avoid oversized allocations. Ensure you reference
FetchRequests, the make(..., 0, batch) allocation, s.queue.Next and
s.queue.TryNext when making the change.
In `@tests/handler_test.go`:
- Around line 529-532: The test currently asserts assert.GreaterOrEqual(t,
r.StatusCode, 500) which is too loose for the dropped-worker timeout contract;
update the assertion to explicitly expect the gateway-timeout status (504). In
the test that calls helpers.Get("http://127.0.0.1:8178/?hello=world") and
captures r (response), replace the loose GreaterOrEqual assertion on
r.StatusCode with an explicit equality/assertion that r.StatusCode == 504 (e.g.
assert.Equal(t, 504, r.StatusCode) or require.Equal), so the test fails if the
behavior regresses to a generic 500.
In `@tests/helpers/rpc.go`:
- Around line 17-25: The rpcH2CClient helper creates an http.Client with no
timeout; update rpcH2CClient to set a sensible client timeout (e.g., Timeout:
10*time.Second) on the returned *http.Client to prevent tests from hanging, and
add the time import if missing; keep the existing http2.Transport and
DialTLSContext unchanged so only the http.Client struct gains the Timeout field.
In `@tests/http_plugin_test.go`:
- Around line 2617-2621: The test currently assumes order in the plugin list by
checking listResp.Msg.GetPlugins()[0] == httpPlugin.PluginName; change this to
assert membership instead: call rc.ListPlugins(...) as before, then verify that
httpPlugin.PluginName is present in the slice returned by
listResp.Msg.GetPlugins() (e.g., iterate or use a helper to check equality),
keeping the same request construction
(connect.NewRequest(&resetterV1.ListPluginsRequest{})) and error checks; ensure
the failure message explains the missing plugin rather than relying on index 0.
In `@tests/http_plugin3_test.go`:
- Around line 1442-1460: The RPC helper functions workers(), addWorker(), and
removeWorker() use context.Background() which can hang tests; replace each call
to context.Background() with a context that has a deadline (e.g. ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second); defer cancel()) and
pass ctx into client.GetWorkers/AddWorker/RemoveWorker calls so stalled RPCs
time out; update imports to include time where needed and ensure defer cancel()
is present in each function.
---
Outside diff comments:
In `@plugin.go`:
- Around line 138-157: The proxy server is started fire-and-forget via
p.proxyServer.Serve(), so startup bind failures can be missed and resources
(p.proxyServer, p.queue, p.handler/worker pool) are leaked if p.initServers()
later fails; change the startup to wait for Serve() to report initial
readiness/error (e.g., start Serve in a goroutine but block until it signals
success or returns an error, or call a Start/Listen method that returns error),
and if p.initServers() returns an error perform rollback by stopping
p.proxyServer (call its Close/Stop), draining/closing p.queue (proxy.NewQueue)
and shutting down the handler/worker pool (handler.NewHandler shutdown method)
before sending errCh and returning; reference p.proxyServer.Serve(),
proxy.NewServer, p.queue, p.handler, and p.initServers() to locate and implement
the readiness gating and cleanup.
---
Nitpick comments:
In `@tests/handler_test.go`:
- Around line 63-70: The test currently starts the server with hs.ListenAndServe
in a goroutine then sleeps 10ms; instead bind the socket first with
net.Listen("tcp", addr) to ensure readiness, then start hs.Serve(listener) in
the goroutine (using the same hs := &http.Server{Addr: addr, Handler: h,
ReadHeaderTimeout: time.Minute}), and remove the time.Sleep; also ensure the
listener is closed in test teardown (or defer listener.Close()) so the test
cleans up deterministically.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 76fbf2a7-a737-47cf-b4d4-92d7957e39b2
⛔ Files ignored due to path filters (3)
go.sumis excluded by!**/*.sumgo.work.sumis excluded by!**/*.sumtests/go.sumis excluded by!**/*.sum
📒 Files selected for processing (38)
.gitignore.golangci.ymlconfig/config.gogo.modhandler/convert.gohandler/errors.gohandler/errors_windows.gohandler/handler.gohandler/handler_test.gohandler/parse.gohandler/parse_test.gohandler/pool.gohandler/request.gohandler/response.gohandler/uploads.goplugin.goproxy/doc.goproxy/queue.goproxy/queue_test.goproxy/server.goproxy/server_test.goservers/http11/http.goservers/interface.gotests/configs/.rr-connectrpc-worker.yamltests/connectrpc_worker_test.gotests/go.modtests/handler_test.gotests/helpers/rpc.gotests/helpers/worker.gotests/http_otlp_test.gotests/http_plugin2_test.gotests/http_plugin3_test.gotests/http_plugin4_test.gotests/http_plugin_test.gotests/php_test_files/big-resptests/test_plugins/plugin1.gotests/test_plugins/plugin_middleware.gotests/uploads_test.go
💤 Files with no reviewable changes (4)
- handler/handler_test.go
- handler/pool.go
- handler/response.go
- .golangci.yml
- handler: stop recycling envelopes on the cancel/timeout paths — the request pointer may still live in the inbox or be mid-marshal in FetchRequest. Put back only on the success branch. - handler: skip Http2-Push when copying worker headers to the client; it's plugin-internal control metadata. - handler: map ErrMaxLevelExceeded to 400 in handleRequestErr. - plugin: close the queue before shutting down the proxy server so handlers parked in queue.Next unblock and http.Shutdown completes. - config: reject negative proxy.request_timeout / inbox_size in Valid (time.NewTimer panics on negative durations). - proxy: drop the up-front cap on the batch result slice; TryNext short-circuits when the queue is empty, append handles growth. - tests: tighten TestHandler_Error2 to require exactly 504; add a 10s Timeout to the RPC h2c client; switch the resetter ListPlugins assert to membership instead of index-0; bound the informer RPC helpers (workers/addWorker/removeWorker) with a 5s WithTimeout.
Summary
Inverts how requests reach PHP workers. Previously the http plugin pushed a goridge-encoded payload into
pool.Exec. Now workers connect IN over h2c and pull from an in-process queue; the plugin still owns worker process lifecycle (spawn / Reset / Workers / scale) viapool/v2— only the request data path moves.This is a breaking change. The PHP worker SDK needs a matching ConnectRPC client; until that lands, the plugin-level integration tests that spawn real PHP workers fail at the read side.
Architecture
What changed
proxy/(new):Queue+Server.FetchRequestsdeliberately uses one blockingNextthenTryNextfor the rest — a concurrentFetchRequestcannot strand the batch loop on an empty channel.handler/handler.go: newServeHTTP,*HttpHandlerRequestviasync.Pool, context + 60s timeout select on the per-request response channel. Multipart uploads are opened before marshaling so the per-fileError/Size/TempFilenamemake it to the worker.plugin.go: wiresproxy.Queue+proxy.Serveralongside the pool;Reset/Workers/Stopkeep their pool semantics.config/config.go: newProxy { address, request_timeout, inbox_size, debug }. The deadraw_bodyfield is removed (breaking).servers/http11: dropped deprecatedx/net/http2/h2cin favor of stdlibhttp.Server.Protocols.tests/: every RR plugin dep bumped/v5 → /v6; the goridge-RPC test helpers switched to ConnectRPC clients (informerV1connect,resetterV1connect);handler_test.goanduploads_test.gorewritten againstproxy.Queue+ a Go-side fake worker (tests/helpers.StartFakeWorker). Newtests/connectrpc_worker_test.goemulates a worker end-to-end via Endure.goccy/go-jsonreplaced by stdlibencoding/json.Summary by CodeRabbit
Release Notes
New Features
Http2-Pushheader.Bug Fixes
Chores