Skip to content

Commit dd2af99

Browse files
committed
Let the BatchLoaderScheduler process all CF completions as a batch
1 parent a1db3a1 commit dd2af99

2 files changed

Lines changed: 21 additions & 18 deletions

File tree

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.CompletionException;
3030
import java.util.concurrent.CompletionStage;
31+
import java.util.concurrent.ConcurrentLinkedQueue;
3132
import java.util.concurrent.atomic.AtomicReference;
3233

3334
import static java.util.Collections.emptyList;
@@ -332,9 +333,8 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object>
332333
return CompletableFuture.completedFuture(values);
333334
}
334335

335-
List<K> clearCacheKeys = new ArrayList<>();
336-
var batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
337-
CompletableFuture<Void>[] scheduledCompletions = new CompletableFuture[keys.size()];
336+
Collection<K> clearCacheKeys = new ConcurrentLinkedQueue<>();
337+
List<Runnable> completeValueRunnables = new ArrayList<>();
338338
for (int idx = 0; idx < queuedFutures.size(); idx++) {
339339
K key = keys.get(idx);
340340
V value = values.get(idx);
@@ -360,14 +360,17 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object>
360360
future.complete(value);
361361
}
362362
};
363-
if(batchLoaderScheduler != null) {
364-
scheduledCompletions[idx] = batchLoaderScheduler.scheduleCompletion(completeValueRunnable, key, value);
365-
} else {
366-
scheduledCompletions[idx] = CompletableFutureKit.run(completeValueRunnable);
367-
}
363+
completeValueRunnables.add(completeValueRunnable);
364+
}
365+
CompletableFuture<Void> result;
366+
var batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
367+
if(batchLoaderScheduler != null) {
368+
result = batchLoaderScheduler.scheduleCompletions(completeValueRunnables, keys, values);
369+
} else {
370+
result = CompletableFuture.allOf(completeValueRunnables.stream().map(CompletableFutureKit::run).toArray(CompletableFuture[]::new));
368371
}
369372
// Wait for all completions to return
370-
return allOf(scheduledCompletions).thenApply(ignored -> {
373+
return result.thenApply(ignored -> {
371374
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
372375
return values;
373376
});
@@ -392,7 +395,7 @@ private void assertResultSize(List<K> keys, List<V> values) {
392395
assertState(keys.size() == values.size(), () -> "The size of the promised values MUST be the same size as the key list");
393396
}
394397

395-
private void possiblyClearCacheEntriesOnExceptions(List<K> keys) {
398+
private void possiblyClearCacheEntriesOnExceptions(Collection<K> keys) {
396399
if (keys.isEmpty()) {
397400
return;
398401
}

src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,17 @@ interface ScheduledBatchPublisherCall {
9898
/**
9999
* Schedules the completion of a {@link DataLoader} value's {@link java.util.concurrent.CompletableFuture} after a batch load.
100100
* <p>
101-
* Override this to offload completions to another thread, avoiding slow chained work
102-
* (e.g. {@code thenApply}) from executing inline on the dispatch thread.
101+
* Override this to offload completions to another thread or execute in parallel using multiple threads, avoiding
102+
* slow chained work (e.g. {@code thenApply}) from executing inline on the dispatch thread.
103103
* <p>
104104
* By default, completions run synchronously on the current thread, sequentially for each key in the batch.
105105
*
106-
* @param completeValueRunnable the runnable that completes the value
107-
* @param key the key being completed
108-
* @param value the value returned by the batch function for this key
109-
* @return a {@link CompletionStage} representing the scheduled work
106+
* @param completeValueRunnables the runnables that completes each value
107+
* @param keys the keys being completed
108+
* @param values the values returned by the batch function for each key
109+
* @return a {@link CompletionStage} representing the completion of all the scheduled work
110110
*/
111-
default <K, V> CompletableFuture<Void> scheduleCompletion(Runnable completeValueRunnable, K key, V value) {
112-
return CompletableFutureKit.run(completeValueRunnable);
111+
default <K, V> CompletableFuture<Void> scheduleCompletions(List<Runnable> completeValueRunnables, List<K> keys, List<V> values) {
112+
return CompletableFuture.allOf(completeValueRunnables.stream().map(CompletableFutureKit::run).toArray(CompletableFuture[]::new));
113113
}
114114
}

0 commit comments

Comments
 (0)