From 5104ce34c0108b15115457c33f24a932afed06fc Mon Sep 17 00:00:00 2001
From: Oskar Dudycz {@code
+ * var settings = KurrentDBClientSerializationSettings.get(options -> {
+ * options.
+ */
+ public static KurrentDBClientSerializationSettings get(
+ Consumer
Example: + *
+ * // Create a message with a specific ID
+ * UserRegistered userRegistered = new UserRegistered("123", "Alice");
+ * Message message = Message.from(userRegistered);
+ *
+ */
+ public static Message from(Object data) {
+ return from(data, null);
+ }
+
+ /**
+ * Creates a new Message with the specified domain data and message ID, but without metadata.
+ * This factory method is a convenient shorthand when working with systems that don't require metadata.
+ *
+ * @param data The message domain data.
+ * @param messageId Unique identifier for this message instance. Must not be a nil UUID.
+ * @return A new immutable Message instance containing the provided data and ID with null metadata.
+ *
+ * Example: + *
+ * // Create a message with a specific ID
+ * UserRegistered userRegistered = new UserRegistered("123", "Alice");
+ * UUID messageId = UUID.randomUUID();
+ * Message message = Message.from(userRegistered, messageId);
+ *
+ */
+ public static Message from(Object data, UUID messageId) {
+ return from(data, null, messageId);
+ }
+
+ /**
+ * Creates a new Message with the specified domain data and message ID and metadata.
+ *
+ * @param data The message domain data.
+ * @param metadata Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information.
+ * @param messageId Unique identifier for this specific message instance. If null, a random UUID will be generated.
+ * @return A new immutable Message instance with the specified properties.
+ * @throws IllegalArgumentException Thrown when messageId is explicitly set to a nil UUID, which is an invalid identifier.
+ *
+ * Example: + *
+ * // Create a message with data and metadata
+ * OrderPlaced orderPlaced = new OrderPlaced("ORD-123", 99.99);
+ * EventMetadata metadata = new EventMetadata(
+ * "user-456",
+ * Instant.now(),
+ * correlationId
+ * );
+ *
+ * // Let the system assign an ID automatically
+ * Message message = Message.from(orderPlaced, metadata);
+ *
+ * // Or specify a custom ID
+ * Message messageWithId = Message.from(orderPlaced, metadata, UUID.randomUUID());
+ *
+ */
+ public static Message from(Object data, Object metadata, UUID messageId) {
+ if (messageId != null && messageId.equals(new UUID(0, 0))) {
+ throw new IllegalArgumentException("Message ID cannot be a nil UUID.");
+ }
+
+ return new Message(data, metadata, messageId != null ? messageId : UUID.randomUUID());
+ }
+
+ /**
+ * Gets the message domain data.
+ *
+ * @return The message domain data.
+ */
+ public Object getData() {
+ return data;
+ }
+
+ /**
+ * Gets the message metadata.
+ *
+ * @return The message metadata, may be null.
+ */
+ public Object getMetadata() {
+ return metadata;
+ }
+
+ /**
+ * Gets the unique identifier for this message.
+ *
+ * @return The message ID.
+ */
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Message message = (Message) o;
+ return Objects.equals(data, message.data) &&
+ Objects.equals(metadata, message.metadata) &&
+ Objects.equals(messageId, message.messageId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(data, metadata, messageId);
+ }
+
+ @Override
+ public String toString() {
+ return "Message{" +
+ "data=" + data +
+ ", metadata=" + metadata +
+ ", messageId=" + messageId +
+ '}';
+ }
+}
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java b/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java
new file mode 100644
index 00000000..f4994f67
--- /dev/null
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/MessageData.java
@@ -0,0 +1,106 @@
+package io.kurrent.dbclient;
+
+import java.util.UUID;
+
+/**
+ * Represents a message that will be sent to KurrentDB.
+ */
+public final class MessageData {
+ private final UUID messageId;
+ private final String messageType;
+ private final String contentType;
+ private final byte[] messageData;
+ private final byte[] messageMetadata;
+
+ MessageData(String messageType, byte[] messageData) {
+ this(messageType, messageData, null, UUID.randomUUID(), ContentType.JSON);
+ }
+
+ MessageData(String messageType, byte[] messageData, byte[] userMetadata) {
+ this(messageType, messageData, userMetadata, UUID.randomUUID(), ContentType.JSON);
+ }
+
+ MessageData(String messageType, byte[] messageData, byte[] userMetadata, UUID messageId, String contentType) {
+ this.messageId = messageId;
+ this.messageType = messageType;
+ this.contentType = contentType;
+ this.messageData = messageData;
+ this.messageMetadata = userMetadata;
+ }
+
+ /**
+ * Returns message's unique identifier
+ */
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ /**
+ * Returns message's type.
+ */
+ public String getMessageType() {
+ return messageType;
+ }
+
+ /**
+ * Returns message's content's type
+ */
+ public String getContentType() {
+ return contentType;
+ }
+
+ /**
+ * Returns message's payload data
+ */
+ public byte[] getMessageData() {
+ return messageData;
+ }
+
+ /**
+ * Returns message's custom user metadata.
+ */
+ public byte[] getMessageMetadata() {
+ return messageMetadata;
+ }
+
+ /**
+ * Configures a message data builder to host a JSON payload.
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder builderAsJson(String messageType, byte[] messageData) {
+ return MessageDataBuilder.json(messageType, messageData);
+ }
+
+ /**
+ * Configures a message data builder to host a JSON payload.
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @param messageMetadata message's metadata payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder builderAsJson(String messageType, byte[] messageData, byte[] messageMetadata) {
+ return MessageDataBuilder.json(messageType, messageData, messageMetadata);
+ }
+
+ /**
+ * Configures a message data builder to host a binary payload.
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder builderAsBinary(String messageType, byte[] messageData) {
+ return MessageDataBuilder.binary(messageType, messageData);
+ }
+
+ /**
+ * Configures a message data builder to host a binary payload.
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder builderAsBinary(String messageType, byte[] messageData, byte[] messageMetadata) {
+ return MessageDataBuilder.binary(messageType, messageMetadata);
+ }
+}
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java b/db-client-java/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java
new file mode 100644
index 00000000..7b0c7730
--- /dev/null
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/MessageDataBuilder.java
@@ -0,0 +1,153 @@
+package io.kurrent.dbclient;
+
+import java.util.UUID;
+
+/**
+ * Utility class to help building an MessageData.
+ */
+public class MessageDataBuilder {
+ private String messageType;
+ private byte[] messageData;
+ private byte[] messageMetadata;
+ private UUID messageId;
+ private String contentType;
+
+ MessageDataBuilder() {
+ }
+
+ /**
+ * Configures a message data builder to host a JSON payload.
+ *
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder json(String messageType, byte[] messageData) {
+ return json(messageType, messageData, null, null);
+ }
+
+ /**
+ * Configures a message data builder to host a JSON payload.
+ *
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @param messageMetadata message's metadata payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder json(String messageType, byte[] messageData, byte[] messageMetadata) {
+ return json(messageType, messageData, messageMetadata, null);
+ }
+
+ /**
+ * Configures a message data builder to host a JSON payload.
+ *
+ * @param messageId message's id.
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @param messageMetadata message's metadata payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder json(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId) {
+ MessageDataBuilder self = new MessageDataBuilder();
+
+ self.messageType = messageType;
+ self.messageData = messageData;
+ self.messageMetadata = messageMetadata;
+ self.messageId = messageId;
+ self.contentType = ContentType.JSON;
+
+ return self;
+ }
+
+ /**
+ * Configures a message data builder to host a binary payload.
+ *
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder binary(String messageType, byte[] messageData) {
+ return binary(messageType, messageData, null, null);
+ }
+
+ /**
+ * Configures a message data builder to host a binary payload.
+ *
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @param messageMetadata message's metadata payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder binary(String messageType, byte[] messageData, byte[] messageMetadata) {
+ return binary(messageType, messageData, messageMetadata, null);
+ }
+
+ /**
+ * Configures a message data builder to host a binary payload.
+ *
+ * @param messageId message's id.
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @param messageMetadata message's metadata payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder binary(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId) {
+ MessageDataBuilder self = new MessageDataBuilder();
+
+ self.messageType = messageType;
+ self.messageData = messageData;
+ self.messageId = messageId;
+ self.messageMetadata = messageMetadata;
+ self.contentType = ContentType.BYTES;
+
+ return self;
+ }
+
+
+ /**
+ * Configures a message data builder to host a binary payload.
+ *
+ * @param messageId message's id.
+ * @param messageType message's type.
+ * @param messageData message's payload.
+ * @param messageMetadata message's metadata payload.
+ * @return a message data builder.
+ */
+ public static MessageDataBuilder with(String messageType, byte[] messageData, byte[] messageMetadata, UUID messageId, boolean isJson) {
+ MessageDataBuilder self = new MessageDataBuilder();
+
+ self.messageType = messageType;
+ self.messageData = messageData;
+ self.messageId = messageId;
+ self.messageMetadata = messageMetadata;
+ self.contentType = isJson ? ContentType.JSON : ContentType.BYTES;
+
+ return self;
+ }
+
+ /**
+ * Sets message's unique identifier.
+ */
+ public MessageDataBuilder messageId(UUID messageId) {
+ this.messageId = messageId;
+ return this;
+ }
+
+ /**
+ * Sets message's custom user metadata.
+ */
+ public MessageDataBuilder messageMetadata(byte[] value) {
+ this.messageMetadata = value;
+ return this;
+ }
+
+ /**
+ * Builds a message ready to be sent to KurrentDB.
+ *
+ * @see MessageData
+ */
+ public MessageData build() {
+ UUID messageId = this.messageId == null ? UUID.randomUUID() : this.messageId;
+ return new MessageData(this.messageType, this.messageData, this.messageMetadata, messageId, this.contentType);
+ }
+}
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java
index b4b9f9f4..b5e47bf0 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/serialization/MessageSerializer.java
@@ -1,8 +1,15 @@
package io.kurrent.dbclient.serialization;
+import io.kurrent.dbclient.Message;
+import io.kurrent.dbclient.MessageData;
+
+import java.util.List;
+
public interface MessageSerializer {
MessageSerializer with(OperationSerializationSettings serializationSettings);
- void serialize();
+ MessageData serialize(Message value, MessageSerializationContext context);
+
+ List
+ * That also allows you to bring your custom JSON serializer implementation (e.g. JSON.NET)
+ */
+ private Serializer jsonSerializer;
+
+
+ /**
+ * The serializer responsible for handling binary data formats. This is used when working with
+ * binary-encoded messages rather than text-based formats (e.g. Protobuf or Avro). Required when storing
+ * or retrieving content with "application/octet-stream" content type
+ */
+ private Serializer bytesSerializer;
+
+ /**
+ * Determines which serialization format (JSON or binary) is used by default when writing messages
+ * where the content type isn't explicitly specified. The default content type is "application/json"
+ */
+ private ContentType defaultContentType = ContentType.JSON;
+
+ /**
+ * Defines the custom strategy used to map between the type name stored in messages and Java type names.
+ * If not provided the default {@link io.kurrent.dbclient.serialization.DefaultMessageTypeNamingStrategy} will be used.
+ * It resolves the class name to the format: "{stream category name}-{Class Message Type}".
+ * You can provide your own implementation of {@link io.kurrent.dbclient.serialization.MessageTypeNamingStrategy}
+ * and register it here to override the default behavior
+ */
+ private MessageTypeNamingStrategy messageTypeNamingStrategy;
+
+ /**
+ * Allows to register mapping of Java message types to their corresponding message type names used in serialized messages.
+ */
+ private Map Example:
* Example:
* Example:
* {@code
- * var settings = KurrentDBClientSerializationSettings.get(options -> {
- * options.
*/
public static KurrentDBClientSerializationSettings get(
Consumer
+ * {@code
+ * KurrentDBClientSerializationSettings settings = KurrentDBClientSerializationSettings.get(options -> {
+ * options.registerMessageType(UserRegistered.class, "user-registered");
+ * options.registerMessageType(UserRoleAssigned.class, "user-role-assigned");
+ * options.registerMessageTypeForCategory(UserRegistered.class, "user-registered");
* });
- * }
+ * }
+ *
+ * {@code
+ * settings.useJsonSettings(builder ->
+ * builder.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true)
+ * .propertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE)
+ * );
+ * }
+ *
+ */
+ public KurrentDBClientSerializationSettings useJsonSettings(
+ Function
+ * {@code
+ * // Register event types that can appear in user streams
+ * settings.registerMessageTypeForCategory(UserCreated.class, "user")
+ * .registerMessageTypeForCategory(UserUpdated.class, "user")
+ * .registerMessageTypeForCategory(UserDeleted.class, "user");
+ * }
+ *
+ */
+ public
+ * {@code
+ * // Register me types with their corresponding type identifiers
+ * settings.registerMessageType(UserCreated.class, "user-created-v1")
+ * .registerMessageType(OrderPlaced.class, "order-placed-v2");
+ * }
+ *
+ */
+ public
- * // Create a message with a specific ID
+ * Create a message with a specific ID
* UserRegistered userRegistered = new UserRegistered("123", "Alice");
* Message message = Message.from(userRegistered);
*
@@ -53,7 +53,7 @@ public static Message from(Object data) {
*
*
- * // Create a message with a specific ID
+ * Create a message with a specific ID
* UserRegistered userRegistered = new UserRegistered("123", "Alice");
* UUID messageId = UUID.randomUUID();
* Message message = Message.from(userRegistered, messageId);
@@ -74,7 +74,7 @@ public static Message from(Object data, UUID messageId) {
*
*
- * // Create a message with data and metadata
+ * Create a message with data and metadata
* OrderPlaced orderPlaced = new OrderPlaced("ORD-123", 99.99);
* EventMetadata metadata = new EventMetadata(
* "user-456",
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
index f7199a88..bb50b6d0 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
@@ -8,7 +8,7 @@ class OptionsBase