-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathAbstractRead.java
More file actions
52 lines (42 loc) · 1.99 KB
/
AbstractRead.java
File metadata and controls
52 lines (42 loc) · 1.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package io.kurrent.dbclient;
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.proto.streams.StreamsGrpc;
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
abstract class AbstractRead implements Publisher<ReadMessage> {
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;
private final GrpcClient client;
private final OptionsWithBackPressureAndSerialization<?> options;
protected AbstractRead(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
this.client = client;
this.options = options;
}
static {
defaultReadOptions = StreamsOuterClass.ReadReq.Options.newBuilder()
.setUuidOption(StreamsOuterClass.ReadReq.Options.UUIDOption.newBuilder()
.setStructured(Shared.Empty.getDefaultInstance()));
}
public abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();
@Override
public void subscribe(Subscriber<? super ReadMessage> subscriber) {
ReadResponseObserver observer = new ReadResponseObserver(
options,
new ReadStreamConsumer(subscriber),
this.client.getSerializer(options.serializationSettings().orElse(null))
);
this.client.getWorkItemArgs().whenComplete((args, error) -> {
if (error != null) {
observer.onError(error);
return;
}
StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
.build();
StreamsGrpc.StreamsStub client = GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
observer.onConnected(args);
subscriber.onSubscribe(observer.getSubscription());
client.read(request, observer);
});
}
}