Skip to content

Commit bf7d524

Browse files
committed
RFC 9113 conformance: H2 stream can be cancelled (reset) locally from multiple threads
1 parent 1558d2a commit bf7d524

2 files changed

Lines changed: 35 additions & 10 deletions

File tree

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,9 @@ public void submit(final List<Header> headers, final boolean endStream) throws I
13671367
if (headers == null || headers.isEmpty()) {
13681368
throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
13691369
}
1370+
if (isLocalReset()) {
1371+
return;
1372+
}
13701373
ensureNotClosed();
13711374
commitHeaders(id, headers, endStream);
13721375
if (endStream) {
@@ -1382,6 +1385,9 @@ public void push(final List<Header> headers, final AsyncPushProducer pushProduce
13821385
acceptPushRequest();
13831386
ioSession.getLock().lock();
13841387
try {
1388+
if (isLocalReset()) {
1389+
return;
1390+
}
13851391
ensureNotClosed();
13861392
final int promisedStreamId = streams.generateStreamId();
13871393
final H2StreamChannel channel = createChannel(promisedStreamId);
@@ -1405,6 +1411,9 @@ public void update(final int increment) throws IOException {
14051411
public int write(final ByteBuffer payload) throws IOException {
14061412
ioSession.getLock().lock();
14071413
try {
1414+
if (isLocalReset()) {
1415+
return 0;
1416+
}
14081417
ensureNotClosed();
14091418
return streamData(id, outputWindow, payload);
14101419
} finally {
@@ -1416,6 +1425,9 @@ public int write(final ByteBuffer payload) throws IOException {
14161425
public void endStream(final List<? extends Header> trailers) throws IOException {
14171426
ioSession.getLock().lock();
14181427
try {
1428+
if (isLocalReset()) {
1429+
return;
1430+
}
14191431
ensureNotClosed();
14201432
localClosed = true;
14211433
if (trailers != null && !trailers.isEmpty()) {

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.hc.core5.http.Header;
4040
import org.apache.hc.core5.http.HttpException;
4141
import org.apache.hc.core5.http.ProtocolException;
42-
import org.apache.hc.core5.http.StreamClosedException;
4342
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
4443
import org.apache.hc.core5.http.nio.HandlerFactory;
4544
import org.apache.hc.core5.http2.H2Error;
@@ -56,6 +55,7 @@ enum State { RESERVED, OPEN, CLOSED }
5655
private final Consumer<State> stateChangeCallback;
5756
private final AtomicReference<State> transitionRef;
5857
private final AtomicBoolean released;
58+
private final AtomicBoolean cancelled;
5959

6060
private volatile boolean reserved;
6161
private volatile boolean remoteClosed;
@@ -67,6 +67,7 @@ enum State { RESERVED, OPEN, CLOSED }
6767
this.reserved = true;
6868
this.transitionRef = new AtomicReference<>(State.RESERVED);
6969
this.released = new AtomicBoolean();
70+
this.cancelled = new AtomicBoolean();
7071
}
7172

7273
int getId() {
@@ -136,6 +137,10 @@ void consumePromise(final List<Header> headers) throws HttpException, IOExceptio
136137
if (channel.isLocalReset()) {
137138
return;
138139
}
140+
if (cancelled.get()) {
141+
localResetCancelled();
142+
return;
143+
}
139144
handler.consumePromise(headers);
140145
channel.markLocalClosed();
141146
} catch (final ProtocolException ex) {
@@ -151,6 +156,10 @@ void consumeHeader(final List<Header> headers, final boolean endOfStream) throws
151156
if (channel.isLocalReset()) {
152157
return;
153158
}
159+
if (cancelled.get()) {
160+
localResetCancelled();
161+
return;
162+
}
154163
handler.consumeHeader(headers, remoteClosed);
155164
} catch (final ProtocolException ex) {
156165
localReset(ex, H2Error.PROTOCOL_ERROR);
@@ -165,6 +174,10 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc
165174
if (channel.isLocalReset()) {
166175
return;
167176
}
177+
if (cancelled.get()) {
178+
localResetCancelled();
179+
return;
180+
}
168181
handler.consumeData(src, remoteClosed);
169182
} catch (final CharacterCodingException ex) {
170183
localReset(ex, H2Error.INTERNAL_ERROR);
@@ -222,6 +235,10 @@ void localReset(final H2StreamResetException ex) throws IOException {
222235
localReset(ex, ex.getCode());
223236
}
224237

238+
void localResetCancelled() throws IOException {
239+
localReset(new H2StreamResetException(H2Error.CANCEL, "Cancelled"));
240+
}
241+
225242
void handle(final HttpException ex) throws IOException, HttpException {
226243
handler.handle(ex, remoteClosed);
227244
}
@@ -231,16 +248,12 @@ HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
231248
}
232249

233250
boolean abort() {
234-
final boolean cancelled = channel.cancel();
235-
if (released.compareAndSet(false, true)) {
236-
try {
237-
handler.failed(new StreamClosedException());
238-
handler.releaseResources();
239-
} finally {
240-
triggerClosed();
241-
}
251+
if (cancelled.compareAndSet(false, true)) {
252+
channel.requestOutput();
253+
return true;
254+
} else {
255+
return false;
242256
}
243-
return cancelled;
244257
}
245258

246259
boolean abortGracefully() throws IOException {

0 commit comments

Comments
 (0)