From b50fc124a00971e10fefa844f3e0ae77158f91e4 Mon Sep 17 00:00:00 2001 From: Local Merge Date: Fri, 29 May 2026 11:04:53 +0530 Subject: [PATCH] container feed changes --- .../InternalBlobChangefeedEventData.java | 53 ++- .../models/BlobChangefeedEventData.java | 29 ++ .../models/BlobChangefeedEventType.java | 25 ++ .../changefeed/models/BlobOperationName.java | 102 ++++++ .../storage/blob/changefeed/ChunkTests.java | 3 + .../changefeed/MockedChangefeedResources.java | 2 +- ...obChangefeedEventDeserializationTests.java | 311 ++++++++++++++++++ 7 files changed, 519 insertions(+), 6 deletions(-) create mode 100644 sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobOperationName.java create mode 100644 sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/implementation/models/BlobChangefeedEventDeserializationTests.java diff --git a/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/implementation/models/InternalBlobChangefeedEventData.java b/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/implementation/models/InternalBlobChangefeedEventData.java index b29c1cf0bea2..fffd7fe19872 100644 --- a/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/implementation/models/InternalBlobChangefeedEventData.java +++ b/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/implementation/models/InternalBlobChangefeedEventData.java @@ -8,6 +8,7 @@ import com.azure.storage.internal.avro.implementation.AvroConstants; import com.azure.storage.internal.avro.implementation.schema.AvroSchema; +import java.time.OffsetDateTime; import java.util.Map; import java.util.Objects; @@ -29,6 +30,9 @@ public class InternalBlobChangefeedEventData implements BlobChangefeedEventData private final String blobUrl; private final boolean recursive; private final String sequencer; + private final OffsetDateTime creationTime; + private final OffsetDateTime lastAccessTime; + private final String restoredContainerVersion; /** * Constructs a {@link InternalBlobChangefeedEventData}. @@ -46,10 +50,14 @@ public class InternalBlobChangefeedEventData implements BlobChangefeedEventData * @param blobUrl The blob url. * @param recursive Whether this operation was recursive. * @param sequencer The sequencer. + * @param creationTime The blob creation time. Schema V6. + * @param lastAccessTime The last access time. Schema V7. + * @param restoredContainerVersion The restored container version. Schema V8. */ public InternalBlobChangefeedEventData(String api, String clientRequestId, String requestId, String eTag, String contentType, Long contentLength, BlobType blobType, Long contentOffset, String destinationUrl, - String sourceUrl, String blobUrl, boolean recursive, String sequencer) { + String sourceUrl, String blobUrl, boolean recursive, String sequencer, OffsetDateTime creationTime, + OffsetDateTime lastAccessTime, String restoredContainerVersion) { this.api = api; this.clientRequestId = clientRequestId; this.requestId = requestId; @@ -63,6 +71,9 @@ public InternalBlobChangefeedEventData(String api, String clientRequestId, Strin this.blobUrl = blobUrl; this.recursive = recursive; this.sequencer = sequencer; + this.creationTime = creationTime; + this.lastAccessTime = lastAccessTime; + this.restoredContainerVersion = restoredContainerVersion; } static InternalBlobChangefeedEventData fromRecord(Object d) { @@ -86,6 +97,9 @@ static InternalBlobChangefeedEventData fromRecord(Object d) { Object blobUrl = data.get("url"); Object recursive = data.get("recursive"); Object sequencer = data.get("sequencer"); + Object createTime = data.get("createTime"); + Object lastAccessTime = data.get("lastAccessTime"); + Object restoredContainerVersion = data.get("restoredContainerVersion"); return new InternalBlobChangefeedEventData(ChangefeedTypeValidator.nullOr("api", api, String.class), ChangefeedTypeValidator.nullOr("clientRequestId", clientRequestId, String.class), @@ -101,7 +115,16 @@ static InternalBlobChangefeedEventData fromRecord(Object d) { ChangefeedTypeValidator.nullOr("sourceUrl", sourceUrl, String.class), ChangefeedTypeValidator.nullOr("url", blobUrl, String.class), Boolean.TRUE.equals(ChangefeedTypeValidator.nullOr("recursive", recursive, Boolean.class)), - ChangefeedTypeValidator.nullOr("sequencer", sequencer, String.class)); + ChangefeedTypeValidator.nullOr("sequencer", sequencer, String.class), + ChangefeedTypeValidator.isNull(createTime) + ? null + : OffsetDateTime.parse( + Objects.requireNonNull(ChangefeedTypeValidator.nullOr("createTime", createTime, String.class))), + ChangefeedTypeValidator.isNull(lastAccessTime) + ? null + : OffsetDateTime.parse(Objects + .requireNonNull(ChangefeedTypeValidator.nullOr("lastAccessTime", lastAccessTime, String.class))), + ChangefeedTypeValidator.nullOr("restoredContainerVersion", restoredContainerVersion, String.class)); } @Override @@ -169,6 +192,21 @@ public String getSequencer() { return sequencer; } + @Override + public OffsetDateTime getCreationTime() { + return creationTime; + } + + @Override + public OffsetDateTime getLastAccessTime() { + return lastAccessTime; + } + + @Override + public String getRestoredContainerVersion() { + return restoredContainerVersion; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -190,14 +228,17 @@ && getBlobType() == that.getBlobType() && Objects.equals(getSourceUrl(), that.getSourceUrl()) && Objects.equals(getBlobUrl(), that.getBlobUrl()) && Objects.equals(isRecursive(), that.isRecursive()) - && Objects.equals(getSequencer(), that.getSequencer()); + && Objects.equals(getSequencer(), that.getSequencer()) + && Objects.equals(getCreationTime(), that.getCreationTime()) + && Objects.equals(getLastAccessTime(), that.getLastAccessTime()) + && Objects.equals(getRestoredContainerVersion(), that.getRestoredContainerVersion()); } @Override public int hashCode() { return Objects.hash(getApi(), getClientRequestId(), getRequestId(), getETag(), getContentType(), getContentLength(), getBlobType(), getContentOffset(), getDestinationUrl(), getSourceUrl(), getBlobUrl(), - isRecursive(), getSequencer()); + isRecursive(), getSequencer(), getCreationTime(), getLastAccessTime(), getRestoredContainerVersion()); } @Override @@ -206,6 +247,8 @@ public String toString() { + ", requestId='" + requestId + '\'' + ", eTag='" + eTag + '\'' + ", contentType='" + contentType + '\'' + ", contentLength=" + contentLength + ", blobType=" + blobType + ", contentOffset=" + contentOffset + ", destinationUrl='" + destinationUrl + '\'' + ", sourceUrl='" + sourceUrl + '\'' + ", blobUrl='" - + blobUrl + '\'' + ", recursive=" + recursive + ", sequencer='" + sequencer + '\'' + '}'; + + blobUrl + '\'' + ", recursive=" + recursive + ", sequencer='" + sequencer + '\'' + ", creationTime=" + + creationTime + ", lastAccessTime=" + lastAccessTime + ", restoredContainerVersion='" + + restoredContainerVersion + '\'' + '}'; } } diff --git a/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobChangefeedEventData.java b/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobChangefeedEventData.java index dc07dfb90e72..b9f917cb8838 100644 --- a/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobChangefeedEventData.java +++ b/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobChangefeedEventData.java @@ -5,6 +5,8 @@ import com.azure.storage.blob.models.BlobType; +import java.time.OffsetDateTime; + /** * This class contains properties of a BlobChangefeedEventData. */ @@ -101,4 +103,31 @@ public interface BlobChangefeedEventData { */ String getSequencer(); + /** + * Gets the blob creation time. Present in schema V6 and later for AppendBlob data-updated events. + * + * @return The creation time, or null if not present. + */ + default OffsetDateTime getCreationTime() { + return null; + } + + /** + * Gets the last access time of the blob. Present in schema V7 and later. + * + * @return The last access time, or null if not present. + */ + default OffsetDateTime getLastAccessTime() { + return null; + } + + /** + * Gets the restored container version. Present in schema V8 and later for RestoreContainer events. + * + * @return The restored container version, or null if not present. + */ + default String getRestoredContainerVersion() { + return null; + } + } diff --git a/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobChangefeedEventType.java b/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobChangefeedEventType.java index 358b0cac36ea..6302044ad16f 100644 --- a/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobChangefeedEventType.java +++ b/sdk/storage/azure-storage-blob-changefeed/src/main/java/com/azure/storage/blob/changefeed/models/BlobChangefeedEventType.java @@ -22,6 +22,31 @@ public final class BlobChangefeedEventType extends ExpandableStringEnum { + + /** Static value UnspecifiedApi for BlobOperationName. */ + public static final BlobOperationName UNSPECIFIED_API = fromString("UnspecifiedApi"); + + /** Static value PutBlob for BlobOperationName. */ + public static final BlobOperationName PUT_BLOB = fromString("PutBlob"); + + /** Static value PutBlockList for BlobOperationName. */ + public static final BlobOperationName PUT_BLOCK_LIST = fromString("PutBlockList"); + + /** Static value CopyBlob for BlobOperationName. */ + public static final BlobOperationName COPY_BLOB = fromString("CopyBlob"); + + /** Static value DeleteBlob for BlobOperationName. */ + public static final BlobOperationName DELETE_BLOB = fromString("DeleteBlob"); + + /** Static value SetBlobMetadata for BlobOperationName. */ + public static final BlobOperationName SET_BLOB_METADATA = fromString("SetBlobMetadata"); + + /** Static value ControlEvent for BlobOperationName. */ + public static final BlobOperationName CONTROL_EVENT = fromString("ControlEvent"); + + /** Static value UndeleteBlob for BlobOperationName. */ + public static final BlobOperationName UNDELETE_BLOB = fromString("UndeleteBlob"); + + /** Static value SetBlobProperties for BlobOperationName. */ + public static final BlobOperationName SET_BLOB_PROPERTIES = fromString("SetBlobProperties"); + + /** Static value SnapshotBlob for BlobOperationName. */ + public static final BlobOperationName SNAPSHOT_BLOB = fromString("SnapshotBlob"); + + /** Static value SetBlobTier for BlobOperationName. */ + public static final BlobOperationName SET_BLOB_TIER = fromString("SetBlobTier"); + + /** Static value AbortCopyBlob for BlobOperationName. */ + public static final BlobOperationName ABORT_COPY_BLOB = fromString("AbortCopyBlob"); + + /** Static value SetBlobTags for BlobOperationName. */ + public static final BlobOperationName SET_BLOB_TAGS = fromString("SetBlobTags"); + + /** Static value CreateRestorePointMarker for BlobOperationName. */ + public static final BlobOperationName CREATE_RESTORE_POINT_MARKER = fromString("CreateRestorePointMarker"); + + /** Static value AppendBlock for BlobOperationName. Schema V6. */ + public static final BlobOperationName APPEND_BLOCK = fromString("AppendBlock"); + + /** Static value UpdateLastAccessTime for BlobOperationName. Schema V7. */ + public static final BlobOperationName UPDATE_LAST_ACCESS_TIME = fromString("UpdateLastAccessTime"); + + /** Static value CreateContainer for BlobOperationName. Schema V8. */ + public static final BlobOperationName CREATE_CONTAINER = fromString("ContainerCreated"); + + /** Static value DeleteContainer for BlobOperationName. Schema V8. */ + public static final BlobOperationName DELETE_CONTAINER = fromString("ContainerDeleted"); + + /** Static value RestoreContainer for BlobOperationName. Schema V8. */ + public static final BlobOperationName RESTORE_CONTAINER = fromString("RestoreContainer"); + + /** Static value SetContainerMetadata for BlobOperationName. Schema V8. */ + public static final BlobOperationName SET_CONTAINER_METADATA = fromString("SetContainerMetadata"); + + /** + * Creates a new instance of {@link BlobOperationName} with no string value. + * + * @deprecated Please use {@link #fromString(String)} to create an instance of BlobOperationName. + */ + @Deprecated + public BlobOperationName() { + } + + /** + * Creates or finds a BlobOperationName from its string representation. + * + * @param name a name to look for. + * @return the corresponding BlobOperationName. + */ + public static BlobOperationName fromString(String name) { + return fromString(name, BlobOperationName.class); + } + + /** + * Gets known BlobOperationName values. + * + * @return known BlobOperationName values. + */ + public static Collection values() { + return values(BlobOperationName.class); + } +} diff --git a/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/ChunkTests.java b/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/ChunkTests.java index 06be32ea72e0..a4273e615308 100644 --- a/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/ChunkTests.java +++ b/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/ChunkTests.java @@ -312,6 +312,9 @@ private static Map getMockChangefeedEventDataRecord(BlobChangefe cfEventData.put("url", data.getBlobUrl()); cfEventData.put("sequencer", data.getSequencer()); cfEventData.put("recursive", data.isRecursive()); + cfEventData.put("createTime", null); + cfEventData.put("lastAccessTime", null); + cfEventData.put("restoredContainerVersion", null); return cfEventData; } } diff --git a/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/MockedChangefeedResources.java b/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/MockedChangefeedResources.java index 39480424de9a..d5bd59a75032 100644 --- a/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/MockedChangefeedResources.java +++ b/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/MockedChangefeedResources.java @@ -44,7 +44,7 @@ static BlobChangefeedEvent getMockBlobChangefeedEvent(int index) { static BlobChangefeedEventData getMockBlobChangefeedEventData() { return new InternalBlobChangefeedEventData("PutBlob", "clientRequestId", "requestId", "etag", "application/octet-stream", 100L, BlobType.BLOCK_BLOB, 0L, "destinationUrl", "sourceUrl", "", false, - "sequencer"); + "sequencer", null, null, null); } private MockedChangefeedResources() { diff --git a/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/implementation/models/BlobChangefeedEventDeserializationTests.java b/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/implementation/models/BlobChangefeedEventDeserializationTests.java new file mode 100644 index 000000000000..17fa6e41f889 --- /dev/null +++ b/sdk/storage/azure-storage-blob-changefeed/src/test/java/com/azure/storage/blob/changefeed/implementation/models/BlobChangefeedEventDeserializationTests.java @@ -0,0 +1,311 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.changefeed.implementation.models; + +import com.azure.storage.blob.changefeed.models.BlobChangefeedEventType; +import com.azure.storage.blob.changefeed.models.BlobOperationName; +import org.junit.jupiter.api.Test; + +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** + * Tests deserialization of BlobChangefeedEvent and BlobChangefeedEventData for schema versions V6, V7, and V8. + * Test values mirror EventSchemaV6.json, EventSchemaV7.json, EventSchemaV8.json from the .NET PR. + */ +public class BlobChangefeedEventDeserializationTests { + + // Values from EventSchemaV6.json / EventSchemaV7.json / EventSchemaV8.json + private static final long CONTENT_OFFSET = 256L; + private static final String CREATE_TIME = "2022-02-17T13:11:52.5901564Z"; + private static final String LAST_ACCESS_TIME = "2022-02-17T13:11:53.5901564Z"; + private static final String RESTORED_CONTAINER_VERSION = "0000000000000002"; + + // ======================== Schema V6 ======================== + + @Test + public void schemaV6AppendBlobDataUpdatedEventTypeDeserializes() { + assertEquals(BlobChangefeedEventType.APPEND_BLOB_DATA_UPDATED, + BlobChangefeedEventType.fromString("AppendBlobDataUpdated")); + } + + @Test + public void schemaV6AppendBlockOperationNameDeserializes() { + assertEquals(BlobOperationName.APPEND_BLOCK, BlobOperationName.fromString("AppendBlock")); + } + + @Test + public void schemaV6ContentOffsetDeserializes() { + InternalBlobChangefeedEventData data = InternalBlobChangefeedEventData.fromRecord(buildDataRecord(r -> { + r.put("api", "PutBlob"); + r.put("contentOffset", CONTENT_OFFSET); + })); + assertEquals(CONTENT_OFFSET, data.getContentOffset()); + } + + @Test + public void schemaV6CreationTimeDeserializes() { + InternalBlobChangefeedEventData data = InternalBlobChangefeedEventData.fromRecord(buildDataRecord(r -> { + r.put("api", "PutBlob"); + r.put("createTime", CREATE_TIME); + })); + assertEquals(OffsetDateTime.parse(CREATE_TIME), data.getCreationTime()); + } + + @Test + public void schemaV6ContentOffsetAndCreationTimeNullWhenAbsent() { + InternalBlobChangefeedEventData data = InternalBlobChangefeedEventData.fromRecord(buildDataRecord(r -> { + })); + assertNull(data.getContentOffset()); + assertNull(data.getCreationTime()); + } + + @Test + public void schemaV6FullEventDeserializes() { + Map eventMap = buildEventRecord(r -> { + r.put("eventType", "BlobCreated"); + r.put("data", buildDataRecord(d -> { + d.put("api", "PutBlob"); + d.put("contentOffset", CONTENT_OFFSET); + d.put("createTime", CREATE_TIME); + })); + }); + + InternalBlobChangefeedEvent event = InternalBlobChangefeedEvent.fromRecord(eventMap); + + assertEquals(BlobChangefeedEventType.BLOB_CREATED, event.getEventType()); + assertEquals("PutBlob", event.getData().getApi()); + assertEquals(CONTENT_OFFSET, event.getData().getContentOffset()); + assertEquals(OffsetDateTime.parse(CREATE_TIME), event.getData().getCreationTime()); + assertNull(event.getData().getLastAccessTime()); + assertNull(event.getData().getRestoredContainerVersion()); + } + + // ======================== Schema V7 ======================== + + @Test + public void schemaV7BlobLastAccessTimeUpdatedEventTypeDeserializes() { + assertEquals(BlobChangefeedEventType.BLOB_LAST_ACCESS_TIME_UPDATED, + BlobChangefeedEventType.fromString("BlobLastAccessTimeUpdated")); + } + + @Test + public void schemaV7UpdateLastAccessTimeOperationNameDeserializes() { + assertEquals(BlobOperationName.UPDATE_LAST_ACCESS_TIME, BlobOperationName.fromString("UpdateLastAccessTime")); + } + + @Test + public void schemaV7LastAccessTimeDeserializes() { + InternalBlobChangefeedEventData data = InternalBlobChangefeedEventData.fromRecord(buildDataRecord(r -> { + r.put("api", "PutBlob"); + r.put("lastAccessTime", LAST_ACCESS_TIME); + })); + assertEquals(OffsetDateTime.parse(LAST_ACCESS_TIME), data.getLastAccessTime()); + } + + @Test + public void schemaV7LastAccessTimeNullWhenAbsent() { + InternalBlobChangefeedEventData data = InternalBlobChangefeedEventData.fromRecord(buildDataRecord(r -> { + })); + assertNull(data.getLastAccessTime()); + } + + @Test + public void schemaV7FullEventDeserializes() { + Map eventMap = buildEventRecord(r -> { + r.put("eventType", "BlobCreated"); + r.put("data", buildDataRecord(d -> { + d.put("api", "PutBlob"); + d.put("contentOffset", CONTENT_OFFSET); + d.put("createTime", CREATE_TIME); + d.put("lastAccessTime", LAST_ACCESS_TIME); + })); + }); + + InternalBlobChangefeedEvent event = InternalBlobChangefeedEvent.fromRecord(eventMap); + + assertEquals(CONTENT_OFFSET, event.getData().getContentOffset()); + assertEquals(OffsetDateTime.parse(CREATE_TIME), event.getData().getCreationTime()); + assertEquals(OffsetDateTime.parse(LAST_ACCESS_TIME), event.getData().getLastAccessTime()); + assertNull(event.getData().getRestoredContainerVersion()); + } + + // ======================== Schema V8 / Container Change Feed ======================== + + @Test + public void schemaV8ContainerCreatedEventTypeDeserializes() { + assertEquals(BlobChangefeedEventType.CONTAINER_CREATED, BlobChangefeedEventType.fromString("ContainerCreated")); + } + + @Test + public void schemaV8ContainerDeletedEventTypeDeserializes() { + assertEquals(BlobChangefeedEventType.CONTAINER_DELETED, BlobChangefeedEventType.fromString("ContainerDeleted")); + } + + @Test + public void schemaV8ContainerPropertiesUpdatedEventTypeDeserializes() { + assertEquals(BlobChangefeedEventType.CONTAINER_PROPERTIES_UPDATED, + BlobChangefeedEventType.fromString("ContainerPropertiesUpdated")); + } + + @Test + public void schemaV8CreateContainerOperationNameDeserializes() { + // .NET: BlobOperationName.CreateContainer wraps string "ContainerCreated" + assertEquals(BlobOperationName.CREATE_CONTAINER, BlobOperationName.fromString("ContainerCreated")); + } + + @Test + public void schemaV8DeleteContainerOperationNameDeserializes() { + // .NET: BlobOperationName.DeleteContainer wraps string "ContainerDeleted" + assertEquals(BlobOperationName.DELETE_CONTAINER, BlobOperationName.fromString("ContainerDeleted")); + } + + @Test + public void schemaV8RestoreContainerOperationNameDeserializes() { + assertEquals(BlobOperationName.RESTORE_CONTAINER, BlobOperationName.fromString("RestoreContainer")); + } + + @Test + public void schemaV8SetContainerMetadataOperationNameDeserializes() { + assertEquals(BlobOperationName.SET_CONTAINER_METADATA, BlobOperationName.fromString("SetContainerMetadata")); + } + + @Test + public void schemaV8RestoredContainerVersionDeserializes() { + InternalBlobChangefeedEventData data = InternalBlobChangefeedEventData.fromRecord(buildDataRecord(r -> { + r.put("api", "PutBlob"); + r.put("restoredContainerVersion", RESTORED_CONTAINER_VERSION); + })); + assertEquals(RESTORED_CONTAINER_VERSION, data.getRestoredContainerVersion()); + } + + @Test + public void schemaV8RestoredContainerVersionNullWhenAbsent() { + InternalBlobChangefeedEventData data = InternalBlobChangefeedEventData.fromRecord(buildDataRecord(r -> { + })); + assertNull(data.getRestoredContainerVersion()); + } + + @Test + public void schemaV8FullEventDeserializes() { + Map eventMap = buildEventRecord(r -> { + r.put("eventType", "BlobCreated"); + r.put("data", buildDataRecord(d -> { + d.put("api", "PutBlob"); + d.put("contentOffset", CONTENT_OFFSET); + d.put("createTime", CREATE_TIME); + d.put("lastAccessTime", LAST_ACCESS_TIME); + d.put("restoredContainerVersion", RESTORED_CONTAINER_VERSION); + })); + }); + + InternalBlobChangefeedEvent event = InternalBlobChangefeedEvent.fromRecord(eventMap); + + assertEquals(CONTENT_OFFSET, event.getData().getContentOffset()); + assertEquals(OffsetDateTime.parse(CREATE_TIME), event.getData().getCreationTime()); + assertEquals(OffsetDateTime.parse(LAST_ACCESS_TIME), event.getData().getLastAccessTime()); + assertEquals(RESTORED_CONTAINER_VERSION, event.getData().getRestoredContainerVersion()); + } + + // ======================== Regression Tests ======================== + + @Test + public void olderSchemaPayloadDeserializesWithoutNewFields() { + Map eventMap = buildEventRecord(r -> { + r.put("eventType", "BlobCreated"); + r.put("data", buildDataRecord(d -> { + d.put("api", "PutBlob"); + d.put("etag", "0x8D9F2171BE32588"); + d.put("contentType", "application/octet-stream"); + d.put("contentLength", 128L); + d.put("blobType", "BlockBlob"); + d.put("url", "https://www.myurl.com"); + })); + }); + + InternalBlobChangefeedEvent event = InternalBlobChangefeedEvent.fromRecord(eventMap); + + assertEquals(BlobChangefeedEventType.BLOB_CREATED, event.getEventType()); + assertEquals("PutBlob", event.getData().getApi()); + assertNull(event.getData().getContentOffset()); + assertNull(event.getData().getCreationTime()); + assertNull(event.getData().getLastAccessTime()); + assertNull(event.getData().getRestoredContainerVersion()); + } + + @Test + public void existingBlobEventsUnaffected() { + Map eventMap = buildEventRecord(r -> { + r.put("eventType", "BlobDeleted"); + r.put("data", buildDataRecord(d -> { + d.put("api", "DeleteBlob"); + d.put("sequencer", "00000000000000010000000000000002000000000000001d"); + })); + }); + + InternalBlobChangefeedEvent event = InternalBlobChangefeedEvent.fromRecord(eventMap); + + assertEquals(BlobChangefeedEventType.BLOB_DELETED, event.getEventType()); + assertEquals("DeleteBlob", event.getData().getApi()); + assertNull(event.getData().getCreationTime()); + assertNull(event.getData().getLastAccessTime()); + assertNull(event.getData().getRestoredContainerVersion()); + } + + @Test + public void unknownOptionalFieldsDoNotFailDeserialization() { + Map dataRecord = buildDataRecord(r -> { + r.put("unknownFutureField", "someValue"); + r.put("anotherUnknownField", 42L); + }); + + InternalBlobChangefeedEventData data = InternalBlobChangefeedEventData.fromRecord(dataRecord); + assertEquals("PutBlob", data.getApi()); + } + + @Test + public void dataVersionFieldUnaffected() { + Map eventMap = buildEventRecord(r -> r.put("dataVersion", 8L)); + + InternalBlobChangefeedEvent event = InternalBlobChangefeedEvent.fromRecord(eventMap); + assertEquals(8L, event.getDataVersion()); + } + + // ======================== Helpers ======================== + + @FunctionalInterface + private interface MapCustomizer { + void customize(Map map); + } + + private static Map buildEventRecord(MapCustomizer customizer) { + Map record = new HashMap<>(); + record.put("$record", "BlobChangeEvent"); + record.put("schemaVersion", 1); + record.put("topic", "topic"); + record.put("subject", "subject"); + record.put("eventType", "BlobCreated"); + record.put("eventTime", OffsetDateTime.of(2022, 2, 17, 13, 12, 11, 0, ZoneOffset.UTC).toString()); + record.put("id", "62616073-8020-0000-00ff-233467060cc0"); + record.put("dataVersion", 1L); + record.put("metadataVersion", "1"); + record.put("data", buildDataRecord(d -> { + })); + customizer.customize(record); + return record; + } + + private static Map buildDataRecord(MapCustomizer customizer) { + Map record = new HashMap<>(); + record.put("$record", "BlobChangeEventData"); + record.put("api", "PutBlob"); + customizer.customize(record); + return record; + } +}