Skip to content

Commit 45065b0

Browse files
committed
PR 3.1 — Tidy: Extract narrow interfaces for segment-commons
- Add ExecutorFactory, TelemetryListener, DefinitionsCacheConsumer interfaces - Refactor SegmentFetcherImp and SegmentSynchronizationTaskImp to use them - Remove Guava dependencies from segment classes - Update SplitFactoryImpl and all affected tests
1 parent c06660f commit 45065b0

File tree

12 files changed

+104
-69
lines changed

12 files changed

+104
-69
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@
5757
import io.split.engine.experiments.SplitParser;
5858
import io.split.engine.experiments.SplitSynchronizationTask;
5959
import io.split.engine.experiments.RuleBasedSegmentParser;
60+
import io.split.engine.segments.ExecutorFactory;
6061
import io.split.engine.segments.SegmentChangeFetcher;
6162
import io.split.engine.segments.SegmentSynchronizationTaskImp;
63+
import io.split.engine.segments.TelemetryListener;
6264
import io.split.integrations.IntegrationsConfig;
6365
import io.split.service.SplitHttpClientImpl;
6466
import io.split.service.SplitHttpClient;
@@ -85,6 +87,7 @@
8587
import io.split.storages.pluggable.adapters.UserCustomRuleBasedSegmentAdapterConsumer;
8688
import io.split.storages.pluggable.domain.UserStorageWrapper;
8789
import io.split.storages.pluggable.synchronizer.TelemetryConsumerSubmitter;
90+
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
8891
import io.split.telemetry.storage.InMemoryTelemetryStorage;
8992
import io.split.telemetry.storage.NoopTelemetryStorage;
9093
import io.split.telemetry.storage.TelemetryStorage;
@@ -129,6 +132,7 @@
129132
import java.util.List;
130133
import java.util.ArrayList;
131134

135+
import io.split.client.utils.SplitExecutorFactory;
132136
import static io.split.client.utils.SplitExecutorFactory.buildExecutorService;
133137

134138
public class SplitFactoryImpl implements SplitFactory {
@@ -427,12 +431,17 @@ protected SplitFactoryImpl(SplitClientConfig config) {
427431
segmentChangeFetcher = new LocalhostSegmentChangeFetcher(config.segmentDirectory());
428432
}
429433

434+
TelemetryListener segmentTelemetryListener =
435+
t -> _telemetryStorageProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, t);
436+
ExecutorFactory segmentExecutorFactory =
437+
(tf, name, n) -> SplitExecutorFactory.buildScheduledExecutorService(tf, name, n);
430438
_segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher,
431439
config.segmentsRefreshRate(),
432440
config.numThreadsForSegmentFetch(),
433441
segmentCache,
434-
_telemetryStorageProducer,
442+
segmentTelemetryListener,
435443
_splitCache,
444+
segmentExecutorFactory,
436445
config.getThreadFactory(),
437446
ruleBasedSegmentCache);
438447

@@ -696,12 +705,17 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config,
696705
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_splitHttpClient, _rootTarget,
697706
_telemetryStorageProducer);
698707

708+
TelemetryListener segTelemetryListener =
709+
t -> _telemetryStorageProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, t);
710+
ExecutorFactory segExecutorFactory =
711+
(tf, name, n) -> SplitExecutorFactory.buildScheduledExecutorService(tf, name, n);
699712
return new SegmentSynchronizationTaskImp(segmentChangeFetcher,
700713
config.segmentsRefreshRate(),
701714
config.numThreadsForSegmentFetch(),
702715
segmentCacheProducer,
703-
_telemetryStorageProducer,
716+
segTelemetryListener,
704717
splitCacheConsumer,
718+
segExecutorFactory,
705719
config.getThreadFactory(),
706720
ruleBasedSegmentCache);
707721
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.split.engine.segments;
2+
3+
import java.util.concurrent.ScheduledExecutorService;
4+
import java.util.concurrent.ThreadFactory;
5+
6+
public interface ExecutorFactory {
7+
ScheduledExecutorService build(ThreadFactory threadFactory, String nameFormat, int numThreads);
8+
}

client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,30 @@
22

33
import io.split.client.dtos.SegmentChange;
44
import io.split.storages.SegmentCacheProducer;
5-
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
6-
import io.split.telemetry.storage.TelemetryRuntimeProducer;
75
import io.split.engine.common.FetchOptions;
86
import org.slf4j.Logger;
97
import org.slf4j.LoggerFactory;
108

119
import java.util.ArrayList;
1210
import java.util.List;
13-
14-
import static com.google.common.base.Preconditions.checkNotNull;
11+
import java.util.Objects;
1512

1613
public class SegmentFetcherImp implements SegmentFetcher {
1714
private static final Logger _log = LoggerFactory.getLogger(SegmentFetcherImp.class);
1815

1916
private final String _segmentName;
2017
private final SegmentChangeFetcher _segmentChangeFetcher;
2118
private final SegmentCacheProducer _segmentCacheProducer;
22-
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
19+
private final TelemetryListener _telemetryListener;
2320

2421
private final Object _lock = new Object();
2522

2623
public SegmentFetcherImp(String segmentName, SegmentChangeFetcher segmentChangeFetcher, SegmentCacheProducer segmentCacheProducer,
27-
TelemetryRuntimeProducer telemetryRuntimeProducer) {
28-
_segmentName = checkNotNull(segmentName);
29-
_segmentChangeFetcher = checkNotNull(segmentChangeFetcher);
30-
_segmentCacheProducer = checkNotNull(segmentCacheProducer);
31-
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
24+
TelemetryListener telemetryListener) {
25+
_segmentName = Objects.requireNonNull(segmentName);
26+
_segmentChangeFetcher = Objects.requireNonNull(segmentChangeFetcher);
27+
_segmentCacheProducer = Objects.requireNonNull(segmentCacheProducer);
28+
_telemetryListener = Objects.requireNonNull(telemetryListener);
3229

3330
_segmentCacheProducer.updateSegment(segmentName, new ArrayList<>(), new ArrayList<>(), -1L);
3431
}
@@ -97,7 +94,7 @@ private void runWithoutExceptionHandling(FetchOptions options) {
9794
_log.info(_segmentName + " removed keys: " + summarize(change.removed));
9895
}
9996

100-
_telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, System.currentTimeMillis());
97+
_telemetryListener.recordSuccessfulSync(System.currentTimeMillis());
10198
}
10299
}
103100

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
package io.split.engine.segments;
22

3-
import com.google.common.collect.Maps;
4-
import io.split.client.utils.SplitExecutorFactory;
53
import io.split.engine.common.FetchOptions;
4+
import io.split.storages.DefinitionsCacheConsumer;
65
import io.split.storages.RuleBasedSegmentCacheConsumer;
76
import io.split.storages.SegmentCacheProducer;
8-
import io.split.storages.SplitCacheConsumer;
9-
import io.split.telemetry.storage.TelemetryRuntimeProducer;
107
import org.slf4j.Logger;
118
import org.slf4j.LoggerFactory;
129

1310
import java.io.Closeable;
1411
import java.util.HashSet;
1512
import java.util.List;
1613
import java.util.Map;
14+
import java.util.Objects;
1715
import java.util.Set;
16+
import java.util.concurrent.ConcurrentHashMap;
1817
import java.util.concurrent.ConcurrentMap;
1918
import java.util.concurrent.ExecutionException;
2019
import java.util.concurrent.Future;
@@ -26,40 +25,40 @@
2625
import java.util.concurrent.atomic.AtomicLong;
2726
import java.util.stream.Collectors;
2827

29-
import static com.google.common.base.Preconditions.checkArgument;
30-
import static com.google.common.base.Preconditions.checkNotNull;
31-
3228
public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask, Closeable {
3329
private static final Logger _log = LoggerFactory.getLogger(SegmentSynchronizationTaskImp.class);
3430

3531
private final SegmentChangeFetcher _segmentChangeFetcher;
3632
private final AtomicLong _refreshEveryNSeconds;
3733
private final AtomicBoolean _running;
3834
private final Object _lock = new Object();
39-
private final ConcurrentMap<String, SegmentFetcher> _segmentFetchers = Maps.newConcurrentMap();
35+
private final ConcurrentMap<String, SegmentFetcher> _segmentFetchers = new ConcurrentHashMap<>();
4036
private final SegmentCacheProducer _segmentCacheProducer;
4137
private final ScheduledExecutorService _scheduledExecutorService;
42-
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
43-
private final SplitCacheConsumer _splitCacheConsumer;
38+
private final TelemetryListener _telemetryListener;
39+
private final DefinitionsCacheConsumer _definitionsCacheConsumer;
4440
private final RuleBasedSegmentCacheConsumer _ruleBasedSegmentCacheConsumer;
4541

4642
private ScheduledFuture<?> _scheduledFuture;
4743

4844
public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads,
49-
SegmentCacheProducer segmentCacheProducer, TelemetryRuntimeProducer telemetryRuntimeProducer,
50-
SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory,
45+
SegmentCacheProducer segmentCacheProducer, TelemetryListener telemetryListener,
46+
DefinitionsCacheConsumer definitionsCacheConsumer, ExecutorFactory executorFactory,
47+
ThreadFactory threadFactory,
5148
RuleBasedSegmentCacheConsumer ruleBasedSegmentCacheConsumer) {
52-
_segmentChangeFetcher = checkNotNull(segmentChangeFetcher);
49+
_segmentChangeFetcher = Objects.requireNonNull(segmentChangeFetcher);
5350

54-
checkArgument(refreshEveryNSeconds >= 0L);
51+
if (refreshEveryNSeconds < 0L) {
52+
throw new IllegalArgumentException("refreshEveryNSeconds must be non-negative");
53+
}
5554
_refreshEveryNSeconds = new AtomicLong(refreshEveryNSeconds);
56-
_scheduledExecutorService = SplitExecutorFactory.buildScheduledExecutorService(threadFactory, "split-segmentFetcher-" + "%d", numThreads);
55+
_scheduledExecutorService = executorFactory.build(threadFactory, "split-segmentFetcher-%d", numThreads);
5756
_running = new AtomicBoolean(false);
5857

59-
_segmentCacheProducer = checkNotNull(segmentCacheProducer);
60-
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
61-
_splitCacheConsumer = checkNotNull(splitCacheConsumer);
62-
_ruleBasedSegmentCacheConsumer = checkNotNull(ruleBasedSegmentCacheConsumer);
58+
_segmentCacheProducer = Objects.requireNonNull(segmentCacheProducer);
59+
_telemetryListener = Objects.requireNonNull(telemetryListener);
60+
_definitionsCacheConsumer = Objects.requireNonNull(definitionsCacheConsumer);
61+
_ruleBasedSegmentCacheConsumer = Objects.requireNonNull(ruleBasedSegmentCacheConsumer);
6362
}
6463

6564
public void initializeSegment(String segmentName) {
@@ -77,7 +76,7 @@ public void initializeSegment(String segmentName) {
7776
return;
7877
}
7978

80-
SegmentFetcher newSegment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _segmentCacheProducer, _telemetryRuntimeProducer);
79+
SegmentFetcher newSegment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _segmentCacheProducer, _telemetryListener);
8180

8281
if (_running.get()) {
8382
_scheduledExecutorService.submit(() -> newSegment.fetch(new FetchOptions.Builder().build()));
@@ -195,14 +194,14 @@ private void initialize(String segmentName) {
195194
return;
196195
}
197196

198-
segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _segmentCacheProducer, _telemetryRuntimeProducer);
197+
segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _segmentCacheProducer, _telemetryListener);
199198

200199
_segmentFetchers.putIfAbsent(segmentName, segment);
201200
}
202201
}
203202

204203
private Set<String> getSegmentNames() {
205-
Set<String> names = new HashSet<>(_splitCacheConsumer.getSegments());
204+
Set<String> names = new HashSet<>(_definitionsCacheConsumer.getSegments());
206205
names.addAll(_ruleBasedSegmentCacheConsumer.getSegments());
207206

208207
return names;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.split.engine.segments;
2+
3+
public interface TelemetryListener {
4+
void recordSuccessfulSync(long time);
5+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.split.storages;
2+
3+
import java.util.Set;
4+
5+
public interface DefinitionsCacheConsumer {
6+
Set<String> getSegments();
7+
}
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.split.storages;
22

3-
import java.util.Set;
4-
5-
public interface SplitCacheCommons {
3+
public interface SplitCacheCommons extends DefinitionsCacheConsumer {
64
long getChangeNumber();
7-
Set<String> getSegments();
85
}

client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import io.split.client.utils.FileInputStreamProvider;
88
import io.split.client.utils.InputStreamProvider;
99
import io.split.engine.experiments.*;
10+
import io.split.engine.segments.ExecutorFactory;
1011
import io.split.engine.segments.SegmentChangeFetcher;
1112
import io.split.engine.segments.SegmentSynchronizationTaskImp;
13+
import io.split.engine.segments.TelemetryListener;
1214
import io.split.storages.*;
1315
import io.split.storages.memory.InMemoryCacheImp;
1416
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
@@ -24,6 +26,8 @@
2426
public class LocalhostSynchronizerTest {
2527

2628
private static final TelemetryStorage TELEMETRY_STORAGE_NOOP = Mockito.mock(NoopTelemetryStorage.class);
29+
private static final TelemetryListener TELEMETRY_LISTENER = t -> {};
30+
private static final ExecutorFactory EXECUTOR_FACTORY = (tf, name, n) -> java.util.concurrent.Executors.newScheduledThreadPool(n);
2731
private static final FlagSetsFilter FLAG_SETS_FILTER = new FlagSetsFilterImpl(new HashSet<>());
2832

2933
@Test
@@ -45,7 +49,7 @@ public void testSyncAll(){
4549
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();
4650

4751
SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
48-
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
52+
TELEMETRY_LISTENER, splitCacheProducer, EXECUTOR_FACTORY, null, ruleBasedSegmentCache);
4953
SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);
5054

5155
LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, false);
@@ -72,7 +76,7 @@ public void testPeriodicFetching() throws InterruptedException {
7276
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();
7377

7478
SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
75-
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
79+
TELEMETRY_LISTENER, splitCacheProducer, EXECUTOR_FACTORY, null, ruleBasedSegmentCache);
7680

7781
SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);
7882
LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, true);

client/src/test/java/io/split/engine/common/SynchronizerTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import io.split.client.impressions.UniqueKeysTracker;
66
import io.split.client.interceptors.FlagSetsFilter;
77
import io.split.client.interceptors.FlagSetsFilterImpl;
8+
import io.split.engine.segments.ExecutorFactory;
89
import io.split.engine.segments.SegmentChangeFetcher;
910
import io.split.engine.segments.SegmentSynchronizationTaskImp;
11+
import io.split.engine.segments.TelemetryListener;
1012
import io.split.storages.*;
1113
import io.split.storages.memory.InMemoryCacheImp;
1214
import io.split.engine.experiments.FetchResult;
@@ -81,9 +83,11 @@ public void syncAll() throws InterruptedException {
8183

8284
@Test
8385
public void testSyncAllSegments() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
86+
ExecutorFactory executorFactory = (tf, name, n) -> java.util.concurrent.Executors.newScheduledThreadPool(n);
87+
TelemetryListener telemetryListener = t -> {};
8488
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(Mockito.mock(SegmentChangeFetcher.class),
85-
20L, 1, _segmentCacheProducer, Mockito.mock(TelemetryRuntimeProducer.class),
86-
Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class));
89+
20L, 1, _segmentCacheProducer, telemetryListener,
90+
Mockito.mock(SplitCacheConsumer.class), executorFactory, null, Mockito.mock(RuleBasedSegmentCache.class));
8791
Field synchronizerSegmentFetcher = SynchronizerImp.class.getDeclaredField("_segmentSynchronizationTaskImp");
8892
synchronizerSegmentFetcher.setAccessible(true);
8993
Field modifiersField = Field.class.getDeclaredField("modifiers");

0 commit comments

Comments
 (0)