From f4cbb28956a5b37817803dfe5ca8735b1482ab90 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 22 Dec 2025 09:59:24 +0100 Subject: [PATCH] Revert "SolaceIO data loss - remove message ack from close and advance as it may lead to data loss during work rebalancing or retry. " This reverts commit f277b5ec59ba213af547167080fd8f5bd210e6a7. --- .../io/solace/read/SolaceCheckpointMark.java | 9 ++- .../io/solace/read/UnboundedSolaceReader.java | 33 ++++++++- .../beam/sdk/io/solace/SolaceIOReadTest.java | 73 +------------------ 3 files changed, 37 insertions(+), 78 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index eb2d4b3006a6..a913fd6133ea 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -18,8 +18,8 @@ package org.apache.beam.sdk.io.solace.read; import com.solacesystems.jcsmp.BytesXMLMessage; -import java.util.List; import java.util.Objects; +import java.util.Queue; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -38,7 +38,7 @@ @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient List safeToAck; + private transient Queue safeToAck; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} @@ -48,13 +48,14 @@ private SolaceCheckpointMark() {} * * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. */ - SolaceCheckpointMark(List safeToAck) { + SolaceCheckpointMark(Queue safeToAck) { this.safeToAck = safeToAck; } @Override public void finalizeCheckpoint() { - for (BytesXMLMessage msg : safeToAck) { + BytesXMLMessage msg; + while ((msg = safeToAck.poll()) != null) { try { msg.ackMessage(); } catch (IllegalStateException e) { diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index 7c756169ef3e..dc84e0a07017 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -27,6 +27,7 @@ import java.util.NoSuchElementException; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,7 +42,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -60,6 +60,12 @@ class UnboundedSolaceReader extends UnboundedReader { private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; + /** + * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: + * Accessed by both reader and checkpointing threads. + */ + private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); + /** * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a * {@link SolaceCheckpointMark}. @@ -130,6 +136,8 @@ public boolean start() { @Override public boolean advance() { + finalizeReadyMessages(); + BytesXMLMessage receivedXmlMessage; try { receivedXmlMessage = getSessionService().getReceiver().receive(); @@ -150,9 +158,27 @@ public boolean advance() { @Override public void close() { + finalizeReadyMessages(); sessionServiceCache.invalidate(readerUuid); } + public void finalizeReadyMessages() { + BytesXMLMessage msg; + while ((msg = safeToAckMessages.poll()) != null) { + try { + msg.ackMessage(); + } catch (IllegalStateException e) { + LOG.error( + "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); + safeToAckMessages.add(msg); // In case the error was transient, might succeed later + break; // Commit is only best effort + } + } + } + @Override public Instant getWatermark() { // should be only used by a test receiver @@ -164,10 +190,9 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - - ImmutableList bytesXMLMessages = ImmutableList.copyOf(receivedMessages); + safeToAckMessages.addAll(receivedMessages); receivedMessages.clear(); - return new SolaceCheckpointMark(bytesXMLMessages); + return new SolaceCheckpointMark(safeToAckMessages); } @Override diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index c17ec3e128d2..a1f80932eddf 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -458,13 +458,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // mark all consumed messages as ready to be acknowledged CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 1 more message. + // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked. reader.advance(); - assertEquals(0, countAckMessages.get()); + assertEquals(4, countAckMessages.get()); // consume 1 more message. No change in the acknowledged messages. reader.advance(); - assertEquals(0, countAckMessages.get()); + assertEquals(4, countAckMessages.get()); // acknowledge from the first checkpoint checkpointMark.finalizeCheckpoint(); @@ -473,73 +473,6 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { assertEquals(4, countAckMessages.get()); } - @Test - public void testLateCheckpointOverlappingFlushingOfNextBundle() throws Exception { - AtomicInteger countConsumedMessages = new AtomicInteger(0); - AtomicInteger countAckMessages = new AtomicInteger(0); - - // Broker that creates input data - SerializableFunction recordFn = - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }; - - SessionServiceFactory fakeSessionServiceFactory = - MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); - - Read spec = - getDefaultRead() - .withSessionServiceFactory(fakeSessionServiceFactory) - .withMaxNumConnections(4); - - UnboundedSolaceSource initialSource = getSource(spec, pipeline); - - UnboundedReader reader = - initialSource.createReader(PipelineOptionsFactory.create(), null); - - // start the reader and move to the first record - assertTrue(reader.start()); - - // consume 3 messages (NB: #start() already consumed the first message) - for (int i = 0; i < 3; i++) { - assertTrue(String.format("Failed at %d-th message", i), reader.advance()); - } - - // #advance() was called, but the messages were not ready to be acknowledged. - assertEquals(0, countAckMessages.get()); - - // mark all consumed messages as ready to be acknowledged - CheckpointMark checkpointMark = reader.getCheckpointMark(); - - // data is flushed - - // consume 1 more message. - reader.advance(); - assertEquals(0, countAckMessages.get()); - - // consume 1 more message. No change in the acknowledged messages. - reader.advance(); - assertEquals(0, countAckMessages.get()); - - CheckpointMark checkpointMark2 = reader.getCheckpointMark(); - // data is prepared for flushing that will be rejected - - // acknowledge from the first checkpoint may arrive late - checkpointMark.finalizeCheckpoint(); - - assertEquals(4, countAckMessages.get()); - - checkpointMark2.finalizeCheckpoint(); - assertEquals(6, countAckMessages.get()); - } - @Test public void testCheckpointMarkSafety() throws Exception {