Skip to content

Commit 4e0e2be

Browse files
committed
Introduce experimental RouteSegmentedConnPool (OFFLOCK): lock-free, route-segmented, disposal off critical path; preserves STRICT/LAX semantics.
1 parent c16c887 commit 4e0e2be

10 files changed

Lines changed: 1262 additions & 1 deletion

File tree

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.hc.core5.pool.ManagedConnPool;
6464
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
6565
import org.apache.hc.core5.pool.PoolReusePolicy;
66+
import org.apache.hc.core5.pool.RouteSegmentedConnPool;
6667
import org.apache.hc.core5.pool.StrictConnPool;
6768
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
6869
import org.apache.hc.core5.reactor.IOReactorConfig;
@@ -370,6 +371,14 @@ public H2AsyncRequester create() {
370371
new DefaultDisposalCallback<>(),
371372
connPoolListener);
372373
break;
374+
case OFFLOCK:
375+
connPool = new RouteSegmentedConnPool<>(
376+
defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 20,
377+
maxTotal > 0 ? maxTotal : 50,
378+
timeToLive,
379+
poolReusePolicy,
380+
new DefaultDisposalCallback<>());
381+
break;
373382
case STRICT:
374383
default:
375384
connPool = new StrictConnPool<>(

httpcore5-testing/pom.xml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@
110110
<artifactId>junit-jupiter</artifactId>
111111
<scope>test</scope>
112112
</dependency>
113+
<dependency>
114+
<groupId>org.openjdk.jmh</groupId>
115+
<artifactId>jmh-core</artifactId>
116+
<scope>test</scope>
117+
</dependency>
118+
<dependency>
119+
<groupId>org.openjdk.jmh</groupId>
120+
<artifactId>jmh-generator-annprocess</artifactId>
121+
<scope>test</scope>
122+
</dependency>
113123
</dependencies>
114124

115125
<profiles>
@@ -135,6 +145,45 @@
135145
</plugins>
136146
</build>
137147
</profile>
148+
149+
<profile>
150+
<id>benchmark</id>
151+
<properties>
152+
<skipTests>true</skipTests>
153+
<benchmark>org.apache</benchmark>
154+
</properties>
155+
<build>
156+
<plugins>
157+
<plugin>
158+
<groupId>org.codehaus.mojo</groupId>
159+
<artifactId>exec-maven-plugin</artifactId>
160+
<executions>
161+
<execution>
162+
<id>benchmark</id>
163+
<phase>test</phase>
164+
<goals>
165+
<goal>exec</goal>
166+
</goals>
167+
<configuration>
168+
<classpathScope>test</classpathScope>
169+
<executable>java</executable>
170+
<arguments>
171+
<argument>-classpath</argument>
172+
<classpath/>
173+
<argument>org.openjdk.jmh.Main</argument>
174+
<argument>-rf</argument>
175+
<argument>json</argument>
176+
<argument>-rff</argument>
177+
<argument>target/jmh-result.${benchmark}.json</argument>
178+
<argument>${benchmark}</argument>
179+
</arguments>
180+
</configuration>
181+
</execution>
182+
</executions>
183+
</plugin>
184+
</plugins>
185+
</build>
186+
</profile>
138187
</profiles>
139188

140189
<reporting>
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.core5.benchmark;
28+
29+
30+
import java.io.IOException;
31+
import java.util.Locale;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.ThreadLocalRandom;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import org.apache.hc.core5.io.CloseMode;
37+
import org.apache.hc.core5.io.ModalCloseable;
38+
import org.apache.hc.core5.pool.DisposalCallback;
39+
import org.apache.hc.core5.pool.LaxConnPool;
40+
import org.apache.hc.core5.pool.ManagedConnPool;
41+
import org.apache.hc.core5.pool.PoolEntry;
42+
import org.apache.hc.core5.pool.PoolReusePolicy;
43+
import org.apache.hc.core5.pool.PoolStats;
44+
import org.apache.hc.core5.pool.RouteSegmentedConnPool;
45+
import org.apache.hc.core5.pool.StrictConnPool;
46+
import org.apache.hc.core5.util.TimeValue;
47+
import org.apache.hc.core5.util.Timeout;
48+
import org.openjdk.jmh.annotations.Benchmark;
49+
import org.openjdk.jmh.annotations.BenchmarkMode;
50+
import org.openjdk.jmh.annotations.Fork;
51+
import org.openjdk.jmh.annotations.Level;
52+
import org.openjdk.jmh.annotations.Measurement;
53+
import org.openjdk.jmh.annotations.Mode;
54+
import org.openjdk.jmh.annotations.OperationsPerInvocation;
55+
import org.openjdk.jmh.annotations.OutputTimeUnit;
56+
import org.openjdk.jmh.annotations.Param;
57+
import org.openjdk.jmh.annotations.Scope;
58+
import org.openjdk.jmh.annotations.Setup;
59+
import org.openjdk.jmh.annotations.State;
60+
import org.openjdk.jmh.annotations.TearDown;
61+
import org.openjdk.jmh.annotations.Threads;
62+
import org.openjdk.jmh.annotations.Warmup;
63+
64+
/**
65+
* Compare StrictConnPool, LaxConnPool, and RouteSegmentedConnPool (“OFFLOCK”)
66+
* under different contention patterns and slow-disposal rates.
67+
*/
68+
@BenchmarkMode({Mode.Throughput, Mode.SampleTime})
69+
@Warmup(iterations = 3, time = 2, timeUnit = TimeUnit.SECONDS)
70+
@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
71+
@Fork(1)
72+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
73+
public class RoutePoolsJmh {
74+
75+
/**
76+
* Minimal connection that can simulate slow close.
77+
*/
78+
public static final class FakeConn implements ModalCloseable {
79+
private final int closeDelayMs;
80+
81+
public FakeConn(final int closeDelayMs) {
82+
this.closeDelayMs = closeDelayMs;
83+
}
84+
85+
@Override
86+
public void close(final CloseMode closeMode) {
87+
if (closeDelayMs <= 0) {
88+
return;
89+
}
90+
try {
91+
Thread.sleep(closeDelayMs);
92+
} catch (final InterruptedException ignore) {
93+
Thread.currentThread().interrupt();
94+
}
95+
}
96+
97+
@Override
98+
public void close() throws IOException {
99+
100+
}
101+
}
102+
103+
/**
104+
* All benchmark parameters & shared state live here (required by JMH).
105+
*/
106+
@State(Scope.Benchmark)
107+
public static class BenchState {
108+
109+
/**
110+
* Which pool to benchmark.
111+
* STRICT -> StrictConnPool
112+
* LAX -> LaxConnPool
113+
* OFFLOCK -> RouteSegmentedConnPool
114+
*/
115+
@Param({"STRICT", "LAX", "OFFLOCK"})
116+
public String policy;
117+
118+
/**
119+
* Number of distinct routes to spread load across.
120+
* 1 = hot single route; 10 = multi-route scenario.
121+
*/
122+
@Param({"1", "10"})
123+
public int routes;
124+
125+
/**
126+
* Percent (0..100) of releases that will be non-reusable,
127+
* triggering a discard (and thus a potentially slow close).
128+
*/
129+
@Param({"0", "5", "20"})
130+
public int slowClosePct;
131+
132+
/**
133+
* Sleep (ms) when a connection is discarded (slow close path).
134+
*/
135+
@Param({"0", "200"})
136+
public int closeSleepMs;
137+
138+
/**
139+
* Max total, default per-route — tuned to create contention.
140+
*/
141+
@Param({"32"})
142+
public int maxTotal;
143+
@Param({"8"})
144+
public int defMaxPerRoute;
145+
146+
/**
147+
* Keep-alive on reusable releases.
148+
*/
149+
@Param({"5000"})
150+
public int keepAliveMs;
151+
152+
ManagedConnPool<String, FakeConn> pool;
153+
String[] routeKeys;
154+
DisposalCallback<FakeConn> disposal;
155+
156+
@Setup(Level.Trial)
157+
public void setUp() {
158+
// routes list
159+
routeKeys = new String[routes];
160+
for (int i = 0; i < routes; i++) {
161+
routeKeys[i] = "route-" + i;
162+
}
163+
164+
disposal = (c, m) -> {
165+
if (c != null) {
166+
c.close(m);
167+
}
168+
};
169+
170+
final TimeValue ttl = TimeValue.NEG_ONE_MILLISECOND;
171+
172+
switch (policy.toUpperCase(Locale.ROOT)) {
173+
case "STRICT":
174+
pool = new StrictConnPool<>(
175+
defMaxPerRoute,
176+
maxTotal,
177+
ttl,
178+
PoolReusePolicy.LIFO,
179+
disposal,
180+
null);
181+
break;
182+
case "LAX":
183+
pool = new LaxConnPool<>(
184+
defMaxPerRoute,
185+
ttl,
186+
PoolReusePolicy.LIFO,
187+
disposal,
188+
null);
189+
pool.setMaxTotal(maxTotal);
190+
break;
191+
case "OFFLOCK":
192+
pool = new RouteSegmentedConnPool<>(
193+
defMaxPerRoute,
194+
maxTotal,
195+
ttl,
196+
PoolReusePolicy.LIFO,
197+
disposal);
198+
break;
199+
default:
200+
throw new IllegalArgumentException("Unknown policy: " + policy);
201+
}
202+
}
203+
204+
@TearDown(Level.Trial)
205+
public void tearDown() {
206+
if (pool != null) {
207+
pool.close(CloseMode.IMMEDIATE);
208+
}
209+
}
210+
211+
String pickRoute() {
212+
final int idx = ThreadLocalRandom.current().nextInt(routeKeys.length);
213+
return routeKeys[idx];
214+
}
215+
216+
boolean shouldDiscard() {
217+
if (slowClosePct <= 0) return false;
218+
return ThreadLocalRandom.current().nextInt(100) < slowClosePct;
219+
}
220+
}
221+
222+
/**
223+
* Lease+release on a randomly chosen route.
224+
* Mix of reusable and non-reusable releases (to trigger discard/close).
225+
*/
226+
@Benchmark
227+
@Threads(50)
228+
public void leaseReleaseMixed(final BenchState s) throws Exception {
229+
try {
230+
final Future<PoolEntry<String, FakeConn>> f = s.pool.lease(s.pickRoute(), null, Timeout.ofMilliseconds(500), null);
231+
final PoolEntry<String, FakeConn> e = f.get(500, TimeUnit.MILLISECONDS);
232+
if (!e.hasConnection()) e.assignConnection(new FakeConn(s.closeSleepMs));
233+
final boolean reusable = !s.shouldDiscard();
234+
if (reusable) {
235+
e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs));
236+
s.pool.release(e, true);
237+
} else {
238+
s.pool.release(e, false);
239+
}
240+
} catch (final IllegalStateException ignored) {
241+
242+
}
243+
}
244+
245+
246+
/**
247+
* Optional stats probe to ensure the benchmark does "something".
248+
* Not a measured benchmark; use only for sanity runs.
249+
*/
250+
@Benchmark
251+
@Threads(1)
252+
@OperationsPerInvocation(1)
253+
@BenchmarkMode(Mode.SingleShotTime)
254+
public void statsProbe(final BenchState s, final org.openjdk.jmh.infra.Blackhole bh) {
255+
final PoolStats stats = s.pool.getTotalStats();
256+
bh.consume(stats.getAvailable());
257+
bh.consume(stats.getLeased());
258+
bh.consume(stats.getPending());
259+
}
260+
}

httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.hc.core5.pool.ManagedConnPool;
4747
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
4848
import org.apache.hc.core5.pool.PoolReusePolicy;
49+
import org.apache.hc.core5.pool.RouteSegmentedConnPool;
4950
import org.apache.hc.core5.pool.StrictConnPool;
5051
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
5152
import org.apache.hc.core5.reactor.IOReactorConfig;
@@ -260,6 +261,14 @@ public HttpAsyncRequester create() {
260261
new DefaultDisposalCallback<>(),
261262
connPoolListener);
262263
break;
264+
case OFFLOCK:
265+
connPool = new RouteSegmentedConnPool<>(
266+
defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 20,
267+
maxTotal > 0 ? maxTotal : 50,
268+
timeToLive,
269+
poolReusePolicy,
270+
new DefaultDisposalCallback<>());
271+
break;
263272
case STRICT:
264273
default:
265274
connPool = new StrictConnPool<>(

httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/RequesterBootstrap.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.hc.core5.pool.ManagedConnPool;
5555
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
5656
import org.apache.hc.core5.pool.PoolReusePolicy;
57+
import org.apache.hc.core5.pool.RouteSegmentedConnPool;
5758
import org.apache.hc.core5.pool.StrictConnPool;
5859
import org.apache.hc.core5.util.Timeout;
5960

@@ -213,6 +214,14 @@ public HttpRequester create() {
213214
new DefaultDisposalCallback<>(),
214215
connPoolListener);
215216
break;
217+
case OFFLOCK:
218+
connPool = new RouteSegmentedConnPool<>(
219+
defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 20,
220+
maxTotal > 0 ? maxTotal : 50,
221+
timeToLive,
222+
poolReusePolicy,
223+
new DefaultDisposalCallback<>());
224+
break;
216225
case STRICT:
217226
default:
218227
connPool = new StrictConnPool<>(

0 commit comments

Comments
 (0)