From c5fbdd829d241d039bf8d64b3889b244fdcf693d Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sun, 11 Jan 2026 15:42:31 +0100 Subject: [PATCH] HTTPCLIENT-1165: Add optional request collapsing for async cache misses Serialize concurrent cold misses per cache key to avoid duplicate backend fetches. Add CacheRequestCollapser and tests covering coordination and end-to-end behavior. --- .../http/impl/cache/AsyncCachingExec.java | 91 ++++++++ .../impl/cache/CacheRequestCollapser.java | 155 +++++++++++++ .../cache/CachingH2AsyncClientBuilder.java | 14 +- .../cache/CachingHttpAsyncClientBuilder.java | 16 +- .../AsyncClientCacheRequestCollapsing.java | 157 +++++++++++++ .../impl/cache/CacheRequestCollapserTest.java | 169 ++++++++++++++ ...TestAsyncCachingExecRequestCollapsing.java | 212 ++++++++++++++++++ 7 files changed, 812 insertions(+), 2 deletions(-) create mode 100644 httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java create mode 100644 httpclient5-cache/src/test/java/org/apache/hc/client5/http/cache/example/AsyncClientCacheRequestCollapsing.java create mode 100644 httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapserTest.java create mode 100644 httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java index 8936863716..9f67877c11 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java @@ -98,13 +98,23 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler private final HttpAsyncCache responseCache; private final DefaultAsyncCacheRevalidator cacheRevalidator; private final ConditionalRequestBuilder conditionalRequestBuilder; + private final boolean requestCollapsingEnabled; AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) { + this(cache, cacheRevalidator, config, false); + } + + AsyncCachingExec( + final HttpAsyncCache cache, + final DefaultAsyncCacheRevalidator cacheRevalidator, + final CacheConfig config, + final boolean requestCollapsingEnabled) { super(config); this.responseCache = Args.notNull(cache, "Response cache"); this.cacheRevalidator = cacheRevalidator; this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(request -> BasicRequestBuilder.copy(request).build()); + this.requestCollapsingEnabled = requestCollapsingEnabled; } AsyncCachingExec( @@ -274,6 +284,87 @@ public void completed(final CacheMatch result) { final CacheHit hit = result != null ? result.hit : null; final CacheHit root = result != null ? result.root : null; if (hit == null) { + if (requestCollapsingEnabled && root == null && entityProducer == null && !requestCacheControl.isOnlyIfCached()) { + final String cacheKey = CacheKeyGenerator.INSTANCE.generateKey(target, request); + final CacheRequestCollapser.Token token = CacheRequestCollapser.INSTANCE.enter(cacheKey); + if (token.isLeader()) { + handleCacheMiss(requestCacheControl, null, target, request, null, scope, chain, new AsyncExecCallback() { + + @Override + public AsyncDataConsumer handleResponse( + final HttpResponse response, + final EntityDetails entityDetails) throws HttpException, IOException { + try { + return asyncExecCallback.handleResponse(response, entityDetails); + } catch (final HttpException | IOException ex) { + token.complete(); + throw ex; + } + } + + @Override + public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { + try { + asyncExecCallback.handleInformationResponse(response); + } catch (final HttpException | IOException ex) { + token.complete(); + throw ex; + } + } + + @Override + public void completed() { + try { + asyncExecCallback.completed(); + } finally { + token.complete(); + } + } + + @Override + public void failed(final Exception cause) { + try { + asyncExecCallback.failed(cause); + } finally { + token.complete(); + } + } + + }); + } else { + operation.setDependency(CacheRequestCollapser.INSTANCE.await(token, () -> operation.setDependency( + responseCache.match(target, request, new FutureCallback() { + + @Override + public void completed(final CacheMatch result) { + final CacheHit hit = result != null ? result.hit : null; + final CacheHit root = result != null ? result.root : null; + if (hit == null) { + handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback); + } else { + final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry); + if (LOG.isDebugEnabled()) { + LOG.debug("{} response cache control: {}", exchangeId, responseCacheControl); + } + context.setResponseCacheControl(responseCacheControl); + handleCacheHit(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback); + } + } + + @Override + public void failed(final Exception cause) { + asyncExecCallback.failed(cause); + } + + @Override + public void cancelled() { + asyncExecCallback.failed(new InterruptedIOException()); + } + + })))); + } + return; + } handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback); } else { final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry); diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java new file mode 100644 index 0000000000..82d50b8d32 --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java @@ -0,0 +1,155 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.util.Args; + +/** + * Coordinates concurrent requests for the same cache key so that only one request + * goes to the backend while others wait for it to complete and then re-check the cache. + *

+ * Internal helper used to implement request collapsing ({@code HTTPCLIENT-1165}). + */ +@Internal +final class CacheRequestCollapser { + + static final CacheRequestCollapser INSTANCE = new CacheRequestCollapser(); + + static final class Token { + + private final ConcurrentHashMap inflight; + private final String key; + private final Entry entry; + private final boolean leader; + + private Token(final ConcurrentHashMap inflight, final String key, final Entry entry, final boolean leader) { + this.inflight = inflight; + this.key = key; + this.entry = entry; + this.leader = leader; + } + + boolean isLeader() { + return leader; + } + + void complete() { + if (entry.completed.compareAndSet(false, true)) { + inflight.remove(key, entry); + entry.drain(); + } + } + + } + + private static final class Waiter implements Cancellable { + + private final AtomicBoolean cancelled; + private final Runnable task; + + private Waiter(final Runnable task) { + this.cancelled = new AtomicBoolean(false); + this.task = task; + } + + @Override + public boolean cancel() { + return cancelled.compareAndSet(false, true); + } + + void runIfNotCancelled() { + if (!cancelled.get()) { + task.run(); + } + } + + } + + private static final class Entry { + + private final AtomicBoolean completed; + private final ConcurrentLinkedQueue waiters; + + private Entry() { + this.completed = new AtomicBoolean(false); + this.waiters = new ConcurrentLinkedQueue<>(); + } + + private Cancellable await(final Runnable task) { + if (completed.get()) { + task.run(); + return () -> false; + } + final Waiter waiter = new Waiter(task); + waiters.add(waiter); + if (completed.get()) { + drain(); + } + return waiter; + } + + private void drain() { + for (; ; ) { + final Waiter waiter = waiters.poll(); + if (waiter == null) { + return; + } + waiter.runIfNotCancelled(); + } + } + + } + + private final ConcurrentHashMap inflight; + + private CacheRequestCollapser() { + this.inflight = new ConcurrentHashMap<>(); + } + + Token enter(final String key) { + Args.notEmpty(key, "Key"); + final Entry created = new Entry(); + final Entry existing = inflight.putIfAbsent(key, created); + if (existing == null) { + return new Token(inflight, key, created, true); + } + return new Token(inflight, key, existing, false); + } + + Cancellable await(final Token token, final Runnable task) { + Args.notNull(token, "Token"); + Args.notNull(task, "Task"); + return token.entry.await(task); + } + +} diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingH2AsyncClientBuilder.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingH2AsyncClientBuilder.java index 5cd9f33767..b6b898ae9c 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingH2AsyncClientBuilder.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingH2AsyncClientBuilder.java @@ -56,6 +56,7 @@ public class CachingH2AsyncClientBuilder extends H2AsyncClientBuilder { private SchedulingStrategy schedulingStrategy; private CacheConfig cacheConfig; private boolean deleteCache; + private boolean requestCollapsingEnabled; public static CachingH2AsyncClientBuilder create() { return new CachingH2AsyncClientBuilder(); @@ -112,6 +113,16 @@ public CachingH2AsyncClientBuilder setDeleteCache(final boolean deleteCache) { return this; } + /** + * Enables request collapsing for cacheable requests ({@code HTTPCLIENT-1165}). + * + * @since 5.7 + */ + public CachingH2AsyncClientBuilder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) { + this.requestCollapsingEnabled = requestCollapsingEnabled; + return this; + } + @Override protected void customizeExecChain(final NamedElementChain execChainDefinition) { final CacheConfig config = this.cacheConfig != null ? this.cacheConfig : CacheConfig.DEFAULT; @@ -156,7 +167,8 @@ protected void customizeExecChain(final NamedElementChain final AsyncCachingExec cachingExec = new AsyncCachingExec( httpCache, cacheRevalidator, - config); + config, + requestCollapsingEnabled); execChainDefinition.addBefore(ChainElement.PROTOCOL.name(), cachingExec, ChainElement.CACHING.name()); } diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingHttpAsyncClientBuilder.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingHttpAsyncClientBuilder.java index 2dda48a9a2..2c4336fe77 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingHttpAsyncClientBuilder.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingHttpAsyncClientBuilder.java @@ -60,6 +60,7 @@ public class CachingHttpAsyncClientBuilder extends HttpAsyncClientBuilder { private SchedulingStrategy schedulingStrategy; private CacheConfig cacheConfig; private boolean deleteCache; + private boolean requestCollapsingEnabled; public static CachingHttpAsyncClientBuilder create() { return new CachingHttpAsyncClientBuilder(); @@ -116,6 +117,18 @@ public CachingHttpAsyncClientBuilder setDeleteCache(final boolean deleteCache) { return this; } + /** + * Enables request collapsing for cacheable requests ({@code HTTPCLIENT-1165}). When enabled, + * concurrent requests for the same cache key are coalesced so that only one request goes to + * the backend while the others wait and then re-check the cache. + * + * @since 5.7 + */ + public CachingHttpAsyncClientBuilder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) { + this.requestCollapsingEnabled = requestCollapsingEnabled; + return this; + } + @Override protected void customizeExecChain(final NamedElementChain execChainDefinition) { final CacheConfig config = this.cacheConfig != null ? this.cacheConfig : CacheConfig.DEFAULT; @@ -160,7 +173,8 @@ protected void customizeExecChain(final NamedElementChain final AsyncCachingExec cachingExec = new AsyncCachingExec( httpCache, cacheRevalidator, - config); + config, + requestCollapsingEnabled); execChainDefinition.addBefore(ChainElement.PROTOCOL.name(), cachingExec, ChainElement.CACHING.name()); } diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/cache/example/AsyncClientCacheRequestCollapsing.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/cache/example/AsyncClientCacheRequestCollapsing.java new file mode 100644 index 0000000000..67a0d94153 --- /dev/null +++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/cache/example/AsyncClientCacheRequestCollapsing.java @@ -0,0 +1,157 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.cache.example; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.sun.net.httpserver.HttpServer; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.cache.CacheContextBuilder; +import org.apache.hc.client5.http.cache.HttpCacheContext; +import org.apache.hc.client5.http.cache.RequestCacheControl; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.cache.CacheConfig; +import org.apache.hc.client5.http.impl.cache.CachingHttpAsyncClients; +import org.apache.hc.client5.http.impl.cache.HeapResourceFactory; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.message.StatusLine; + +/** + * Demonstrates request collapsing for the async HTTP cache ({@code HTTPCLIENT-1165}). + *

+ * The example starts a local HTTP server that delays responses so concurrent cache misses overlap. + * With request collapsing enabled, only one request should hit the origin server. + */ +public class AsyncClientCacheRequestCollapsing { + + public static void main(final String[] args) throws Exception { + + final AtomicInteger originHits = new AtomicInteger(0); + final HttpServer server = HttpServer.create(new InetSocketAddress("localhost", 0), 0); + server.setExecutor(Executors.newCachedThreadPool()); + server.createContext("/", exchange -> { + originHits.incrementAndGet(); + try { + Thread.sleep(1500); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + final byte[] body = "Hello from origin".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Cache-Control", "max-age=60"); + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); + exchange.sendResponseHeaders(200, body.length); + exchange.getResponseBody().write(body); + exchange.close(); + }); + server.start(); + + final int port = server.getAddress().getPort(); + final HttpHost target = new HttpHost("http", "localhost", port); + + try { + runRound(target, originHits, false); + runRound(target, originHits, true); + } finally { + server.stop(0); + } + } + + private static void runRound(final HttpHost target, final AtomicInteger originHits, final boolean collapsing) throws Exception { + final int requests = 20; + final int before = originHits.get(); + + try (final CloseableHttpAsyncClient httpclient = CachingHttpAsyncClients.custom() + .setCacheConfig(CacheConfig.custom() + .setMaxObjectSize(200000) + .build()) + .setResourceFactory(HeapResourceFactory.INSTANCE) + .setRequestCollapsingEnabled(collapsing) + .build()) { + + httpclient.start(); + + final ExecutorService executor = Executors.newFixedThreadPool(requests); + final CountDownLatch start = new CountDownLatch(1); + final List> futures = new ArrayList<>(requests); + + for (int i = 0; i < requests; i++) { + final int id = i; + futures.add(executor.submit(() -> { + start.await(); + + final SimpleHttpRequest request = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/") + .build(); + + final HttpCacheContext context = CacheContextBuilder.create() + .setCacheControl(RequestCacheControl.DEFAULT) + .build(); + + final Future future = httpclient.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + context, + null); + + final SimpleHttpResponse response = future.get(); + System.out.println("[" + (collapsing ? "collapsed" : "baseline") + "/" + id + "] " + request + " -> " + new StatusLine(response) + + " (cache=" + context.getCacheResponseStatus() + ")"); + return response; + })); + } + + start.countDown(); + for (final Future future : futures) { + future.get(); + } + + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + } + + final int after = originHits.get(); + System.out.println(); + System.out.println("Round: collapsing=" + collapsing + ", origin hits=" + (after - before) + " (total=" + after + ")"); + System.out.println(); + } + +} diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapserTest.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapserTest.java new file mode 100644 index 0000000000..5fde8ede68 --- /dev/null +++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapserTest.java @@ -0,0 +1,169 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.Cancellable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestCacheRequestCollapser { + + @Test + void testSingleLeaderForSameKey() throws Exception { + final String key = "testSingleLeaderForSameKey-" + System.nanoTime(); + final int threads = 32; + + final ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + final CountDownLatch ready = new CountDownLatch(threads); + final CountDownLatch start = new CountDownLatch(1); + + final AtomicInteger leaders = new AtomicInteger(0); + final List> futures = new ArrayList<>(threads); + + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + ready.countDown(); + start.await(5, TimeUnit.SECONDS); + final CacheRequestCollapser.Token token = CacheRequestCollapser.INSTANCE.enter(key); + if (token.isLeader()) { + leaders.incrementAndGet(); + } + return token; + })); + } + + Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS)); + start.countDown(); + + final List tokens = new ArrayList<>(threads); + for (final Future f : futures) { + tokens.add(f.get(5, TimeUnit.SECONDS)); + } + + Assertions.assertEquals(1, leaders.get(), "Expected exactly one leader"); + + // Complete using the leader token to clean up. + CacheRequestCollapser.Token leaderToken = null; + for (final CacheRequestCollapser.Token t : tokens) { + if (t.isLeader()) { + leaderToken = t; + break; + } + } + Assertions.assertNotNull(leaderToken); + leaderToken.complete(); + + // After completion, next enter should create a new leader. + final CacheRequestCollapser.Token next = CacheRequestCollapser.INSTANCE.enter(key); + Assertions.assertTrue(next.isLeader()); + next.complete(); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testAwaitRunsAfterComplete() throws Exception { + final String key = "testAwaitRunsAfterComplete-" + System.nanoTime(); + + final CacheRequestCollapser.Token leader = CacheRequestCollapser.INSTANCE.enter(key); + Assertions.assertTrue(leader.isLeader()); + + final CacheRequestCollapser.Token follower = CacheRequestCollapser.INSTANCE.enter(key); + Assertions.assertFalse(follower.isLeader()); + + final CountDownLatch ran = new CountDownLatch(1); + + final Cancellable cancellable = CacheRequestCollapser.INSTANCE.await(follower, () -> ran.countDown()); + Assertions.assertNotNull(cancellable); + + // Must not run before completion. + Assertions.assertFalse(ran.await(150, TimeUnit.MILLISECONDS)); + + leader.complete(); + + // Must run after completion. + Assertions.assertTrue(ran.await(5, TimeUnit.SECONDS)); + } + + @Test + void testCancelledWaiterDoesNotRun() throws Exception { + final String key = "testCancelledWaiterDoesNotRun-" + System.nanoTime(); + + final CacheRequestCollapser.Token leader = CacheRequestCollapser.INSTANCE.enter(key); + Assertions.assertTrue(leader.isLeader()); + + final CacheRequestCollapser.Token follower = CacheRequestCollapser.INSTANCE.enter(key); + Assertions.assertFalse(follower.isLeader()); + + final AtomicInteger runs = new AtomicInteger(0); + + final Cancellable cancellable = CacheRequestCollapser.INSTANCE.await(follower, () -> runs.incrementAndGet()); + Assertions.assertTrue(cancellable.cancel()); + + leader.complete(); + + // Give drain a moment. + Thread.sleep(100); + + Assertions.assertEquals(0, runs.get(), "Cancelled waiter must not run"); + } + + @Test + void testCompleteIsIdempotent() throws Exception { + final String key = "testCompleteIsIdempotent-" + System.nanoTime(); + + final CacheRequestCollapser.Token leader = CacheRequestCollapser.INSTANCE.enter(key); + Assertions.assertTrue(leader.isLeader()); + + final CacheRequestCollapser.Token follower = CacheRequestCollapser.INSTANCE.enter(key); + Assertions.assertFalse(follower.isLeader()); + + final AtomicInteger runs = new AtomicInteger(0); + CacheRequestCollapser.INSTANCE.await(follower, () -> runs.incrementAndGet()); + + leader.complete(); + leader.complete(); + leader.complete(); + + // Give drain a moment. + Thread.sleep(100); + + Assertions.assertEquals(1, runs.get(), "Waiters must run exactly once"); + } + +} diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java new file mode 100644 index 0000000000..673e80d5c2 --- /dev/null +++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java @@ -0,0 +1,212 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.cache.CacheResponseStatus; +import org.apache.hc.client5.http.cache.HttpCacheContext; +import org.apache.hc.client5.http.cache.RequestCacheControl; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.utils.DateUtils; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.io.CloseMode; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestAsyncCachingExecRequestCollapsing { + + private static final class RoundResult { + private final int originHits; + private final int cacheMisses; + private final int cacheHits; + + private RoundResult(final int originHits, final int cacheMisses, final int cacheHits) { + this.originHits = originHits; + this.cacheMisses = cacheMisses; + this.cacheHits = cacheHits; + } + } + + @Test + void testRequestCollapsingPreventsThunderingHerdOnColdMiss() throws Exception { + final AtomicInteger originHits = new AtomicInteger(0); + + final HttpServer server = HttpServer.create(new InetSocketAddress(0), 0); + server.createContext("/", exchange -> handleOrigin(exchange, originHits)); + server.setExecutor(Executors.newCachedThreadPool()); + server.start(); + + try { + final int port = server.getAddress().getPort(); + final HttpHost target = new HttpHost("http", "localhost", port); + final int concurrent = 20; + + originHits.set(0); + final RoundResult baseline = runRound(target, concurrent, false, originHits); + Assertions.assertEquals(concurrent, baseline.originHits, "Baseline must hit origin N times"); + Assertions.assertEquals(concurrent, baseline.cacheMisses, "Baseline must be all CACHE_MISS on cold miss"); + Assertions.assertEquals(0, baseline.cacheHits, "Baseline must have no CACHE_HIT on cold miss"); + + originHits.set(0); + final RoundResult collapsed = runRound(target, concurrent, true, originHits); + Assertions.assertEquals(1, collapsed.originHits, "Collapsing must allow only one origin request"); + Assertions.assertEquals(1, collapsed.cacheMisses, "Collapsing must have exactly one CACHE_MISS leader"); + Assertions.assertEquals(concurrent - 1, collapsed.cacheHits, "Collapsing must serve followers from cache"); + } finally { + server.stop(0); + } + } + + private static void handleOrigin(final HttpExchange exchange, final AtomicInteger originHits) throws IOException { + originHits.incrementAndGet(); + + // Keep the origin "busy" so concurrent client requests overlap and all see a cold cache. + try { + Thread.sleep(250); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + final byte[] body = "OK".getBytes(StandardCharsets.US_ASCII); + + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=us-ascii"); + exchange.getResponseHeaders().add("Cache-Control", "public, max-age=60"); + exchange.getResponseHeaders().add("Date", DateUtils.formatStandardDate(Instant.now())); + + exchange.sendResponseHeaders(200, body.length); + try (final OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + + private static RoundResult runRound( + final HttpHost target, + final int concurrent, + final boolean requestCollapsingEnabled, + final AtomicInteger originHits) throws Exception { + + final CacheConfig cacheConfig = CacheConfig.custom() + .setHeuristicCachingEnabled(false) + .build(); + + final AtomicInteger cacheMisses = new AtomicInteger(0); + final AtomicInteger cacheHits = new AtomicInteger(0); + + final CloseableHttpAsyncClient client = CachingHttpAsyncClients.custom() + .setCacheConfig(cacheConfig) + .setResourceFactory(HeapResourceFactory.INSTANCE) + .setRequestCollapsingEnabled(requestCollapsingEnabled) // <-- your new switch + .build(); + + client.start(); + + try { + final List> futures = new ArrayList<>(concurrent); + final CountDownLatch done = new CountDownLatch(concurrent); + final AtomicInteger failures = new AtomicInteger(0); + + for (int i = 0; i < concurrent; i++) { + final SimpleHttpRequest request = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/") + .build(); + + // IMPORTANT: one context per request (context is not thread-safe). + final HttpCacheContext context = HttpCacheContext.create(); + context.setRequestCacheControl(RequestCacheControl.DEFAULT); + + futures.add(client.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + context, + new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse result) { + final CacheResponseStatus status = context.getCacheResponseStatus(); + if (status == CacheResponseStatus.CACHE_MISS) { + cacheMisses.incrementAndGet(); + } else if (status == CacheResponseStatus.CACHE_HIT) { + cacheHits.incrementAndGet(); + } else { + // For this test we only expect HIT or MISS. + failures.incrementAndGet(); + } + done.countDown(); + } + + @Override + public void failed(final Exception ex) { + failures.incrementAndGet(); + done.countDown(); + } + + @Override + public void cancelled() { + failures.incrementAndGet(); + done.countDown(); + } + + })); + } + + Assertions.assertTrue(done.await(30, TimeUnit.SECONDS), "Requests did not complete in time"); + Assertions.assertEquals(0, failures.get(), "Unexpected failures / cache statuses"); + + // Also ensure futures are all done / propagate any hidden exception. + for (final Future f : futures) { + f.get(5, TimeUnit.SECONDS); + } + + return new RoundResult(originHits.get(), cacheMisses.get(), cacheHits.get()); + } finally { + client.close(CloseMode.GRACEFUL); + } + } + +}