From 5f6021f65d8b4613cbffed94416eb2197ad45950 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 8 Apr 2026 16:57:39 +0200 Subject: [PATCH 1/2] fix: return UnsupportedOperationError when subscribing to terminal tasks Per A2A spec, attempting to subscribe to a task in a terminal state (completed, failed, canceled, rejected) must return UnsupportedOperationError. The issue had two parts: 1. DefaultRequestHandler checked terminal state AFTER queue operations began, causing errors to be delivered via SSE stream instead of immediately. 2. For streaming endpoints (subscribeToTask), immediate errors must be wrapped in SSE format so the client's SSE parser can process them and deliver to the error handler callback. Previously they were sent as plain JSON/HTTP responses which the SSE parser couldn't handle. Changes: - Move terminal state validation BEFORE queue operations in DefaultRequestHandler - Wrap all immediate errors in SSE format for JSON-RPC routing layer - Wrap all immediate errors in SSE format for REST transport - Add integration test to verify fix across all three transports Applied to HTTP+JSON, JSON-RPC, and gRPC transports. Fixes #767 Co-Authored-By: Claude Sonnet 4.5 --- .../server/apps/quarkus/A2AServerRoutes.java | 22 ++++--- .../DefaultRequestHandler.java | 15 +++-- .../apps/common/AbstractA2AServerTest.java | 60 +++++++++++++++++++ .../jsonrpc/handler/JSONRPCHandler.java | 3 +- .../jsonrpc/handler/JSONRPCHandlerTest.java | 39 ++---------- .../transport/rest/handler/RestHandler.java | 2 + 6 files changed, 95 insertions(+), 46 deletions(-) diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java index 7ca65cb35..917cf8af3 100644 --- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java @@ -439,15 +439,21 @@ private A2AResponse processNonStreamingRequest(NonStreamingJSONRPCRequest */ private Multi> processStreamingRequest( A2ARequest request, ServerCallContext context) { - Flow.Publisher> publisher; - if (request instanceof SendStreamingMessageRequest req) { - publisher = jsonRpcHandler.onMessageSendStream(req, context); - } else if (request instanceof SubscribeToTaskRequest req) { - publisher = jsonRpcHandler.onSubscribeToTask(req, context); - } else { - return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError())); + try { + Flow.Publisher> publisher; + if (request instanceof SendStreamingMessageRequest req) { + publisher = jsonRpcHandler.onMessageSendStream(req, context); + } else if (request instanceof SubscribeToTaskRequest req) { + publisher = jsonRpcHandler.onSubscribeToTask(req, context); + } else { + return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError())); + } + return Multi.createFrom().publisher(publisher); + } catch (A2AError error) { + // For streaming endpoints, wrap immediate errors (like TaskNotFoundError, + // UnsupportedOperationError) in error response and send as first SSE event + return Multi.createFrom().item(generateErrorResponse(request, error)); } - return Multi.createFrom().publisher(publisher); } /** diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 9a480c44f..c702e0666 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -821,16 +821,23 @@ public Flow.Publisher onSubscribeToTask(TaskIdParams params, throw new TaskNotFoundError(); } + // Per A2A spec: subscription to tasks in terminal state (completed, failed, canceled, + // rejected) MUST return UnsupportedOperationError. + // Check BEFORE any queue operations to ensure immediate error response. + if (task.status().state().isFinal()) { + throw new UnsupportedOperationError( + null, + "Cannot subscribe to task " + task.id() + + " - task is in terminal state: " + task.status().state(), + null); + } + TaskManager taskManager = new TaskManager(task.id(), task.contextId(), taskStore, null); ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor); EventQueue queue = queueManager.tap(task.id()); LOGGER.debug("onSubscribeToTask - tapped queue: {}", queue != null ? System.identityHashCode(queue) : "null"); if (queue == null) { - // If task is in final state, queue legitimately doesn't exist anymore - if (task.status().state().isFinal()) { - throw new TaskNotFoundError(); - } // For non-final tasks, recreate the queue so client can receive future events // (Note: historical events from before queue closed are not available) LOGGER.debug("Queue not found for active task {}, creating new queue for future events", task.id()); diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index fc621bdb6..11001f36c 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -1125,6 +1125,66 @@ public void testSubscribeNoExistingTaskError() throws Exception { } } + @Test + public void testSubscribeToTerminalTaskError() throws Exception { + // Create a task in terminal state (COMPLETED) + Task completedTask = Task.builder() + .id("terminal-task-test") + .contextId("session-xyz") + .status(new TaskStatus(TaskState.TASK_STATE_COMPLETED)) + .build(); + saveTaskInTaskStore(completedTask); + + try { + CountDownLatch errorLatch = new CountDownLatch(1); + AtomicReference errorRef = new AtomicReference<>(); + + // Create error handler to capture the UnsupportedOperationError + Consumer errorHandler = error -> { + if (error == null) { + // Stream completed successfully - ignore, we're waiting for an error + return; + } + if (!isStreamClosedError(error)) { + errorRef.set(error); + } + errorLatch.countDown(); + }; + + getClient().subscribeToTask(new TaskIdParams(completedTask.id()), List.of(), errorHandler); + + // Wait for error to be captured + boolean errorReceived = errorLatch.await(10, TimeUnit.SECONDS); + + if (errorReceived) { + // Error came via error handler + Throwable error = errorRef.get(); + assertNotNull(error, "Should receive an error when subscribing to terminal task"); + + // Per spec STREAM-SUB-003: should return UnsupportedOperationError for terminal tasks + if (error instanceof A2AClientException) { + assertInstanceOf(UnsupportedOperationError.class, ((A2AClientException) error).getCause(), + "Error should be UnsupportedOperationError for terminal task"); + } else { + // Check if it's directly an UnsupportedOperationError or walk the cause chain + Throwable cause = error; + boolean foundUnsupportedOperation = false; + while (cause != null && !foundUnsupportedOperation) { + if (cause instanceof UnsupportedOperationError) { + foundUnsupportedOperation = true; + } + cause = cause.getCause(); + } + assertTrue(foundUnsupportedOperation, "Expected UnsupportedOperationError in error chain"); + } + } else { + fail("Expected UnsupportedOperationError when subscribing to terminal task"); + } + } finally { + deleteTaskInTaskStore(completedTask.id()); + } + } + /** * Regression test for race condition where MainQueue closed when first ChildQueue closed, * preventing resubscription. With reference counting, MainQueue stays alive while any diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java index ec419f465..d62ef94c9 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java @@ -389,7 +389,8 @@ public Flow.Publisher onSubscribeToTask( // via Subscriber.onError() rather than as part of the SendStreamingResponse payload return convertToSendStreamingMessageResponse(request.getId(), publisher); } catch (A2AError e) { - return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), e)); + // Let routing layer catch TaskNotFoundError and UnsupportedOperationError to wrap in SSE format + throw e; } catch (Throwable throwable) { return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), new InternalError(throwable.getMessage()))); } diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index 72f94404b..a353e1e2a 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -983,41 +983,14 @@ public void testOnSubscribeNoExistingTaskError() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); SubscribeToTaskRequest request = new SubscribeToTaskRequest("1", new TaskIdParams(MINIMAL_TASK.id())); - Flow.Publisher response = handler.onSubscribeToTask(request, callContext); - - List results = new ArrayList<>(); - AtomicReference error = new AtomicReference<>(); - - response.subscribe(new Flow.Subscriber<>() { - private Flow.Subscription subscription; - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(SendStreamingMessageResponse item) { - results.add(item); - subscription.request(1); - } - - @Override - public void onError(Throwable throwable) { - error.set(throwable); - subscription.cancel(); - } - @Override - public void onComplete() { - subscription.cancel(); - } - }); + // Per spec: TaskNotFoundError should be thrown immediately for non-existent tasks + // The routing layer will catch this and convert to JSON-RPC error response + TaskNotFoundError thrown = Assertions.assertThrows( + TaskNotFoundError.class, + () -> handler.onSubscribeToTask(request, callContext)); - assertEquals(1, results.size()); - assertNull(results.get(0).getResult()); - assertInstanceOf(TaskNotFoundError.class, results.get(0).getError()); + Assertions.assertNotNull(thrown); } @Test diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java index ea16f3aa4..dfac008f5 100644 --- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java @@ -430,6 +430,8 @@ public HTTPRestResponse subscribeToTask(ServerCallContext context, String tenant Flow.Publisher publisher = requestHandler.onSubscribeToTask(params, context); return createStreamingResponse(publisher); } catch (A2AError e) { + // For streaming endpoints: wrap ALL errors (including TaskNotFoundError, UnsupportedOperationError) + // in SSE stream format so client's SSE parser can process them and deliver to error handler return new HTTPRestStreamingResponse(ZeroPublisher.fromItems(new HTTPRestErrorResponse(e).toJson())); } catch (Throwable throwable) { return new HTTPRestStreamingResponse(ZeroPublisher.fromItems(new HTTPRestErrorResponse(new InternalError(throwable.getMessage())).toJson())); From fc35d6707ec2c32a6ff015c677bfcfb6589e899b Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 8 Apr 2026 17:29:55 +0200 Subject: [PATCH 2/2] Update server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../io/a2a/server/requesthandlers/DefaultRequestHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index c702e0666..db63c2fc1 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -827,8 +827,8 @@ public Flow.Publisher onSubscribeToTask(TaskIdParams params, if (task.status().state().isFinal()) { throw new UnsupportedOperationError( null, - "Cannot subscribe to task " + task.id() + - " - task is in terminal state: " + task.status().state(), + String.format("Cannot subscribe to task %s - task is in terminal state: %s", + task.id(), task.status().state()), null); }