Skip to content

Commit e81ea3c

Browse files
committed
Update dispatch strategy and impl bfs strategy
1 parent 3243b9b commit e81ea3c

File tree

8 files changed

+395
-199
lines changed

8 files changed

+395
-199
lines changed

src/main/java/org/dataloader/DataLoader.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,7 @@ public Optional<CompletableFuture<V>> getIfCompleted(K key) {
229229
* @return the future of the value
230230
*/
231231
public CompletableFuture<V> load(@NonNull K key, @Nullable Object keyContext) {
232-
CompletableFuture<V> result = loadImpl(key, keyContext);
233-
options.getDispatchStrategy().loadCalled(this);
234-
return result;
232+
return loadImpl(key, keyContext);
235233
}
236234

237235
private CompletableFuture<V> loadImpl(@NonNull K key, @Nullable Object keyContext) {
@@ -283,7 +281,6 @@ public CompletableFuture<List<V>> loadMany(List<K> keys, List<Object> keyContext
283281
}
284282
collect.add(loadImpl(key, keyContext));
285283
}
286-
options.getDispatchStrategy().loadCalled(this);
287284
return CompletableFutureKit.allOf(collect);
288285
}
289286

@@ -311,7 +308,6 @@ public CompletableFuture<Map<K, V>> loadMany(Map<K, ?> keysAndContexts) {
311308
Object keyContext = entry.getValue();
312309
collect.put(key, loadImpl(key, keyContext));
313310
}
314-
options.getDispatchStrategy().loadCalled(this);
315311
return CompletableFutureKit.allOf(collect);
316312
}
317313

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ CompletableFuture<V> load(K key, Object loadContext) {
158158
// We already have a promise for this key, no need to check value cache or queue up load
159159
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
160160
ctx.onDispatched();
161+
loaderOptions.getDispatchStrategy().loadCalled();
161162
cachedFuture.whenComplete(ctx::onCompleted);
163+
cachedFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted());
162164
return cachedFuture;
163165
}
164166
} catch (Exception ignored) {
@@ -173,7 +175,9 @@ CompletableFuture<V> load(K key, Object loadContext) {
173175
// another thread was faster and created a matching CF ... hence this is really a cachehit and we are done
174176
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
175177
ctx.onDispatched();
178+
loaderOptions.getDispatchStrategy().loadCalled();
176179
cachedFuture.whenComplete(ctx::onCompleted);
180+
cachedFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted());
177181
return cachedFuture;
178182
}
179183
}
@@ -190,14 +194,18 @@ CompletableFuture<V> load(K key, Object loadContext) {
190194
// meaning this is a cache hit and we are done
191195
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
192196
ctx.onDispatched();
197+
loaderOptions.getDispatchStrategy().loadCalled();
193198
cachedFuture.whenComplete(ctx::onCompleted);
199+
cachedFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted());
194200
return cachedFuture;
195201
}
196202
}
197203
}
198204

199205
ctx.onDispatched();
206+
loaderOptions.getDispatchStrategy().loadCalled();
200207
loadCallFuture.whenComplete(ctx::onCompleted);
208+
loadCallFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted());
201209
return loadCallFuture;
202210
}
203211

src/main/java/org/dataloader/DataLoaderRegistry.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,15 @@ private DataLoaderRegistry(Builder builder) {
5858
}
5959

6060
protected DataLoaderRegistry(Map<String, DataLoader<?, ?>> dataLoaders, @Nullable DataLoaderInstrumentation instrumentation, DispatchStrategy dispatchStrategy) {
61-
this.dataLoaders = instrumentDLs(dataLoaders, instrumentation);
61+
this.dataLoaders = instrumentDLs(dataLoaders, instrumentation, dispatchStrategy);
6262
this.instrumentation = instrumentation;
6363
this.dispatchStrategy = dispatchStrategy;
64+
dispatchStrategy.onRegistryCreation(this);
6465
}
6566

66-
private Map<String, DataLoader<?, ?>> instrumentDLs(Map<String, DataLoader<?, ?>> incomingDataLoaders, @Nullable DataLoaderInstrumentation registryInstrumentation) {
67+
private Map<String, DataLoader<?, ?>> instrumentDLs(Map<String, DataLoader<?, ?>> incomingDataLoaders, @Nullable DataLoaderInstrumentation registryInstrumentation, DispatchStrategy dispatchStrategy) {
6768
Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>(incomingDataLoaders);
68-
if (registryInstrumentation != null) {
69+
if (registryInstrumentation != null || dispatchStrategy != DispatchStrategy.NO_OP) {
6970
dataLoaders.replaceAll((k, existingDL) -> nameAndInstrumentDL(k, registryInstrumentation, existingDL, dispatchStrategy));
7071
}
7172
return dataLoaders;
@@ -83,7 +84,7 @@ protected DataLoaderRegistry(Map<String, DataLoader<?, ?>> dataLoaders, @Nullabl
8384
private static DataLoader<?, ?> nameAndInstrumentDL(String key, @Nullable DataLoaderInstrumentation registryInstrumentation, DataLoader<?, ?> existingDL, DispatchStrategy dispatchStrategy) {
8485
existingDL = checkAndSetName(key, existingDL);
8586

86-
if (registryInstrumentation == null) {
87+
if (registryInstrumentation == null && dispatchStrategy == DispatchStrategy.NO_OP) {
8788
return existingDL;
8889
}
8990
DataLoaderOptions options = existingDL.getOptions();
@@ -120,12 +121,17 @@ protected DataLoaderRegistry(Map<String, DataLoader<?, ?>> dataLoaders, @Nullabl
120121
return dataLoader;
121122
}
122123

123-
private static DataLoader<?, ?> mkInstrumentedDataLoader(DataLoader<?, ?> existingDL, DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) {
124+
private static DataLoader<?, ?> mkInstrumentedDataLoader(DataLoader<?, ?> existingDL, DataLoaderOptions options, @Nullable DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) {
124125
return existingDL.transform(builder -> builder.options(setInInstrumentation(options, newInstrumentation, dispatchStrategy)));
125126
}
126127

127-
private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) {
128-
return options.transform(optionsBuilder -> optionsBuilder.setInstrumentation(newInstrumentation).setDispatchStrategy(dispatchStrategy));
128+
private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, @Nullable DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) {
129+
return options.transform(optionsBuilder -> {
130+
optionsBuilder.setDispatchStrategy(dispatchStrategy);
131+
if (newInstrumentation != null) {
132+
optionsBuilder.setInstrumentation(newInstrumentation);
133+
}
134+
});
129135
}
130136

131137
/**

src/main/java/org/dataloader/DispatchStrategy.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,24 @@
33
import org.dataloader.annotations.PublicApi;
44
import org.jspecify.annotations.NullMarked;
55

6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
68
@NullMarked
79
@PublicApi
810
public interface DispatchStrategy {
911

1012
DispatchStrategy NO_OP = new DispatchStrategy() {
1113
};
1214

13-
default void loadCalled(DataLoader<?, ?> dataLoader) {
15+
default void onRegistryCreation(DataLoaderRegistry registry) {
16+
17+
}
18+
19+
default void loadCalled() {
20+
21+
}
22+
23+
default void loadCompleted() {
1424

1525
}
1626
}

src/main/java/org/dataloader/NotBusyDispatchStrategy.java

Lines changed: 0 additions & 185 deletions
This file was deleted.

src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.dataloader.DataLoader;
44
import org.dataloader.DataLoaderRegistry;
5+
import org.dataloader.DispatchStrategy;
56
import org.dataloader.annotations.ExperimentalApi;
67
import org.dataloader.impl.Assertions;
78
import org.dataloader.instrumentation.DataLoaderInstrumentation;
@@ -69,7 +70,7 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A
6970
private volatile boolean closed;
7071

7172
private ScheduledDataLoaderRegistry(Builder builder) {
72-
super(builder.dataLoaders, builder.instrumentation);
73+
super(builder.dataLoaders, builder.instrumentation, DispatchStrategy.NO_OP);
7374
this.scheduledExecutorService = Assertions.nonNull(builder.scheduledExecutorService);
7475
this.defaultExecutorUsed = builder.defaultExecutorUsed;
7576
this.schedule = builder.schedule;

0 commit comments

Comments
 (0)