diff --git a/inprocess/src/test/java/io/grpc/inprocess/MirroringInterceptorTest.java b/inprocess/src/test/java/io/grpc/inprocess/MirroringInterceptorTest.java index d81a5aa6904..6fbfae37b6f 100644 --- a/inprocess/src/test/java/io/grpc/inprocess/MirroringInterceptorTest.java +++ b/inprocess/src/test/java/io/grpc/inprocess/MirroringInterceptorTest.java @@ -16,7 +16,9 @@ package io.grpc.inprocess; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import io.grpc.CallOptions; import io.grpc.Channel; @@ -32,8 +34,10 @@ import io.grpc.util.MirroringInterceptor; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Rule; import org.junit.Test; @@ -61,6 +65,33 @@ public String parse(java.io.InputStream stream) { .setResponseMarshaller(MARSHALLER) .build(); + // ─── Helper to build a simple auto-closing server ─────────────────────────── + + private String buildAutoCloseServer(CountDownLatch latch, AtomicBoolean headerVerified, + Metadata.Key key, String expectedValue) throws Exception { + String name = InProcessServerBuilder.generateName(); + grpcCleanup.register( + InProcessServerBuilder.forName(name) + .directExecutor() + .addService( + ServerServiceDefinition.builder("test") + .addMethod(method, (call, headers) -> { + if (key != null && expectedValue.equals(headers.get(key))) { + headerVerified.set(true); + } + if (latch != null) latch.countDown(); + call.sendHeaders(new Metadata()); + call.close(Status.OK, new Metadata()); + return new ServerCall.Listener() {}; + }) + .build()) + .build() + .start()); + return name; + } + + // ─── Test 1: Unary call is mirrored with headers ──────────────────────────── + @Test public void unaryCallIsMirroredWithHeaders() throws Exception { CountDownLatch mirrorLatch = new CountDownLatch(1); @@ -68,73 +99,204 @@ public void unaryCallIsMirroredWithHeaders() throws Exception { Metadata.Key.of("test-header", Metadata.ASCII_STRING_MARSHALLER); AtomicBoolean mirrorHeaderVerified = new AtomicBoolean(false); - // 1. Setup Mirror Server - IMPORTANT: It must CLOSE the call + String mirrorName = buildAutoCloseServer(mirrorLatch, mirrorHeaderVerified, + testKey, "shadow-value"); + String primaryName = buildAutoCloseServer(null, null, null, ""); + + ManagedChannel mirrorChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build()); + ManagedChannel primaryChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build()); + + Channel interceptedChannel = ClientInterceptors.intercept( + primaryChannel, new MirroringInterceptor(mirrorChannel, Runnable::run)); + + Metadata headers = new Metadata(); + headers.put(testKey, "shadow-value"); + + ClientCall call = interceptedChannel.newCall(method, CallOptions.DEFAULT); + call.start(new ClientCall.Listener() {}, headers); + call.sendMessage("hello"); + call.halfClose(); + + assertTrue("Mirror server was not reached", mirrorLatch.await(1, TimeUnit.SECONDS)); + assertTrue("Headers not mirrored", mirrorHeaderVerified.get()); + } + + // ─── Test 2: Cancel is propagated to mirror ────────────────────────────────── + + @Test + public void cancelIsPropagatedToMirror() throws Exception { + CountDownLatch mirrorStartLatch = new CountDownLatch(1); + AtomicBoolean mirrorCancelSeen = new AtomicBoolean(false); + String mirrorName = InProcessServerBuilder.generateName(); grpcCleanup.register( InProcessServerBuilder.forName(mirrorName) .directExecutor() .addService( ServerServiceDefinition.builder("test") - .addMethod( - method, - (call, headers) -> { - if ("shadow-value".equals(headers.get(testKey))) { - mirrorHeaderVerified.set(true); - } - mirrorLatch.countDown(); - - // CRITICAL: Close the call so the channel can shut down - call.sendHeaders(new Metadata()); - call.close(Status.OK, new Metadata()); - return new ServerCall.Listener() {}; - }) + .addMethod(method, (call, headers) -> { + mirrorStartLatch.countDown(); + call.sendHeaders(new Metadata()); + return new ServerCall.Listener() { + @Override + public void onCancel() { + mirrorCancelSeen.set(true); + } + }; + }) .build()) .build() .start()); - // 2. Setup Primary Server - Also must CLOSE the call - String primaryName = InProcessServerBuilder.generateName(); + String primaryName = buildAutoCloseServer(null, null, null, ""); + + ManagedChannel mirrorChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build()); + ManagedChannel primaryChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build()); + + Channel interceptedChannel = ClientInterceptors.intercept( + primaryChannel, new MirroringInterceptor(mirrorChannel, Runnable::run)); + + ClientCall call = + interceptedChannel.newCall(method, CallOptions.DEFAULT); + call.start(new ClientCall.Listener() {}, new Metadata()); + + assertTrue("Mirror call never started", mirrorStartLatch.await(1, TimeUnit.SECONDS)); + + // Now cancel — should propagate to mirror + call.cancel("test cancel", null); + + // Give mirror time to process cancel + Thread.sleep(200); + assertTrue("Cancel was not propagated to mirror", mirrorCancelSeen.get()); + } + + // ─── Test 3: Multiple messages are all mirrored ────────────────────────────── + + @Test + public void multipleMessagesAreMirrored() throws Exception { + AtomicInteger mirrorMessageCount = new AtomicInteger(0); + CountDownLatch halfCloseLatch = new CountDownLatch(1); + + String mirrorName = InProcessServerBuilder.generateName(); grpcCleanup.register( - InProcessServerBuilder.forName(primaryName) + InProcessServerBuilder.forName(mirrorName) .directExecutor() .addService( ServerServiceDefinition.builder("test") - .addMethod( - method, - (call, headers) -> { - call.sendHeaders(new Metadata()); + .addMethod(method, (call, headers) -> { + call.sendHeaders(new Metadata()); + call.request(10); + return new ServerCall.Listener() { + @Override + public void onMessage(String message) { + mirrorMessageCount.incrementAndGet(); + } + + @Override + public void onHalfClose() { + halfCloseLatch.countDown(); call.close(Status.OK, new Metadata()); - return new ServerCall.Listener() {}; - }) + } + }; + }) .build()) .build() .start()); + String primaryName = buildAutoCloseServer(null, null, null, ""); + ManagedChannel mirrorChannel = grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build()); ManagedChannel primaryChannel = grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build()); - // Use direct executor to keep the mirror call on the same thread - java.util.concurrent.Executor directExecutor = Runnable::run; + Channel interceptedChannel = ClientInterceptors.intercept( + primaryChannel, new MirroringInterceptor(mirrorChannel, Runnable::run)); + + ClientCall call = + interceptedChannel.newCall(method, CallOptions.DEFAULT); + call.start(new ClientCall.Listener() {}, new Metadata()); + call.request(1); + call.sendMessage("msg1"); + call.sendMessage("msg2"); + call.sendMessage("msg3"); + call.halfClose(); - Channel interceptedChannel = - ClientInterceptors.intercept( - primaryChannel, new MirroringInterceptor(mirrorChannel, directExecutor)); + assertTrue("Mirror halfClose never received", halfCloseLatch.await(1, TimeUnit.SECONDS)); + assertTrue("Expected 3 mirrored messages, got: " + mirrorMessageCount.get(), + mirrorMessageCount.get() >= 3); + } - // 3. Trigger call with Metadata - Metadata headers = new Metadata(); - headers.put(testKey, "shadow-value"); + // ─── Test 4: Null mirrorChannel throws NullPointerException ───────────────── - ClientCall call = interceptedChannel.newCall(method, CallOptions.DEFAULT); - call.start(new ClientCall.Listener() {}, headers); + @Test + public void nullMirrorChannelThrowsException() { + try { + new MirroringInterceptor(null, Runnable::run); + fail("Expected NullPointerException for null mirrorChannel"); + } catch (NullPointerException e) { + assertNotNull(e); + } + } + + // ─── Test 5: Null executor throws NullPointerException ────────────────────── + + @Test + public void nullExecutorThrowsException() throws Exception { + String mirrorName = buildAutoCloseServer(null, null, null, ""); + ManagedChannel mirrorChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(mirrorName).build()); + try { + new MirroringInterceptor(mirrorChannel, null); + fail("Expected NullPointerException for null executor"); + } catch (NullPointerException e) { + assertNotNull(e); + } + } + + // ─── Test 6: Mirror call failure is handled silently ──────────────────────── + + @Test + public void mirrorCallFailureDoesNotAffectPrimary() throws Exception { + CountDownLatch primaryLatch = new CountDownLatch(1); + + String primaryName = InProcessServerBuilder.generateName(); + grpcCleanup.register( + InProcessServerBuilder.forName(primaryName) + .directExecutor() + .addService( + ServerServiceDefinition.builder("test") + .addMethod(method, (call, headers) -> { + primaryLatch.countDown(); + call.sendHeaders(new Metadata()); + call.close(Status.OK, new Metadata()); + return new ServerCall.Listener() {}; + }) + .build()) + .build() + .start()); + + // Mirror channel points to non-existent server — will fail silently + ManagedChannel brokenMirrorChannel = grpcCleanup.register( + InProcessChannelBuilder.forName("non-existent-server-xyz").build()); + ManagedChannel primaryChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(primaryName).build()); + + Channel interceptedChannel = ClientInterceptors.intercept( + primaryChannel, new MirroringInterceptor(brokenMirrorChannel, Runnable::run)); + + ClientCall call = + interceptedChannel.newCall(method, CallOptions.DEFAULT); + call.start(new ClientCall.Listener() {}, new Metadata()); call.sendMessage("hello"); call.halfClose(); - // 4. Assertions - assertTrue("Mirror server was not reached", mirrorLatch.await(1, TimeUnit.SECONDS)); - assertTrue( - "Headers were not correctly mirrored to shadow service", mirrorHeaderVerified.get()); - System.out.println("FULL MIRRORING SUCCESSFUL!"); + // Primary should still succeed even if mirror fails + assertTrue("Primary call was affected by mirror failure", + primaryLatch.await(1, TimeUnit.SECONDS)); } } \ No newline at end of file