Skip to content

Commit 7cfe053

Browse files
oskardudyczw1am
authored andcommitted
[DEVEX-250] Added Message deserialization
1 parent 9036ea4 commit 7cfe053

17 files changed

Lines changed: 278 additions & 33 deletions

db-client-java/src/main/java/io/kurrent/dbclient/AbstractRead.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,15 @@ abstract class AbstractRead implements Publisher<ReadMessage> {
1010
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;
1111

1212
private final GrpcClient client;
13+
<<<<<<< HEAD
1314
private final OptionsBase<?> options;
1415

1516
protected AbstractRead(GrpcClient client, OptionsBase<?> options) {
17+
=======
18+
private final OptionsWithBackPressureAndSerialization<?> options;
19+
20+
protected AbstractRead(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
21+
>>>>>>> ce0858b ([DEVEX-250] Added Message deserialization)
1622
this.client = client;
1723
this.options = options;
1824
}
@@ -28,8 +34,22 @@ protected AbstractRead(GrpcClient client, OptionsBase<?> options) {
2834
@Override
2935
@SuppressWarnings("unchecked")
3036
public void subscribe(Subscriber<? super ReadMessage> subscriber) {
37+
<<<<<<< HEAD
3138
ReadSubscription readSubscription = new ReadSubscription(subscriber);
3239
subscriber.onSubscribe(readSubscription);
40+
=======
41+
ReadResponseObserver observer = new ReadResponseObserver(
42+
options,
43+
new ReadStreamConsumer(subscriber),
44+
this.client.getSerializer(options.serializationSettings().orElse(null))
45+
);
46+
47+
this.client.getWorkItemArgs().whenComplete((args, error) -> {
48+
if (error != null) {
49+
observer.onError(error);
50+
return;
51+
}
52+
>>>>>>> ce0858b ([DEVEX-250] Added Message deserialization)
3353

3454
CompletableFuture<ReadSubscription> result = new CompletableFuture<>();
3555
this.client.run(channel -> {

db-client-java/src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@ abstract class AbstractRegularSubscription {
2222
protected SubscriptionListener listener;
2323
protected Checkpointer checkpointer = null;
2424
private final GrpcClient client;
25+
<<<<<<< HEAD
2526
private final OptionsBase<?> options;
2627

2728
protected AbstractRegularSubscription(GrpcClient client, OptionsBase<?> options) {
29+
=======
30+
private final OptionsWithBackPressureAndSerialization<?> options;
31+
32+
protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
33+
>>>>>>> ce0858b ([DEVEX-250] Added Message deserialization)
2834
this.client = client;
2935
this.options = options;
3036
}
@@ -155,5 +161,28 @@ public void onCompleted() {
155161

156162
return future;
157163
});
164+
<<<<<<< HEAD
165+
=======
166+
167+
return future;
168+
}
169+
170+
private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture<Subscription> future) {
171+
StreamConsumer consumer = new SubscriptionStreamConsumer(this.listener, this.checkpointer, future, (subscriptionId, event, action) -> {
172+
ClientTelemetry.traceSubscribe(
173+
action,
174+
subscriptionId,
175+
args.getChannel(),
176+
client.getSettings(),
177+
options.getCredentials(),
178+
event);
179+
});
180+
181+
return new ReadResponseObserver(
182+
this.options,
183+
consumer,
184+
this.client.getSerializer(options.serializationSettings().orElse(null))
185+
);
186+
>>>>>>> ce0858b ([DEVEX-250] Added Message deserialization)
158187
}
159188
}

db-client-java/src/main/java/io/kurrent/dbclient/AbstractSubscribePersistentSubscription.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package io.kurrent.dbclient;
22

3-
import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
4-
import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
5-
import io.kurrent.dbclient.proto.shared.Shared;
63
import io.grpc.Metadata;
74
import io.grpc.Status;
85
import io.grpc.StatusRuntimeException;
96
import io.grpc.stub.ClientCallStreamObserver;
107
import io.grpc.stub.ClientResponseObserver;
118
import io.grpc.stub.StreamObserver;
9+
import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
10+
import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
11+
import io.kurrent.dbclient.proto.shared.Shared;
12+
import io.kurrent.dbclient.serialization.MessageSerializer;
1213

1314
import java.util.concurrent.CompletableFuture;
1415

@@ -18,20 +19,26 @@ abstract class AbstractSubscribePersistentSubscription {
1819
private final String group;
1920
private final PersistentSubscriptionListener listener;
2021
private final SubscribePersistentSubscriptionOptions options;
22+
private final MessageSerializer messageSerializer;
2123

2224
static {
2325
defaultReadOptions = Persistent.ReadReq.Options.newBuilder()
2426
.setUuidOption(Persistent.ReadReq.Options.UUIDOption.newBuilder()
2527
.setStructured(Shared.Empty.getDefaultInstance()));
2628
}
2729

28-
public AbstractSubscribePersistentSubscription(GrpcClient client, String group,
29-
SubscribePersistentSubscriptionOptions options,
30-
PersistentSubscriptionListener listener) {
30+
public AbstractSubscribePersistentSubscription(
31+
GrpcClient client,
32+
String group,
33+
SubscribePersistentSubscriptionOptions options,
34+
PersistentSubscriptionListener listener,
35+
MessageSerializer messageSerializer
36+
) {
3137
this.client = client;
3238
this.group = group;
3339
this.options = options;
3440
this.listener = listener;
41+
this.messageSerializer = messageSerializer;
3542
}
3643

3744
protected abstract Persistent.ReadReq.Options.Builder createOptions();
@@ -91,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) {
9198
int retryCount = readResp.getEvent().hasNoRetryCount() ? 0 : readResp.getEvent().getRetryCount();
9299

93100
try {
94-
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent());
101+
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent(), messageSerializer);
95102
ClientTelemetry.traceSubscribe(
96103
() -> listener.onEvent(this._subscription, retryCount, resolvedEvent),
97104
_subscription.getSubscriptionId(),

db-client-java/src/main/java/io/kurrent/dbclient/GrpcClient.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import io.grpc.ManagedChannel;
44
import io.grpc.Status;
55
import io.grpc.StatusRuntimeException;
6+
import io.kurrent.dbclient.serialization.MessageSerializer;
7+
import io.kurrent.dbclient.serialization.MessageSerializerBuilder;
8+
import io.kurrent.dbclient.serialization.OperationSerializationSettings;
69
import org.slf4j.Logger;
710
import org.slf4j.LoggerFactory;
811

@@ -18,11 +21,14 @@ class GrpcClient {
1821
private final AtomicBoolean closed;
1922
private final LinkedBlockingQueue<Msg> queue;
2023
private final KurrentDBClientSettings settings;
24+
private final MessageSerializer serializer;
2125

2226
GrpcClient(KurrentDBClientSettings settings, AtomicBoolean closed, LinkedBlockingQueue<Msg> queue) {
2327
this.settings = settings;
2428
this.closed = closed;
2529
this.queue = queue;
30+
31+
this.serializer = MessageSerializerBuilder.get(settings.getSerializationSettings());
2632
}
2733

2834
public boolean isShutdown() {
@@ -72,10 +78,27 @@ public <A> CompletableFuture<A> runWithArgs(Function<WorkItemArgs, CompletableFu
7278
return;
7379
}
7480

81+
<<<<<<< HEAD
7582
action.apply(args).whenComplete((outcome, error) -> {
7683
if (outcome != null) {
7784
result.complete(outcome);
7885
return;
86+
=======
87+
if (e instanceof StatusRuntimeException) {
88+
StatusRuntimeException ex = (StatusRuntimeException) e;
89+
90+
if (ex.getStatus().getCode().equals(Status.Code.UNAVAILABLE)) {
91+
this.push(new CreateChannel(args.getId()));
92+
}
93+
}
94+
95+
logger.debug("RunWorkItem[{}] completed exceptionally: {}", args.getId(), e.toString());
96+
97+
if (e instanceof RuntimeException)
98+
throw (RuntimeException) e;
99+
else
100+
throw new RuntimeException(e);
101+
>>>>>>> ce0858b ([DEVEX-250] Added Message deserialization)
79102
}
80103

81104
try {
@@ -115,4 +138,8 @@ public CompletableFuture<Void> shutdown() {
115138
public KurrentDBClientSettings getSettings() {
116139
return this.settings;
117140
}
141+
142+
public MessageSerializer getSerializer(OperationSerializationSettings serializationSettings) {
143+
return this.serializer.with(serializationSettings);
144+
}
118145
}

db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.json.JsonMapper;
55
import io.kurrent.dbclient.serialization.MessageSerializationContext;
6+
import io.kurrent.dbclient.serialization.MessageSerializer;
67
import org.reactivestreams.Publisher;
78

89
import java.util.*;
@@ -140,11 +141,13 @@ public CompletableFuture<WriteResult> appendToStream(String streamName, AppendTo
140141
if (options == null)
141142
options = AppendToStreamOptions.get();
142143

143-
MessageSerializationContext serializationContext = new MessageSerializationContext(fromStreamName(streamName));
144+
MessageSerializationContext serializationContext =
145+
new MessageSerializationContext(fromStreamName(streamName));
144146

145-
Iterator<MessageData> messageData = options.serializationSettings()
146-
.map(serializer::with)
147-
.orElse(serializer)
147+
MessageSerializer serializer = getGrpcClient()
148+
.getSerializer(options.serializationSettings().orElse(null));
149+
150+
Iterator<MessageData> messageData = serializer
148151
.serialize(messages, serializationContext)
149152
.iterator();
150153

db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
public class KurrentDBClientBase {
1717
final Logger logger = LoggerFactory.getLogger(KurrentDBClientBase.class);
1818
final private GrpcClient client;
19-
final protected MessageSerializer serializer;
2019

2120
KurrentDBClientBase(KurrentDBClientSettings settings) {
2221
Discovery discovery;
@@ -33,8 +32,6 @@ public class KurrentDBClientBase {
3332
this.client = service.getHandle();
3433

3534
CompletableFuture.runAsync(service, createConnectionLoopExecutor());
36-
37-
serializer = MessageSerializerBuilder.get(settings.getSerializationSettings());
3835
}
3936
private Executor createConnectionLoopExecutor() {
4037
return Executors.newSingleThreadExecutor(r -> {

db-client-java/src/main/java/io/kurrent/dbclient/Message.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public Message(Object data, Object metadata, UUID messageId) {
3434
*
3535
* <p>Example:
3636
* <pre>
37-
* // Create a message with a specific ID
37+
* Create a message with a specific ID
3838
* UserRegistered userRegistered = new UserRegistered("123", "Alice");
3939
* Message message = Message.from(userRegistered);
4040
* </pre>
@@ -53,7 +53,7 @@ public static Message from(Object data) {
5353
*
5454
* <p>Example:
5555
* <pre>
56-
* // Create a message with a specific ID
56+
* Create a message with a specific ID
5757
* UserRegistered userRegistered = new UserRegistered("123", "Alice");
5858
* UUID messageId = UUID.randomUUID();
5959
* Message message = Message.from(userRegistered, messageId);
@@ -74,7 +74,7 @@ public static Message from(Object data, UUID messageId) {
7474
*
7575
* <p>Example:
7676
* <pre>
77-
* // Create a message with data and metadata
77+
* Create a message with data and metadata
7878
* OrderPlaced orderPlaced = new OrderPlaced("ORD-123", 99.99);
7979
* EventMetadata metadata = new EventMetadata(
8080
* "user-456",

db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class OptionsBase<T> {
88
private final OperationKind kind;
99
private UserCredentials credentials;
1010
private boolean requiresLeader;
11-
private Map<String, String> headers = new HashMap<>();
11+
private final Map<String, String> headers = new HashMap<>();
1212

1313
protected OptionsBase() {
1414
this(OperationKind.Regular);
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.kurrent.dbclient;
2+
3+
import java.util.Optional;
4+
5+
import io.kurrent.dbclient.serialization.OperationSerializationSettings;
6+
7+
class OptionsWithBackPressureAndSerialization<T> extends OptionsWithBackPressure<T> {
8+
public OperationSerializationSettings serializationSettings;
9+
10+
protected OptionsWithBackPressureAndSerialization(OperationKind kind) {
11+
super(kind);
12+
}
13+
14+
/**
15+
* Allows to customize or disable the automatic deserialization.
16+
*/
17+
public Optional<OperationSerializationSettings> serializationSettings() {
18+
return Optional.ofNullable(serializationSettings);
19+
}
20+
21+
/**
22+
* Customize or disable the automatic deserialization.
23+
*/
24+
@SuppressWarnings("unchecked")
25+
public T serializationSettings(OperationSerializationSettings serializationSettings) {
26+
this.serializationSettings = serializationSettings;
27+
return (T)this;
28+
}
29+
}

db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.kurrent.dbclient;
22

3-
class OptionsWithPositionAndResolveLinkTosBase<T> extends OptionsWithBackPressure<T> {
3+
class OptionsWithPositionAndResolveLinkTosBase<T> extends OptionsWithBackPressureAndSerialization<T> {
44
private StreamPosition<Position> position;
55

66
protected OptionsWithPositionAndResolveLinkTosBase(OperationKind kind) {

0 commit comments

Comments
 (0)