From b985eb69892c3d9b5f73df6d2c445e86e994adee Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Tue, 24 Feb 2026 14:15:42 -0700 Subject: [PATCH 1/3] fix replay duplication bug --- .../com/uid2/optout/vertx/OptOutSender.java | 29 +++++++++++++-- .../uid2/optout/vertx/OptOutSenderTest.java | 35 +++++++++++++++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSender.java b/src/main/java/com/uid2/optout/vertx/OptOutSender.java index bf7ceb4c..b59c48dc 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSender.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSender.java @@ -91,6 +91,7 @@ public void error(String message, Object... args) { private final AtomicLong lastEntrySent; private LinkedList pendingFiles = new LinkedList<>(); private AtomicBoolean isReplaying = new AtomicBoolean(false); + private final AtomicLong skippedOldDeltaCount = new AtomicLong(0); // in-memory state (loaded from / persisted to cloud storage) private Instant lastProcessedTimestamp = null; private final Set processedDeltas = new HashSet<>(); @@ -318,12 +319,21 @@ private void handleCloudDownloaded(Message msg) { return; } + // Skip deltas that have already been processed + // Download event is fired on pod start for all deltas now, including already sent ones + if (isAlreadyProcessed(filename)) { + long skipped = this.skippedOldDeltaCount.incrementAndGet(); + if (skipped % 500 == 1) { + this.logger.info("skipping already-processed delta (skipped {} so far)", skipped); + } + return; + } + this.logger.info("received delta " + filename + " to consolidate and replicate to remote"); OptOutUtils.addSorted(this.pendingFiles, filename, OptOutUtils.DeltaFilenameComparator); - // if it is still replaying the last one, return - if (this.isReplaying.get()) { - this.logger.info("still replaying the last delta, will not start replaying this one"); + if (this.isReplaying.get()) { + this.logger.info("still replaying, added to pending (pending count={}); will process next batch when current replay completes", this.pendingFiles.size()); return; } @@ -333,6 +343,19 @@ private void handleCloudDownloaded(Message msg) { } } + private boolean isAlreadyProcessed(String filename) { + if (this.processedDeltas.contains(filename)) { + return true; + } + if (this.lastProcessedTimestamp != null && this.lastProcessedTimestamp != Instant.EPOCH) { + Instant fileTimestamp = OptOutUtils.getFileTimestamp(filename); + if (fileTimestamp != null && fileTimestamp.isBefore(this.lastProcessedTimestamp)) { + return true; + } + } + return false; + } + private void processPendingFilesToConsolidate(Instant now) { Instant currentSlot = OptOutUtils.instantFloorByInterval(now, this.deltaRotateInterval); diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSenderTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSenderTest.java index 29a79698..a0ce545f 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSenderTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSenderTest.java @@ -28,6 +28,7 @@ import org.mockito.MockitoAnnotations; import java.nio.file.Files; +import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -196,4 +197,38 @@ void testStateRecoveredFromCloudOnRestart(Vertx vertx, VertxTestContext testCont testContext.completeNow(); } + + @Test + void testOldDeltasSkippedAfterEmptyDirRestart(Vertx vertx, VertxTestContext testContext) throws Exception { + // Simulate a sender that has already processed deltas up to a known timestamp. + // On a fresh emptyDir pod, cloud sync re-downloads ALL historical files from S3. + // The sender should skip deltas older than lastProcessedTimestamp. + InMemoryStorageMock storage = new InMemoryStorageMock(); + + // Seed cloud state: sender already processed up to "now" (recent timestamp) + long lastProcessedEpoch = Instant.now().getEpochSecond(); + storage.upload(new java.io.ByteArrayInputStream( + Long.toString(lastProcessedEpoch).getBytes(StandardCharsets.UTF_8)), CLOUD_TIMESTAMP_KEY); + storage.upload(new java.io.ByteArrayInputStream(new byte[0]), CLOUD_PROCESSED_KEY); + + deployAndAwait(vertx, storage); + + // Publish several "old" deltas (timestamps well before lastProcessedTimestamp) + String[] oldDeltas = { + filePath + "/consumer/delta/optout-delta-001_2025-12-01T10.00.00Z_aabbccdd.dat", + filePath + "/consumer/delta/optout-delta-002_2025-12-15T14.30.00Z_11223344.dat", + filePath + "/consumer/delta/optout-delta-000_2026-01-10T08.00.00Z_deadbeef.dat", + }; + for (String oldDelta : oldDeltas) { + vertx.eventBus().publish(eventBusName, oldDelta); + } + + // Give the event loop time to process the published events + Thread.sleep(500); + + // None of the old deltas should have triggered a send — they should all be skipped + verify(optOutPartnerEndpoint, never()).send(any()); + + testContext.completeNow(); + } } From 1402c036361382f64ea7a5482013dde23f6b5195 Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Tue, 24 Feb 2026 21:24:01 +0000 Subject: [PATCH 2/3] [CI Pipeline] Released Snapshot version: 4.9.20-alpha-220-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d3d66d7e..099cddf9 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.9.19 + 4.9.20-alpha-220-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From 6bc42f59a8540514d31f921165895118a641ec28 Mon Sep 17 00:00:00 2001 From: Ian Nara <135270994+Ian-Nara@users.noreply.github.com> Date: Tue, 24 Feb 2026 15:04:43 -0700 Subject: [PATCH 3/3] Update pom.xml --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 099cddf9..d3d66d7e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.9.20-alpha-220-SNAPSHOT + 4.9.19 uid2-optout https://github.com/IABTechLab/uid2-optout