fix: resolve event stream race conditions in EventConsumer and SSE tr…#782
Open
kabir wants to merge 2 commits intoa2aproject:mainfrom
Open
fix: resolve event stream race conditions in EventConsumer and SSE tr…#782kabir wants to merge 2 commits intoa2aproject:mainfrom
kabir wants to merge 2 commits intoa2aproject:mainfrom
Conversation
…ansport Fixed multiple race conditions causing intermittent test failures in streaming and subscription scenarios. These fixes reduced the failure rate from ~33% to 0% across 1,500+ test iterations in CI. Removed executor.execute() wrapper in REST/JSONRPC routes that delayed subscription by 100-600ms, causing events to be lost when EventConsumer started emitting before subscriber was ready. Changed onCancelTask to use consumeAndBreakOnInterrupt() instead of consumeAll(). Removed unused ResultAggregator.consumeAll() method since cancel was its only caller. Moved EventConsumer polling loop to executor thread to prevent blocking caller, ensuring subscription happens immediately without delay. Fixed race in onSubscribeToTask where initial task snapshot was enqueued but EventConsumer polling hadn't started yet. Added insertingProcessor() utility to AsyncUtils that prepends initial items synchronously to reactive streams using mutiny-zero ZeroPublisher, ensuring subscriber receives initial task snapshot immediately on subscription. Updated transport layer unit tests (JSONRPCHandlerTest, GrpcHandlerTest) to expect initial task snapshot per A2A Protocol Specification 3.1.6. Fixed insertingProcessor() to respect reactive streams semantics by sending inserted items in the source's onSubscribe() callback after subscription is established, rather than immediately in the ZeroPublisher creation lambda. Fixed two test race conditions where tests checked for received events immediately after subscription was established (server-side metric), without waiting for consumer callbacks to actually process events: - testSubscribeToTaskWithInterruptedStateKeepsStreamOpen: Added initialTaskLatch to wait for initial TaskEvent reception - testNonBlockingWithMultipleMessages: Added streamConsumerReadyLatch to wait for streaming consumer to start receiving events Enhanced awaitingFinalEvent tracking with timeout guards (max 3s wait) to prevent infinite waiting if final event never arrives due to distribution delays in replicated scenarios. Increased sleep delay from 50ms to 150ms to account for CI environment latency and ensure buffered events flush before stream ends. Improved pollTimeoutsWhileAwaitingFinal reset logic to only reset when not awaiting final event. Calculated timeout constant from base timeout value for better maintainability. Tests fixed: - testNonBlockingWithMultipleMessages - testCancelTaskSuccess - testSubscribeToTaskWithInterruptedStateKeepsStreamOpen
Contributor
There was a problem hiding this comment.
Code Review
This pull request resolves SSE streaming race conditions and buffering issues by implementing synchronous subscriptions in route handlers and offloading EventConsumer polling to a dedicated executor. It also introduces an insertingProcessor to prepend initial task snapshots to streams for protocol compliance. Feedback suggests using a more idiomatic polling loop condition in EventConsumer.
server-common/src/main/java/io/a2a/server/events/EventConsumer.java
Outdated
Show resolved
Hide resolved
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
…ansport
Fixed multiple race conditions causing intermittent test failures in streaming and subscription scenarios. These fixes reduced the failure rate from ~33% to 0% across 1,500+ test iterations in CI.
Removed executor.execute() wrapper in REST/JSONRPC routes that delayed subscription by 100-600ms, causing events to be lost when EventConsumer started emitting before subscriber was ready.
Changed onCancelTask to use consumeAndBreakOnInterrupt() instead of consumeAll(). Removed unused ResultAggregator.consumeAll() method since cancel was its only caller.
Moved EventConsumer polling loop to executor thread to prevent blocking caller, ensuring subscription happens immediately without delay.
Fixed race in onSubscribeToTask where initial task snapshot was enqueued but EventConsumer polling hadn't started yet. Added insertingProcessor() utility to AsyncUtils that prepends initial items synchronously to reactive streams using mutiny-zero ZeroPublisher, ensuring subscriber receives initial task snapshot immediately on subscription.
Updated transport layer unit tests (JSONRPCHandlerTest, GrpcHandlerTest) to expect initial task snapshot per A2A Protocol Specification 3.1.6.
Fixed insertingProcessor() to respect reactive streams semantics by sending inserted items in the source's onSubscribe() callback after subscription is established, rather than immediately in the ZeroPublisher creation lambda.
Fixed two test race conditions where tests checked for received events immediately after subscription was established (server-side metric), without waiting for consumer callbacks to actually process events:
Enhanced awaitingFinalEvent tracking with timeout guards (max 3s wait) to prevent infinite waiting if final event never arrives due to distribution delays in replicated scenarios.
Increased sleep delay from 50ms to 150ms to account for CI environment latency and ensure buffered events flush before stream ends.
Improved pollTimeoutsWhileAwaitingFinal reset logic to only reset when not awaiting final event. Calculated timeout constant from base timeout value for better maintainability.
Tests fixed: