-
Notifications
You must be signed in to change notification settings - Fork 4.5k
SolaceIO - add ack deadline #37164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
SolaceIO - add ack deadline #37164
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,17 +20,22 @@ | |
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; | ||
|
|
||
| import com.solacesystems.jcsmp.BytesXMLMessage; | ||
| import com.solacesystems.jcsmp.JCSMPException; | ||
| import com.solacesystems.jcsmp.XMLMessage; | ||
| import java.io.IOException; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.time.Duration; | ||
| import java.util.ArrayDeque; | ||
| import java.util.NoSuchElementException; | ||
| import java.util.Queue; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import org.apache.beam.sdk.io.UnboundedSource; | ||
| import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; | ||
| import org.apache.beam.sdk.io.solace.broker.SempClient; | ||
|
|
@@ -41,7 +46,6 @@ | |
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
| import org.joda.time.Instant; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -59,6 +63,8 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> { | |
| private final SessionServiceFactory sessionServiceFactory; | ||
| private @Nullable BytesXMLMessage solaceOriginalRecord; | ||
| private @Nullable T solaceMappedRecord; | ||
| private @Nullable Future<?> nackCallback = null; | ||
| private final int ackDeadlineSeconds; | ||
|
|
||
| /** | ||
| * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a | ||
|
|
@@ -67,7 +73,7 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> { | |
| private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>(); | ||
|
|
||
| private static final Cache<UUID, SessionService> sessionServiceCache; | ||
| private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1); | ||
| private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(4); | ||
|
|
||
| static { | ||
| Duration cacheExpirationTimeout = Duration.ofMinutes(1); | ||
|
|
@@ -103,6 +109,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) { | |
| this.sessionServiceFactory = currentSource.getSessionServiceFactory(); | ||
| this.sempClient = currentSource.getSempClientFactory().create(); | ||
| this.readerUuid = UUID.randomUUID(); | ||
| this.ackDeadlineSeconds = currentSource.getAckDeadlineSeconds(); | ||
| } | ||
|
|
||
| private SessionService getSessionService() { | ||
|
|
@@ -150,9 +157,32 @@ public boolean advance() { | |
|
|
||
| @Override | ||
| public void close() { | ||
| try { | ||
| if (nackCallback != null) { | ||
| // wait only for last one to finish, it will mean all the previous one are also done. | ||
| nackCallback.get(ackDeadlineSeconds * 2, TimeUnit.SECONDS); | ||
| } | ||
| } catch (InterruptedException | ExecutionException | TimeoutException e) { | ||
| LOG.error("SolaceIO.Read: Failed to wait till nack background thread is finished"); | ||
| } | ||
| sessionServiceCache.invalidate(readerUuid); | ||
| } | ||
|
|
||
| public void nackMessages(Queue<BytesXMLMessage> checkpoint) { | ||
| BytesXMLMessage msg; | ||
| while ((msg = checkpoint.poll()) != null) { | ||
| try { | ||
| msg.settle(XMLMessage.Outcome.FAILED); | ||
| } catch (IllegalStateException | JCSMPException e) { | ||
| LOG.error( | ||
| "SolaceIO.Read: failed to nack the message with applicationMessageId={}, ackMessageId={}.", | ||
| msg.getApplicationMessageId(), | ||
| msg.getAckMessageId(), | ||
| e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Instant getWatermark() { | ||
| // should be only used by a test receiver | ||
|
|
@@ -164,10 +194,20 @@ public Instant getWatermark() { | |
|
|
||
| @Override | ||
| public UnboundedSource.CheckpointMark getCheckpointMark() { | ||
|
|
||
| ImmutableList<BytesXMLMessage> bytesXMLMessages = ImmutableList.copyOf(receivedMessages); | ||
| Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>(); | ||
| safeToAckMessages.addAll(receivedMessages); | ||
|
Comment on lines
+197
to
+198
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're not modifying I can't recall if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. safeToAckMessages is used async by cleanUpThread, see below
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, nvm. It seems like we're doing the same work that's expected of the bundle finalizer though. |
||
| receivedMessages.clear(); | ||
| return new SolaceCheckpointMark(bytesXMLMessages); | ||
| nackCallback = | ||
| cleanUpThread.submit( | ||
| () -> { | ||
| try { | ||
| Thread.sleep(ackDeadlineSeconds * 1000L); | ||
| } catch (InterruptedException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| nackMessages(safeToAckMessages); | ||
| }); | ||
| return new SolaceCheckpointMark(safeToAckMessages); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this attempts to await the same conditions as when calling
ExecutorService#shutdown()andExecutorService#awaitTermination(long, TimeUnit)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really, there are other background tasks that are scheduled on executor service that I don't want to wait for.