Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ public class SolaceIO {
}
};
private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
private static final int DEFAULT_ACK_DEADLINE_SECONDS = 60;
private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD =
Duration.standardSeconds(30);
public static final int DEFAULT_WRITER_NUM_SHARDS = 20;
Expand Down Expand Up @@ -461,6 +462,7 @@ public static Read<Solace.Record> read() {
.setParseFn(SolaceRecordMapper::map)
.setTimestampFn(SENDER_TIMESTAMP_FUNCTION)
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
.setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS)
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
}

Expand Down Expand Up @@ -490,6 +492,7 @@ public static <T> Read<T> read(
.setParseFn(parseFn)
.setTimestampFn(timestampFn)
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
.setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS)
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
}

Expand Down Expand Up @@ -587,6 +590,16 @@ public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
return this;
}

/**
* Optional, default: 60. Set to ack deadline after which {@link
* org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader} will start to reject outstanding
* messages that were not successfully checkpointed.
*/
public Read<T> withAckDeadlineSeconds(int ackDeadlineSeconds) {
configurationBuilder.setAckDeadlineSeconds(ackDeadlineSeconds);
return this;
}

/**
* Set a factory that creates a {@link org.apache.beam.sdk.io.solace.broker.SempClientFactory}.
*
Expand Down Expand Up @@ -689,6 +702,8 @@ abstract static class Configuration<T> {

abstract Duration getWatermarkIdleDurationThreshold();

abstract int getAckDeadlineSeconds();

public static <T> Builder<T> builder() {
Builder<T> builder =
new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder<T>();
Expand Down Expand Up @@ -719,6 +734,8 @@ abstract Builder<T> setParseFn(

abstract Builder<T> setWatermarkIdleDurationThreshold(Duration idleDurationThreshold);

abstract Builder<T> setAckDeadlineSeconds(int seconds);

abstract Configuration<T> build();
}
}
Expand Down Expand Up @@ -756,7 +773,8 @@ public PCollection<T> expand(PBegin input) {
coder,
configuration.getTimestampFn(),
configuration.getWatermarkIdleDurationThreshold(),
configuration.getParseFn())));
configuration.getParseFn(),
configuration.getAckDeadlineSeconds())));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -143,6 +144,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException

ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
flowProperties.setEndpoint(queue);
flowProperties.addRequiredSettlementOutcomes(
XMLMessage.Outcome.FAILED, XMLMessage.Outcome.REJECTED);
flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);

EndpointProperties endpointProperties = new EndpointProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
Expand All @@ -38,7 +38,7 @@
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient List<BytesXMLMessage> safeToAck;
private transient Queue<BytesXMLMessage> safeToAck;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}
Expand All @@ -48,13 +48,14 @@ private SolaceCheckpointMark() {}
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
*/
SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
}

@Override
public void finalizeCheckpoint() {
for (BytesXMLMessage msg : safeToAck) {
BytesXMLMessage msg;
while ((msg = safeToAck.poll()) != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.XMLMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.solace.broker.SempClient;
Expand All @@ -41,7 +46,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -59,6 +63,8 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private final SessionServiceFactory sessionServiceFactory;
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
private @Nullable Future<?> nackCallback = null;
private final int ackDeadlineSeconds;

/**
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
Expand All @@ -67,7 +73,7 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>();

private static final Cache<UUID, SessionService> sessionServiceCache;
private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1);
private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(4);

static {
Duration cacheExpirationTimeout = Duration.ofMinutes(1);
Expand Down Expand Up @@ -103,6 +109,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) {
this.sessionServiceFactory = currentSource.getSessionServiceFactory();
this.sempClient = currentSource.getSempClientFactory().create();
this.readerUuid = UUID.randomUUID();
this.ackDeadlineSeconds = currentSource.getAckDeadlineSeconds();
}

private SessionService getSessionService() {
Expand Down Expand Up @@ -150,9 +157,32 @@ public boolean advance() {

@Override
public void close() {
try {
if (nackCallback != null) {
// wait only for last one to finish, it will mean all the previous one are also done.
nackCallback.get(ackDeadlineSeconds * 2, TimeUnit.SECONDS);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("SolaceIO.Read: Failed to wait till nack background thread is finished");
}
Comment on lines +160 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this attempts to await the same conditions as when callingExecutorService#shutdown() and ExecutorService#awaitTermination(long, TimeUnit)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, there are other background tasks that are scheduled on executor service that I don't want to wait for.

sessionServiceCache.invalidate(readerUuid);
}

public void nackMessages(Queue<BytesXMLMessage> checkpoint) {
BytesXMLMessage msg;
while ((msg = checkpoint.poll()) != null) {
try {
msg.settle(XMLMessage.Outcome.FAILED);
} catch (IllegalStateException | JCSMPException e) {
LOG.error(
"SolaceIO.Read: failed to nack the message with applicationMessageId={}, ackMessageId={}.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
}
}

@Override
public Instant getWatermark() {
// should be only used by a test receiver
Expand All @@ -164,10 +194,20 @@ public Instant getWatermark() {

@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {

ImmutableList<BytesXMLMessage> bytesXMLMessages = ImmutableList.copyOf(receivedMessages);
Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
safeToAckMessages.addAll(receivedMessages);
Comment on lines +197 to +198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not modifying safeToAckMessages outside the scope of this function, right? This doesn't have to be a concurrent container if that's the case.

I can't recall if receivedMessages can safely be replaced by the time this function is called. If so, maybe this can be cleaned up a bit more by reassigning receivedMessages to a new instance and passing the old instance off to nackMessages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safeToAckMessages is used async by cleanUpThread, see below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, nvm. It seems like we're doing the same work that's expected of the bundle finalizer though.
I'm trying to think if we could capture the outstanding checkpoint finalization calls instead with an ExecutorService for example. When the reader is closed you'd then call ExecutorService#shutdownNow() which returns a collection of all Runnable items that will not be executed and those items could be executed then instead.

receivedMessages.clear();
return new SolaceCheckpointMark(bytesXMLMessages);
nackCallback =
cleanUpThread.submit(
() -> {
try {
Thread.sleep(ackDeadlineSeconds * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
nackMessages(safeToAckMessages);
});
return new SolaceCheckpointMark(safeToAckMessages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class UnboundedSolaceSource<T> extends UnboundedSource<T, SolaceCheckpoin
private final SerializableFunction<T, Instant> timestampFn;
private final Duration watermarkIdleDurationThreshold;
private final SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn;
private final int ackDeadlineSeconds;

public Queue getQueue() {
return queue;
Expand All @@ -70,6 +71,10 @@ public Duration getWatermarkIdleDurationThreshold() {
return watermarkIdleDurationThreshold;
}

public int getAckDeadlineSeconds() {
return ackDeadlineSeconds;
}

public SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn() {
return parseFn;
}
Expand All @@ -83,7 +88,8 @@ public UnboundedSolaceSource(
Coder<T> coder,
SerializableFunction<T, Instant> timestampFn,
Duration watermarkIdleDurationThreshold,
SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn) {
SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn,
int ackDeadlineSeconds) {
this.queue = queue;
this.sempClientFactory = sempClientFactory;
this.sessionServiceFactory = sessionServiceFactory;
Expand All @@ -93,6 +99,7 @@ public UnboundedSolaceSource(
this.timestampFn = timestampFn;
this.watermarkIdleDurationThreshold = watermarkIdleDurationThreshold;
this.parseFn = parseFn;
this.ackDeadlineSeconds = ackDeadlineSeconds;
}

@Override
Expand Down Expand Up @@ -134,7 +141,8 @@ private List<UnboundedSolaceSource<T>> getSolaceSources(
coder,
timestampFn,
watermarkIdleDurationThreshold,
parseFn);
parseFn,
ackDeadlineSeconds);
sourceList.add(source);
}
return sourceList;
Expand Down
Loading
Loading