diff --git a/core/src/main/java/io/grpc/internal/CallExecutors.java b/core/src/main/java/io/grpc/internal/CallExecutors.java new file mode 100644 index 00000000000..9a5493e4b01 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/CallExecutors.java @@ -0,0 +1,46 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import java.util.concurrent.Executor; + +/** + * Common utilities for GRPC call executors. + */ +final class CallExecutors { + + private CallExecutors() {} + + /** + * Wraps an executor with safeguarding (serialization) if not already safeguarded. + */ + static Executor safeguard(Executor executor) { + // If we know that the executor is a direct executor, we don't need to wrap it with a + // SerializingExecutor. This is purely for performance reasons. + // See https://github.com/grpc/grpc-java/issues/368 + if (executor instanceof SerializingExecutor + || executor instanceof SerializeReentrantCallsDirectExecutor) { + return executor; + } + if (executor == directExecutor()) { + return new SerializeReentrantCallsDirectExecutor(); + } + return new SerializingExecutor(executor); + } +} diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 4b24b1eae3d..3debcae6403 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -104,16 +104,8 @@ final class ClientCallImpl extends ClientCall { this.method = method; // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this)); - // If we know that the executor is a direct executor, we don't need to wrap it with a - // SerializingExecutor. This is purely for performance reasons. - // See https://github.com/grpc/grpc-java/issues/368 - if (executor == directExecutor()) { - this.callExecutor = new SerializeReentrantCallsDirectExecutor(); - callExecutorIsDirect = true; - } else { - this.callExecutor = new SerializingExecutor(executor); - callExecutorIsDirect = false; - } + this.callExecutor = CallExecutors.safeguard(executor); + callExecutorIsDirect = (this.callExecutor instanceof SerializeReentrantCallsDirectExecutor); this.channelCallsTracer = channelCallsTracer; // Propagate the context from the thread which initiated the call to all callbacks. this.context = Context.current(); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index e423220e3ad..0cb1e01cc65 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -808,6 +808,20 @@ public boolean isTerminated() { @Override public ClientCall newCall(MethodDescriptor method, CallOptions callOptions) { + // If we have no interceptors, we don't need to populate the executor in CallOptions + // yet. This avoids mutating CallOptions unnecessarily and breaking tests that + // expect exact instance equality. The executor will still be safeguarded when + // creating the actual ClientCallImpl. + if (interceptorChannel == realChannel) { + return realChannel.newCall(method, callOptions); + } + Executor executor = callOptions.getExecutor(); + if (executor == null) { + executor = this.executor; + } + // All calls on the channel should have a safeguarded executor in CallOptions before + // calling interceptors. + callOptions = callOptions.withExecutor(CallExecutors.safeguard(executor)); return interceptorChannel.newCall(method, callOptions); } @@ -821,7 +835,7 @@ private Executor getCallExecutor(CallOptions callOptions) { if (executor == null) { executor = this.executor; } - return executor; + return CallExecutors.safeguard(executor); } private class RealChannel extends Channel { @@ -1084,9 +1098,12 @@ static final class ConfigSelectingClientCall this.configSelector = configSelector; this.channel = channel; this.method = method; - this.callExecutor = - callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor(); - this.callOptions = callOptions.withExecutor(callExecutor); + Executor executor = callOptions.getExecutor(); + if (executor == null) { + executor = channelExecutor; + } + this.callExecutor = CallExecutors.safeguard(executor); + this.callOptions = callOptions.withExecutor(this.callExecutor); this.context = Context.current(); } diff --git a/core/src/main/java/io/grpc/internal/SubchannelChannel.java b/core/src/main/java/io/grpc/internal/SubchannelChannel.java index ced4272afe3..777e903d8d7 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelChannel.java +++ b/core/src/main/java/io/grpc/internal/SubchannelChannel.java @@ -85,8 +85,11 @@ public ClientStream newStream(MethodDescriptor method, @Override public ClientCall newCall( MethodDescriptor methodDescriptor, CallOptions callOptions) { - final Executor effectiveExecutor = - callOptions.getExecutor() == null ? executor : callOptions.getExecutor(); + Executor callExecutor = callOptions.getExecutor(); + if (callExecutor == null) { + callExecutor = this.executor; + } + final Executor effectiveExecutor = CallExecutors.safeguard(callExecutor); if (callOptions.isWaitForReady()) { return new ClientCall() { @Override diff --git a/core/src/test/java/io/grpc/internal/CallExecutorsTest.java b/core/src/test/java/io/grpc/internal/CallExecutorsTest.java new file mode 100644 index 00000000000..ed26577c2e2 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/CallExecutorsTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.Executor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CallExecutorsTest { + + @Test + public void safeguard_alreadySerializing_returnsSameInstance() { + Executor raw = command -> command.run(); + SerializingExecutor serializing = new SerializingExecutor(raw); + assertSame(serializing, CallExecutors.safeguard(serializing)); + } + + @Test + public void safeguard_alreadySerializeReentrantCallsDirect_returnsSameInstance() { + SerializeReentrantCallsDirectExecutor direct = new SerializeReentrantCallsDirectExecutor(); + assertSame(direct, CallExecutors.safeguard(direct)); + } + + @Test + public void safeguard_directExecutor_returnsSerializeReentrantCallsDirect() { + Executor safeguarded = CallExecutors.safeguard(directExecutor()); + assertTrue(safeguarded instanceof SerializeReentrantCallsDirectExecutor); + } + + @Test + public void safeguard_otherExecutor_returnsSerializing() { + Executor raw = command -> command.run(); + Executor safeguarded = CallExecutors.safeguard(raw); + assertTrue(safeguarded instanceof SerializingExecutor); + } + + @Test + public void safeguard_idempotent() { + Executor raw = command -> command.run(); + Executor first = CallExecutors.safeguard(raw); + Executor second = CallExecutors.safeguard(first); + assertSame(first, second); + + Executor firstDirect = CallExecutors.safeguard(directExecutor()); + Executor secondDirect = CallExecutors.safeguard(firstDirect); + assertSame(firstDirect, secondDirect); + } +}