Skip to content

Commit 4c922c5

Browse files
committed
Add complicated test case
1 parent 1ccd687 commit 4c922c5

File tree

2 files changed

+298
-0
lines changed

2 files changed

+298
-0
lines changed

src/main/java/org/dataloader/strategy/BreadthFirstChainedDispatchStrategy.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.dataloader.DataLoaderRegistry;
44
import org.dataloader.DispatchStrategy;
5+
import org.dataloader.annotations.VisibleForTesting;
56
import org.dataloader.impl.Assertions;
67
import org.jspecify.annotations.Nullable;
78

@@ -20,6 +21,9 @@ public class BreadthFirstChainedDispatchStrategy implements DispatchStrategy {
2021
private final AtomicInteger totalWorkCount = new AtomicInteger(0);
2122
private final Object dispatchLock = new Object();
2223

24+
// only used for tests
25+
private Runnable onIteration;
26+
2327
private final Duration fallbackTimeout;
2428
@Nullable private ScheduledFuture<?> fallbackDispatchFuture = null;
2529

@@ -62,6 +66,8 @@ private void triggerDeterministicDispatch() {
6266
}
6367

6468
while (pendingLoadCount.get() > 0) {
69+
onIteration.run();
70+
6571
int workBefore = totalWorkCount.get();
6672

6773
dispatchCallback.run();
@@ -111,6 +117,11 @@ private synchronized void resetState() {
111117
}
112118
}
113119

120+
@VisibleForTesting
121+
void onIteration(Runnable onIteration) {
122+
this.onIteration = onIteration;
123+
}
124+
114125
public static class Builder {
115126
private Duration fallbackTimeout = DEFAULT_FALLBACK_TIMEOUT;
116127
private final ScheduledExecutorService scheduledExecutorService;
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
package org.dataloader.strategy;
2+
3+
import org.dataloader.BatchLoader;
4+
import org.dataloader.DataLoaderFactory;
5+
import org.dataloader.DataLoaderRegistry;
6+
import org.junit.jupiter.api.BeforeEach;
7+
import org.junit.jupiter.api.Test;
8+
9+
import java.time.Duration;
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.CountDownLatch;
14+
import java.util.concurrent.Executors;
15+
import java.util.function.Function;
16+
import java.util.stream.Collectors;
17+
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
import static org.hamcrest.Matchers.equalTo;
20+
21+
public class BreadthFirstChainedDispatchStrategyStressTest {
22+
23+
private int iterationCount;
24+
private List<List<String>> dispatchOrder;
25+
private List<List<String>> queueOrder;
26+
private List<List<String>> completionOrder;
27+
private DataLoaderRegistry registry;
28+
private CountDownLatch bLatch;
29+
private CountDownLatch gLatch;
30+
private CountDownLatch aStarted;
31+
private CountDownLatch gCompleted;
32+
private CountDownLatch iCompleted;
33+
34+
/*
35+
Simulating tree with async conditions
36+
37+
A
38+
B (async) completed before G
39+
E
40+
F
41+
C
42+
G (async) completes last
43+
H
44+
D
45+
I
46+
J
47+
*/
48+
@BeforeEach
49+
public void setup() {
50+
dispatchOrder = new ArrayList<>();
51+
queueOrder = new ArrayList<>();
52+
completionOrder = new ArrayList<>();
53+
for (int i = 0; i < 5; i++) {
54+
dispatchOrder.add(new ArrayList<>());
55+
queueOrder.add(new ArrayList<>());
56+
completionOrder.add(new ArrayList<>());
57+
}
58+
addAtIteration(queueOrder, "A");
59+
bLatch = new CountDownLatch(1);
60+
gLatch = new CountDownLatch(1);
61+
aStarted = new CountDownLatch(1);
62+
gCompleted = new CountDownLatch(1);
63+
iCompleted = new CountDownLatch(1);
64+
iterationCount = 1;
65+
BreadthFirstChainedDispatchStrategy breadthFirstChainedDispatchStrategy =
66+
new BreadthFirstChainedDispatchStrategy.Builder(Executors.newSingleThreadScheduledExecutor())
67+
.setFallbackTimeout(Duration.ofMillis(300)).build();
68+
breadthFirstChainedDispatchStrategy.onIteration(() -> iterationCount += 1);
69+
registry = DataLoaderRegistry.newRegistry()
70+
.dispatchStrategy(breadthFirstChainedDispatchStrategy)
71+
.build();
72+
73+
74+
// Loaders named after diagram above
75+
BatchLoader<Integer, Integer> eLoader = keys -> {
76+
addAtIteration(dispatchOrder, "E");
77+
return CompletableFuture.completedFuture(keys);
78+
};
79+
BatchLoader<Integer, Integer> fLoader = keys -> {
80+
addAtIteration(dispatchOrder, "F");
81+
return CompletableFuture.completedFuture(keys);
82+
};
83+
BatchLoader<Integer, Integer> hLoader = keys -> {
84+
addAtIteration(dispatchOrder, "H");
85+
return CompletableFuture.completedFuture(keys);
86+
};
87+
BatchLoader<Integer, Integer> iLoader = keys -> {
88+
addAtIteration(dispatchOrder, "I");
89+
return CompletableFuture.completedFuture(keys);
90+
};
91+
BatchLoader<Integer, Integer> jLoader = keys -> {
92+
addAtIteration(dispatchOrder, "J");
93+
return CompletableFuture.completedFuture(keys);
94+
};
95+
96+
BatchLoader<Integer, Integer> gLoader = keys -> {
97+
addAtIteration(dispatchOrder, "G");
98+
CompletableFuture<List<Integer>> gFuture = new CompletableFuture<>();
99+
CompletableFuture.runAsync(() -> {
100+
try {
101+
gLatch.await();
102+
gFuture.complete(keys);
103+
} catch (InterruptedException e) {
104+
// do nothing
105+
}
106+
});
107+
return gFuture;
108+
};
109+
110+
BatchLoader<Integer, Integer> bLoader = keys -> {
111+
addAtIteration(dispatchOrder, "B");
112+
CompletableFuture<List<Integer>> bFuture = new CompletableFuture<>();
113+
CompletableFuture.runAsync(() -> {
114+
try {
115+
bLatch.await();
116+
CompletableFuture<Integer> eResult = registry.<Integer, Integer>getDataLoader("eLoader").load(keys.get(0))
117+
.whenComplete((result, error) -> addAtIteration(completionOrder, "E"));
118+
addAtIteration(queueOrder, "E");
119+
CompletableFuture<Integer> fResult = registry.<Integer, Integer>getDataLoader("fLoader").load(keys.get(0))
120+
.whenComplete((result, error) -> addAtIteration(completionOrder, "F"));
121+
addAtIteration(queueOrder, "F");
122+
eResult.thenCombine(fResult, (eNum, fNum) -> List.of(eNum + fNum))
123+
.thenAccept(bFuture::complete);
124+
} catch (InterruptedException e) {
125+
// do nothing
126+
}
127+
});
128+
return bFuture;
129+
};
130+
131+
BatchLoader<Integer, Integer> cLoader = keys -> {
132+
addAtIteration(dispatchOrder, "C");
133+
CompletableFuture<Integer> gResult = registry.<Integer, Integer>getDataLoader("gLoader").load(keys.get(0))
134+
.whenComplete((result, error) -> {
135+
addAtIteration(completionOrder, "G");
136+
gCompleted.countDown();
137+
});
138+
addAtIteration(queueOrder, "G");
139+
CompletableFuture<Integer> hResult = registry.<Integer, Integer>getDataLoader("hLoader").load(keys.get(0))
140+
.whenComplete((result, error) -> addAtIteration(completionOrder, "H"));
141+
addAtIteration(queueOrder, "H");
142+
143+
return gResult.thenCombine(hResult, (gNum, hNum) -> List.of(gNum + hNum));
144+
};
145+
146+
BatchLoader<Integer, Integer> dLoader = keys -> {
147+
addAtIteration(dispatchOrder, "D");
148+
CompletableFuture<Integer> iResult = registry.<Integer, Integer>getDataLoader("iLoader").load(keys.get(0))
149+
.whenComplete((result, error) -> {
150+
addAtIteration(completionOrder, "I");
151+
iCompleted.countDown();
152+
});
153+
addAtIteration(queueOrder, "I");
154+
CompletableFuture<Integer> jResult = registry.<Integer, Integer>getDataLoader("jLoader").load(keys.get(0))
155+
.whenComplete((result, error) -> addAtIteration(completionOrder, "J"));
156+
addAtIteration(queueOrder, "J");
157+
158+
return iResult.thenCombine(jResult, (iNum, jNum) -> List.of(iNum + jNum));
159+
};
160+
161+
BatchLoader<Integer, Integer> aLoader = keys -> {
162+
aStarted.countDown();
163+
addAtIteration(dispatchOrder, "A");
164+
CompletableFuture<Integer> bResult = registry.<Integer, Integer>getDataLoader("bLoader").load(keys.get(0))
165+
.whenComplete((result, error) -> addAtIteration(completionOrder, "B"));
166+
addAtIteration(queueOrder, "B");
167+
CompletableFuture<Integer> cResult = registry.<Integer, Integer>getDataLoader("cLoader").load(keys.get(0))
168+
.whenComplete((result, error) -> addAtIteration(completionOrder, "C"));
169+
addAtIteration(queueOrder, "C");
170+
CompletableFuture<Integer> dResult = registry.<Integer, Integer>getDataLoader("dLoader").load(keys.get(0))
171+
.whenComplete((result, error) -> addAtIteration(completionOrder, "D"));
172+
addAtIteration(queueOrder, "D");
173+
174+
return CompletableFuture.allOf(bResult, cResult, dResult).thenApply(unused -> {
175+
int bNum = bResult.join();
176+
int cNum = cResult.join();
177+
int dNum = dResult.join();
178+
179+
return List.of(bNum + cNum + dNum);
180+
}).whenComplete((result, error) -> addAtIteration(completionOrder, "A"));
181+
};
182+
183+
registry.register("aLoader", DataLoaderFactory.newDataLoader(aLoader));
184+
registry.register("bLoader", DataLoaderFactory.newDataLoader(bLoader));
185+
registry.register("cLoader", DataLoaderFactory.newDataLoader(cLoader));
186+
registry.register("dLoader", DataLoaderFactory.newDataLoader(dLoader));
187+
registry.register("eLoader", DataLoaderFactory.newDataLoader(eLoader));
188+
registry.register("fLoader", DataLoaderFactory.newDataLoader(fLoader));
189+
registry.register("gLoader", DataLoaderFactory.newDataLoader(gLoader));
190+
registry.register("hLoader", DataLoaderFactory.newDataLoader(hLoader));
191+
registry.register("iLoader", DataLoaderFactory.newDataLoader(iLoader));
192+
registry.register("jLoader", DataLoaderFactory.newDataLoader(jLoader));
193+
}
194+
195+
196+
/*
197+
Explanation of assertions.
198+
199+
G and B are async
200+
G unlocked once leaf nodes have started
201+
202+
B unlocked once G completes
203+
204+
Dispatch order
205+
Iteration 1: - Due to dataloader order in the registry C is dispatched and H is dispatched greedily
206+
A, B, C, D H
207+
Iteration 2:
208+
G, I, J - E and F are blocked by B as they are async chained
209+
Iteration 3:
210+
E, F - B has unlocked and allowed dispatching of E and F
211+
212+
Queue Order
213+
Iteration 1: -
214+
A
215+
Iteration 2:
216+
B, C, D, G, H, I, J - All but E and F queued as we get as much work as possible
217+
Iteration 3
218+
E, F - B unlocks E and F once async call completes
219+
220+
Completion Order
221+
H, J, I, D, G, C, E, F, B, A
222+
223+
Walk the tree up from roots greedily as calls finish.
224+
D finishes first as no blocks
225+
C finishes second as G is async
226+
B finishes last as well as E and F leafs as they are blocked by async B finishing
227+
*/
228+
@Test
229+
void verifyExecutionOrder() throws Exception {
230+
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> registry.<Integer, Integer>getDataLoader("aLoader").load(1).join(),
231+
Executors.newSingleThreadExecutor());
232+
233+
aStarted.await();
234+
235+
// do not release g until leaf level started
236+
iCompleted.await();
237+
238+
// g call finished
239+
gLatch.countDown();
240+
241+
// do not release b until leafs completed
242+
gCompleted.await();
243+
244+
// b call finished
245+
bLatch.countDown();
246+
247+
int resultNum = result.join();
248+
249+
// 6 leaf nodes added together
250+
assertThat(resultNum, equalTo(6));
251+
252+
// clean up padded lists
253+
dispatchOrder = dispatchOrder.stream().filter(list -> !list.isEmpty()).collect(Collectors.toList());
254+
queueOrder = queueOrder.stream().filter(list -> !list.isEmpty()).collect(Collectors.toList());
255+
List<String> flatCompletionOrder = completionOrder.stream().flatMap(List::stream).collect(Collectors.toList());
256+
257+
// Due to DataLoaders queueing other dataloaders during dispatch more work is done than level by level
258+
assertThat(dispatchOrder, equalTo(List.of(
259+
List.of("A", "C", "B", "H", "D"),
260+
List.of("G", "J", "I"),
261+
List.of("E", "F")
262+
)));
263+
// Greedily queues all known work capable
264+
assertThat(queueOrder, equalTo(List.of(
265+
List.of("A"),
266+
List.of("B", "C", "D", "G", "H", "I", "J"),
267+
List.of("E", "F")
268+
)));
269+
270+
assertThat(completionOrder, equalTo(List.of(
271+
"H",
272+
"J",
273+
"I",
274+
"D",
275+
"G",
276+
"C",
277+
"E",
278+
"F",
279+
"B",
280+
"A"
281+
)));
282+
}
283+
284+
private void addAtIteration(List<List<String>> aList, String toAdd) {
285+
aList.get(iterationCount).add(toAdd);
286+
}
287+
}

0 commit comments

Comments
 (0)