diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java index 7ca65cb35..917cf8af3 100644 --- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java @@ -439,15 +439,21 @@ private A2AResponse processNonStreamingRequest(NonStreamingJSONRPCRequest */ private Multi> processStreamingRequest( A2ARequest request, ServerCallContext context) { - Flow.Publisher> publisher; - if (request instanceof SendStreamingMessageRequest req) { - publisher = jsonRpcHandler.onMessageSendStream(req, context); - } else if (request instanceof SubscribeToTaskRequest req) { - publisher = jsonRpcHandler.onSubscribeToTask(req, context); - } else { - return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError())); + try { + Flow.Publisher> publisher; + if (request instanceof SendStreamingMessageRequest req) { + publisher = jsonRpcHandler.onMessageSendStream(req, context); + } else if (request instanceof SubscribeToTaskRequest req) { + publisher = jsonRpcHandler.onSubscribeToTask(req, context); + } else { + return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError())); + } + return Multi.createFrom().publisher(publisher); + } catch (A2AError error) { + // For streaming endpoints, wrap immediate errors (like TaskNotFoundError, + // UnsupportedOperationError) in error response and send as first SSE event + return Multi.createFrom().item(generateErrorResponse(request, error)); } - return Multi.createFrom().publisher(publisher); } /** diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 9a480c44f..db63c2fc1 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -821,16 +821,23 @@ public Flow.Publisher onSubscribeToTask(TaskIdParams params, throw new TaskNotFoundError(); } + // Per A2A spec: subscription to tasks in terminal state (completed, failed, canceled, + // rejected) MUST return UnsupportedOperationError. + // Check BEFORE any queue operations to ensure immediate error response. + if (task.status().state().isFinal()) { + throw new UnsupportedOperationError( + null, + String.format("Cannot subscribe to task %s - task is in terminal state: %s", + task.id(), task.status().state()), + null); + } + TaskManager taskManager = new TaskManager(task.id(), task.contextId(), taskStore, null); ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor); EventQueue queue = queueManager.tap(task.id()); LOGGER.debug("onSubscribeToTask - tapped queue: {}", queue != null ? System.identityHashCode(queue) : "null"); if (queue == null) { - // If task is in final state, queue legitimately doesn't exist anymore - if (task.status().state().isFinal()) { - throw new TaskNotFoundError(); - } // For non-final tasks, recreate the queue so client can receive future events // (Note: historical events from before queue closed are not available) LOGGER.debug("Queue not found for active task {}, creating new queue for future events", task.id()); diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index fc621bdb6..11001f36c 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -1125,6 +1125,66 @@ public void testSubscribeNoExistingTaskError() throws Exception { } } + @Test + public void testSubscribeToTerminalTaskError() throws Exception { + // Create a task in terminal state (COMPLETED) + Task completedTask = Task.builder() + .id("terminal-task-test") + .contextId("session-xyz") + .status(new TaskStatus(TaskState.TASK_STATE_COMPLETED)) + .build(); + saveTaskInTaskStore(completedTask); + + try { + CountDownLatch errorLatch = new CountDownLatch(1); + AtomicReference errorRef = new AtomicReference<>(); + + // Create error handler to capture the UnsupportedOperationError + Consumer errorHandler = error -> { + if (error == null) { + // Stream completed successfully - ignore, we're waiting for an error + return; + } + if (!isStreamClosedError(error)) { + errorRef.set(error); + } + errorLatch.countDown(); + }; + + getClient().subscribeToTask(new TaskIdParams(completedTask.id()), List.of(), errorHandler); + + // Wait for error to be captured + boolean errorReceived = errorLatch.await(10, TimeUnit.SECONDS); + + if (errorReceived) { + // Error came via error handler + Throwable error = errorRef.get(); + assertNotNull(error, "Should receive an error when subscribing to terminal task"); + + // Per spec STREAM-SUB-003: should return UnsupportedOperationError for terminal tasks + if (error instanceof A2AClientException) { + assertInstanceOf(UnsupportedOperationError.class, ((A2AClientException) error).getCause(), + "Error should be UnsupportedOperationError for terminal task"); + } else { + // Check if it's directly an UnsupportedOperationError or walk the cause chain + Throwable cause = error; + boolean foundUnsupportedOperation = false; + while (cause != null && !foundUnsupportedOperation) { + if (cause instanceof UnsupportedOperationError) { + foundUnsupportedOperation = true; + } + cause = cause.getCause(); + } + assertTrue(foundUnsupportedOperation, "Expected UnsupportedOperationError in error chain"); + } + } else { + fail("Expected UnsupportedOperationError when subscribing to terminal task"); + } + } finally { + deleteTaskInTaskStore(completedTask.id()); + } + } + /** * Regression test for race condition where MainQueue closed when first ChildQueue closed, * preventing resubscription. With reference counting, MainQueue stays alive while any diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java index ec419f465..d62ef94c9 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java @@ -389,7 +389,8 @@ public Flow.Publisher onSubscribeToTask( // via Subscriber.onError() rather than as part of the SendStreamingResponse payload return convertToSendStreamingMessageResponse(request.getId(), publisher); } catch (A2AError e) { - return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), e)); + // Let routing layer catch TaskNotFoundError and UnsupportedOperationError to wrap in SSE format + throw e; } catch (Throwable throwable) { return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), new InternalError(throwable.getMessage()))); } diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index 72f94404b..a353e1e2a 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -983,41 +983,14 @@ public void testOnSubscribeNoExistingTaskError() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); SubscribeToTaskRequest request = new SubscribeToTaskRequest("1", new TaskIdParams(MINIMAL_TASK.id())); - Flow.Publisher response = handler.onSubscribeToTask(request, callContext); - - List results = new ArrayList<>(); - AtomicReference error = new AtomicReference<>(); - - response.subscribe(new Flow.Subscriber<>() { - private Flow.Subscription subscription; - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(SendStreamingMessageResponse item) { - results.add(item); - subscription.request(1); - } - - @Override - public void onError(Throwable throwable) { - error.set(throwable); - subscription.cancel(); - } - @Override - public void onComplete() { - subscription.cancel(); - } - }); + // Per spec: TaskNotFoundError should be thrown immediately for non-existent tasks + // The routing layer will catch this and convert to JSON-RPC error response + TaskNotFoundError thrown = Assertions.assertThrows( + TaskNotFoundError.class, + () -> handler.onSubscribeToTask(request, callContext)); - assertEquals(1, results.size()); - assertNull(results.get(0).getResult()); - assertInstanceOf(TaskNotFoundError.class, results.get(0).getError()); + Assertions.assertNotNull(thrown); } @Test diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java index ea16f3aa4..dfac008f5 100644 --- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java @@ -430,6 +430,8 @@ public HTTPRestResponse subscribeToTask(ServerCallContext context, String tenant Flow.Publisher publisher = requestHandler.onSubscribeToTask(params, context); return createStreamingResponse(publisher); } catch (A2AError e) { + // For streaming endpoints: wrap ALL errors (including TaskNotFoundError, UnsupportedOperationError) + // in SSE stream format so client's SSE parser can process them and deliver to error handler return new HTTPRestStreamingResponse(ZeroPublisher.fromItems(new HTTPRestErrorResponse(e).toJson())); } catch (Throwable throwable) { return new HTTPRestStreamingResponse(ZeroPublisher.fromItems(new HTTPRestErrorResponse(new InternalError(throwable.getMessage())).toJson()));