From 52c67e8c6a60abe793cbc0d1c1e9150b17693bae Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Wed, 11 Mar 2026 12:53:33 +0100 Subject: [PATCH] Add deterministic, sleep-free connection pool fuzzer. --- .../pool/TestConnPoolClockInjection.java | 42 +-- .../pool/TestConnPoolDeterministicFuzzer.java | 253 ++++++++++++++++++ .../apache/hc/core5/pool/TestingClock.java | 68 +++++ 3 files changed, 322 insertions(+), 41 deletions(-) create mode 100644 httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolDeterministicFuzzer.java create mode 100644 httpcore5/src/test/java/org/apache/hc/core5/pool/TestingClock.java diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolClockInjection.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolClockInjection.java index f7a0ddf0b..14832b337 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolClockInjection.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolClockInjection.java @@ -31,12 +31,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.time.Clock; -import java.time.Instant; -import java.time.ZoneId; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.util.TimeValue; @@ -46,46 +42,10 @@ final class TestConnPoolClockInjection { - static final class TestClock extends Clock { - - private final ZoneId zoneId; - private final AtomicLong millis; - - TestClock(final long initialMillis) { - this.zoneId = ZoneId.of("UTC"); - this.millis = new AtomicLong(initialMillis); - } - - void advanceMillis(final long deltaMillis) { - this.millis.addAndGet(deltaMillis); - } - - @Override - public ZoneId getZone() { - return zoneId; - } - - @Override - public Clock withZone(final ZoneId zone) { - return this; - } - - @Override - public long millis() { - return millis.get(); - } - - @Override - public Instant instant() { - return Instant.ofEpochMilli(millis()); - } - - } - @ParameterizedTest @EnumSource(PoolConcurrencyPolicy.class) void closeIdleUsesInjectedClock(final PoolConcurrencyPolicy policy) throws Exception { - final TestClock clock = new TestClock(0L); + final TestingClock clock = new TestingClock(0L); final ManagedConnPool pool = PoolTestSupport.createPool(policy, 1, 1, clock); diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolDeterministicFuzzer.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolDeterministicFuzzer.java new file mode 100644 index 000000000..21374b1ba --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestConnPoolDeterministicFuzzer.java @@ -0,0 +1,253 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.pool; + +import java.util.ArrayList; +import java.util.List; +import java.util.SplittableRandom; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +final class TestConnPoolDeterministicFuzzer { + + static Stream params() { + final long[] seeds = new long[]{1L, 2L, 3L, 4L}; + final List out = new ArrayList<>(); + for (final PoolConcurrencyPolicy policy : PoolConcurrencyPolicy.values()) { + for (final long seed : seeds) { + out.add(Arguments.of(policy, seed)); + } + } + return out.stream(); + } + + @ParameterizedTest + @MethodSource("params") + void fuzzSingleThreaded(final PoolConcurrencyPolicy policy, final long seed) throws Exception { + final TestingClock clock = new TestingClock(0L); + + final int defaultMaxPerRoute = 2; + final int maxTotal = 4; + + final ManagedConnPool pool = + PoolTestSupport.createPool(policy, defaultMaxPerRoute, maxTotal, clock); + + final Timeout requestTimeout = Timeout.of(0L, TimeUnit.MILLISECONDS); + + final SplittableRandom rnd = new SplittableRandom(seed); + + final List> leased = new ArrayList<>(); + Future> pending = null; + + try { + final String[] routes = new String[]{"r1", "r2", "r3"}; + final Object[] states = new Object[]{null, "s1"}; + + final int steps = 10_000; + + Assertions.assertTrue(pool.getMaxTotal() >= 0); + Assertions.assertTrue(pool.getDefaultMaxPerRoute() >= 0); + for (final String route : routes) { + Assertions.assertTrue(pool.getMaxPerRoute(route) >= 0); + } + + for (int i = 0; i < steps; i++) { + + pending = drainPending(pending, leased); + + final int op = rnd.nextInt(100); + + if (op < 45) { + // LEASE + final String route = routes[rnd.nextInt(routes.length)]; + final Object state = states[rnd.nextInt(states.length)]; + + if (pending == null) { + final Future> f = + pool.lease(route, state, requestTimeout, null); + if (f.isDone()) { + final PoolEntry entry = getDone(f); + if (entry != null) { + ensureConnection(entry); + leased.add(entry); + } + } else { + pending = f; + } + } else { + if (rnd.nextInt(8) == 0) { + pending.cancel(true); + pending = null; + validatePendingRequests(pool); + } + } + + } else if (op < 75) { + if (!leased.isEmpty()) { + final int idx = rnd.nextInt(leased.size()); + final PoolEntry entry = leased.remove(idx); + final boolean reusable = rnd.nextBoolean(); + pool.release(entry, reusable); + pending = drainPending(pending, leased); + } + + } else if (op < 85) { + clock.advanceMillis(1L + rnd.nextInt(20)); + + } else if (op < 92) { + pool.closeIdle(TimeValue.ofMilliseconds(1)); + + } else if (op < 97) { + pool.closeExpired(); + + } else { + if (pending != null) { + pending.cancel(true); + pending = null; + validatePendingRequests(pool); + } + } + + // keep stats + invariants cost under control + if ((i & 31) == 0) { + validatePendingRequests(pool); + pending = drainPending(pending, leased); + assertCoreInvariants(policy, pool, leased, pending, routes); + } + } + + // Cleanup + if (pending != null) { + pending.cancel(true); + pending = null; + } + validatePendingRequests(pool); + + while (!leased.isEmpty()) { + final PoolEntry entry = leased.remove(leased.size() - 1); + pool.release(entry, true); + } + + validatePendingRequests(pool); + pending = drainPending(pending, leased); + assertCoreInvariants(policy, pool, leased, pending, routes); + + } finally { + pool.close(CloseMode.IMMEDIATE); + } + } + + private static void ensureConnection(final PoolEntry entry) { + if (!entry.hasConnection()) { + entry.assignConnection(new PoolTestSupport.DummyConn()); + } + } + + private static PoolEntry getDone( + final Future> f) throws Exception { + + if (f.isCancelled()) { + return null; + } + return f.get(); + } + + private static Future> drainPending( + final Future> pending, + final List> leased) throws Exception { + + if (pending != null && pending.isDone()) { + final PoolEntry entry = getDone(pending); + if (entry != null) { + ensureConnection(entry); + leased.add(entry); + } + return null; + } + return pending; + } + + private static void validatePendingRequests(final ManagedConnPool pool) { + if (pool instanceof StrictConnPool) { + ((StrictConnPool) pool).validatePendingRequests(); + } else if (pool instanceof LaxConnPool) { + ((LaxConnPool) pool).validatePendingRequests(); + } + } + + private static void assertCoreInvariants( + final PoolConcurrencyPolicy policy, + final ManagedConnPool pool, + final List> leased, + final Future> pending, + final String[] routes) { + + final PoolStats totals = pool.getTotalStats(); + + Assertions.assertTrue(pool.getMaxTotal() >= 0); + Assertions.assertTrue(pool.getDefaultMaxPerRoute() >= 0); + + Assertions.assertTrue(totals.getAvailable() >= 0); + Assertions.assertTrue(totals.getLeased() >= 0); + Assertions.assertTrue(totals.getPending() >= 0); + Assertions.assertTrue(totals.getMax() >= 0); + + final long allocated = (long) totals.getAvailable() + (long) totals.getLeased(); + Assertions.assertTrue(allocated <= (long) totals.getMax(), "allocated > max"); + + if (policy != PoolConcurrencyPolicy.LAX) { + Assertions.assertTrue(totals.getLeased() <= pool.getMaxTotal(), "leased > max total"); + } + + Assertions.assertEquals(leased.size(), totals.getLeased(), "leased count mismatch"); + + final int expectedPending = pending != null && !pending.isDone() && !pending.isCancelled() ? 1 : 0; + Assertions.assertEquals(expectedPending, totals.getPending(), "pending count mismatch"); + + if (policy != PoolConcurrencyPolicy.LAX) { + for (final String route : routes) { + final PoolStats routeStats = pool.getStats(route); + Assertions.assertTrue(routeStats.getAvailable() >= 0); + Assertions.assertTrue(routeStats.getLeased() >= 0); + Assertions.assertTrue(routeStats.getPending() >= 0); + Assertions.assertTrue(routeStats.getMax() >= 0); + final long routeAllocated = (long) routeStats.getAvailable() + (long) routeStats.getLeased(); + Assertions.assertTrue(routeAllocated <= (long) routeStats.getMax(), "route allocated > max"); + } + } + } + +} \ No newline at end of file diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/TestingClock.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestingClock.java new file mode 100644 index 000000000..f0afcb1b4 --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/TestingClock.java @@ -0,0 +1,68 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.pool; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.concurrent.atomic.AtomicLong; + +final class TestingClock extends Clock { + + private final ZoneId zoneId; + private final AtomicLong millis; + + TestingClock(final long initialMillis) { + this.zoneId = ZoneId.of("UTC"); + this.millis = new AtomicLong(initialMillis); + } + + void advanceMillis(final long deltaMillis) { + this.millis.addAndGet(deltaMillis); + } + + @Override + public ZoneId getZone() { + return zoneId; + } + + @Override + public Clock withZone(final ZoneId zone) { + return this; + } + + @Override + public long millis() { + return millis.get(); + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(millis()); + } + +} \ No newline at end of file