Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions core/src/main/java/io/grpc/internal/CallExecutors.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2026 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import java.util.concurrent.Executor;

/**
* Common utilities for GRPC call executors.
*/
final class CallExecutors {

private CallExecutors() {}

/**
* Wraps an executor with safeguarding (serialization) if not already safeguarded.
*/
static Executor safeguard(Executor executor) {
// If we know that the executor is a direct executor, we don't need to wrap it with a
// SerializingExecutor. This is purely for performance reasons.
// See https://github.com/grpc/grpc-java/issues/368
if (executor instanceof SerializingExecutor
|| executor instanceof SerializeReentrantCallsDirectExecutor) {
return executor;
}
if (executor == directExecutor()) {
return new SerializeReentrantCallsDirectExecutor();
}
return new SerializingExecutor(executor);
}
}
12 changes: 2 additions & 10 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
this.method = method;
// TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
// If we know that the executor is a direct executor, we don't need to wrap it with a
// SerializingExecutor. This is purely for performance reasons.
// See https://github.com/grpc/grpc-java/issues/368
if (executor == directExecutor()) {
this.callExecutor = new SerializeReentrantCallsDirectExecutor();
callExecutorIsDirect = true;
} else {
this.callExecutor = new SerializingExecutor(executor);
callExecutorIsDirect = false;
}
this.callExecutor = CallExecutors.safeguard(executor);
callExecutorIsDirect = (this.callExecutor instanceof SerializeReentrantCallsDirectExecutor);
this.channelCallsTracer = channelCallsTracer;
// Propagate the context from the thread which initiated the call to all callbacks.
this.context = Context.current();
Expand Down
25 changes: 21 additions & 4 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,20 @@ public boolean isTerminated() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
// If we have no interceptors, we don't need to populate the executor in CallOptions
// yet. This avoids mutating CallOptions unnecessarily and breaking tests that
// expect exact instance equality. The executor will still be safeguarded when
// creating the actual ClientCallImpl.
if (interceptorChannel == realChannel) {
return realChannel.newCall(method, callOptions);
}
Executor executor = callOptions.getExecutor();
if (executor == null) {
executor = this.executor;
}
// All calls on the channel should have a safeguarded executor in CallOptions before
// calling interceptors.
callOptions = callOptions.withExecutor(CallExecutors.safeguard(executor));
return interceptorChannel.newCall(method, callOptions);
}

Expand All @@ -821,7 +835,7 @@ private Executor getCallExecutor(CallOptions callOptions) {
if (executor == null) {
executor = this.executor;
}
return executor;
return CallExecutors.safeguard(executor);
}

private class RealChannel extends Channel {
Expand Down Expand Up @@ -1084,9 +1098,12 @@ static final class ConfigSelectingClientCall<ReqT, RespT>
this.configSelector = configSelector;
this.channel = channel;
this.method = method;
this.callExecutor =
callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
this.callOptions = callOptions.withExecutor(callExecutor);
Executor executor = callOptions.getExecutor();
if (executor == null) {
executor = channelExecutor;
}
this.callExecutor = CallExecutors.safeguard(executor);
this.callOptions = callOptions.withExecutor(this.callExecutor);
this.context = Context.current();
}

Expand Down
7 changes: 5 additions & 2 deletions core/src/main/java/io/grpc/internal/SubchannelChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ public ClientStream newStream(MethodDescriptor<?, ?> method,
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
final Executor effectiveExecutor =
callOptions.getExecutor() == null ? executor : callOptions.getExecutor();
Executor callExecutor = callOptions.getExecutor();
if (callExecutor == null) {
callExecutor = this.executor;
}
final Executor effectiveExecutor = CallExecutors.safeguard(callExecutor);
if (callOptions.isWaitForReady()) {
return new ClientCall<RequestT, ResponseT>() {
@Override
Expand Down
68 changes: 68 additions & 0 deletions core/src/test/java/io/grpc/internal/CallExecutorsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2026 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.Executor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class CallExecutorsTest {

@Test
public void safeguard_alreadySerializing_returnsSameInstance() {
Executor raw = command -> command.run();
SerializingExecutor serializing = new SerializingExecutor(raw);
assertSame(serializing, CallExecutors.safeguard(serializing));
}

@Test
public void safeguard_alreadySerializeReentrantCallsDirect_returnsSameInstance() {
SerializeReentrantCallsDirectExecutor direct = new SerializeReentrantCallsDirectExecutor();
assertSame(direct, CallExecutors.safeguard(direct));
}

@Test
public void safeguard_directExecutor_returnsSerializeReentrantCallsDirect() {
Executor safeguarded = CallExecutors.safeguard(directExecutor());
assertTrue(safeguarded instanceof SerializeReentrantCallsDirectExecutor);
}

@Test
public void safeguard_otherExecutor_returnsSerializing() {
Executor raw = command -> command.run();
Executor safeguarded = CallExecutors.safeguard(raw);
assertTrue(safeguarded instanceof SerializingExecutor);
}

@Test
public void safeguard_idempotent() {
Executor raw = command -> command.run();
Executor first = CallExecutors.safeguard(raw);
Executor second = CallExecutors.safeguard(first);
assertSame(first, second);

Executor firstDirect = CallExecutors.safeguard(directExecutor());
Executor secondDirect = CallExecutors.safeguard(firstDirect);
assertSame(firstDirect, secondDirect);
}
}
Loading