Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
Expand All @@ -38,7 +38,7 @@
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient List<BytesXMLMessage> safeToAck;
private transient Queue<BytesXMLMessage> safeToAck;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}
Expand All @@ -48,13 +48,14 @@ private SolaceCheckpointMark() {}
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
*/
SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
}

@Override
public void finalizeCheckpoint() {
for (BytesXMLMessage msg : safeToAck) {
BytesXMLMessage msg;
while ((msg = safeToAck.poll()) != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -41,7 +42,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -60,6 +60,12 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;

/**
* Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION:
* Accessed by both reader and checkpointing threads.
*/
private final Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();

/**
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
* {@link SolaceCheckpointMark}.
Expand Down Expand Up @@ -130,6 +136,8 @@ public boolean start() {

@Override
public boolean advance() {
finalizeReadyMessages();

BytesXMLMessage receivedXmlMessage;
try {
receivedXmlMessage = getSessionService().getReceiver().receive();
Expand All @@ -150,9 +158,27 @@ public boolean advance() {

@Override
public void close() {
finalizeReadyMessages();
sessionServiceCache.invalidate(readerUuid);
}

public void finalizeReadyMessages() {
BytesXMLMessage msg;
while ((msg = safeToAckMessages.poll()) != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
safeToAckMessages.add(msg); // In case the error was transient, might succeed later
break; // Commit is only best effort
}
}
}

@Override
public Instant getWatermark() {
// should be only used by a test receiver
Expand All @@ -164,10 +190,9 @@ public Instant getWatermark() {

@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {

ImmutableList<BytesXMLMessage> bytesXMLMessages = ImmutableList.copyOf(receivedMessages);
safeToAckMessages.addAll(receivedMessages);
receivedMessages.clear();
return new SolaceCheckpointMark(bytesXMLMessages);
return new SolaceCheckpointMark(safeToAckMessages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
// mark all consumed messages as ready to be acknowledged
CheckpointMark checkpointMark = reader.getCheckpointMark();

// consume 1 more message.
// consume 1 more message. This will call #ackMsg() on messages that were ready to be acked.
reader.advance();
assertEquals(0, countAckMessages.get());
assertEquals(4, countAckMessages.get());

// consume 1 more message. No change in the acknowledged messages.
reader.advance();
assertEquals(0, countAckMessages.get());
assertEquals(4, countAckMessages.get());

// acknowledge from the first checkpoint
checkpointMark.finalizeCheckpoint();
Expand All @@ -473,73 +473,6 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
assertEquals(4, countAckMessages.get());
}

@Test
public void testLateCheckpointOverlappingFlushingOfNextBundle() throws Exception {
AtomicInteger countConsumedMessages = new AtomicInteger(0);
AtomicInteger countAckMessages = new AtomicInteger(0);

// Broker that creates input data
SerializableFunction<Integer, BytesXMLMessage> recordFn =
index -> {
List<BytesXMLMessage> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
messages.add(
SolaceDataUtils.getBytesXmlMessage(
"payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
}
countConsumedMessages.incrementAndGet();
return getOrNull(index, messages);
};

SessionServiceFactory fakeSessionServiceFactory =
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();

Read<Record> spec =
getDefaultRead()
.withSessionServiceFactory(fakeSessionServiceFactory)
.withMaxNumConnections(4);

UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);

UnboundedReader<Record> reader =
initialSource.createReader(PipelineOptionsFactory.create(), null);

// start the reader and move to the first record
assertTrue(reader.start());

// consume 3 messages (NB: #start() already consumed the first message)
for (int i = 0; i < 3; i++) {
assertTrue(String.format("Failed at %d-th message", i), reader.advance());
}

// #advance() was called, but the messages were not ready to be acknowledged.
assertEquals(0, countAckMessages.get());

// mark all consumed messages as ready to be acknowledged
CheckpointMark checkpointMark = reader.getCheckpointMark();

// data is flushed

// consume 1 more message.
reader.advance();
assertEquals(0, countAckMessages.get());

// consume 1 more message. No change in the acknowledged messages.
reader.advance();
assertEquals(0, countAckMessages.get());

CheckpointMark checkpointMark2 = reader.getCheckpointMark();
// data is prepared for flushing that will be rejected

// acknowledge from the first checkpoint may arrive late
checkpointMark.finalizeCheckpoint();

assertEquals(4, countAckMessages.get());

checkpointMark2.finalizeCheckpoint();
assertEquals(6, countAckMessages.get());
}

@Test
public void testCheckpointMarkSafety() throws Exception {

Expand Down
Loading