Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,21 @@ private A2AResponse<?> processNonStreamingRequest(NonStreamingJSONRPCRequest<?>
*/
private Multi<? extends A2AResponse<?>> processStreamingRequest(
A2ARequest<?> request, ServerCallContext context) {
Flow.Publisher<? extends A2AResponse<?>> publisher;
if (request instanceof SendStreamingMessageRequest req) {
publisher = jsonRpcHandler.onMessageSendStream(req, context);
} else if (request instanceof SubscribeToTaskRequest req) {
publisher = jsonRpcHandler.onSubscribeToTask(req, context);
} else {
return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError()));
try {
Flow.Publisher<? extends A2AResponse<?>> publisher;
if (request instanceof SendStreamingMessageRequest req) {
publisher = jsonRpcHandler.onMessageSendStream(req, context);
} else if (request instanceof SubscribeToTaskRequest req) {
publisher = jsonRpcHandler.onSubscribeToTask(req, context);
} else {
return Multi.createFrom().item(generateErrorResponse(request, new UnsupportedOperationError()));
}
return Multi.createFrom().publisher(publisher);
} catch (A2AError error) {
// For streaming endpoints, wrap immediate errors (like TaskNotFoundError,
// UnsupportedOperationError) in error response and send as first SSE event
return Multi.createFrom().item(generateErrorResponse(request, error));
}
return Multi.createFrom().publisher(publisher);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,16 +821,23 @@ public Flow.Publisher<StreamingEventKind> onSubscribeToTask(TaskIdParams params,
throw new TaskNotFoundError();
}

// Per A2A spec: subscription to tasks in terminal state (completed, failed, canceled,
// rejected) MUST return UnsupportedOperationError.
// Check BEFORE any queue operations to ensure immediate error response.
if (task.status().state().isFinal()) {
throw new UnsupportedOperationError(
null,
String.format("Cannot subscribe to task %s - task is in terminal state: %s",
task.id(), task.status().state()),
null);
}

TaskManager taskManager = new TaskManager(task.id(), task.contextId(), taskStore, null);
ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor);
EventQueue queue = queueManager.tap(task.id());
LOGGER.debug("onSubscribeToTask - tapped queue: {}", queue != null ? System.identityHashCode(queue) : "null");

if (queue == null) {
// If task is in final state, queue legitimately doesn't exist anymore
if (task.status().state().isFinal()) {
throw new TaskNotFoundError();
}
// For non-final tasks, recreate the queue so client can receive future events
// (Note: historical events from before queue closed are not available)
LOGGER.debug("Queue not found for active task {}, creating new queue for future events", task.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,66 @@ public void testSubscribeNoExistingTaskError() throws Exception {
}
}

@Test
public void testSubscribeToTerminalTaskError() throws Exception {
// Create a task in terminal state (COMPLETED)
Task completedTask = Task.builder()
.id("terminal-task-test")
.contextId("session-xyz")
.status(new TaskStatus(TaskState.TASK_STATE_COMPLETED))
.build();
saveTaskInTaskStore(completedTask);

try {
CountDownLatch errorLatch = new CountDownLatch(1);
AtomicReference<Throwable> errorRef = new AtomicReference<>();

// Create error handler to capture the UnsupportedOperationError
Consumer<Throwable> errorHandler = error -> {
if (error == null) {
// Stream completed successfully - ignore, we're waiting for an error
return;
}
if (!isStreamClosedError(error)) {
errorRef.set(error);
}
errorLatch.countDown();
};

getClient().subscribeToTask(new TaskIdParams(completedTask.id()), List.of(), errorHandler);

// Wait for error to be captured
boolean errorReceived = errorLatch.await(10, TimeUnit.SECONDS);

if (errorReceived) {
// Error came via error handler
Throwable error = errorRef.get();
assertNotNull(error, "Should receive an error when subscribing to terminal task");

// Per spec STREAM-SUB-003: should return UnsupportedOperationError for terminal tasks
if (error instanceof A2AClientException) {
assertInstanceOf(UnsupportedOperationError.class, ((A2AClientException) error).getCause(),
"Error should be UnsupportedOperationError for terminal task");
} else {
// Check if it's directly an UnsupportedOperationError or walk the cause chain
Throwable cause = error;
boolean foundUnsupportedOperation = false;
while (cause != null && !foundUnsupportedOperation) {
if (cause instanceof UnsupportedOperationError) {
foundUnsupportedOperation = true;
}
cause = cause.getCause();
}
assertTrue(foundUnsupportedOperation, "Expected UnsupportedOperationError in error chain");
}
} else {
fail("Expected UnsupportedOperationError when subscribing to terminal task");
}
} finally {
deleteTaskInTaskStore(completedTask.id());
}
}

/**
* Regression test for race condition where MainQueue closed when first ChildQueue closed,
* preventing resubscription. With reference counting, MainQueue stays alive while any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ public Flow.Publisher<SendStreamingMessageResponse> onSubscribeToTask(
// via Subscriber.onError() rather than as part of the SendStreamingResponse payload
return convertToSendStreamingMessageResponse(request.getId(), publisher);
} catch (A2AError e) {
return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), e));
// Let routing layer catch TaskNotFoundError and UnsupportedOperationError to wrap in SSE format
throw e;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is a bit agressive if there is an internal error or something else

} catch (Throwable throwable) {
return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), new InternalError(throwable.getMessage())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,41 +983,14 @@ public void testOnSubscribeNoExistingTaskError() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);

SubscribeToTaskRequest request = new SubscribeToTaskRequest("1", new TaskIdParams(MINIMAL_TASK.id()));
Flow.Publisher<SendStreamingMessageResponse> response = handler.onSubscribeToTask(request, callContext);

List<SendStreamingMessageResponse> results = new ArrayList<>();
AtomicReference<Throwable> error = new AtomicReference<>();

response.subscribe(new Flow.Subscriber<>() {
private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(SendStreamingMessageResponse item) {
results.add(item);
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
error.set(throwable);
subscription.cancel();
}

@Override
public void onComplete() {
subscription.cancel();
}
});
// Per spec: TaskNotFoundError should be thrown immediately for non-existent tasks
// The routing layer will catch this and convert to JSON-RPC error response
TaskNotFoundError thrown = Assertions.assertThrows(
TaskNotFoundError.class,
() -> handler.onSubscribeToTask(request, callContext));

assertEquals(1, results.size());
assertNull(results.get(0).getResult());
assertInstanceOf(TaskNotFoundError.class, results.get(0).getError());
Assertions.assertNotNull(thrown);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ public HTTPRestResponse subscribeToTask(ServerCallContext context, String tenant
Flow.Publisher<StreamingEventKind> publisher = requestHandler.onSubscribeToTask(params, context);
return createStreamingResponse(publisher);
} catch (A2AError e) {
// For streaming endpoints: wrap ALL errors (including TaskNotFoundError, UnsupportedOperationError)
// in SSE stream format so client's SSE parser can process them and deliver to error handler
return new HTTPRestStreamingResponse(ZeroPublisher.fromItems(new HTTPRestErrorResponse(e).toJson()));
} catch (Throwable throwable) {
return new HTTPRestStreamingResponse(ZeroPublisher.fromItems(new HTTPRestErrorResponse(new InternalError(throwable.getMessage())).toJson()));
Expand Down
Loading