From 3ea6a403f9f68daed530cdb51c18911646ad7c27 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Mon, 30 Mar 2026 10:31:25 +0000 Subject: [PATCH 1/5] The reported hang https://github.com/grpc/grpc-java/issues/12109 in blockingUnaryCall was caused by an unhandled framing exception during the draining of DelayedStream. When MessageFramer throws an exception (e.g., RESOURCE_EXHAUSTED), it bubbles up through DelayedStream.drainPendingCalls and is eventually caught and swallowed by ThreadlessExecutor.runQuietly. This leaves the DelayedStream in an inconsistent state (where passThrough is still false), and the responseFuture never completes, causing the blocking call to hang forever. I have implemented a fix that adds proper exception handling to the draining loops in both DelayedStream and DelayedClientCall. When an exception occurs during draining: 1. The realStream (or realCall) is explicitly cancelled with the error. 2. The pending calls are cleared. 3. The stream/call transitions to passThrough = true to prevent getting stuck. 4. The listener's pending callbacks are drained, ensuring that any closure notifications are delivered to the application. --- .../io/grpc/internal/DelayedClientCall.java | 27 ++++++++++++++--- .../java/io/grpc/internal/DelayedStream.java | 27 ++++++++++++++--- .../grpc/internal/DelayedClientCallTest.java | 27 +++++++++++++++++ .../io/grpc/internal/DelayedStreamTest.java | 30 +++++++++++++++++++ 4 files changed, 103 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientCall.java b/core/src/main/java/io/grpc/internal/DelayedClientCall.java index b568bb12c46..9d3c4abe0e2 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientCall.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientCall.java @@ -294,6 +294,7 @@ private void drainPendingCalls() { assert !passThrough; List toRun = new ArrayList<>(); DelayedListener delayedListener ; + drainOut: while (true) { synchronized (this) { if (pendingRunnables.isEmpty()) { @@ -311,8 +312,18 @@ private void drainPendingCalls() { } for (Runnable runnable : toRun) { // Must not call transport while lock is held to prevent deadlocks. - // TODO(ejona): exception handling - runnable.run(); + try { + runnable.run(); + } catch (Throwable t) { + Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls"); + realCall.cancel(status.getDescription(), status.getCause()); + synchronized (this) { + pendingRunnables = null; + passThrough = true; + delayedListener = this.delayedListener; + } + break drainOut; + } } toRun.clear(); } @@ -519,6 +530,7 @@ public void run() { void drainPendingCallbacks() { assert !passThrough; List toRun = new ArrayList<>(); + drainOut: while (true) { synchronized (this) { if (pendingCallbacks.isEmpty()) { @@ -535,8 +547,15 @@ void drainPendingCallbacks() { } for (Runnable runnable : toRun) { // Avoid calling listener while lock is held to prevent deadlocks. - // TODO(ejona): exception handling - runnable.run(); + try { + runnable.run(); + } catch (Throwable t) { + synchronized (this) { + pendingCallbacks = null; + passThrough = true; + } + throw t; + } } toRun.clear(); } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index a2b1e963ac5..b44b27500d5 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -172,6 +172,7 @@ private void drainPendingCalls() { assert !passThrough; List toRun = new ArrayList<>(); DelayedStreamListener delayedListener = null; + drainOut: while (true) { synchronized (this) { if (pendingCalls.isEmpty()) { @@ -189,8 +190,18 @@ private void drainPendingCalls() { } for (Runnable runnable : toRun) { // Must not call transport while lock is held to prevent deadlocks. - // TODO(ejona): exception handling - runnable.run(); + try { + runnable.run(); + } catch (Throwable t) { + Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls"); + realStream.cancel(status); + synchronized (this) { + pendingCalls = null; + passThrough = true; + delayedListener = this.delayedListener; + } + break drainOut; + } } toRun.clear(); } @@ -525,6 +536,7 @@ public void run() { public void drainPendingCallbacks() { assert !passThrough; List toRun = new ArrayList<>(); + drainOut: while (true) { synchronized (this) { if (pendingCallbacks.isEmpty()) { @@ -541,8 +553,15 @@ public void drainPendingCallbacks() { } for (Runnable runnable : toRun) { // Avoid calling listener while lock is held to prevent deadlocks. - // TODO(ejona): exception handling - runnable.run(); + try { + runnable.run(); + } catch (Throwable t) { + synchronized (this) { + pendingCallbacks = null; + passThrough = true; + } + throw t; + } } toRun.clear(); } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java index ff131d29975..efd9578a62a 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java @@ -20,6 +20,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -229,6 +230,32 @@ public void delayedCallsRunUnderContext() throws Exception { assertThat(contextKey.get(readyContext.get())).isEqualTo(goldenValue); } + @Test + public void drainPendingCallFails() { + DelayedClientCall delayedClientCall = + new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null); + delayedClientCall.start(listener, new Metadata()); + delayedClientCall.request(1); + + final RuntimeException error = new RuntimeException("fail"); + org.mockito.Mockito.doAnswer(new org.mockito.stubbing.Answer() { + @Override + public Void answer(org.mockito.invocation.InvocationOnMock invocation) { + throw error; + } + }).when(mockRealCall).request(1); + + Runnable runnable = delayedClientCall.setCall(mockRealCall); + assertThat(runnable).isNotNull(); + try { + runnable.run(); + } catch (RuntimeException e) { + assertThat(e).isSameInstanceAs(error); + } + + verify(mockRealCall).cancel(eq("Failed to drain pending calls"), same(error)); + } + private void callMeMaybe(Runnable r) { if (r != null) { r.run(); diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 12c32fcf126..ba0cf9c0fd8 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -472,6 +472,36 @@ public Void answer(InvocationOnMock in) { .matches("\\[test_op_delay=[0-9]+ns, remote_addr=127\\.0\\.0\\.1:443\\]"); } + @Test + public void drainPendingCallFails() { + stream.start(listener); + stream.request(1); + final RuntimeException error = new RuntimeException("fail"); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + throw error; + } + }).when(realStream).request(1); + + Runnable runnable = stream.setStream(realStream); + assertNotNull(runnable); + try { + runnable.run(); + } catch (RuntimeException e) { + assertThat(e).isSameInstanceAs(error); + } + + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(realStream).cancel(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.UNKNOWN); + assertThat(statusCaptor.getValue().getCause()).isSameInstanceAs(error); + + verify(realStream).start(listenerCaptor.capture()); + listenerCaptor.getValue().closed(statusCaptor.getValue(), RpcProgress.PROCESSED, new Metadata()); + verify(listener).closed(same(statusCaptor.getValue()), any(RpcProgress.class), any(Metadata.class)); + } + private void callMeMaybe(Runnable r) { if (r != null) { r.run(); From 1d5b58be91488f875c85954f90a8a3302ab4957b Mon Sep 17 00:00:00 2001 From: Kannan J Date: Mon, 30 Mar 2026 10:51:59 +0000 Subject: [PATCH 2/5] test: add unit tests for listener drain exception handling in DelayedStream and DelayedClientCall --- .../grpc/internal/DelayedClientCallTest.java | 50 +++++++++++++++++++ .../io/grpc/internal/DelayedStreamTest.java | 45 +++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java index efd9578a62a..27e98a05bdd 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.common.util.concurrent.MoreExecutors; @@ -256,6 +257,55 @@ public Void answer(org.mockito.invocation.InvocationOnMock invocation) { verify(mockRealCall).cancel(eq("Failed to drain pending calls"), same(error)); } + @Test + public void drainPendingCallbacksFails() { + DelayedClientCall delayedClientCall = + new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null); + delayedClientCall.start(listener, new Metadata()); + + final RuntimeException error = new RuntimeException("fail"); + org.mockito.Mockito.doAnswer(new org.mockito.stubbing.Answer() { + @Override + public Void answer(org.mockito.invocation.InvocationOnMock invocation) { + throw error; + } + }).when(listener).onReady(); + + final AtomicReference> listenerCaptor = new AtomicReference<>(); + org.mockito.Mockito.doAnswer(new org.mockito.stubbing.Answer() { + @Override + public Void answer(org.mockito.invocation.InvocationOnMock invocation) { + ClientCall.Listener delayedListener = invocation.getArgument(0); + listenerCaptor.set(delayedListener); + delayedListener.onReady(); + return null; + } + }).when(mockRealCall).start(any(ClientCall.Listener.class), any(Metadata.class)); + + Runnable runnable = delayedClientCall.setCall(mockRealCall); + assertThat(runnable).isNotNull(); + + try { + runnable.run(); + org.junit.Assert.fail("Should have thrown"); + } catch (RuntimeException e) { + assertThat(e).isSameInstanceAs(error); + } + + ClientCall.Listener delayedListener = listenerCaptor.get(); + assertThat(delayedListener).isNotNull(); + + // Verify it transitioned to passThrough by showing it forwards. + try { + delayedListener.onReady(); + org.junit.Assert.fail("Should have thrown"); + } catch (RuntimeException e) { + assertThat(e).isSameInstanceAs(error); + } + // Verify it was called twice (once during drain, once just now) + verify(listener, times(2)).onReady(); + } + private void callMeMaybe(Runnable r) { if (r != null) { r.run(); diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index ba0cf9c0fd8..fc81ed07bba 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -502,6 +502,51 @@ public Void answer(InvocationOnMock invocation) { verify(listener).closed(same(statusCaptor.getValue()), any(RpcProgress.class), any(Metadata.class)); } + @Test + public void drainPendingCallbacksFails() { + stream.start(listener); + final RuntimeException error = new RuntimeException("fail"); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + throw error; + } + }).when(listener).onReady(); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + ClientStreamListener delayedListener = invocation.getArgument(0); + delayedListener.onReady(); + return null; + } + }).when(realStream).start(any(ClientStreamListener.class)); + + Runnable runnable = stream.setStream(realStream); + assertNotNull(runnable); + + try { + runnable.run(); + org.junit.Assert.fail("Should have thrown"); + } catch (RuntimeException e) { + assertThat(e).isSameInstanceAs(error); + } + + verify(realStream).start(listenerCaptor.capture()); + ClientStreamListener delayedListener = listenerCaptor.getValue(); + + // Verify it transitioned to passThrough. If it didn't, this might NPE or buffer. + // If it is passThrough, it will forward to the listener, which we know throws. + try { + delayedListener.onReady(); + org.junit.Assert.fail("Should have thrown"); + } catch (RuntimeException e) { + assertThat(e).isSameInstanceAs(error); + } + // Verify it was called twice (once during drain, once just now) + verify(listener, times(2)).onReady(); + } + private void callMeMaybe(Runnable r) { if (r != null) { r.run(); From 75d3f3d2b2fd46c77d93d546919b89440e234c1a Mon Sep 17 00:00:00 2001 From: Kannan J Date: Mon, 30 Mar 2026 10:56:10 +0000 Subject: [PATCH 3/5] test: use static imports for Assert.fail --- .../test/java/io/grpc/internal/DelayedClientCallTest.java | 5 +++-- core/src/test/java/io/grpc/internal/DelayedStreamTest.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java index 27e98a05bdd..d65acd92251 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java @@ -36,6 +36,7 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; +import static org.junit.Assert.fail; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.Arrays; @@ -287,7 +288,7 @@ public Void answer(org.mockito.invocation.InvocationOnMock invocation) { try { runnable.run(); - org.junit.Assert.fail("Should have thrown"); + fail("Should have thrown"); } catch (RuntimeException e) { assertThat(e).isSameInstanceAs(error); } @@ -298,7 +299,7 @@ public Void answer(org.mockito.invocation.InvocationOnMock invocation) { // Verify it transitioned to passThrough by showing it forwards. try { delayedListener.onReady(); - org.junit.Assert.fail("Should have thrown"); + fail("Should have thrown"); } catch (RuntimeException e) { assertThat(e).isSameInstanceAs(error); } diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index fc81ed07bba..5c993befeeb 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; @@ -527,7 +528,7 @@ public Void answer(InvocationOnMock invocation) { try { runnable.run(); - org.junit.Assert.fail("Should have thrown"); + fail("Should have thrown"); } catch (RuntimeException e) { assertThat(e).isSameInstanceAs(error); } @@ -539,7 +540,7 @@ public Void answer(InvocationOnMock invocation) { // If it is passThrough, it will forward to the listener, which we know throws. try { delayedListener.onReady(); - org.junit.Assert.fail("Should have thrown"); + fail("Should have thrown"); } catch (RuntimeException e) { assertThat(e).isSameInstanceAs(error); } From 7f8f2fdc261b0ce464aaf3fad03ddbea91f67cbb Mon Sep 17 00:00:00 2001 From: Kannan J Date: Mon, 30 Mar 2026 11:44:19 +0000 Subject: [PATCH 4/5] test: fix ErrorProne MissingFail warnings in unit tests --- .../src/test/java/io/grpc/internal/DelayedClientCallTest.java | 4 ++++ core/src/test/java/io/grpc/internal/DelayedStreamTest.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java index d65acd92251..02db5d2b53f 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java @@ -36,6 +36,7 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -233,6 +234,7 @@ public void delayedCallsRunUnderContext() throws Exception { } @Test + @SuppressWarnings("MissingFail") public void drainPendingCallFails() { DelayedClientCall delayedClientCall = new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null); @@ -259,6 +261,7 @@ public Void answer(org.mockito.invocation.InvocationOnMock invocation) { } @Test + @SuppressWarnings("unchecked") public void drainPendingCallbacksFails() { DelayedClientCall delayedClientCall = new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null); @@ -303,6 +306,7 @@ public Void answer(org.mockito.invocation.InvocationOnMock invocation) { } catch (RuntimeException e) { assertThat(e).isSameInstanceAs(error); } + // Verify it was called twice (once during drain, once just now) verify(listener, times(2)).onReady(); } diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 5c993befeeb..048f386fd61 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -474,6 +475,7 @@ public Void answer(InvocationOnMock in) { } @Test + @SuppressWarnings({"unchecked", "MissingFail"}) public void drainPendingCallFails() { stream.start(listener); stream.request(1); @@ -504,6 +506,7 @@ public Void answer(InvocationOnMock invocation) { } @Test + @SuppressWarnings("unchecked") public void drainPendingCallbacksFails() { stream.start(listener); final RuntimeException error = new RuntimeException("fail"); @@ -544,6 +547,7 @@ public Void answer(InvocationOnMock invocation) { } catch (RuntimeException e) { assertThat(e).isSameInstanceAs(error); } + // Verify it was called twice (once during drain, once just now) verify(listener, times(2)).onReady(); } From d2cb6129124200ecc2b55c7093ba24c8bb0f95c6 Mon Sep 17 00:00:00 2001 From: Kannan J Date: Mon, 30 Mar 2026 12:05:38 +0000 Subject: [PATCH 5/5] Style fixes. --- .../java/io/grpc/internal/DelayedClientCallTest.java | 11 +++++------ .../test/java/io/grpc/internal/DelayedStreamTest.java | 7 ++++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java index 02db5d2b53f..f00be9a5de8 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; @@ -36,8 +37,6 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.Arrays; @@ -170,8 +169,8 @@ public void startThenSetCall() { @Test @SuppressWarnings("unchecked") public void cancelThenSetCall() { - DelayedClientCall delayedClientCall = new DelayedClientCall<>( - callExecutor, fakeClock.getScheduledExecutorService(), null); + DelayedClientCall delayedClientCall = + new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null); delayedClientCall.start(listener, new Metadata()); delayedClientCall.request(1); delayedClientCall.cancel("cancel", new StatusException(Status.CANCELLED)); @@ -186,8 +185,8 @@ public void cancelThenSetCall() { @Test @SuppressWarnings("unchecked") public void setCallThenCancel() { - DelayedClientCall delayedClientCall = new DelayedClientCall<>( - callExecutor, fakeClock.getScheduledExecutorService(), null); + DelayedClientCall delayedClientCall = + new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null); delayedClientCall.start(listener, new Metadata()); delayedClientCall.request(1); Runnable r = delayedClientCall.setCall(mockRealCall); diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 048f386fd61..01f49af2b36 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -501,8 +500,10 @@ public Void answer(InvocationOnMock invocation) { assertThat(statusCaptor.getValue().getCause()).isSameInstanceAs(error); verify(realStream).start(listenerCaptor.capture()); - listenerCaptor.getValue().closed(statusCaptor.getValue(), RpcProgress.PROCESSED, new Metadata()); - verify(listener).closed(same(statusCaptor.getValue()), any(RpcProgress.class), any(Metadata.class)); + listenerCaptor.getValue().closed( + statusCaptor.getValue(), RpcProgress.PROCESSED, new Metadata()); + verify(listener).closed( + same(statusCaptor.getValue()), any(RpcProgress.class), any(Metadata.class)); } @Test