Skip to content

fix: resolve event stream race conditions in EventConsumer and SSE tr…#782

Open
kabir wants to merge 2 commits intoa2aproject:mainfrom
kabir:fix-local-handling-race-condition
Open

fix: resolve event stream race conditions in EventConsumer and SSE tr…#782
kabir wants to merge 2 commits intoa2aproject:mainfrom
kabir:fix-local-handling-race-condition

Conversation

@kabir
Copy link
Copy Markdown
Collaborator

@kabir kabir commented Apr 7, 2026

…ansport

Fixed multiple race conditions causing intermittent test failures in streaming and subscription scenarios. These fixes reduced the failure rate from ~33% to 0% across 1,500+ test iterations in CI.

Removed executor.execute() wrapper in REST/JSONRPC routes that delayed subscription by 100-600ms, causing events to be lost when EventConsumer started emitting before subscriber was ready.

Changed onCancelTask to use consumeAndBreakOnInterrupt() instead of consumeAll(). Removed unused ResultAggregator.consumeAll() method since cancel was its only caller.

Moved EventConsumer polling loop to executor thread to prevent blocking caller, ensuring subscription happens immediately without delay.

Fixed race in onSubscribeToTask where initial task snapshot was enqueued but EventConsumer polling hadn't started yet. Added insertingProcessor() utility to AsyncUtils that prepends initial items synchronously to reactive streams using mutiny-zero ZeroPublisher, ensuring subscriber receives initial task snapshot immediately on subscription.

Updated transport layer unit tests (JSONRPCHandlerTest, GrpcHandlerTest) to expect initial task snapshot per A2A Protocol Specification 3.1.6.

Fixed insertingProcessor() to respect reactive streams semantics by sending inserted items in the source's onSubscribe() callback after subscription is established, rather than immediately in the ZeroPublisher creation lambda.

Fixed two test race conditions where tests checked for received events immediately after subscription was established (server-side metric), without waiting for consumer callbacks to actually process events:

  • testSubscribeToTaskWithInterruptedStateKeepsStreamOpen: Added initialTaskLatch to wait for initial TaskEvent reception
  • testNonBlockingWithMultipleMessages: Added streamConsumerReadyLatch to wait for streaming consumer to start receiving events

Enhanced awaitingFinalEvent tracking with timeout guards (max 3s wait) to prevent infinite waiting if final event never arrives due to distribution delays in replicated scenarios.

Increased sleep delay from 50ms to 150ms to account for CI environment latency and ensure buffered events flush before stream ends.

Improved pollTimeoutsWhileAwaitingFinal reset logic to only reset when not awaiting final event. Calculated timeout constant from base timeout value for better maintainability.

Tests fixed:

  • testNonBlockingWithMultipleMessages
  • testCancelTaskSuccess
  • testSubscribeToTaskWithInterruptedStateKeepsStreamOpen

…ansport

Fixed multiple race conditions causing intermittent test failures in streaming
and subscription scenarios. These fixes reduced the failure rate from ~33% to
0% across 1,500+ test iterations in CI.

Removed executor.execute() wrapper in REST/JSONRPC routes that delayed
subscription by 100-600ms, causing events to be lost when EventConsumer
started emitting before subscriber was ready.

Changed onCancelTask to use consumeAndBreakOnInterrupt() instead of
consumeAll(). Removed unused ResultAggregator.consumeAll() method since
cancel was its only caller.

Moved EventConsumer polling loop to executor thread to prevent blocking
caller, ensuring subscription happens immediately without delay.

Fixed race in onSubscribeToTask where initial task snapshot was enqueued
but EventConsumer polling hadn't started yet. Added insertingProcessor()
utility to AsyncUtils that prepends initial items synchronously to reactive
streams using mutiny-zero ZeroPublisher, ensuring subscriber receives
initial task snapshot immediately on subscription.

Updated transport layer unit tests (JSONRPCHandlerTest, GrpcHandlerTest)
to expect initial task snapshot per A2A Protocol Specification 3.1.6.

Fixed insertingProcessor() to respect reactive streams semantics by sending
inserted items in the source's onSubscribe() callback after subscription is
established, rather than immediately in the ZeroPublisher creation lambda.

Fixed two test race conditions where tests checked for received events
immediately after subscription was established (server-side metric), without
waiting for consumer callbacks to actually process events:
- testSubscribeToTaskWithInterruptedStateKeepsStreamOpen: Added
  initialTaskLatch to wait for initial TaskEvent reception
- testNonBlockingWithMultipleMessages: Added streamConsumerReadyLatch to
  wait for streaming consumer to start receiving events

Enhanced awaitingFinalEvent tracking with timeout guards (max 3s wait) to
prevent infinite waiting if final event never arrives due to distribution
delays in replicated scenarios.

Increased sleep delay from 50ms to 150ms to account for CI environment
latency and ensure buffered events flush before stream ends.

Improved pollTimeoutsWhileAwaitingFinal reset logic to only reset when not
awaiting final event. Calculated timeout constant from base timeout value
for better maintainability.

Tests fixed:
- testNonBlockingWithMultipleMessages
- testCancelTaskSuccess
- testSubscribeToTaskWithInterruptedStateKeepsStreamOpen
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request resolves SSE streaming race conditions and buffering issues by implementing synchronous subscriptions in route handlers and offloading EventConsumer polling to a dedicated executor. It also introduces an insertingProcessor to prepend initial task snapshots to streams for protocol compliance. Feedback suggests using a more idiomatic polling loop condition in EventConsumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant