diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 9b39d4b080..05af4e4c3f 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -46,7 +46,6 @@ import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLSession; -import org.apache.hc.core5.concurrent.CancellableDependency; import org.apache.hc.core5.http.ConnectionClosedException; import org.apache.hc.core5.http.EndpointDetails; import org.apache.hc.core5.http.Header; @@ -650,14 +649,10 @@ private void executeRequest(final RequestExecutionCommand requestExecutionComman final int initOutputWindow = stream.getOutputWindow().get(); streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow); } - + requestExecutionCommand.initiated(stream); if (stream.isOutputReady()) { stream.produceOutput(); } - final CancellableDependency cancellableDependency = requestExecutionCommand.getCancellableDependency(); - if (cancellableDependency != null) { - cancellableDependency.setDependency(stream::abort); - } } private void executePush(final PushResponseCommand pushResponseCommand) throws IOException, HttpException { diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index 0f72c9ba0a..881e5d27ed 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -39,14 +39,13 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.ProtocolException; +import org.apache.hc.core5.http.StreamControl; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; -class H2Stream { - - enum State { RESERVED, OPEN, CLOSED } +class H2Stream implements StreamControl { private static final long LINGER_TIME = 1000; // 1 second @@ -70,10 +69,16 @@ enum State { RESERVED, OPEN, CLOSED } this.cancelled = new AtomicBoolean(); } - int getId() { + @Override + public int getId() { return channel.getId(); } + @Override + public State getState() { + return transitionRef.get(); + } + boolean isReserved() { return reserved; } @@ -276,6 +281,11 @@ void releaseResources() { } } + @Override + public boolean cancel() { + return abort(); + } + @Override public String toString() { final StringBuilder buf = new StringBuilder(); diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index 88e2fb6d34..083f03a816 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -186,7 +186,7 @@ private void execute( @Override public void completed(final IOSession ioSession) { - ioSession.enqueue(new RequestExecutionCommand(new AsyncClientExchangeHandler() { + final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() { @Override public void releaseResources() { @@ -244,7 +244,13 @@ public void failed(final Exception cause) { exchangeHandler.failed(cause); } - }, pushHandlerFactory, cancellableDependency, context), Command.Priority.NORMAL); + }; + ioSession.enqueue(new RequestExecutionCommand( + handlerProxy, + pushHandlerFactory, + context, + cancellableDependency::setDependency), + Command.Priority.NORMAL); if (!ioSession.isOpen()) { exchangeHandler.failed(new ConnectionClosedException()); } diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java index f81e5354d7..5072e1bb1a 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ClientSessionEndpoint.java @@ -81,7 +81,7 @@ public void execute( final HandlerFactory pushHandlerFactory, final HttpContext context) { Asserts.check(!closed.get(), "Connection is already closed"); - final Command executionCommand = new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context); + final Command executionCommand = new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context); ioSession.enqueue(executionCommand, Command.Priority.NORMAL); if (!ioSession.isOpen()) { exchangeHandler.failed(new ConnectionClosedException()); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/StreamControl.java b/httpcore5/src/main/java/org/apache/hc/core5/http/StreamControl.java new file mode 100644 index 0000000000..f4e1e49f5c --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/StreamControl.java @@ -0,0 +1,47 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.concurrent.Cancellable; + +/** + * Represents a message stream control interface. + * + * @since 5.5 + */ +@Internal +public interface StreamControl extends Cancellable { + + enum State { RESERVED, OPEN, CLOSED } + + int getId(); + + State getState(); + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java index b94f051a6d..3da2664267 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java @@ -502,7 +502,9 @@ public void execute( final HandlerFactory pushHandlerFactory, final HttpContext context) { final IOSession ioSession = getIOSession(); - ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL); + ioSession.enqueue( + new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context), + Command.Priority.NORMAL); if (!ioSession.isOpen()) { try { exchangeHandler.failed(new ConnectionClosedException()); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutableCommand.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutableCommand.java index 47cb1aa4a9..634ba82fec 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutableCommand.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutableCommand.java @@ -33,14 +33,17 @@ /** * Abstract executable command that may need to do some cleaning up - * in case of an failure and also optionally may want to cancel - * the associated HTTP message exchange through {@link CancellableDependency}. + * in case of an failure. * * @since 5.0 */ @Internal public abstract class ExecutableCommand implements Command { + /** + * @deprecated Not used. + */ + @Deprecated public abstract CancellableDependency getCancellableDependency(); public abstract void failed(Exception ex); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java index 6da8e304eb..efd48c7d76 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/RequestExecutionCommand.java @@ -31,7 +31,9 @@ import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.CancellableDependency; +import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.http.RequestNotExecutedException; +import org.apache.hc.core5.http.StreamControl; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.HandlerFactory; @@ -50,8 +52,29 @@ public final class RequestExecutionCommand extends ExecutableCommand { private final HandlerFactory pushHandlerFactory; private final CancellableDependency cancellableDependency; private final HttpContext context; + private final Callback initiationCallback; private final AtomicBoolean failed; + /** + * @since 5.5 + */ + public RequestExecutionCommand( + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context, + final Callback initiationCallback) { + this.exchangeHandler = Args.notNull(exchangeHandler, "Handler"); + this.pushHandlerFactory = pushHandlerFactory; + this.initiationCallback = initiationCallback; + this.cancellableDependency = null; + this.context = context; + this.failed = new AtomicBoolean(); + } + + /** + * @deprecated Not used. + */ + @Deprecated public RequestExecutionCommand( final AsyncClientExchangeHandler exchangeHandler, final HandlerFactory pushHandlerFactory, @@ -59,6 +82,7 @@ public RequestExecutionCommand( final HttpContext context) { this.exchangeHandler = Args.notNull(exchangeHandler, "Handler"); this.pushHandlerFactory = pushHandlerFactory; + this.initiationCallback = null; this.cancellableDependency = cancellableDependency; this.context = context; this.failed = new AtomicBoolean(); @@ -68,13 +92,13 @@ public RequestExecutionCommand( final AsyncClientExchangeHandler exchangeHandler, final HandlerFactory pushHandlerFactory, final HttpContext context) { - this(exchangeHandler, pushHandlerFactory, null, context); + this(exchangeHandler, pushHandlerFactory, context, null); } public RequestExecutionCommand( final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) { - this(exchangeHandler, null, null, context); + this(exchangeHandler, null, context); } public AsyncClientExchangeHandler getExchangeHandler() { @@ -85,6 +109,10 @@ public HandlerFactory getPushHandlerFactory() { return pushHandlerFactory; } + /** + * @deprecated no used. + */ + @Deprecated @Override public CancellableDependency getCancellableDependency() { return cancellableDependency; @@ -94,6 +122,19 @@ public HttpContext getContext() { return context; } + /** + * @since 5.5 + */ + @SuppressWarnings("deprecated") + public void initiated(final StreamControl streamControl) { + if (initiationCallback != null) { + initiationCallback.execute(streamControl); + } + if (cancellableDependency != null) { + cancellableDependency.setDependency(streamControl); + } + } + @Override public void failed(final Exception ex) { if (failed.compareAndSet(false, true)) {