Skip to content

Commit 2a8230a

Browse files
committed
Call executor safeguard for interceptor
1 parent 7c45aac commit 2a8230a

4 files changed

Lines changed: 73 additions & 13 deletions

File tree

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2026 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.internal;
18+
19+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
20+
21+
import java.util.concurrent.Executor;
22+
23+
/**
24+
* Common utilities for GRPC call executors.
25+
*/
26+
final class CallExecutors {
27+
28+
private CallExecutors() {}
29+
30+
/**
31+
* Wraps an executor with safeguarding (serialization) if not already safeguarded.
32+
*/
33+
static Executor safeguard(Executor executor) {
34+
if (executor instanceof SerializingExecutor
35+
|| executor instanceof SerializeReentrantCallsDirectExecutor) {
36+
return executor;
37+
}
38+
if (executor == directExecutor()) {
39+
return new SerializeReentrantCallsDirectExecutor();
40+
}
41+
return new SerializingExecutor(executor);
42+
}
43+
44+
/**
45+
* Returns true if the executor is safeguarded (e.g. a {@link SerializingExecutor} or
46+
* {@link SerializeReentrantCallsDirectExecutor}).
47+
*/
48+
static boolean isSafeguarded(Executor executor) {
49+
return executor instanceof SerializingExecutor
50+
|| executor instanceof SerializeReentrantCallsDirectExecutor;
51+
}
52+
}

core/src/main/java/io/grpc/internal/ClientCallImpl.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
107107
// If we know that the executor is a direct executor, we don't need to wrap it with a
108108
// SerializingExecutor. This is purely for performance reasons.
109109
// See https://github.com/grpc/grpc-java/issues/368
110-
if (executor == directExecutor()) {
111-
this.callExecutor = new SerializeReentrantCallsDirectExecutor();
112-
callExecutorIsDirect = true;
113-
} else {
114-
this.callExecutor = new SerializingExecutor(executor);
115-
callExecutorIsDirect = false;
116-
}
110+
this.callExecutor = CallExecutors.safeguard(executor);
111+
callExecutorIsDirect = (this.callExecutor instanceof SerializeReentrantCallsDirectExecutor);
117112
this.channelCallsTracer = channelCallsTracer;
118113
// Propagate the context from the thread which initiated the call to all callbacks.
119114
this.context = Context.current();

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,13 @@ public boolean isTerminated() {
808808
@Override
809809
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
810810
CallOptions callOptions) {
811+
Executor executor = callOptions.getExecutor();
812+
if (executor == null) {
813+
executor = this.executor;
814+
}
815+
// All calls on the channel should have a safeguarded executor in CallOptions before
816+
// calling interceptors.
817+
callOptions = callOptions.withExecutor(CallExecutors.safeguard(executor));
811818
return interceptorChannel.newCall(method, callOptions);
812819
}
813820

@@ -821,7 +828,7 @@ private Executor getCallExecutor(CallOptions callOptions) {
821828
if (executor == null) {
822829
executor = this.executor;
823830
}
824-
return executor;
831+
return CallExecutors.safeguard(executor);
825832
}
826833

827834
private class RealChannel extends Channel {
@@ -1084,9 +1091,12 @@ static final class ConfigSelectingClientCall<ReqT, RespT>
10841091
this.configSelector = configSelector;
10851092
this.channel = channel;
10861093
this.method = method;
1087-
this.callExecutor =
1088-
callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1089-
this.callOptions = callOptions.withExecutor(callExecutor);
1094+
Executor executor = callOptions.getExecutor();
1095+
if (executor == null) {
1096+
executor = channelExecutor;
1097+
}
1098+
this.callExecutor = CallExecutors.safeguard(executor);
1099+
this.callOptions = callOptions.withExecutor(this.callExecutor);
10901100
this.context = Context.current();
10911101
}
10921102

core/src/main/java/io/grpc/internal/SubchannelChannel.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,11 @@ public ClientStream newStream(MethodDescriptor<?, ?> method,
8585
@Override
8686
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
8787
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
88-
final Executor effectiveExecutor =
89-
callOptions.getExecutor() == null ? executor : callOptions.getExecutor();
88+
Executor callExecutor = callOptions.getExecutor();
89+
if (callExecutor == null) {
90+
callExecutor = this.executor;
91+
}
92+
final Executor effectiveExecutor = CallExecutors.safeguard(callExecutor);
9093
if (callOptions.isWaitForReady()) {
9194
return new ClientCall<RequestT, ResponseT>() {
9295
@Override

0 commit comments

Comments
 (0)