From f4cbb28956a5b37817803dfe5ca8735b1482ab90 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 22 Dec 2025 09:59:24 +0100 Subject: [PATCH 1/2] Revert "SolaceIO data loss - remove message ack from close and advance as it may lead to data loss during work rebalancing or retry. " This reverts commit f277b5ec59ba213af547167080fd8f5bd210e6a7. --- .../io/solace/read/SolaceCheckpointMark.java | 9 ++- .../io/solace/read/UnboundedSolaceReader.java | 33 ++++++++- .../beam/sdk/io/solace/SolaceIOReadTest.java | 73 +------------------ 3 files changed, 37 insertions(+), 78 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index eb2d4b3006a6..a913fd6133ea 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -18,8 +18,8 @@ package org.apache.beam.sdk.io.solace.read; import com.solacesystems.jcsmp.BytesXMLMessage; -import java.util.List; import java.util.Objects; +import java.util.Queue; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -38,7 +38,7 @@ @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient List safeToAck; + private transient Queue safeToAck; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} @@ -48,13 +48,14 @@ private SolaceCheckpointMark() {} * * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. */ - SolaceCheckpointMark(List safeToAck) { + SolaceCheckpointMark(Queue safeToAck) { this.safeToAck = safeToAck; } @Override public void finalizeCheckpoint() { - for (BytesXMLMessage msg : safeToAck) { + BytesXMLMessage msg; + while ((msg = safeToAck.poll()) != null) { try { msg.ackMessage(); } catch (IllegalStateException e) { diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index 7c756169ef3e..dc84e0a07017 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -27,6 +27,7 @@ import java.util.NoSuchElementException; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,7 +42,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -60,6 +60,12 @@ class UnboundedSolaceReader extends UnboundedReader { private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; + /** + * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: + * Accessed by both reader and checkpointing threads. + */ + private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); + /** * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a * {@link SolaceCheckpointMark}. @@ -130,6 +136,8 @@ public boolean start() { @Override public boolean advance() { + finalizeReadyMessages(); + BytesXMLMessage receivedXmlMessage; try { receivedXmlMessage = getSessionService().getReceiver().receive(); @@ -150,9 +158,27 @@ public boolean advance() { @Override public void close() { + finalizeReadyMessages(); sessionServiceCache.invalidate(readerUuid); } + public void finalizeReadyMessages() { + BytesXMLMessage msg; + while ((msg = safeToAckMessages.poll()) != null) { + try { + msg.ackMessage(); + } catch (IllegalStateException e) { + LOG.error( + "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); + safeToAckMessages.add(msg); // In case the error was transient, might succeed later + break; // Commit is only best effort + } + } + } + @Override public Instant getWatermark() { // should be only used by a test receiver @@ -164,10 +190,9 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - - ImmutableList bytesXMLMessages = ImmutableList.copyOf(receivedMessages); + safeToAckMessages.addAll(receivedMessages); receivedMessages.clear(); - return new SolaceCheckpointMark(bytesXMLMessages); + return new SolaceCheckpointMark(safeToAckMessages); } @Override diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index c17ec3e128d2..a1f80932eddf 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -458,13 +458,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // mark all consumed messages as ready to be acknowledged CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 1 more message. + // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked. reader.advance(); - assertEquals(0, countAckMessages.get()); + assertEquals(4, countAckMessages.get()); // consume 1 more message. No change in the acknowledged messages. reader.advance(); - assertEquals(0, countAckMessages.get()); + assertEquals(4, countAckMessages.get()); // acknowledge from the first checkpoint checkpointMark.finalizeCheckpoint(); @@ -473,73 +473,6 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { assertEquals(4, countAckMessages.get()); } - @Test - public void testLateCheckpointOverlappingFlushingOfNextBundle() throws Exception { - AtomicInteger countConsumedMessages = new AtomicInteger(0); - AtomicInteger countAckMessages = new AtomicInteger(0); - - // Broker that creates input data - SerializableFunction recordFn = - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }; - - SessionServiceFactory fakeSessionServiceFactory = - MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); - - Read spec = - getDefaultRead() - .withSessionServiceFactory(fakeSessionServiceFactory) - .withMaxNumConnections(4); - - UnboundedSolaceSource initialSource = getSource(spec, pipeline); - - UnboundedReader reader = - initialSource.createReader(PipelineOptionsFactory.create(), null); - - // start the reader and move to the first record - assertTrue(reader.start()); - - // consume 3 messages (NB: #start() already consumed the first message) - for (int i = 0; i < 3; i++) { - assertTrue(String.format("Failed at %d-th message", i), reader.advance()); - } - - // #advance() was called, but the messages were not ready to be acknowledged. - assertEquals(0, countAckMessages.get()); - - // mark all consumed messages as ready to be acknowledged - CheckpointMark checkpointMark = reader.getCheckpointMark(); - - // data is flushed - - // consume 1 more message. - reader.advance(); - assertEquals(0, countAckMessages.get()); - - // consume 1 more message. No change in the acknowledged messages. - reader.advance(); - assertEquals(0, countAckMessages.get()); - - CheckpointMark checkpointMark2 = reader.getCheckpointMark(); - // data is prepared for flushing that will be rejected - - // acknowledge from the first checkpoint may arrive late - checkpointMark.finalizeCheckpoint(); - - assertEquals(4, countAckMessages.get()); - - checkpointMark2.finalizeCheckpoint(); - assertEquals(6, countAckMessages.get()); - } - @Test public void testCheckpointMarkSafety() throws Exception { From 3149f96c728235a63da38c76c94f76197ad716ea Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 22 Dec 2025 11:22:15 +0100 Subject: [PATCH 2/2] SolaceIO data loss - remove message ack from close and advance as it may lead to data loss during work rebalancing or retry. - add async NACK with configurable deadline so any not finalized messages are rejected and retried. --- .../apache/beam/sdk/io/solace/SolaceIO.java | 20 +++++++- .../io/solace/broker/JcsmpSessionService.java | 3 ++ .../io/solace/read/UnboundedSolaceReader.java | 49 ++++++++++++------- .../io/solace/read/UnboundedSolaceSource.java | 12 ++++- .../beam/sdk/io/solace/SolaceIOReadTest.java | 27 +++++++--- ...SolaceIOCustomSessionServiceFactoryIT.java | 3 +- .../io/solace/it/SolaceIOMultipleSempIT.java | 3 +- 7 files changed, 87 insertions(+), 30 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index 63509126022b..d80517a267f5 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -412,6 +412,7 @@ public class SolaceIO { } }; private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; + private static final int DEFAULT_ACK_DEADLINE_SECONDS = 60; private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD = Duration.standardSeconds(30); public static final int DEFAULT_WRITER_NUM_SHARDS = 20; @@ -461,6 +462,7 @@ public static Read read() { .setParseFn(SolaceRecordMapper::map) .setTimestampFn(SENDER_TIMESTAMP_FUNCTION) .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) + .setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS) .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); } @@ -490,6 +492,7 @@ public static Read read( .setParseFn(parseFn) .setTimestampFn(timestampFn) .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) + .setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS) .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); } @@ -587,6 +590,16 @@ public Read withDeduplicateRecords(boolean deduplicateRecords) { return this; } + /** + * Optional, default: 60. Set to ack deadline after which {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader} will start to reject outstanding + * messages that were not successfully checkpointed. + */ + public Read withAckDeadlineSeconds(int ackDeadlineSeconds) { + configurationBuilder.setAckDeadlineSeconds(ackDeadlineSeconds); + return this; + } + /** * Set a factory that creates a {@link org.apache.beam.sdk.io.solace.broker.SempClientFactory}. * @@ -689,6 +702,8 @@ abstract static class Configuration { abstract Duration getWatermarkIdleDurationThreshold(); + abstract int getAckDeadlineSeconds(); + public static Builder builder() { Builder builder = new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder(); @@ -719,6 +734,8 @@ abstract Builder setParseFn( abstract Builder setWatermarkIdleDurationThreshold(Duration idleDurationThreshold); + abstract Builder setAckDeadlineSeconds(int seconds); + abstract Configuration build(); } } @@ -756,7 +773,8 @@ public PCollection expand(PBegin input) { coder, configuration.getTimestampFn(), configuration.getWatermarkIdleDurationThreshold(), - configuration.getParseFn()))); + configuration.getParseFn(), + configuration.getAckDeadlineSeconds()))); } @VisibleForTesting diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java index 818368a92b9f..e4bdff7cf29c 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java @@ -29,6 +29,7 @@ import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.XMLMessage; import com.solacesystems.jcsmp.XMLMessageProducer; import java.io.IOException; import java.util.Objects; @@ -143,6 +144,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException ConsumerFlowProperties flowProperties = new ConsumerFlowProperties(); flowProperties.setEndpoint(queue); + flowProperties.addRequiredSettlementOutcomes( + XMLMessage.Outcome.FAILED, XMLMessage.Outcome.REJECTED); flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT); EndpointProperties endpointProperties = new EndpointProperties(); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index dc84e0a07017..ca475d8c1750 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -20,6 +20,8 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.XMLMessage; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -30,8 +32,10 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.solace.broker.SempClient; @@ -59,12 +63,8 @@ class UnboundedSolaceReader extends UnboundedReader { private final SessionServiceFactory sessionServiceFactory; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; - - /** - * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: - * Accessed by both reader and checkpointing threads. - */ - private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); + private @Nullable Future nackCallback = null; + private final int ackDeadlineSeconds; /** * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a @@ -73,7 +73,7 @@ class UnboundedSolaceReader extends UnboundedReader { private final Queue receivedMessages = new ArrayDeque<>(); private static final Cache sessionServiceCache; - private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1); + private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(4); static { Duration cacheExpirationTimeout = Duration.ofMinutes(1); @@ -109,6 +109,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { this.sessionServiceFactory = currentSource.getSessionServiceFactory(); this.sempClient = currentSource.getSempClientFactory().create(); this.readerUuid = UUID.randomUUID(); + this.ackDeadlineSeconds = currentSource.getAckDeadlineSeconds(); } private SessionService getSessionService() { @@ -136,8 +137,6 @@ public boolean start() { @Override public boolean advance() { - finalizeReadyMessages(); - BytesXMLMessage receivedXmlMessage; try { receivedXmlMessage = getSessionService().getReceiver().receive(); @@ -158,23 +157,28 @@ public boolean advance() { @Override public void close() { - finalizeReadyMessages(); + try { + if (nackCallback != null) { + // wait only for last one to finish, it will mean all the previous one are also done. + nackCallback.get(ackDeadlineSeconds * 2, TimeUnit.SECONDS); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("SolaceIO.Read: Failed to wait till nack background thread is finished"); + } sessionServiceCache.invalidate(readerUuid); } - public void finalizeReadyMessages() { + public void nackMessages(Queue checkpoint) { BytesXMLMessage msg; - while ((msg = safeToAckMessages.poll()) != null) { + while ((msg = checkpoint.poll()) != null) { try { - msg.ackMessage(); - } catch (IllegalStateException e) { + msg.settle(XMLMessage.Outcome.FAILED); + } catch (IllegalStateException | JCSMPException e) { LOG.error( - "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", + "SolaceIO.Read: failed to nack the message with applicationMessageId={}, ackMessageId={}.", msg.getApplicationMessageId(), msg.getAckMessageId(), e); - safeToAckMessages.add(msg); // In case the error was transient, might succeed later - break; // Commit is only best effort } } } @@ -190,8 +194,19 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { + Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); safeToAckMessages.addAll(receivedMessages); receivedMessages.clear(); + nackCallback = + cleanUpThread.submit( + () -> { + try { + Thread.sleep(ackDeadlineSeconds * 1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + nackMessages(safeToAckMessages); + }); return new SolaceCheckpointMark(safeToAckMessages); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java index 1cb17a49fbdb..81b114ba268c 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java @@ -49,6 +49,7 @@ public class UnboundedSolaceSource extends UnboundedSource timestampFn; private final Duration watermarkIdleDurationThreshold; private final SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn; + private final int ackDeadlineSeconds; public Queue getQueue() { return queue; @@ -70,6 +71,10 @@ public Duration getWatermarkIdleDurationThreshold() { return watermarkIdleDurationThreshold; } + public int getAckDeadlineSeconds() { + return ackDeadlineSeconds; + } + public SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn() { return parseFn; } @@ -83,7 +88,8 @@ public UnboundedSolaceSource( Coder coder, SerializableFunction timestampFn, Duration watermarkIdleDurationThreshold, - SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn) { + SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn, + int ackDeadlineSeconds) { this.queue = queue; this.sempClientFactory = sempClientFactory; this.sessionServiceFactory = sessionServiceFactory; @@ -93,6 +99,7 @@ public UnboundedSolaceSource( this.timestampFn = timestampFn; this.watermarkIdleDurationThreshold = watermarkIdleDurationThreshold; this.parseFn = parseFn; + this.ackDeadlineSeconds = ackDeadlineSeconds; } @Override @@ -134,7 +141,8 @@ private List> getSolaceSources( coder, timestampFn, watermarkIdleDurationThreshold, - parseFn); + parseFn, + ackDeadlineSeconds); sourceList.add(source); } return sourceList; diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index a1f80932eddf..c2120fa5323a 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -72,14 +72,16 @@ private Read getDefaultRead() { return SolaceIO.read() .from(Solace.Queue.fromName("queue")) .withSempClientFactory(MockSempClientFactory.getDefaultMock()) - .withMaxNumConnections(1); + .withMaxNumConnections(1) + .withAckDeadlineSeconds(1); } private Read getDefaultReadForTopic() { return SolaceIO.read() .from(Solace.Topic.fromName("topic")) .withSempClientFactory(MockSempClientFactory.getDefaultMock()) - .withMaxNumConnections(1); + .withMaxNumConnections(1) + .withAckDeadlineSeconds(1); } private static BytesXMLMessage getOrNull(Integer index, List messages) { @@ -97,7 +99,8 @@ private static UnboundedSolaceSource getSource(Read spec, TestPi spec.inferCoder(pipeline, configuration.getTypeDescriptor()), configuration.getTimestampFn(), configuration.getWatermarkIdleDurationThreshold(), - configuration.getParseFn()); + configuration.getParseFn(), + configuration.getAckDeadlineSeconds()); } @Test @@ -437,7 +440,8 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { Read spec = getDefaultRead() .withSessionServiceFactory(fakeSessionServiceFactory) - .withMaxNumConnections(4); + .withMaxNumConnections(4) + .withAckDeadlineSeconds(1); UnboundedSolaceSource initialSource = getSource(spec, pipeline); @@ -458,19 +462,26 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // mark all consumed messages as ready to be acknowledged CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked. + // consume 1 more message. This will still not call ack. reader.advance(); - assertEquals(4, countAckMessages.get()); + assertEquals(0, countAckMessages.get()); // consume 1 more message. No change in the acknowledged messages. reader.advance(); - assertEquals(4, countAckMessages.get()); + assertEquals(0, countAckMessages.get()); // acknowledge from the first checkpoint checkpointMark.finalizeCheckpoint(); // No change in the acknowledged messages, because they were acknowledged in the #advance() // method. assertEquals(4, countAckMessages.get()); + + checkpointMark = reader.getCheckpointMark(); + + Thread.sleep(2000); + checkpointMark.finalizeCheckpoint(); + // messages were nacked, no chane in expected values + assertEquals(4, countAckMessages.get()); } @Test @@ -542,7 +553,7 @@ public void testCheckpointMarkSafety() throws Exception { @Test public void testDefaultCoder() { Coder coder = - new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null) + new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null, 1) .getCheckpointMarkCoder(); CoderProperties.coderSerializable(coder); } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java index b047026a2bbd..0d07bd7ffaf9 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java @@ -102,6 +102,7 @@ public void test01writeAndReadWithCustomSessionServiceFactory() { SolaceIO.read() .from(Queue.fromName(QUEUE_NAME)) .withMaxNumConnections(1) + .withAckDeadlineSeconds(10) .withDeduplicateRecords(true) .withSempClientFactory( BasicAuthSempClientFactory.builder() @@ -118,7 +119,7 @@ public void test01writeAndReadWithCustomSessionServiceFactory() { PipelineResult pipelineResult = writerPipeline.run(); // We need enough time for Beam to pull all messages from the queue, but we need a timeout too, // as the Read connector will keep attempting to read forever. - pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); + pipelineResult.waitUntilFinish(Duration.standardMinutes(2)); MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT); diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java index 77d00b4e41ec..a11b20d79817 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java @@ -107,6 +107,7 @@ public void test01writeAndReadWithMultipleSempClientFactory() { SolaceIO.read() .from(Queue.fromName(QUEUE_NAME)) .withMaxNumConnections(1) + .withAckDeadlineSeconds(10) .withDeduplicateRecords(true) .withSempClientFactory( BasicAuthMultipleSempClientFactory.builder() @@ -131,7 +132,7 @@ public void test01writeAndReadWithMultipleSempClientFactory() { PipelineResult pipelineResult = writerPipeline.run(); // We need enough time for Beam to pull all messages from the queue, but we need a timeout too, // as the Read connector will keep attempting to read forever. - pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); + pipelineResult.waitUntilFinish(Duration.standardMinutes(2)); MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT);