Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,23 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
private final HttpAsyncCache responseCache;
private final DefaultAsyncCacheRevalidator cacheRevalidator;
private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder;
private final boolean requestCollapsingEnabled;

AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) {
this(cache, cacheRevalidator, config, false);
}

AsyncCachingExec(
final HttpAsyncCache cache,
final DefaultAsyncCacheRevalidator cacheRevalidator,
final CacheConfig config,
final boolean requestCollapsingEnabled) {
super(config);
this.responseCache = Args.notNull(cache, "Response cache");
this.cacheRevalidator = cacheRevalidator;
this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(request ->
BasicRequestBuilder.copy(request).build());
this.requestCollapsingEnabled = requestCollapsingEnabled;
}

AsyncCachingExec(
Expand Down Expand Up @@ -274,6 +284,87 @@ public void completed(final CacheMatch result) {
final CacheHit hit = result != null ? result.hit : null;
final CacheHit root = result != null ? result.root : null;
if (hit == null) {
if (requestCollapsingEnabled && root == null && entityProducer == null && !requestCacheControl.isOnlyIfCached()) {
final String cacheKey = CacheKeyGenerator.INSTANCE.generateKey(target, request);
final CacheRequestCollapser.Token token = CacheRequestCollapser.INSTANCE.enter(cacheKey);
if (token.isLeader()) {
handleCacheMiss(requestCacheControl, null, target, request, null, scope, chain, new AsyncExecCallback() {

@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
try {
return asyncExecCallback.handleResponse(response, entityDetails);
} catch (final HttpException | IOException ex) {
token.complete();
throw ex;
}
}

@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
try {
asyncExecCallback.handleInformationResponse(response);
} catch (final HttpException | IOException ex) {
token.complete();
throw ex;
}
}

@Override
public void completed() {
try {
asyncExecCallback.completed();
} finally {
token.complete();
}
}

@Override
public void failed(final Exception cause) {
try {
asyncExecCallback.failed(cause);
} finally {
token.complete();
}
}

});
} else {
operation.setDependency(CacheRequestCollapser.INSTANCE.await(token, () -> operation.setDependency(
responseCache.match(target, request, new FutureCallback<CacheMatch>() {

@Override
public void completed(final CacheMatch result) {
final CacheHit hit = result != null ? result.hit : null;
final CacheHit root = result != null ? result.root : null;
if (hit == null) {
handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback);
} else {
final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry);
if (LOG.isDebugEnabled()) {
LOG.debug("{} response cache control: {}", exchangeId, responseCacheControl);
}
context.setResponseCacheControl(responseCacheControl);
handleCacheHit(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback);
}
}

@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}

@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}

}))));
}
return;
}
handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback);
} else {
final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.util.Args;

/**
* Coordinates concurrent requests for the same cache key so that only one request
* goes to the backend while others wait for it to complete and then re-check the cache.
* <p>
* Internal helper used to implement request collapsing ({@code HTTPCLIENT-1165}).
*/
@Internal
final class CacheRequestCollapser {

static final CacheRequestCollapser INSTANCE = new CacheRequestCollapser();

static final class Token {

private final ConcurrentHashMap<String, Entry> inflight;
private final String key;
private final Entry entry;
private final boolean leader;

private Token(final ConcurrentHashMap<String, Entry> inflight, final String key, final Entry entry, final boolean leader) {
this.inflight = inflight;
this.key = key;
this.entry = entry;
this.leader = leader;
}

boolean isLeader() {
return leader;
}

void complete() {
if (entry.completed.compareAndSet(false, true)) {
inflight.remove(key, entry);
entry.drain();
}
}

}

private static final class Waiter implements Cancellable {

private final AtomicBoolean cancelled;
private final Runnable task;

private Waiter(final Runnable task) {
this.cancelled = new AtomicBoolean(false);
this.task = task;
}

@Override
public boolean cancel() {
return cancelled.compareAndSet(false, true);
}

void runIfNotCancelled() {
if (!cancelled.get()) {
task.run();
}
}

}

private static final class Entry {

private final AtomicBoolean completed;
private final ConcurrentLinkedQueue<Waiter> waiters;

private Entry() {
this.completed = new AtomicBoolean(false);
this.waiters = new ConcurrentLinkedQueue<>();
}

private Cancellable await(final Runnable task) {
if (completed.get()) {
task.run();
return () -> false;
}
final Waiter waiter = new Waiter(task);
waiters.add(waiter);
if (completed.get()) {
drain();
}
return waiter;
}

private void drain() {
for (; ; ) {
final Waiter waiter = waiters.poll();
if (waiter == null) {
return;
}
waiter.runIfNotCancelled();
}
}

}

private final ConcurrentHashMap<String, Entry> inflight;

private CacheRequestCollapser() {
this.inflight = new ConcurrentHashMap<>();
}

Token enter(final String key) {
Args.notEmpty(key, "Key");
final Entry created = new Entry();
final Entry existing = inflight.putIfAbsent(key, created);
if (existing == null) {
return new Token(inflight, key, created, true);
}
return new Token(inflight, key, existing, false);
}

Cancellable await(final Token token, final Runnable task) {
Args.notNull(token, "Token");
Args.notNull(task, "Task");
return token.entry.await(task);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class CachingH2AsyncClientBuilder extends H2AsyncClientBuilder {
private SchedulingStrategy schedulingStrategy;
private CacheConfig cacheConfig;
private boolean deleteCache;
private boolean requestCollapsingEnabled;

public static CachingH2AsyncClientBuilder create() {
return new CachingH2AsyncClientBuilder();
Expand Down Expand Up @@ -112,6 +113,16 @@ public CachingH2AsyncClientBuilder setDeleteCache(final boolean deleteCache) {
return this;
}

/**
* Enables request collapsing for cacheable requests ({@code HTTPCLIENT-1165}).
*
* @since 5.7
*/
public CachingH2AsyncClientBuilder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) {
this.requestCollapsingEnabled = requestCollapsingEnabled;
return this;
}

@Override
protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler> execChainDefinition) {
final CacheConfig config = this.cacheConfig != null ? this.cacheConfig : CacheConfig.DEFAULT;
Expand Down Expand Up @@ -156,7 +167,8 @@ protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler>
final AsyncCachingExec cachingExec = new AsyncCachingExec(
httpCache,
cacheRevalidator,
config);
config,
requestCollapsingEnabled);
execChainDefinition.addBefore(ChainElement.PROTOCOL.name(), cachingExec, ChainElement.CACHING.name());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class CachingHttpAsyncClientBuilder extends HttpAsyncClientBuilder {
private SchedulingStrategy schedulingStrategy;
private CacheConfig cacheConfig;
private boolean deleteCache;
private boolean requestCollapsingEnabled;

public static CachingHttpAsyncClientBuilder create() {
return new CachingHttpAsyncClientBuilder();
Expand Down Expand Up @@ -116,6 +117,18 @@ public CachingHttpAsyncClientBuilder setDeleteCache(final boolean deleteCache) {
return this;
}

/**
* Enables request collapsing for cacheable requests ({@code HTTPCLIENT-1165}). When enabled,
* concurrent requests for the same cache key are coalesced so that only one request goes to
* the backend while the others wait and then re-check the cache.
*
* @since 5.7
*/
public CachingHttpAsyncClientBuilder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) {
this.requestCollapsingEnabled = requestCollapsingEnabled;
return this;
}

@Override
protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler> execChainDefinition) {
final CacheConfig config = this.cacheConfig != null ? this.cacheConfig : CacheConfig.DEFAULT;
Expand Down Expand Up @@ -160,7 +173,8 @@ protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler>
final AsyncCachingExec cachingExec = new AsyncCachingExec(
httpCache,
cacheRevalidator,
config);
config,
requestCollapsingEnabled);
execChainDefinition.addBefore(ChainElement.PROTOCOL.name(), cachingExec, ChainElement.CACHING.name());
}

Expand Down
Loading