Skip to content

Commit e9dbbc2

Browse files
authored
Streaming modules (#878)
1 parent a628ce3 commit e9dbbc2

File tree

65 files changed

+2721
-751
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+2721
-751
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ dependencies {
145145
return candidates.find { findProject(it) != null }
146146
}
147147

148-
['main', 'logger', 'executor', 'events', 'events-domain', 'api', 'http-api', 'http', 'fallback', 'backoff', 'tracker', 'submitter'].each { moduleName ->
148+
['main', 'logger', 'events', 'events-domain', 'api', 'http-api', 'http', 'fallback', 'backoff', 'tracker', 'submitter', 'streaming', 'streaming-support', 'executor'].each { moduleName ->
149149
def resolvedPath = resolveProjectPath(moduleName)
150150
if (resolvedPath != null) {
151151
include project(resolvedPath)

main/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ dependencies {
6060
api clientModuleProject('submitter')
6161

6262
// Internal module dependencies
63-
implementation clientModuleProject('http')
64-
implementation clientModuleProject('events-domain')
63+
implementation clientModuleProject(':http')
64+
implementation clientModuleProject(':events-domain')
65+
implementation clientModuleProject(':streaming')
66+
implementation clientModuleProject(':streaming-support')
6567

6668
// External dependencies
6769
implementation libs.roomRuntime

main/src/main/java/io/split/android/client/SplitFactoryHelper.java

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.concurrent.TimeUnit;
2323
import java.util.concurrent.locks.ReentrantLock;
2424

25-
import io.split.android.client.common.CompressionUtilProvider;
25+
import io.split.android.client.streaming.support.CompressionUtilProvider;
2626
import io.split.android.client.events.EventsManagerCoordinator;
2727
import io.split.android.client.events.SplitInternalEvent;
2828
import io.split.android.client.lifecycle.SplitLifecycleManager;
@@ -54,13 +54,22 @@
5454
import io.split.android.client.service.sseclient.reactor.MySegmentsUpdateWorkerRegistry;
5555
import io.split.android.client.service.sseclient.reactor.SplitUpdatesWorker;
5656
import io.split.android.client.service.sseclient.sseclient.BackoffCounterTimer;
57+
import io.split.android.client.service.sseclient.sseclient.HttpFetcherStreamingAuthFetcher;
58+
import io.split.android.client.service.sseclient.sseclient.NotificationProcessorUpdateListener;
5759
import io.split.android.client.service.sseclient.sseclient.PushNotificationManager;
5860
import io.split.android.client.service.sseclient.sseclient.SseAuthenticator;
5961
import io.split.android.client.service.sseclient.sseclient.SseClient;
60-
import io.split.android.client.service.sseclient.sseclient.SseClientImpl;
62+
import io.split.android.client.service.sseclient.sseclient.HttpClientStreamingTransport;
63+
import io.split.android.client.service.sseclient.sseclient.DefaultSseClient;
64+
import io.split.android.client.service.sseclient.sseclient.EventSourceClientImpl;
6165
import io.split.android.client.service.sseclient.sseclient.SseHandler;
6266
import io.split.android.client.service.sseclient.sseclient.SseRefreshTokenTimer;
67+
import io.split.android.client.service.sseclient.sseclient.SplitTaskExecutorStreamingScheduler;
6368
import io.split.android.client.service.sseclient.sseclient.StreamingComponents;
69+
import io.split.android.client.service.sseclient.sseclient.TelemetryRuntimeProducerStreamingTelemetry;
70+
import io.split.android.client.service.sseclient.spi.StreamingScheduler;
71+
import io.split.android.client.service.sseclient.spi.StreamingTelemetry;
72+
import io.split.android.client.service.sseclient.spi.UpdateNotificationListener;
6473
import io.split.android.client.service.synchronizer.RolloutCacheManager;
6574
import io.split.android.client.service.synchronizer.RolloutCacheManagerImpl;
6675
import io.split.android.client.service.synchronizer.SyncGuardian;
@@ -288,37 +297,41 @@ SyncManager buildSyncManager(SplitClientConfig config,
288297
}
289298

290299
@NonNull
291-
PushNotificationManager getPushNotificationManager(SplitTaskExecutor splitTaskExecutor,
300+
PushNotificationManager getPushNotificationManager(StreamingScheduler scheduler,
292301
SseAuthenticator sseAuthenticator,
293302
PushManagerEventBroadcaster pushManagerEventBroadcaster,
294303
SseClient sseClient,
295-
TelemetryRuntimeProducer telemetryRuntimeProducer,
304+
StreamingTelemetry telemetry,
296305
long defaultSseConnectionDelayInSecs,
297306
int sseDisconnectionDelayInSecs) {
298307
return new PushNotificationManager(pushManagerEventBroadcaster,
299308
sseAuthenticator,
300309
sseClient,
301-
new SseRefreshTokenTimer(splitTaskExecutor, pushManagerEventBroadcaster),
302-
telemetryRuntimeProducer,
310+
new SseRefreshTokenTimer(scheduler, pushManagerEventBroadcaster),
311+
scheduler,
312+
telemetry,
303313
defaultSseConnectionDelayInSecs,
304314
sseDisconnectionDelayInSecs,
305315
null);
306316
}
307317

308318
public SseClient getSseClient(String streamingServiceUrlString,
309319
NotificationParser notificationParser,
310-
NotificationProcessor notificationProcessor,
311-
TelemetryRuntimeProducer telemetryRuntimeProducer,
320+
UpdateNotificationListener updateListener,
321+
StreamingTelemetry telemetry,
312322
PushManagerEventBroadcaster pushManagerEventBroadcaster,
313323
HttpClient httpClient) {
314324
SseHandler sseHandler = new SseHandler(notificationParser,
315-
notificationProcessor,
316-
telemetryRuntimeProducer,
325+
updateListener,
326+
telemetry,
317327
pushManagerEventBroadcaster);
318328

319-
return new SseClientImpl(URI.create(streamingServiceUrlString),
320-
httpClient,
321-
new EventStreamParser(),
329+
EventSourceClientImpl eventSourceClient = new EventSourceClientImpl(
330+
new HttpClientStreamingTransport(httpClient),
331+
new EventStreamParser());
332+
333+
return new DefaultSseClient(URI.create(streamingServiceUrlString),
334+
eventSourceClient,
322335
sseHandler);
323336
}
324337

@@ -396,22 +409,25 @@ public StreamingComponents buildStreamingComponents(@NonNull SplitTaskExecutor s
396409
notificationParser, splitsUpdateNotificationQueue);
397410

398411
PushManagerEventBroadcaster pushManagerEventBroadcaster = new PushManagerEventBroadcaster();
412+
StreamingScheduler scheduler = new SplitTaskExecutorStreamingScheduler(splitTaskExecutor);
413+
StreamingTelemetry streamingTelemetry = new TelemetryRuntimeProducerStreamingTelemetry(storageContainer.getTelemetryStorage());
414+
UpdateNotificationListener updateListener = new NotificationProcessorUpdateListener(notificationProcessor);
399415

400416
SseClient sseClient = getSseClient(config.streamingServiceUrl(),
401417
notificationParser,
402-
notificationProcessor,
403-
storageContainer.getTelemetryStorage(),
418+
updateListener,
419+
streamingTelemetry,
404420
pushManagerEventBroadcaster,
405421
defaultHttpClient);
406422

407-
SseAuthenticator sseAuthenticator = new SseAuthenticator(splitApiFacade.getSseAuthenticationFetcher(),
423+
SseAuthenticator sseAuthenticator = new SseAuthenticator(new HttpFetcherStreamingAuthFetcher(splitApiFacade.getSseAuthenticationFetcher()),
408424
new SseJwtParser(), flagsSpec);
409425

410-
PushNotificationManager pushNotificationManager = getPushNotificationManager(splitTaskExecutor,
426+
PushNotificationManager pushNotificationManager = getPushNotificationManager(scheduler,
411427
sseAuthenticator,
412428
pushManagerEventBroadcaster,
413429
sseClient,
414-
storageContainer.getTelemetryStorage(),
430+
streamingTelemetry,
415431
config.defaultSSEConnectionDelay(),
416432
config.sseDisconnectionDelay());
417433

main/src/main/java/io/split/android/client/SplitFactoryImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
import io.split.android.client.main.BuildConfig;
2626
import io.split.android.client.api.Key;
27-
import io.split.android.client.common.CompressionUtilProvider;
27+
import io.split.android.client.streaming.support.CompressionUtilProvider;
2828
import io.split.android.client.events.EventsManagerCoordinator;
2929
import io.split.android.client.factory.FactoryMonitor;
3030
import io.split.android.client.factory.FactoryMonitorImpl;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.split.android.client.service.sseclient;
2+
3+
/**
4+
* Constants used by the streaming module.
5+
*/
6+
public final class StreamingConstants {
7+
8+
private StreamingConstants() {
9+
// Utility class
10+
}
11+
12+
/**
13+
* Buffer size for segment data decompression.
14+
*/
15+
public static final int SEGMENT_DATA_BUFFER_SIZE = 1024 * 10; // 10KB
16+
17+
/**
18+
* Query param for flags spec in streaming auth.
19+
*/
20+
public static final String FLAGS_SPEC_PARAM = "s";
21+
}

main/src/main/java/io/split/android/client/service/sseclient/notifications/InstantUpdateChangeNotification.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import com.google.gson.annotations.SerializedName;
66

7-
import io.split.android.client.common.CompressionType;
7+
import io.split.android.client.streaming.support.CompressionType;
88

99
public abstract class InstantUpdateChangeNotification extends IncomingNotification {
1010

main/src/main/java/io/split/android/client/service/sseclient/notifications/MembershipNotification.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import java.util.Set;
88

9-
import io.split.android.client.common.CompressionType;
9+
import io.split.android.client.streaming.support.CompressionType;
1010

1111
public class MembershipNotification extends IncomingNotification {
1212

main/src/main/java/io/split/android/client/service/sseclient/notifications/MySegmentsV2PayloadDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import io.split.android.client.exceptions.MySegmentsParsingException;
77
import io.split.android.client.utils.Base64Util;
8-
import io.split.android.client.utils.CompressionUtil;
8+
import io.split.android.client.streaming.support.CompressionUtil;
99
import io.split.android.client.utils.MurmurHash3;
1010
import io.split.android.client.utils.StringHelper;
1111

main/src/main/java/io/split/android/client/service/sseclient/notifications/memberships/MembershipsNotificationProcessorImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import java.util.Set;
66
import java.util.concurrent.BlockingQueue;
77

8-
import io.split.android.client.common.CompressionType;
9-
import io.split.android.client.common.CompressionUtilProvider;
8+
import io.split.android.client.streaming.support.CompressionType;
9+
import io.split.android.client.streaming.support.CompressionUtilProvider;
1010
import io.split.android.client.service.executor.SplitTaskExecutor;
1111
import io.split.android.client.service.mysegments.MySegmentUpdateParams;
1212
import io.split.android.client.service.mysegments.MySegmentsUpdateTask;

main/src/main/java/io/split/android/client/service/sseclient/notifications/mysegments/MembershipsNotificationProcessorFactoryImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import androidx.annotation.NonNull;
66

7-
import io.split.android.client.common.CompressionUtilProvider;
7+
import io.split.android.client.streaming.support.CompressionUtilProvider;
88
import io.split.android.client.service.executor.SplitTaskExecutor;
99
import io.split.android.client.service.sseclient.notifications.MySegmentsV2PayloadDecoder;
1010
import io.split.android.client.service.sseclient.notifications.NotificationParser;

0 commit comments

Comments
 (0)