Do not kill a task if offsets are inconsistent but publish from another group is pending#19091
Do not kill a task if offsets are inconsistent but publish from another group is pending#19091kfaraz wants to merge 13 commits intoapache:masterfrom
Conversation
| /** | ||
| * Approximately 10 minutes of retrying using {@link RetryUtils#nextRetrySleepMillis(int)}. | ||
| */ | ||
| private static final int MAX_RETRIES = 13; |
There was a problem hiding this comment.
Any concern about something that will never succeed no matter what take 10 min to fail here? Not sure what we could do about it anyways though. If this is needed to stabilize auto scaling then I guess it is worth it
There was a problem hiding this comment.
Yeah, there may still be cases that may not succeed even if after the 10 min window.
Handling that would require some kind of queueing mechanism for publishing tasks on the supervisor side.
I intend to explore that angle soon and make the TaskGroup more auto-scaler friendly as well. The TaskGroup/groupId concept is currently tightly tied to the assumption of a fixed task count.
There was a problem hiding this comment.
What's the rationale behind extending the retries? If it's useful, go ahead— 10 minutes isn't so bad— but I worry about retrying things that aren't possibly going to succeed.
There was a problem hiding this comment.
I am concerned about the case where we decide not to kill off B since A is currently pending publish,
and when it is finally time for B to publish, A still hasn't finished publishing.
In that case, it would make sense for B to retry for a while.
The current retry count of 5 amounts to only about 1 minute.
I do want to improve upon the retry algorithm such that we throw a retryable exception only if there is another task group pending publish for the partitions that the current task action is trying to update (in fact, let me try including that in this PR since we already have the new method isAnotherGroupPublishing).
There was a problem hiding this comment.
I wonder how many of these cases will actually be solved by extending retries. From what I've seen, 90%+ of the cases where B fails to commit are because task A also outright failed (not because it is slow to commit). In this case, we are prolonging the inevitable.
There was a problem hiding this comment.
Yeah, that's a fair point.
I have updated SegmentTransactionalAppendAction and SegmentTransactionalInsertAction to return a retryable response only if there is a conflicting publish queued up. Otherwise, we will just fail the publish action and not retry anymore.
| .values() | ||
| .stream() | ||
| .flatMap(Collection::stream) | ||
| .filter(group -> !group.equals(taskGroup)) |
There was a problem hiding this comment.
TaskGroup doesn't have an override for the equals method. can we compare by group id or add an equals override? Or am I missing the point here where the reference equality is fine
There was a problem hiding this comment.
Thanks for calling this out!
Yeah, I didn't override the equals/hashCode on purpose since each TaskGroup object is supposed to be distinct. I will add a comment to that effect.
Also the equality filter in this method is just a safe-side measure since the taskGroup passed into this method will always be one of the activelyReadingTaskGroups and it will be compared against pendingCompletionTaskGroups. The two sets will always be distinct except when the target taskGroup is moved from activeReadingTaskGroups to pendingCompletionTaskGroups while this method is in progress (which should not happen either since the notices are all handled by the single-threaded exec).
Let me know if that makes sense or if I should update the logic or add more comments.
There was a problem hiding this comment.
If you are intentionally using reference equality, it's best to use == to make the intent clear. This will also make it robust to someone possibly adding equals to TaskGroup in the future for some reason.
There was a problem hiding this comment.
I had to update the equality check since method signature has also changed. I am now simply checking the set of taskIds in the TaskGroup against the target taskId.
|
In the example above, how are we avoiding the case where we don't actually want to allow B to continue b/c it's using invalid offsets? For example, what if A fails to publish? |
@jtuglu1 , that case is already handled in the existing code. After A publishes to fail, the code flow moves into the As long as A is pending publish though, we have to assume that it is going to succeed and continue with B as usual. |
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
| * A failed publish action should be retried only if there is another task | ||
| * waiting to publish offsets for an overlapping set of partitions. | ||
| */ | ||
| private boolean shouldNotRetryFailure(SegmentPublishResult result, Task task, TaskActionToolbox toolbox) |
There was a problem hiding this comment.
It would be clearer to call this shouldFailImmediately.
| */ | ||
| private boolean shouldNotRetryFailure(SegmentPublishResult result, Task task, TaskActionToolbox toolbox) | ||
| { | ||
| if (result.isSuccess() || !result.isRetryable() || startMetadata != null) { |
There was a problem hiding this comment.
Was this meant to be startMetadata == null?
There was a problem hiding this comment.
Ah, yes, thanks for catching this!
| } else { | ||
| return -1; | ||
| } | ||
| // return count > 0 ? sum / count : -1; |
There was a problem hiding this comment.
Please don't include commented-out code.
| import java.util.Map; | ||
|
|
||
| /** | ||
| * Work in progress. |
There was a problem hiding this comment.
I assume you intend to finish this before merging the PR.
There was a problem hiding this comment.
Yes, still ironing out some kinks in the test.
|
@kfaraz, thank you for fixing these ingestion errors:
Is it worth considering making this property configurable rather than hardcoding it to 13 in this patch? Alternatively, maybe it could be derived dynamically as a function of |
[1]
Yes, @abhishekrb19 , the I assume you would be seeing more errors of the format below: These are retryable errors which typically occur when publish is slow. [2]
I am not really a fan of the current retry mechanism , since it requires the task to blindly retry without knowing how long it should wait. Exponential backoff retries make more sense when the issue is transient (and likely self-resolving). Slow publish, on the other hand, is somewhat predictable/repeatable behaviour and we should probably be able to incorporate that in the retry timeout. I don't want to make the number of retries configurable just yet. Instead, tying it to I want to merge the current patch and verify if the |
|
@gianm , @jtuglu1 , @capistrant , I have updated this PR with fixes for a couple of bugs that I discovered:
|
What'd you have it do in this situation? I suppose if processing rate is zero but lag is high, we shouldn't scale at all (either up or down). The combination of those two metrics suggests something is broken that scaling isn't going to fix.
Huh, I thought we already changed that in #18745. Could you please also update the comment above the code block that currently says: Please also update
I'm ok with going ahead with this patch given that more work is coming in this area, and that hopefully the |
Yeah, apparently, we missed a case in
Yes, agreed.
Yeah, that sounds reasonable. Let me add that check and maybe raise an alert for the same.
Oh, this was an old comment. I later rephrased it. The actual problem was 2-fold: (1) auto-scaler had a bug where it would do spurious scale downs even when cost was some for all task counts, and (2) auto-scaler did not take any action if processing rate was zero. Instead, it should have skipped scaling when processing rate was zero but lag was high. I will update the check accordingly. |
Description
Follow up to #19034 , which addressed the issue of using the correct starting sequences after a scaling event occurs.
Discovered another race condition which causes task failures in pairs.
Change summary
SeekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions()CostBasedAutoScalerIntegrationTestSegmentTransactionalAppendActionandSegmentTransactionalInsertActionto return a retryable error response only if there is a pending publish that conflicts with the current actionSeekableStreamSupervisorIOConfigCostBasedAutoScalerto avoid spurious scale downsCostBasedAutoScalerbefore proceeding with scaling actionRace condition
Example error
Typical scenario
This typically plays out as follows:
runInternal()callsverifyAndMergeCheckpoints()There is already a check to not kill task group B if there are some
pendingCompletionTaskGroupsbut only for the same group B. When scaling happens, partitions are reassigned and tasks from a different group A may also end up updating the offsets that B is reading from.Fix
While performing
verifyAndMergeCheckpoints, consider a task eligible for checkpoint verification only if no other task group (irrespective of groupId) is waiting to publish to any of the partitions from which a task is reading.This still feels like a temporary fix.
A cleaner, more long-term fix would be to rethink the way
TaskGroupis handled insideSeekableStreamSupervisor, so that it lends itself to scaling and partition reassignments better.This PR has: