Skip to content

Commit ddaccde

Browse files
Inject Clock into connection pools to make time-based behavior testable (#645)
1 parent 994c382 commit ddaccde

File tree

7 files changed

+377
-55
lines changed

7 files changed

+377
-55
lines changed

httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*/
2727
package org.apache.hc.core5.pool;
2828

29+
import java.time.Clock;
2930
import java.util.Deque;
3031
import java.util.HashSet;
3132
import java.util.Iterator;
@@ -78,6 +79,8 @@ public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool
7879
private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
7980
private final AtomicBoolean isShutDown;
8081

82+
private final Clock clock;
83+
8184
private volatile int defaultMaxPerRoute;
8285

8386
/**
@@ -89,6 +92,16 @@ public LaxConnPool(
8992
final PoolReusePolicy policy,
9093
final DisposalCallback<C> disposalCallback,
9194
final ConnPoolListener<T> connPoolListener) {
95+
this(defaultMaxPerRoute, timeToLive, policy, disposalCallback, connPoolListener, Clock.systemUTC());
96+
}
97+
98+
LaxConnPool(
99+
final int defaultMaxPerRoute,
100+
final TimeValue timeToLive,
101+
final PoolReusePolicy policy,
102+
final DisposalCallback<C> disposalCallback,
103+
final ConnPoolListener<T> connPoolListener,
104+
final Clock clock) {
92105
super();
93106
Args.positive(defaultMaxPerRoute, "Max per route value");
94107
this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
@@ -98,6 +111,7 @@ public LaxConnPool(
98111
this.routeToPool = new ConcurrentHashMap<>();
99112
this.isShutDown = new AtomicBoolean();
100113
this.defaultMaxPerRoute = defaultMaxPerRoute;
114+
this.clock = Args.notNull(clock, "clock");
101115
}
102116

103117
/**
@@ -145,7 +159,8 @@ private PerRoutePool<T, C> getPool(final T route) {
145159
policy,
146160
this,
147161
disposalCallback,
148-
connPoolListener);
162+
connPoolListener,
163+
clock);
149164
routePool = routeToPool.putIfAbsent(route, newRoutePool);
150165
if (routePool == null) {
151166
routePool = newRoutePool;
@@ -266,7 +281,7 @@ public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
266281

267282
@Override
268283
public void closeIdle(final TimeValue idleTime) {
269-
final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
284+
final long deadline = clock.millis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
270285
enumAvailable(entry -> {
271286
if (entry.getUpdated() <= deadline) {
272287
entry.discardConnection(CloseMode.GRACEFUL);
@@ -276,7 +291,7 @@ public void closeIdle(final TimeValue idleTime) {
276291

277292
@Override
278293
public void closeExpired() {
279-
final long now = System.currentTimeMillis();
294+
final long now = clock.millis();
280295
enumAvailable(entry -> {
281296
if (entry.getExpiryDeadline().isBefore(now)) {
282297
entry.discardConnection(CloseMode.GRACEFUL);
@@ -306,11 +321,11 @@ static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
306321

307322
LeaseRequest(
308323
final Object state,
309-
final Timeout requestTimeout,
324+
final Deadline deadline,
310325
final BasicFuture<PoolEntry<T, C>> future) {
311326
super();
312327
this.state = state;
313-
this.deadline = Deadline.calculate(requestTimeout);
328+
this.deadline = deadline;
314329
this.future = future;
315330
}
316331

@@ -362,6 +377,8 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
362377
private final AtomicInteger allocated;
363378
private final AtomicLong releaseSeqNum;
364379

380+
private final Clock clock;
381+
365382
private volatile int max;
366383

367384
PerRoutePool(
@@ -371,7 +388,8 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
371388
final PoolReusePolicy policy,
372389
final ConnPoolStats<T> connPoolStats,
373390
final DisposalCallback<C> disposalCallback,
374-
final ConnPoolListener<T> connPoolListener) {
391+
final ConnPoolListener<T> connPoolListener,
392+
final Clock clock) {
375393
super();
376394
this.route = route;
377395
this.timeToLive = timeToLive;
@@ -386,6 +404,7 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
386404
this.allocated = new AtomicInteger(0);
387405
this.releaseSeqNum = new AtomicLong(0);
388406
this.max = max;
407+
this.clock = Args.notNull(clock, "clock");
389408
}
390409

391410
public void shutdown(final CloseMode closeMode) {
@@ -412,7 +431,7 @@ private PoolEntry<T, C> createPoolEntry() {
412431
prev = allocated.get();
413432
next = (prev < poolMax) ? prev + 1 : prev;
414433
} while (!allocated.compareAndSet(prev, next));
415-
return prev < next ? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
434+
return prev < next ? new PoolEntry<>(route, timeToLive, disposalCallback, clock) : null;
416435
}
417436

418437
private void deallocatePoolEntry() {
@@ -437,12 +456,13 @@ private void removeLeased(final PoolEntry<T, C> entry) {
437456
}
438457

439458
private PoolEntry<T, C> getAvailableEntry(final Object state) {
459+
final long now = clock.millis();
440460
for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
441461
final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
442462
final PoolEntry<T, C> entry = ref.getReference();
443463
if (ref.compareAndSet(entry, entry, false, true)) {
444464
it.remove();
445-
if (entry.getExpiryDeadline().isExpired() || !Objects.equals(entry.getState(), state)) {
465+
if (entry.getExpiryDeadline().isBefore(now) || !Objects.equals(entry.getState(), state)) {
446466
entry.discardConnection(CloseMode.GRACEFUL);
447467
deallocatePoolEntry();
448468
} else {
@@ -486,7 +506,7 @@ public PoolEntry<T, C> get(
486506
addLeased(entry);
487507
future.completed(entry);
488508
} else {
489-
pending.add(new LeaseRequest<>(state, requestTimeout, future));
509+
pending.add(new LeaseRequest<>(state, Deadline.calculate(clock.millis(), requestTimeout), future));
490510
if (releaseState != releaseSeqNum.get()) {
491511
servicePendingRequest();
492512
}
@@ -496,7 +516,8 @@ public PoolEntry<T, C> get(
496516

497517
public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
498518
removeLeased(releasedEntry);
499-
if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
519+
final long now = clock.millis();
520+
if (!reusable || releasedEntry.getExpiryDeadline().isBefore(now)) {
500521
releasedEntry.discardConnection(CloseMode.GRACEFUL);
501522
}
502523
if (releasedEntry.hasConnection()) {
@@ -531,8 +552,9 @@ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy
531552
}
532553
final Object state = leaseRequest.getState();
533554
final Deadline deadline = leaseRequest.getDeadline();
555+
final long now = clock.millis();
534556

535-
if (deadline.isExpired()) {
557+
if (deadline.isBefore(now)) {
536558
leaseRequest.failed(DeadlineTimeoutException.from(deadline));
537559
} else {
538560
final long releaseState = releaseSeqNum.get();
@@ -560,6 +582,7 @@ private void servicePendingRequests(final RequestServiceStrategy serviceStrategy
560582
}
561583

562584
public void validatePendingRequests() {
585+
final long now = clock.millis();
563586
final Iterator<LeaseRequest<T, C>> it = pending.iterator();
564587
while (it.hasNext()) {
565588
final LeaseRequest<T, C> request = it.next();
@@ -568,7 +591,7 @@ public void validatePendingRequests() {
568591
it.remove();
569592
} else {
570593
final Deadline deadline = request.getDeadline();
571-
if (deadline.isExpired()) {
594+
if (deadline.isBefore(now)) {
572595
request.failed(DeadlineTimeoutException.from(deadline));
573596
}
574597
if (request.isDone()) {

httpcore5/src/main/java/org/apache/hc/core5/pool/PoolEntry.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
*/
2727
package org.apache.hc.core5.pool;
2828

29+
import java.time.Clock;
2930
import java.util.concurrent.atomic.AtomicReference;
3031

31-
import org.apache.hc.core5.function.Supplier;
3232
import org.apache.hc.core5.io.CloseMode;
3333
import org.apache.hc.core5.io.ModalCloseable;
3434
import org.apache.hc.core5.util.Args;
@@ -53,7 +53,7 @@ public final class PoolEntry<T, C extends ModalCloseable> {
5353
private final TimeValue timeToLive;
5454
private final AtomicReference<C> connRef;
5555
private final DisposalCallback<C> disposalCallback;
56-
private final Supplier<Long> currentTimeSupplier;
56+
private final Clock clock;
5757

5858
private volatile Object state;
5959
private volatile long created;
@@ -62,17 +62,17 @@ public final class PoolEntry<T, C extends ModalCloseable> {
6262
private volatile Deadline validityDeadline = Deadline.MIN_VALUE;
6363

6464
PoolEntry(final T route, final TimeValue timeToLive, final DisposalCallback<C> disposalCallback,
65-
final Supplier<Long> currentTimeSupplier) {
65+
final Clock clock) {
6666
super();
6767
this.route = Args.notNull(route, "Route");
6868
this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
6969
this.connRef = new AtomicReference<>();
7070
this.disposalCallback = disposalCallback;
71-
this.currentTimeSupplier = currentTimeSupplier;
71+
this.clock = clock;
7272
}
7373

74-
PoolEntry(final T route, final TimeValue timeToLive, final Supplier<Long> currentTimeSupplier) {
75-
this(route, timeToLive, null, currentTimeSupplier);
74+
PoolEntry(final T route, final TimeValue timeToLive, final Clock clock) {
75+
this(route, timeToLive, null, clock);
7676
}
7777

7878
/**
@@ -103,7 +103,7 @@ public PoolEntry(final T route) {
103103
}
104104

105105
long getCurrentTime() {
106-
return currentTimeSupplier != null ? currentTimeSupplier.get() : System.currentTimeMillis();
106+
return clock != null ? clock.millis() : System.currentTimeMillis();
107107
}
108108

109109
public T getRoute() {

httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
package org.apache.hc.core5.pool;
2828

2929
import java.io.IOException;
30+
import java.time.Clock;
3031
import java.util.ArrayDeque;
3132
import java.util.Deque;
3233
import java.util.HashSet;
@@ -104,6 +105,8 @@ public final class RouteSegmentedConnPool<R, C extends ModalCloseable> implement
104105

105106
private final ScheduledThreadPoolExecutor timeouts;
106107

108+
private final Clock clock;
109+
107110
/**
108111
* Dedicated executor for asynchronous, best-effort disposal.
109112
* Bounded queue; on saturation we fall back to IMMEDIATE close on the caller thread.
@@ -131,6 +134,17 @@ public RouteSegmentedConnPool(
131134
final PoolReusePolicy reusePolicy,
132135
final DisposalCallback<C> disposal,
133136
final ConnPoolListener<R> connPoolListener) {
137+
this(defaultMaxPerRoute, maxTotal, timeToLive, reusePolicy, disposal, connPoolListener, Clock.systemUTC());
138+
}
139+
140+
RouteSegmentedConnPool(
141+
final int defaultMaxPerRoute,
142+
final int maxTotal,
143+
final TimeValue timeToLive,
144+
final PoolReusePolicy reusePolicy,
145+
final DisposalCallback<C> disposal,
146+
final ConnPoolListener<R> connPoolListener,
147+
final Clock clock) {
134148

135149
this.defaultMaxPerRoute.set(defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 5);
136150
this.maxTotal.set(maxTotal > 0 ? maxTotal : 25);
@@ -148,6 +162,8 @@ public RouteSegmentedConnPool(
148162
this.disposal = Args.notNull(disposal, "disposal");
149163
this.connPoolListener = connPoolListener;
150164

165+
this.clock = Args.notNull(clock, "clock");
166+
151167
final ThreadFactory tf = r -> {
152168
final Thread t = new Thread(r, "seg-pool-timeouts");
153169
t.setDaemon(true);
@@ -244,7 +260,7 @@ public Future<PoolEntry<R, C>> lease(
244260
if (hit == null) {
245261
break;
246262
}
247-
final long now = System.currentTimeMillis();
263+
final long now = clock.millis();
248264
if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit, now)) {
249265
discardAndDecr(hit, CloseMode.GRACEFUL);
250266
continue;
@@ -261,7 +277,7 @@ public Future<PoolEntry<R, C>> lease(
261277

262278
// 2) Try to allocate new within caps
263279
if (tryAllocateOne(route, seg)) {
264-
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
280+
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal, clock);
265281
fireOnLease(route);
266282
if (callback != null) {
267283
callback.completed(entry);
@@ -339,7 +355,7 @@ public void release(final PoolEntry<R, C> entry, final boolean reusable) {
339355
return;
340356
}
341357

342-
final long now = System.currentTimeMillis();
358+
final long now = clock.millis();
343359
final boolean stillValid = reusable && !isPastTtl(entry, now) && !entry.getExpiryDeadline().isBefore(now);
344360

345361
if (stillValid) {
@@ -404,7 +420,7 @@ public void close(final CloseMode closeMode) {
404420

405421
@Override
406422
public void closeIdle(final TimeValue idleTime) {
407-
final long cutoff = System.currentTimeMillis()
423+
final long cutoff = clock.millis()
408424
- Math.max(0L, idleTime != null ? idleTime.toMilliseconds() : 0L);
409425

410426
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
@@ -430,7 +446,7 @@ public void closeIdle(final TimeValue idleTime) {
430446

431447
@Override
432448
public void closeExpired() {
433-
final long now = System.currentTimeMillis();
449+
final long now = clock.millis();
434450

435451
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
436452
final R route = e.getKey();
@@ -744,7 +760,7 @@ private void serveRoundRobin(final int budget) {
744760
seg.allocated.decrementAndGet();
745761
totalAllocated.decrementAndGet();
746762
} else {
747-
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
763+
final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal, clock);
748764
cancelTimeout(w);
749765
w.complete(entry);
750766
fireOnLease(w.route);

0 commit comments

Comments
 (0)