Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public synchronized ClientStream newStream(
}

@Override
protected void unregisterInbound(Inbound<?> inbound) {
protected void unregisterInbound(Inbound<?, ?> inbound) {
if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) {
clientTransportListener.transportInUse(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public synchronized void shutdownNow(Status reason) {
@Override
@Nullable
@GuardedBy("this")
protected Inbound<?> createInbound(int callId) {
protected Inbound<?, ?> createInbound(int callId) {
return new Inbound.ServerInbound(this, attributes, callId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected enum TransportState {
@GuardedBy("this")
private final LeakSafeOneWayBinder incomingBinder;

protected final ConcurrentHashMap<Integer, Inbound<?>> ongoingCalls;
protected final ConcurrentHashMap<Integer, Inbound<?, ?>> ongoingCalls;
protected final OneWayBinderProxy.Decorator binderDecorator;

@GuardedBy("this")
Expand Down Expand Up @@ -318,13 +318,13 @@ final void shutdownInternal(Status shutdownStatus, boolean forceTerminate) {
incomingBinder.detach();
setState(TransportState.SHUTDOWN_TERMINATED);
sendShutdownTransaction();
ArrayList<Inbound<?>> calls = new ArrayList<>(ongoingCalls.values());
ArrayList<Inbound<?, ?>> calls = new ArrayList<>(ongoingCalls.values());
ongoingCalls.clear();
ArrayList<Future<?>> futuresToCancel = new ArrayList<>(ownedFutures);
ownedFutures.clear();
scheduledExecutorService.execute(
() -> {
for (Inbound<?> inbound : calls) {
for (Inbound<?, ?> inbound : calls) {
synchronized (inbound) {
inbound.closeAbnormal(shutdownStatus);
}
Expand Down Expand Up @@ -392,7 +392,7 @@ protected synchronized void sendPing(int id) throws StatusException {
}
}

protected void unregisterInbound(Inbound<?> inbound) {
protected void unregisterInbound(Inbound<?, ?> inbound) {
unregisterCall(inbound.callId);
}

Expand Down Expand Up @@ -481,13 +481,13 @@ private boolean handleTransactionInternal(int code, Parcel parcel) {
}
} else {
int size = parcel.dataSize();
Inbound<?> inbound = ongoingCalls.get(code);
Inbound<?, ?> inbound = ongoingCalls.get(code);
if (inbound == null) {
synchronized (this) {
if (!isShutdown()) {
inbound = createInbound(code);
if (inbound != null) {
Inbound<?> existing = ongoingCalls.put(code, inbound);
Inbound<?, ?> existing = ongoingCalls.put(code, inbound);
// Can't happen as only one invocation of handleTransaction() is running at a time.
Verify.verify(existing == null, "impossible appearance of %s", existing);
}
Expand Down Expand Up @@ -519,7 +519,7 @@ protected void restrictIncomingBinderToCallsFrom(int allowedCallingUid) {

@Nullable
@GuardedBy("this")
protected Inbound<?> createInbound(int callId) {
protected Inbound<?, ?> createInbound(int callId) {
return null;
}

Expand Down Expand Up @@ -566,7 +566,7 @@ final void handleAcknowledgedBytes(long numBytes) {

Iterator<Integer> i = callIdsToNotifyWhenReady.iterator();
while (isReady() && i.hasNext()) {
Inbound<?> inbound = ongoingCalls.get(i.next());
Inbound<?, ?> inbound = ongoingCalls.get(i.next());
i.remove();
if (inbound != null) { // Calls can be removed out from under us.
inbound.onTransportReady();
Expand Down Expand Up @@ -598,7 +598,7 @@ private static void checkTransition(TransportState current, TransportState next)
}

@VisibleForTesting
Map<Integer, Inbound<?>> getOngoingCalls() {
Map<Integer, Inbound<?, ?>> getOngoingCalls() {
return ongoingCalls;
}

Expand Down
27 changes: 13 additions & 14 deletions binder/src/main/java/io/grpc/binder/internal/Inbound.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@
*
* <p>Out-of-order messages are reassembled into their correct order.
*/
abstract class Inbound<L extends StreamListener> implements StreamListener.MessageProducer {
abstract class Inbound<L extends StreamListener, T extends BinderTransport>
implements StreamListener.MessageProducer {

protected final BinderTransport transport;
protected final T transport;
protected final Attributes attributes;
final int callId;

Expand Down Expand Up @@ -145,7 +146,7 @@ enum State {
@GuardedBy("this")
private boolean producingMessages;

private Inbound(BinderTransport transport, Attributes attributes, int callId) {
private Inbound(T transport, Attributes attributes, int callId) {
this.transport = transport;
this.attributes = attributes;
this.callId = callId;
Expand Down Expand Up @@ -551,7 +552,7 @@ public synchronized String toString() {

// ======================================
// Client-side inbound transactions.
static final class ClientInbound extends Inbound<ClientStreamListener> {
static final class ClientInbound extends Inbound<ClientStreamListener, BinderClientTransport> {

private final boolean countsForInUse;

Expand All @@ -564,7 +565,10 @@ static final class ClientInbound extends Inbound<ClientStreamListener> {
private Metadata trailers;

ClientInbound(
BinderTransport transport, Attributes attributes, int callId, boolean countsForInUse) {
BinderClientTransport transport,
Attributes attributes,
int callId,
boolean countsForInUse) {
super(transport, attributes, callId);
this.countsForInUse = countsForInUse;
}
Expand Down Expand Up @@ -608,13 +612,9 @@ protected void deliverCloseAbnormal(Status status) {

// ======================================
// Server-side inbound transactions.
static final class ServerInbound extends Inbound<ServerStreamListener> {

private final BinderServerTransport serverTransport;

static final class ServerInbound extends Inbound<ServerStreamListener, BinderServerTransport> {
ServerInbound(BinderServerTransport transport, Attributes attributes, int callId) {
super(transport, attributes, callId);
this.serverTransport = transport;
}

@GuardedBy("this")
Expand All @@ -623,17 +623,16 @@ protected void handlePrefix(int flags, Parcel parcel) throws StatusException {
String methodName = parcel.readString();
Metadata headers = MetadataHelper.readMetadata(parcel, attributes);

StatsTraceContext statsTraceContext =
serverTransport.createStatsTraceContext(methodName, headers);
StatsTraceContext statsTraceContext = transport.createStatsTraceContext(methodName, headers);
Outbound.ServerOutbound outbound =
new Outbound.ServerOutbound(serverTransport, callId, statsTraceContext);
new Outbound.ServerOutbound(transport, callId, statsTraceContext);
ServerStream stream;
if ((flags & TransactionUtils.FLAG_EXPECT_SINGLE_MESSAGE) != 0) {
stream = new SingleMessageServerStream(this, outbound, attributes);
} else {
stream = new MultiMessageServerStream(this, outbound, attributes);
}
Status status = serverTransport.startStream(stream, methodName, headers);
Status status = transport.startStream(stream, methodName, headers);
if (status.isOk()) {
checkNotNull(listener); // Is it ok to assume this will happen synchronously?
if (transport.isReady()) {
Expand Down
Loading