Skip to content

Commit 31bbf5b

Browse files
committed
HTTP/2: add per-stream idle and lifetime timeouts to core multiplexer
Expose configuration via H2Config, throw H2StreamTimeoutException on expiry and keep the connection alive Extend test coverage and add an example client demonstrating timed-out and successful streams
1 parent 85c806a commit 31bbf5b

File tree

5 files changed

+611
-2
lines changed

5 files changed

+611
-2
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.core5.http2;
28+
29+
import java.net.SocketTimeoutException;
30+
31+
import org.apache.hc.core5.util.Timeout;
32+
33+
/**
34+
* {@link java.net.SocketTimeoutException} raised by the HTTP/2 stream
35+
* multiplexer when a per-stream timeout elapses.
36+
* <p>
37+
* This exception is used for timeouts that are scoped to a single HTTP/2
38+
* stream rather than the underlying TCP connection, for example:
39+
* </p>
40+
* <ul>
41+
* <li>an idle timeout where no activity has been observed on the stream, or</li>
42+
* <li>a lifetime timeout where the total age of the stream exceeds
43+
* the configured limit.</li>
44+
* </ul>
45+
* <p>
46+
* The {@link #isIdleTimeout()} flag can be used to distinguish whether
47+
* the timeout was triggered by idleness or by the overall stream lifetime.
48+
* The affected stream id and the timeout value are exposed via
49+
* {@link #getStreamId()} and {@link #getTimeout()} respectively.
50+
* </p>
51+
*
52+
* @since 5.4
53+
*/
54+
public class H2StreamTimeoutException extends SocketTimeoutException {
55+
56+
private static final long serialVersionUID = 1L;
57+
58+
private final int streamId;
59+
private final Timeout timeout;
60+
private final boolean idleTimeout;
61+
62+
public H2StreamTimeoutException(final String message, final int streamId, final Timeout timeout, final boolean idleTimeout) {
63+
super(message);
64+
this.streamId = streamId;
65+
this.timeout = timeout;
66+
this.idleTimeout = idleTimeout;
67+
}
68+
69+
public int getStreamId() {
70+
return streamId;
71+
}
72+
73+
public Timeout getTimeout() {
74+
return timeout;
75+
}
76+
77+
/**
78+
* Indicates whether this timeout was triggered by idle time (no activity)
79+
* rather than by stream lifetime.
80+
*
81+
* @return {@code true} if this is an idle timeout, {@code false} if it is a lifetime timeout.
82+
*/
83+
public boolean isIdleTimeout() {
84+
return idleTimeout;
85+
}
86+
87+
}

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.hc.core5.http2.H2ConnectionException;
7676
import org.apache.hc.core5.http2.H2Error;
7777
import org.apache.hc.core5.http2.H2StreamResetException;
78+
import org.apache.hc.core5.http2.H2StreamTimeoutException;
7879
import org.apache.hc.core5.http2.config.H2Config;
7980
import org.apache.hc.core5.http2.config.H2Param;
8081
import org.apache.hc.core5.http2.config.H2Setting;
@@ -439,6 +440,10 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
439440
for (;;) {
440441
final RawFrame frame = inputBuffer.read(src, ioSession);
441442
if (frame != null) {
443+
if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
444+
checkStreamTimeouts(System.nanoTime());
445+
}
446+
442447
if (streamListener != null) {
443448
streamListener.onFrameInput(this, frame.getStreamId(), frame);
444449
}
@@ -455,6 +460,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
455460
break;
456461
}
457462
}
463+
if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
464+
checkStreamTimeouts(System.nanoTime());
465+
}
458466
}
459467
}
460468

@@ -532,6 +540,11 @@ public final void onOutput() throws HttpException, IOException {
532540
}
533541
}
534542
}
543+
544+
if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
545+
checkStreamTimeouts(System.nanoTime());
546+
}
547+
535548
if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
536549
int liveStreams = 0;
537550
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
@@ -643,6 +656,7 @@ private void executeRequest(final RequestExecutionCommand requestExecutionComman
643656
requestExecutionCommand.getExchangeHandler(),
644657
requestExecutionCommand.getPushHandlerFactory(),
645658
requestExecutionCommand.getContext()));
659+
initializeStreamTimeouts(stream);
646660

647661
if (streamListener != null) {
648662
final int initInputWindow = stream.getInputWindow().get();
@@ -765,10 +779,12 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
765779
final H2StreamChannel channel = createChannel(streamId);
766780
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
767781
stream = streams.createActive(channel, incomingRequest(channel));
782+
initializeStreamTimeouts(stream);
768783
streams.resetIfExceedsMaxConcurrentLimit(stream, localConfig.getMaxConcurrentStreams());
769784
} else {
770785
channel.localReset(H2Error.REFUSED_STREAM);
771786
stream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE);
787+
initializeStreamTimeouts(stream);
772788
}
773789
} else if (stream.isLocalClosed() && stream.isRemoteClosed()) {
774790
throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
@@ -959,6 +975,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
959975
channel.localReset(H2Error.REFUSED_STREAM);
960976
promisedStream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE);
961977
}
978+
initializeStreamTimeouts(promisedStream);
962979
try {
963980
consumePushPromiseFrame(frame, payload, promisedStream);
964981
} catch (final H2StreamResetException ex) {
@@ -1364,8 +1381,18 @@ H2StreamChannel createChannel(final int streamId) {
13641381
return new H2StreamChannelImpl(streamId, initInputWinSize, initOutputWinSize);
13651382
}
13661383

1367-
H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) throws H2ConnectionException {
1368-
return streams.createActive(channel, streamHandler);
1384+
private void initializeStreamTimeouts(final H2Stream stream) {
1385+
final Timeout socketTimeout = ioSession.getSocketTimeout();
1386+
if (socketTimeout != null && socketTimeout.isEnabled()) {
1387+
stream.setIdleTimeout(socketTimeout);
1388+
stream.setLifetimeTimeout(socketTimeout);
1389+
}
1390+
}
1391+
1392+
H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler streamHandler) {
1393+
final H2Stream stream = streams.createActive(channel, streamHandler);
1394+
initializeStreamTimeouts(stream);
1395+
return stream;
13691396
}
13701397

13711398
private void recordPriorityFromHeaders(final int streamId, final List<? extends Header> headers) {
@@ -1468,6 +1495,7 @@ public void push(final List<Header> headers, final AsyncPushProducer pushProduce
14681495
final int promisedStreamId = streams.generateStreamId();
14691496
final H2StreamChannel channel = createChannel(promisedStreamId);
14701497
final H2Stream stream = streams.createReserved(channel, outgoingPushPromise(channel, pushProducer));
1498+
initializeStreamTimeouts(stream);
14711499

14721500
commitPushPromise(id, promisedStreamId, headers);
14731501
stream.markRemoteClosed();
@@ -1584,4 +1612,51 @@ public String toString() {
15841612

15851613
}
15861614

1615+
private void checkStreamTimeouts(final long nowNanos) throws IOException {
1616+
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
1617+
final H2Stream stream = it.next();
1618+
if (!stream.isActive()) {
1619+
continue;
1620+
}
1621+
1622+
final Timeout idleTimeout = stream.getIdleTimeout();
1623+
final Timeout lifetimeTimeout = stream.getLifetimeTimeout();
1624+
if ((idleTimeout == null || !idleTimeout.isEnabled())
1625+
&& (lifetimeTimeout == null || !lifetimeTimeout.isEnabled())) {
1626+
continue;
1627+
}
1628+
1629+
final long created = stream.getCreatedNanos();
1630+
final long last = stream.getLastActivityNanos();
1631+
1632+
if (idleTimeout != null && idleTimeout.isEnabled()) {
1633+
final long idleNanos = idleTimeout.toNanoseconds();
1634+
if (idleNanos > 0 && nowNanos - last > idleNanos) {
1635+
final int streamId = stream.getId();
1636+
final H2StreamTimeoutException ex = new H2StreamTimeoutException(
1637+
"HTTP/2 stream idle timeout (" + idleTimeout + ")",
1638+
streamId,
1639+
idleTimeout,
1640+
true);
1641+
stream.localReset(ex, H2Error.CANCEL);
1642+
// Once reset due to idle timeout, we do not care about lifetime anymore
1643+
continue;
1644+
}
1645+
}
1646+
1647+
if (lifetimeTimeout != null && lifetimeTimeout.isEnabled()) {
1648+
final long lifeNanos = lifetimeTimeout.toNanoseconds();
1649+
if (lifeNanos > 0 && nowNanos - created > lifeNanos) {
1650+
final int streamId = stream.getId();
1651+
final H2StreamTimeoutException ex = new H2StreamTimeoutException(
1652+
"HTTP/2 stream lifetime timeout (" + lifetimeTimeout + ")",
1653+
streamId,
1654+
lifetimeTimeout,
1655+
false);
1656+
stream.localReset(ex, H2Error.CANCEL);
1657+
}
1658+
}
1659+
}
1660+
}
1661+
15871662
}

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.hc.core5.http.nio.HandlerFactory;
4444
import org.apache.hc.core5.http2.H2Error;
4545
import org.apache.hc.core5.http2.H2StreamResetException;
46+
import org.apache.hc.core5.util.Timeout;
4647

4748
class H2Stream {
4849

@@ -60,6 +61,12 @@ enum State { RESERVED, OPEN, CLOSED }
6061
private volatile boolean reserved;
6162
private volatile boolean remoteClosed;
6263

64+
private volatile long createdNanos;
65+
private volatile long lastActivityNanos;
66+
67+
private volatile Timeout idleTimeout;
68+
private volatile Timeout lifetimeTimeout;
69+
6370
H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer<State> stateChangeCallback) {
6471
this.channel = channel;
6572
this.handler = handler;
@@ -68,6 +75,8 @@ enum State { RESERVED, OPEN, CLOSED }
6875
this.transitionRef = new AtomicReference<>(State.RESERVED);
6976
this.released = new AtomicBoolean();
7077
this.cancelled = new AtomicBoolean();
78+
this.createdNanos = 0L;
79+
this.lastActivityNanos = 0L;
7180
}
7281

7382
int getId() {
@@ -92,6 +101,7 @@ private void triggerClosed() {
92101

93102
void activate() {
94103
reserved = false;
104+
markCreatedAndActive();
95105
triggerOpen();
96106
}
97107

@@ -134,6 +144,8 @@ boolean isLocalClosed() {
134144

135145
void consumePromise(final List<Header> headers) throws HttpException, IOException {
136146
try {
147+
touch();
148+
137149
if (channel.isLocalReset()) {
138150
return;
139151
}
@@ -153,6 +165,8 @@ void consumeHeader(final List<Header> headers, final boolean endOfStream) throws
153165
if (endOfStream) {
154166
remoteClosed = true;
155167
}
168+
touch();
169+
156170
if (channel.isLocalReset()) {
157171
return;
158172
}
@@ -171,6 +185,8 @@ void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpExc
171185
if (endOfStream) {
172186
remoteClosed = true;
173187
}
188+
touch();
189+
174190
if (channel.isLocalReset()) {
175191
return;
176192
}
@@ -192,13 +208,16 @@ boolean isOutputReady() {
192208

193209
void produceOutput() throws HttpException, IOException {
194210
try {
211+
touch();
212+
195213
handler.produceOutput();
196214
} catch (final ProtocolException ex) {
197215
localReset(ex, H2Error.PROTOCOL_ERROR);
198216
}
199217
}
200218

201219
void produceInputCapacityUpdate() throws IOException {
220+
touch();
202221
handler.updateInputCapacity();
203222
}
204223

@@ -289,4 +308,37 @@ public String toString() {
289308
return buf.toString();
290309
}
291310

311+
private void markCreatedAndActive() {
312+
final long now = System.nanoTime();
313+
this.createdNanos = now;
314+
this.lastActivityNanos = now;
315+
}
316+
317+
private void touch() {
318+
this.lastActivityNanos = System.nanoTime();
319+
}
320+
321+
long getCreatedNanos() {
322+
return createdNanos;
323+
}
324+
325+
long getLastActivityNanos() {
326+
return lastActivityNanos;
327+
}
328+
329+
Timeout getIdleTimeout() {
330+
return idleTimeout;
331+
}
332+
333+
void setIdleTimeout(final Timeout idleTimeout) {
334+
this.idleTimeout = idleTimeout;
335+
}
336+
337+
Timeout getLifetimeTimeout() {
338+
return lifetimeTimeout;
339+
}
340+
341+
void setLifetimeTimeout(final Timeout lifetimeTimeout) {
342+
this.lifetimeTimeout = lifetimeTimeout;
343+
}
292344
}

0 commit comments

Comments
 (0)