Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncPushProducer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http2.H2ConnectionException;
Expand Down Expand Up @@ -114,7 +113,7 @@ void acceptHeaderFrame() throws H2ConnectionException {
}

@Override
void acceptPushFrame() throws H2ConnectionException {
void acceptPushFrame() {
}

@Override
Expand All @@ -123,37 +122,38 @@ void acceptPushRequest() throws H2ConnectionException {
}

@Override
H2StreamHandler createLocallyInitiatedStream(
final ExecutableCommand command,
H2Stream outgoingRequest(
final H2StreamChannel channel,
final HttpProcessor httpProcessor,
final BasicHttpConnectionMetrics connMetrics) throws IOException {
if (command instanceof RequestExecutionCommand) {
final RequestExecutionCommand executionCommand = (RequestExecutionCommand) command;
final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory = executionCommand.getPushHandlerFactory();
final HttpCoreContext context = HttpCoreContext.castOrCreate(executionCommand.getContext());
context.setSSLSession(getSSLSession());
context.setEndpointDetails(getEndpointDetails());
return new ClientH2StreamHandler(channel, httpProcessor, connMetrics, exchangeHandler,
pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory,
context);
}
throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Unexpected executable command");
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
final HttpCoreContext coreContext = HttpCoreContext.castOrCreate(context);
coreContext.setSSLSession(getSSLSession());
coreContext.setEndpointDetails(getEndpointDetails());
return new H2Stream(channel, new ClientH2StreamHandler(channel, getHttpProcessor(), getConnMetrics(), exchangeHandler,
pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory,
coreContext));
}

@Override
H2StreamHandler createRemotelyInitiatedStream(
final H2StreamChannel channel,
final HttpProcessor httpProcessor,
final BasicHttpConnectionMetrics connMetrics,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
H2Stream incomingRequest(final H2StreamChannel channel) throws IOException {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal incoming request");
}

@Override
H2Stream outgoingPushPromise(final H2StreamChannel channel, final AsyncPushProducer pushProducer) throws IOException {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal attempt to send push promise");
}

@Override
H2Stream incomingPushPromise(final H2StreamChannel channel,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
final HttpCoreContext context = HttpCoreContext.create();
context.setSSLSession(getSSLSession());
context.setEndpointDetails(getEndpointDetails());
return new ClientPushH2StreamHandler(channel, httpProcessor, connMetrics,
return new H2Stream(channel, new ClientPushH2StreamHandler(channel, getHttpProcessor(), getConnMetrics(),
pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory,
context);
context));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc
}

boolean isOutputReady() {
return handler.isOutputReady();
return !channel.isLocalClosed() && handler.isOutputReady();
}

void produceOutput() throws HttpException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -47,6 +48,9 @@ class H2Streams {
private final Queue<H2Stream> streams;
private final AtomicInteger lastLocalId;
private final AtomicInteger lastRemoteId;
private final AtomicInteger inboundActive = new AtomicInteger(0);
private final Set<Integer> inboundActiveSet = ConcurrentHashMap.newKeySet();
private final Set<Integer> reservedRemoteSet = ConcurrentHashMap.newKeySet();

public H2Streams(final StreamIdGenerator idGenerator) {
this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
Expand Down Expand Up @@ -100,6 +104,10 @@ public void addRemotelyInitiated(final H2Stream stream) throws H2ConnectionExcep

public void release(final H2Stream stream) {
streamMap.remove(stream.getId());
if (inboundActiveSet.remove(stream.getId())) {
inboundActive.decrementAndGet();
}
reservedRemoteSet.remove(stream.getId());
stream.releaseResources();
}

Expand All @@ -114,6 +122,9 @@ public void shutdownAndReleaseAll() {
}
streams.clear();
streamMap.clear();
inboundActiveSet.clear();
reservedRemoteSet.clear();
inboundActive.set(0);
}

public H2Stream lookup(final int streamId) {
Expand Down Expand Up @@ -166,5 +177,25 @@ public int generateStreamId() {
}
}

public void markReservedRemote(final int streamId) {
reservedRemoteSet.add(streamId);
}

public boolean tryActivateRemote(final int streamId, final int maxInboundConcurrent) {
if (inboundActiveSet.contains(streamId)) {
return true;
}
for (;;) {
final int cur = inboundActive.get();
if (cur >= maxInboundConcurrent) {
return false;
}
if (inboundActive.compareAndSet(cur, cur + 1)) {
inboundActiveSet.add(streamId);
reservedRemoteSet.remove(streamId);
return true;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.RequestHeaderFieldsTooLargeException;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncPushProducer;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http2.H2ConnectionException;
Expand Down Expand Up @@ -103,11 +104,11 @@ H2Setting[] generateSettings(final H2Config localConfig) {
}

@Override
void acceptHeaderFrame() throws H2ConnectionException {
void acceptHeaderFrame() {
}

@Override
void acceptPushRequest() throws H2ConnectionException {
void acceptPushRequest() {
}

@Override
Expand All @@ -116,24 +117,37 @@ void acceptPushFrame() throws H2ConnectionException {
}

@Override
H2StreamHandler createRemotelyInitiatedStream(
final H2StreamChannel channel,
final HttpProcessor httpProcessor,
final BasicHttpConnectionMetrics connMetrics,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
H2Stream incomingRequest(final H2StreamChannel channel) {
final HttpCoreContext context = HttpCoreContext.create();
context.setSSLSession(getSSLSession());
context.setEndpointDetails(getEndpointDetails());
return new ServerH2StreamHandler(channel, httpProcessor, connMetrics, exchangeHandlerFactory, context);
return new H2Stream(channel, new ServerH2StreamHandler(
channel, getHttpProcessor(), getConnMetrics(), exchangeHandlerFactory, context));
}

@Override
H2StreamHandler createLocallyInitiatedStream(
final ExecutableCommand command,
H2Stream outgoingRequest(
final H2StreamChannel channel,
final HttpProcessor httpProcessor,
final BasicHttpConnectionMetrics connMetrics) throws IOException {
throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Illegal attempt to execute a request");
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) throws IOException {
throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Illegal attempt to send a request");
}

@Override
H2Stream incomingPushPromise(final H2StreamChannel channel,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal incoming push promise");
}

@Override
H2Stream outgoingPushPromise(final H2StreamChannel channel,
final AsyncPushProducer pushProducer) throws IOException {
final HttpCoreContext context = HttpCoreContext.create();
context.setSSLSession(getSSLSession());
context.setEndpointDetails(getEndpointDetails());
return new H2Stream(channel, new ServerPushH2StreamHandler(
channel, getHttpProcessor(), getConnMetrics(), pushProducer, context));
}

@Override
Expand Down
Loading