Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/main/java/com/uid2/optout/vertx/OptOutSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void error(String message, Object... args) {
private final AtomicLong lastEntrySent;
private LinkedList<String> pendingFiles = new LinkedList<>();
private AtomicBoolean isReplaying = new AtomicBoolean(false);
private final AtomicLong skippedOldDeltaCount = new AtomicLong(0);
// in-memory state (loaded from / persisted to cloud storage)
private Instant lastProcessedTimestamp = null;
private final Set<String> processedDeltas = new HashSet<>();
Expand Down Expand Up @@ -318,12 +319,21 @@ private void handleCloudDownloaded(Message<String> msg) {
return;
}

// Skip deltas that have already been processed
// Download event is fired on pod start for all deltas now, including already sent ones
if (isAlreadyProcessed(filename)) {
long skipped = this.skippedOldDeltaCount.incrementAndGet();
if (skipped % 500 == 1) {
this.logger.info("skipping already-processed delta (skipped {} so far)", skipped);
}
return;
}

this.logger.info("received delta " + filename + " to consolidate and replicate to remote");
OptOutUtils.addSorted(this.pendingFiles, filename, OptOutUtils.DeltaFilenameComparator);

// if it is still replaying the last one, return
if (this.isReplaying.get()) {
this.logger.info("still replaying the last delta, will not start replaying this one");
if (this.isReplaying.get()) {
this.logger.info("still replaying, added to pending (pending count={}); will process next batch when current replay completes", this.pendingFiles.size());
return;
}

Expand All @@ -333,6 +343,19 @@ private void handleCloudDownloaded(Message<String> msg) {
}
}

private boolean isAlreadyProcessed(String filename) {
if (this.processedDeltas.contains(filename)) {
return true;
}
if (this.lastProcessedTimestamp != null && this.lastProcessedTimestamp != Instant.EPOCH) {
Instant fileTimestamp = OptOutUtils.getFileTimestamp(filename);
if (fileTimestamp != null && fileTimestamp.isBefore(this.lastProcessedTimestamp)) {
return true;
}
}
return false;
}

private void processPendingFilesToConsolidate(Instant now) {
Instant currentSlot = OptOutUtils.instantFloorByInterval(now, this.deltaRotateInterval);

Expand Down
35 changes: 35 additions & 0 deletions src/test/java/com/uid2/optout/vertx/OptOutSenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.mockito.MockitoAnnotations;

import java.nio.file.Files;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -196,4 +197,38 @@ void testStateRecoveredFromCloudOnRestart(Vertx vertx, VertxTestContext testCont

testContext.completeNow();
}

@Test
void testOldDeltasSkippedAfterEmptyDirRestart(Vertx vertx, VertxTestContext testContext) throws Exception {
// Simulate a sender that has already processed deltas up to a known timestamp.
// On a fresh emptyDir pod, cloud sync re-downloads ALL historical files from S3.
// The sender should skip deltas older than lastProcessedTimestamp.
InMemoryStorageMock storage = new InMemoryStorageMock();

// Seed cloud state: sender already processed up to "now" (recent timestamp)
long lastProcessedEpoch = Instant.now().getEpochSecond();
storage.upload(new java.io.ByteArrayInputStream(
Long.toString(lastProcessedEpoch).getBytes(StandardCharsets.UTF_8)), CLOUD_TIMESTAMP_KEY);
storage.upload(new java.io.ByteArrayInputStream(new byte[0]), CLOUD_PROCESSED_KEY);

deployAndAwait(vertx, storage);

// Publish several "old" deltas (timestamps well before lastProcessedTimestamp)
String[] oldDeltas = {
filePath + "/consumer/delta/optout-delta-001_2025-12-01T10.00.00Z_aabbccdd.dat",
filePath + "/consumer/delta/optout-delta-002_2025-12-15T14.30.00Z_11223344.dat",
filePath + "/consumer/delta/optout-delta-000_2026-01-10T08.00.00Z_deadbeef.dat",
};
for (String oldDelta : oldDeltas) {
vertx.eventBus().publish(eventBusName, oldDelta);
}

// Give the event loop time to process the published events
Thread.sleep(500);

// None of the old deltas should have triggered a send — they should all be skipped
verify(optOutPartnerEndpoint, never()).send(any());

testContext.completeNow();
}
}
Loading