From aac46c22b90b9a6271ecff42faee7f019447ab15 Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Sun, 7 Sep 2025 17:19:47 +0200 Subject: [PATCH 1/3] RFC 9113 conformance: improved H2 stream creation and initialization --- .../impl/nio/AbstractH2StreamMultiplexer.java | 137 ++++++++---------- .../impl/nio/ClientH2StreamMultiplexer.java | 54 +++---- .../hc/core5/http2/impl/nio/H2Stream.java | 2 +- .../impl/nio/ServerH2StreamMultiplexer.java | 44 ++++-- .../nio/TestAbstractH2StreamMultiplexer.java | 39 +++-- 5 files changed, 140 insertions(+), 136 deletions(-) diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 861bc9f87c..9c51bdb300 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -57,13 +57,14 @@ import org.apache.hc.core5.http.impl.BasicEndpointDetails; import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics; import org.apache.hc.core5.http.impl.CharCodingSupport; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncPushProducer; import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.command.CommandSupport; -import org.apache.hc.core5.http.nio.command.ExecutableCommand; +import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; import org.apache.hc.core5.http.nio.command.ShutdownCommand; -import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpProcessor; import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; @@ -172,6 +173,18 @@ public String getId() { return ioSession.getId(); } + BasicHttpConnectionMetrics getConnMetrics() { + return connMetrics; + } + + HttpProcessor getHttpProcessor() { + return httpProcessor; + } + + void submitCommand(final Command command) { + ioSession.enqueue(command, Command.Priority.NORMAL); + } + abstract void validateSetting(H2Param param, int value) throws H2ConnectionException; abstract H2Setting[] generateSettings(H2Config localConfig); @@ -182,18 +195,18 @@ public String getId() { abstract void acceptPushFrame() throws H2ConnectionException; - abstract H2StreamHandler createRemotelyInitiatedStream( - H2StreamChannel channel, + abstract H2Stream incomingRequest(H2StreamChannel channel) throws IOException; + + abstract H2Stream incomingPushPromise(H2StreamChannel channel, + HandlerFactory pushHandlerFactory) throws IOException; - HttpProcessor httpProcessor, - BasicHttpConnectionMetrics connMetrics, - HandlerFactory pushHandlerFactory) throws IOException; + abstract H2Stream outgoingRequest(H2StreamChannel channel, + AsyncClientExchangeHandler exchangeHandler, + HandlerFactory pushHandlerFactory, + HttpContext context) throws IOException; - abstract H2StreamHandler createLocallyInitiatedStream( - ExecutableCommand command, - H2StreamChannel channel, - HttpProcessor httpProcessor, - BasicHttpConnectionMetrics connMetrics) throws IOException; + abstract H2Stream outgoingPushPromise(H2StreamChannel channel, + AsyncPushProducer pushProducer) throws IOException; abstract boolean allowGracefulAbort(H2Stream stream); @@ -495,11 +508,11 @@ public final void onOutput() throws HttpException, IOException { executeShutdown((ShutdownCommand) command); } else if (command instanceof PingCommand) { executePing((PingCommand) command); - } else if (command instanceof ExecutableCommand) { - executeRequest((ExecutableCommand) command); - if (!outputQueue.isEmpty()) { - return; - } + } else if (command instanceof RequestExecutionCommand) { + executeRequest((RequestExecutionCommand) command); + } + if (!outputQueue.isEmpty()) { + return; } } } @@ -555,10 +568,10 @@ public final void onTimeout(final Timeout timeout) throws HttpException, IOExcep final RawFrame goAway; if (localSettingState != SettingsHandshake.ACKED) { goAway = frameFactory.createGoAway(streams.getLastRemoteId(), H2Error.SETTINGS_TIMEOUT, - "Setting timeout (" + timeout + ")"); + "Setting timeout (" + timeout + ")"); } else { goAway = frameFactory.createGoAway(streams.getLastRemoteId(), H2Error.NO_ERROR, - "Timeout due to inactivity (" + timeout + ")"); + "Timeout due to inactivity (" + timeout + ")"); } commitFrame(goAway); for (final Iterator it = streams.iterator(); it.hasNext(); ) { @@ -601,13 +614,13 @@ private void executePing(final PingCommand pingCommand) throws IOException { commitFrame(ping); } - private void executeRequest(final ExecutableCommand executableCommand) throws IOException, HttpException { + private void executeRequest(final RequestExecutionCommand requestExecutionCommand) throws IOException, HttpException { final int streamId = streams.generateStreamId(); - final H2StreamChannel channel = createChannel(streamId, true); - final H2StreamHandler streamHandler = createLocallyInitiatedStream( - executableCommand, channel, httpProcessor, connMetrics); - - final H2Stream stream = new H2Stream(channel, streamHandler); + final H2StreamChannel channel = createChannel(streamId); + final H2Stream stream = outgoingRequest(channel, + requestExecutionCommand.getExchangeHandler(), + requestExecutionCommand.getPushHandlerFactory(), + requestExecutionCommand.getContext()); streams.addLocallyInitiated(stream); if (streamListener != null) { @@ -620,23 +633,12 @@ private void executeRequest(final ExecutableCommand executableCommand) throws IO if (stream.isOutputReady()) { stream.produceOutput(); } - final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency(); + final CancellableDependency cancellableDependency = requestExecutionCommand.getCancellableDependency(); if (cancellableDependency != null) { cancellableDependency.setDependency(stream::abort); } } - public void executePush(final int promisedStreamId, final AsyncPushProducer pushProducer) { - final H2StreamChannel channel = createChannel(promisedStreamId, true); - final HttpCoreContext context = HttpCoreContext.create(); - context.setSSLSession(getSSLSession()); - context.setEndpointDetails(getEndpointDetails()); - final H2StreamHandler streamHandler = new ServerPushH2StreamHandler( - channel, httpProcessor, connMetrics, pushProducer, context); - final H2Stream stream = new H2Stream(channel, streamHandler); - streams.addLocallyInitiated(stream); - } - public final void onException(final Exception cause) { try { for (;;) { @@ -722,18 +724,12 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received"); } - final H2StreamChannel channel = createChannel(streamId, false); - final H2StreamHandler streamHandler; + final H2StreamChannel channel = createChannel(streamId); if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { - streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null); + stream = incomingRequest(channel); } else { - streamHandler = NoopH2StreamHandler.INSTANCE; - channel.markLocalClosed(); - } - - stream = new H2Stream(channel, streamHandler); - if (stream.isOutputReady()) { - stream.produceOutput(); + stream = new H2Stream(channel, NoopH2StreamHandler.INSTANCE); + channel.localReset(H2Error.REFUSED_STREAM); } streams.addRemotelyInitiated(stream); } else if (stream.isLocalClosed() && stream.isRemoteClosed()) { @@ -741,7 +737,6 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio } try { consumeHeaderFrame(frame, stream); - if (stream.isOutputReady()) { stream.produceOutput(); } @@ -900,7 +895,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled"); } - final H2Stream stream = streams.lookup(streamId); + final H2Stream stream = streams.lookupValid(streamId); if (stream.isRemoteClosed()) { stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed")); break; @@ -918,19 +913,15 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Stream already open: " + promisedStreamId); } - final H2StreamChannel channel = createChannel(promisedStreamId, false); - final H2StreamHandler streamHandler; + final H2StreamChannel channel = createChannel(promisedStreamId); + final H2Stream promisedStream; if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { - streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, - stream.getPushHandlerFactory()); + promisedStream = incomingPushPromise(channel, stream.getPushHandlerFactory()); } else { - streamHandler = NoopH2StreamHandler.INSTANCE; - channel.markLocalClosed(); + promisedStream = new H2Stream(channel, NoopH2StreamHandler.INSTANCE); + channel.localReset(H2Error.REFUSED_STREAM); } - - final H2Stream promisedStream = new H2Stream(channel, streamHandler); streams.addRemotelyInitiated(promisedStream); - try { consumePushPromiseFrame(frame, payload, promisedStream); } catch (final H2StreamResetException ex) { @@ -1303,8 +1294,8 @@ ByteBuffer getContent() { } - H2StreamChannel createChannel(final int streamId, final boolean idle) { - return new H2StreamChannelImpl(streamId, idle, initInputWinSize, initOutputWinSize); + H2StreamChannel createChannel(final int streamId) { + return new H2StreamChannelImpl(streamId, initInputWinSize, initOutputWinSize); } void addStream(final H2Stream stream) throws H2ConnectionException { @@ -1317,14 +1308,11 @@ class H2StreamChannelImpl implements H2StreamChannel { private final AtomicInteger inputWindow; private final AtomicInteger outputWindow; - private volatile boolean idle; private volatile boolean localClosed; - private volatile long localResetTime; - H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) { + H2StreamChannelImpl(final int id, final int initialInputWindowSize, final int initialOutputWindowSize) { this.id = id; - this.idle = idle; this.inputWindow = new AtomicInteger(initialInputWindowSize); this.outputWindow = new AtomicInteger(initialOutputWindowSize); } @@ -1358,7 +1346,6 @@ public void submit(final List
headers, final boolean endStream) throws I throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing"); } ensureNotClosed(); - idle = false; commitHeaders(id, headers, endStream); if (endStream) { localClosed = true; @@ -1371,15 +1358,15 @@ public void submit(final List
headers, final boolean endStream) throws I @Override public void push(final List
headers, final AsyncPushProducer pushProducer) throws HttpException, IOException { acceptPushRequest(); - ioSession.getLock().lock(); try { ensureNotClosed(); - final int promisedStreamId = streams.generateStreamId(); - executePush(promisedStreamId, pushProducer); + final H2StreamChannel channel = createChannel(promisedStreamId); + final H2Stream stream = outgoingPushPromise(channel, pushProducer); + streams.addLocallyInitiated(stream); + commitPushPromise(id, promisedStreamId, headers); - idle = false; } finally { ioSession.getLock().unlock(); } @@ -1450,12 +1437,9 @@ public boolean localReset(final int code) throws IOException { localClosed = true; localResetTime = System.currentTimeMillis(); - if (!idle) { - final RawFrame resetStream = frameFactory.createResetStream(id, code); - commitFrameInternal(resetStream); - return true; - } - return false; + final RawFrame resetStream = frameFactory.createResetStream(id, code); + commitFrameInternal(resetStream); + return true; } finally { ioSession.getLock().unlock(); } @@ -1484,8 +1468,7 @@ public String toString() { .append(", inputWindow=").append(inputWindow) .append(", outputWindow=").append(outputWindow) .append(", localClosed=").append(localClosed) - .append(", idle=").append(idle); - buf.append("]"); + .append("]"); return buf.toString(); } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java index 9102dd1d8a..21c5fb59f1 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java @@ -30,12 +30,11 @@ import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.http.config.CharCodingConfig; -import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.AsyncPushProducer; import org.apache.hc.core5.http.nio.HandlerFactory; -import org.apache.hc.core5.http.nio.command.ExecutableCommand; -import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpCoreContext; import org.apache.hc.core5.http.protocol.HttpProcessor; import org.apache.hc.core5.http2.H2ConnectionException; @@ -114,7 +113,7 @@ void acceptHeaderFrame() throws H2ConnectionException { } @Override - void acceptPushFrame() throws H2ConnectionException { + void acceptPushFrame() { } @Override @@ -123,37 +122,38 @@ void acceptPushRequest() throws H2ConnectionException { } @Override - H2StreamHandler createLocallyInitiatedStream( - final ExecutableCommand command, + H2Stream outgoingRequest( final H2StreamChannel channel, - final HttpProcessor httpProcessor, - final BasicHttpConnectionMetrics connMetrics) throws IOException { - if (command instanceof RequestExecutionCommand) { - final RequestExecutionCommand executionCommand = (RequestExecutionCommand) command; - final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler(); - final HandlerFactory pushHandlerFactory = executionCommand.getPushHandlerFactory(); - final HttpCoreContext context = HttpCoreContext.castOrCreate(executionCommand.getContext()); - context.setSSLSession(getSSLSession()); - context.setEndpointDetails(getEndpointDetails()); - return new ClientH2StreamHandler(channel, httpProcessor, connMetrics, exchangeHandler, - pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory, - context); - } - throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Unexpected executable command"); + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) { + final HttpCoreContext coreContext = HttpCoreContext.castOrCreate(context); + coreContext.setSSLSession(getSSLSession()); + coreContext.setEndpointDetails(getEndpointDetails()); + return new H2Stream(channel, new ClientH2StreamHandler(channel, getHttpProcessor(), getConnMetrics(), exchangeHandler, + pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory, + coreContext)); } @Override - H2StreamHandler createRemotelyInitiatedStream( - final H2StreamChannel channel, - final HttpProcessor httpProcessor, - final BasicHttpConnectionMetrics connMetrics, - final HandlerFactory pushHandlerFactory) throws IOException { + H2Stream incomingRequest(final H2StreamChannel channel) throws IOException { + throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal incoming request"); + } + + @Override + H2Stream outgoingPushPromise(final H2StreamChannel channel, final AsyncPushProducer pushProducer) throws IOException { + throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal attempt to send push promise"); + } + + @Override + H2Stream incomingPushPromise(final H2StreamChannel channel, + final HandlerFactory pushHandlerFactory) { final HttpCoreContext context = HttpCoreContext.create(); context.setSSLSession(getSSLSession()); context.setEndpointDetails(getEndpointDetails()); - return new ClientPushH2StreamHandler(channel, httpProcessor, connMetrics, + return new H2Stream(channel, new ClientPushH2StreamHandler(channel, getHttpProcessor(), getConnMetrics(), pushHandlerFactory != null ? pushHandlerFactory : this.pushHandlerFactory, - context); + context)); } @Override diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index 90e69f68d1..d15e56e6ee 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -134,7 +134,7 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc } boolean isOutputReady() { - return handler.isOutputReady(); + return !channel.isLocalClosed() && handler.isOutputReady(); } void produceOutput() throws HttpException, IOException { diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamMultiplexer.java index 94669275f1..0a0ccef9bc 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamMultiplexer.java @@ -35,11 +35,12 @@ import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.RequestHeaderFieldsTooLargeException; import org.apache.hc.core5.http.config.CharCodingConfig; -import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.AsyncPushProducer; import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; import org.apache.hc.core5.http.nio.HandlerFactory; -import org.apache.hc.core5.http.nio.command.ExecutableCommand; +import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpCoreContext; import org.apache.hc.core5.http.protocol.HttpProcessor; import org.apache.hc.core5.http2.H2ConnectionException; @@ -103,11 +104,11 @@ H2Setting[] generateSettings(final H2Config localConfig) { } @Override - void acceptHeaderFrame() throws H2ConnectionException { + void acceptHeaderFrame() { } @Override - void acceptPushRequest() throws H2ConnectionException { + void acceptPushRequest() { } @Override @@ -116,24 +117,37 @@ void acceptPushFrame() throws H2ConnectionException { } @Override - H2StreamHandler createRemotelyInitiatedStream( - final H2StreamChannel channel, - final HttpProcessor httpProcessor, - final BasicHttpConnectionMetrics connMetrics, - final HandlerFactory pushHandlerFactory) throws IOException { + H2Stream incomingRequest(final H2StreamChannel channel) { final HttpCoreContext context = HttpCoreContext.create(); context.setSSLSession(getSSLSession()); context.setEndpointDetails(getEndpointDetails()); - return new ServerH2StreamHandler(channel, httpProcessor, connMetrics, exchangeHandlerFactory, context); + return new H2Stream(channel, new ServerH2StreamHandler( + channel, getHttpProcessor(), getConnMetrics(), exchangeHandlerFactory, context)); } @Override - H2StreamHandler createLocallyInitiatedStream( - final ExecutableCommand command, + H2Stream outgoingRequest( final H2StreamChannel channel, - final HttpProcessor httpProcessor, - final BasicHttpConnectionMetrics connMetrics) throws IOException { - throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Illegal attempt to execute a request"); + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) throws IOException { + throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Illegal attempt to send a request"); + } + + @Override + H2Stream incomingPushPromise(final H2StreamChannel channel, + final HandlerFactory pushHandlerFactory) throws IOException { + throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal incoming push promise"); + } + + @Override + H2Stream outgoingPushPromise(final H2StreamChannel channel, + final AsyncPushProducer pushProducer) throws IOException { + final HttpCoreContext context = HttpCoreContext.create(); + context.setSSLSession(getSSLSession()); + context.setEndpointDetails(getEndpointDetails()); + return new H2Stream(channel, new ServerPushH2StreamHandler( + channel, getHttpProcessor(), getConnMetrics(), pushProducer, context)); } @Override diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 7c7208b489..61afaf8553 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -27,7 +27,6 @@ package org.apache.hc.core5.http2.impl.nio; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -37,12 +36,13 @@ import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.config.CharCodingConfig; -import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics; import org.apache.hc.core5.http.impl.CharCodingSupport; import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.AsyncPushProducer; import org.apache.hc.core5.http.nio.HandlerFactory; -import org.apache.hc.core5.http.nio.command.ExecutableCommand; +import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpProcessor; import org.apache.hc.core5.http2.H2ConnectionException; import org.apache.hc.core5.http2.H2Error; @@ -141,20 +141,27 @@ void acceptPushFrame() throws H2ConnectionException { } @Override - H2StreamHandler createRemotelyInitiatedStream( - final H2StreamChannel channel, - final HttpProcessor httpProcessor, - final BasicHttpConnectionMetrics connMetrics, - final HandlerFactory pushHandlerFactory) throws IOException { - return streamHandlerSupplier.get(); + H2Stream incomingRequest(final H2StreamChannel channel) { + return new H2Stream(channel, streamHandlerSupplier.get()); } @Override - H2StreamHandler createLocallyInitiatedStream( - final ExecutableCommand command, - final H2StreamChannel channel, - final HttpProcessor httpProcessor, - final BasicHttpConnectionMetrics connMetrics) throws IOException { + H2Stream outgoingRequest(final H2StreamChannel channel, + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) { + return null; + } + + @Override + H2Stream incomingPushPromise(final H2StreamChannel channel, + final HandlerFactory pushHandlerFactory) { + return new H2Stream(channel, streamHandlerSupplier.get()); + } + + @Override + H2Stream outgoingPushPromise(final H2StreamChannel channel, + final AsyncPushProducer pushProducer) { return null; } @@ -684,7 +691,7 @@ void testStreamRemoteReset() throws Exception { h2StreamListener, () -> streamHandler); - final H2StreamChannel channel = streamMultiplexer.createChannel(1, false); + final H2StreamChannel channel = streamMultiplexer.createChannel(1); final H2Stream stream = new H2Stream(channel, streamHandler); streamMultiplexer.addStream(stream); @@ -731,7 +738,7 @@ void testStreamRemoteResetNoErrorRemoteAlreadyClosed() throws Exception { h2StreamListener, () -> streamHandler); - final H2StreamChannel channel = streamMultiplexer.createChannel(1, false); + final H2StreamChannel channel = streamMultiplexer.createChannel(1); final H2Stream stream = new H2Stream(channel, streamHandler); streamMultiplexer.addStream(stream); From 0f3519c538bd639a4f52720974d99931f4db6fc4 Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Wed, 10 Sep 2025 12:06:48 +0200 Subject: [PATCH 2/3] Disabled ReactiveClient test case failing intermittently with Github Actions --- .../apache/hc/core5/testing/reactive/ReactiveClientTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java index be8e058d49..00ccd75f0c 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java @@ -74,6 +74,7 @@ import org.apache.hc.core5.util.Timeout; import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.reactivestreams.Publisher; @@ -214,7 +215,7 @@ void testRequestError() throws Exception { Assertions.assertSame(exceptionThrown, exception.getCause().getCause()); } - @Test + @Test @Disabled void testRequestTimeout() throws Exception { final InetSocketAddress address = startServer(); final HttpAsyncRequester requester = clientResource.start(); From 452a92007c4249434d544cfbe1daf7b43398971a Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Wed, 10 Sep 2025 16:03:22 +0200 Subject: [PATCH 3/3] H2: enforce inbound concurrency at activation Reserve on PUSH_PROMISE; count on first HEADERS; refuse with RST_STREAM. Atomic gate in H2Streams; safe CONTINUATION drain; idempotent localReset. --- .../impl/nio/AbstractH2StreamMultiplexer.java | 31 +++- .../hc/core5/http2/impl/nio/H2Streams.java | 31 ++++ .../nio/TestAbstractH2StreamMultiplexer.java | 156 ++++++++++++++++++ 3 files changed, 212 insertions(+), 6 deletions(-) diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 9c51bdb300..bb523b7bad 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -93,7 +93,7 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection { - private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; // 10 MiB + private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN } enum SettingsHandshake { READY, TRANSMITTED, ACKED } @@ -735,6 +735,20 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio } else if (stream.isLocalClosed() && stream.isRemoteClosed()) { throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed"); } + if (!streams.isSameSide(streamId)) { + if (!streams.tryActivateRemote(streamId, localConfig.getMaxConcurrentStreams())) { + try { + stream.localReset(new H2StreamResetException(H2Error.REFUSED_STREAM, "Exceeded inbound concurrency limit")); + } finally { + if (!frame.isFlagSet(FrameFlag.END_HEADERS)) { + continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM), localConfig.getMaxContinuations()); + continuation.drainOnly = true; + } + requestSessionOutput(); + } + return; + } + } try { consumeHeaderFrame(frame, stream); if (stream.isOutputReady()) { @@ -871,7 +885,6 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio consumeSettingsFrame(payload); remoteSettingState = SettingsHandshake.TRANSMITTED; } - // Send ACK final RawFrame response = frameFactory.createSettingsAck(); commitFrame(response); remoteSettingState = SettingsHandshake.ACKED; @@ -879,7 +892,6 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio } break; case PRIORITY: - // Stream priority not supported break; case PUSH_PROMISE: { acceptPushFrame(); @@ -922,6 +934,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio channel.localReset(H2Error.REFUSED_STREAM); } streams.addRemotelyInitiated(promisedStream); + streams.markReservedRemote(promisedStreamId); try { consumePushPromiseFrame(frame, payload, promisedStream); } catch (final H2StreamResetException ex) { @@ -1029,7 +1042,6 @@ private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) thr } final ByteBuffer payload = frame.getPayloadContent(); if (frame.isFlagSet(FrameFlag.PRIORITY)) { - // Priority not supported payload.getInt(); payload.get(); } @@ -1052,6 +1064,10 @@ private void consumeContinuationFrame(final RawFrame frame, final H2Stream strea final ByteBuffer payload = frame.getPayload(); continuation.copyPayload(payload); if (frame.isFlagSet(FrameFlag.END_HEADERS)) { + if (continuation.drainOnly) { + continuation = null; + return; + } final List
headers = decodeHeaders(continuation.getContent()); if (streamListener != null) { streamListener.onHeaderInput(this, streamId, headers); @@ -1263,6 +1279,7 @@ private static class Continuation { final boolean enforceMacContinuations; private int count; + boolean drainOnly; private Continuation(final int streamId, final int type, final boolean endStream, final int maxContinuation) { this.streamId = streamId; @@ -1426,6 +1443,7 @@ public void markLocalClosed() { localClosed = true; } + // FIX: make localReset idempotent and allow reset even if already locally closed @Override public boolean localReset(final int code) throws IOException { ioSession.getLock().lock(); @@ -1433,18 +1451,19 @@ public boolean localReset(final int code) throws IOException { if (isLocalReset()) { return false; } - ensureNotClosed(); + final boolean alreadyClosed = localClosed; localClosed = true; localResetTime = System.currentTimeMillis(); final RawFrame resetStream = frameFactory.createResetStream(id, code); commitFrameInternal(resetStream); - return true; + return !alreadyClosed; } finally { ioSession.getLock().unlock(); } } + @Override public long getLocalResetTime() { return localResetTime; diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Streams.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Streams.java index 53b3de5d72..41ec527859 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Streams.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Streams.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -47,6 +48,9 @@ class H2Streams { private final Queue streams; private final AtomicInteger lastLocalId; private final AtomicInteger lastRemoteId; + private final AtomicInteger inboundActive = new AtomicInteger(0); + private final Set inboundActiveSet = ConcurrentHashMap.newKeySet(); + private final Set reservedRemoteSet = ConcurrentHashMap.newKeySet(); public H2Streams(final StreamIdGenerator idGenerator) { this.idGenerator = Args.notNull(idGenerator, "Stream id generator"); @@ -100,6 +104,10 @@ public void addRemotelyInitiated(final H2Stream stream) throws H2ConnectionExcep public void release(final H2Stream stream) { streamMap.remove(stream.getId()); + if (inboundActiveSet.remove(stream.getId())) { + inboundActive.decrementAndGet(); + } + reservedRemoteSet.remove(stream.getId()); stream.releaseResources(); } @@ -114,6 +122,9 @@ public void shutdownAndReleaseAll() { } streams.clear(); streamMap.clear(); + inboundActiveSet.clear(); + reservedRemoteSet.clear(); + inboundActive.set(0); } public H2Stream lookup(final int streamId) { @@ -166,5 +177,25 @@ public int generateStreamId() { } } + public void markReservedRemote(final int streamId) { + reservedRemoteSet.add(streamId); + } + + public boolean tryActivateRemote(final int streamId, final int maxInboundConcurrent) { + if (inboundActiveSet.contains(streamId)) { + return true; + } + for (;;) { + final int cur = inboundActive.get(); + if (cur >= maxInboundConcurrent) { + return false; + } + if (inboundActive.compareAndSet(cur, cur + 1)) { + inboundActiveSet.add(streamId); + reservedRemoteSet.remove(streamId); + return true; + } + } + } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 61afaf8553..c856638e8c 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -774,5 +774,161 @@ void testStreamRemoteResetNoErrorRemoteAlreadyClosed() throws Exception { Mockito.verify(streamHandler, Mockito.never()).failed(ArgumentMatchers.any()); } + // add to TestAbstractH2StreamMultiplexer + + @Test + void testInboundConcurrencyLimitRefusesSecondHeaders() throws Exception { + final H2Config h2Config = H2Config.custom() + .setMaxConcurrentStreams(1) + .setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE) + .build(); + + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final ByteArrayBuffer buf1 = new ByteArrayBuffer(128); + final HPackEncoder enc1 = new HPackEncoder(H2Config.INIT.getHeaderTableSize(), CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + enc1.encodeHeaders(buf1, Arrays.asList(new BasicHeader(":status", "200"), new BasicHeader("a", "b")), h2Config.isCompressionEnabled()); + + final ByteArrayBuffer buf2 = new ByteArrayBuffer(128); + final HPackEncoder enc2 = new HPackEncoder(H2Config.INIT.getHeaderTableSize(), CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + enc2.encodeHeaders(buf2, Arrays.asList(new BasicHeader(":status", "200"), new BasicHeader("c", "d")), h2Config.isCompressionEnabled()); + + final WritableByteChannelMock ch = new WritableByteChannelMock(2048); + final FrameOutputBuffer out = new FrameOutputBuffer(16 * 1024); + + final RawFrame h1 = FRAME_FACTORY.createHeaders(2, ByteBuffer.wrap(buf1.array(), 0, buf1.length()), true, false); + out.write(h1, ch); + final RawFrame h2 = FRAME_FACTORY.createHeaders(4, ByteBuffer.wrap(buf2.array(), 0, buf2.length()), true, false); + out.write(h2, ch); + + streamMultiplexer.onInput(ByteBuffer.wrap(ch.toByteArray())); + + Mockito.verify(streamHandler, Mockito.atLeastOnce()).failed(exceptionCaptor.capture()); + final boolean refused = exceptionCaptor.getAllValues().stream() + .filter(H2StreamResetException.class::isInstance) + .map(H2StreamResetException.class::cast) + .anyMatch(ex -> H2Error.getByCode(ex.getCode()) == H2Error.REFUSED_STREAM); + Assertions.assertTrue(refused); + } + + @Test + void testPushPromiseReservationThenRefusedOnActivation() throws Exception { + final H2Config h2Config = H2Config.custom() + .setMaxConcurrentStreams(1) + .setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE) + .build(); + + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final WritableByteChannelMock ch = new WritableByteChannelMock(4096); + final FrameOutputBuffer out = new FrameOutputBuffer(16 * 1024); + + final ByteArrayBuffer respBuf = new ByteArrayBuffer(128); + final HPackEncoder respEnc = new HPackEncoder(H2Config.INIT.getHeaderTableSize(), CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + respEnc.encodeHeaders(respBuf, Arrays.asList(new BasicHeader(":status", "200"), new BasicHeader("a", "b")), h2Config.isCompressionEnabled()); + out.write(FRAME_FACTORY.createHeaders(2, ByteBuffer.wrap(respBuf.array(), 0, respBuf.length()), true, false), ch); + + final ByteArrayBuffer promised = new ByteArrayBuffer(256); + promised.append((byte) (4 >> 24)); + promised.append((byte) (4 >> 16)); + promised.append((byte) (4 >> 8)); + promised.append((byte) 4); + final HPackEncoder reqEnc = new HPackEncoder(H2Config.INIT.getHeaderTableSize(), CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + reqEnc.encodeHeaders(promised, Arrays.asList( + new BasicHeader(":method", "GET"), + new BasicHeader(":scheme", "http"), + new BasicHeader(":path", "/pushed"), + new BasicHeader(":authority", "www.example.com"), + new BasicHeader("x", "y")), h2Config.isCompressionEnabled()); + out.write(FRAME_FACTORY.createPushPromise(2, ByteBuffer.wrap(promised.array(), 0, promised.length()), true), ch); + + streamMultiplexer.onInput(ByteBuffer.wrap(ch.toByteArray())); + + ch.reset(); + final ByteArrayBuffer pushedResp = new ByteArrayBuffer(128); + final HPackEncoder pushedEnc = new HPackEncoder(H2Config.INIT.getHeaderTableSize(), CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + pushedEnc.encodeHeaders(pushedResp, Arrays.asList(new BasicHeader(":status", "200"), new BasicHeader("k", "v")), h2Config.isCompressionEnabled()); + out.write(FRAME_FACTORY.createHeaders(4, ByteBuffer.wrap(pushedResp.array(), 0, pushedResp.length()), true, false), ch); + + streamMultiplexer.onInput(ByteBuffer.wrap(ch.toByteArray())); + + Mockito.verify(streamHandler, Mockito.atLeastOnce()).failed(exceptionCaptor.capture()); + final boolean refused = exceptionCaptor.getAllValues().stream() + .filter(H2StreamResetException.class::isInstance) + .map(H2StreamResetException.class::cast) + .anyMatch(ex -> H2Error.getByCode(ex.getCode()) == H2Error.REFUSED_STREAM); + Assertions.assertTrue(refused); + } + + @Test + void testRefusedHeadersWithContinuationIsDrained() throws Exception { + final H2Config h2Config = H2Config.custom() + .setMaxConcurrentStreams(1) + .setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE) + .build(); + + final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final WritableByteChannelMock ch = new WritableByteChannelMock(4096); + final FrameOutputBuffer out = new FrameOutputBuffer(16 * 1024); + + final ByteArrayBuffer first = new ByteArrayBuffer(128); + final HPackEncoder enc1 = new HPackEncoder(H2Config.INIT.getHeaderTableSize(), CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + enc1.encodeHeaders(first, Arrays.asList(new BasicHeader(":status", "200"), new BasicHeader("h1", "v1")), h2Config.isCompressionEnabled()); + out.write(FRAME_FACTORY.createHeaders(2, ByteBuffer.wrap(first.array(), 0, first.length()), true, false), ch); + streamMultiplexer.onInput(ByteBuffer.wrap(ch.toByteArray())); + + final ByteArrayBuffer second = new ByteArrayBuffer(1024); + final HPackEncoder enc2 = new HPackEncoder(H2Config.INIT.getHeaderTableSize(), CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + final List
many = new ArrayList<>(); + many.add(new BasicHeader(":status", "200")); + for (int i = 0; i < 50; i++) { + many.add(new BasicHeader("x-" + i, "y-" + i)); + } + enc2.encodeHeaders(second, many, h2Config.isCompressionEnabled()); + final int len = second.length(); + final int cut = Math.max(1, Math.min(len - 1, len / 2)); + + ch.reset(); + out.write(FRAME_FACTORY.createHeaders(4, ByteBuffer.wrap(second.array(), 0, cut), false, false), ch); + streamMultiplexer.onInput(ByteBuffer.wrap(ch.toByteArray())); + + ch.reset(); + out.write(FRAME_FACTORY.createContinuation(4, ByteBuffer.wrap(second.array(), cut, len - cut), true), ch); + streamMultiplexer.onInput(ByteBuffer.wrap(ch.toByteArray())); + + Mockito.verify(streamHandler, Mockito.atLeastOnce()).failed(exceptionCaptor.capture()); + final boolean refused = exceptionCaptor.getAllValues().stream() + .filter(H2StreamResetException.class::isInstance) + .map(H2StreamResetException.class::cast) + .anyMatch(ex -> H2Error.getByCode(ex.getCode()) == H2Error.REFUSED_STREAM); + Assertions.assertTrue(refused); + } + + }