Skip to content

Do not kill a task if offsets are inconsistent but publish from another group is pending#19091

Open
kfaraz wants to merge 13 commits intoapache:masterfrom
kfaraz:fix_stream_publish_bug
Open

Do not kill a task if offsets are inconsistent but publish from another group is pending#19091
kfaraz wants to merge 13 commits intoapache:masterfrom
kfaraz:fix_stream_publish_bug

Conversation

@kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Mar 5, 2026

Description

Follow up to #19034 , which addressed the issue of using the correct starting sequences after a scaling event occurs.

Discovered another race condition which causes task failures in pairs.

Change summary

  • Add method SeekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions()
  • Use this method to check if a task needs to wait before publishing its own offests
  • Update CostBasedAutoScalerIntegrationTest
  • Update SegmentTransactionalAppendAction and SegmentTransactionalInsertAction to return a retryable error response only if there is a pending publish that conflicts with the current action
  • Fix bug in SeekableStreamSupervisorIOConfig
  • Fix bug in CostBasedAutoScaler to avoid spurious scale downs
  • Validate metrics in CostBasedAutoScaler before proceeding with scaling action

Race condition

Example error
First failure:
Killing task[A] as its checkpoints[...] are not consistent with group checkpoints[...] or latest offsets in DB[...].

Second failure:
Stored metadata state[...] has already been updated by other tasks and has diverged from the start metadata state[...].
Typical scenario

This typically plays out as follows:

  • Offsets in DB are currently at E0.
  • Scaling event occurs
  • New task group B is created and is assigned a partition P1 which an old task group A (still pending completion) was also reading from.
  • Task group A is requested to checkpoint and start publishing.
  • Task group A returns the final end offset as E1.
  • Task group B is assigned starting offset as E1 (fixed in Fix recurring bug "Inconsistency between stored metadata" during auto-scaling #19034 and now works as expected)
  • Next invocation of runInternal() calls verifyAndMergeCheckpoints()
  • Task group B is found to have offsets E1 which are inconsistent with DB (since A is yet to publish E1 and DB is still at E0).
  • Task group B is killed
  • New task group C is launched with starting offsets E0
  • Task group A finishes publishing and updates offsets in metadata store from E0 to E1
  • Task group C also fails while publishing since offsets have already advanced from what C knew (i.e. E0)

There is already a check to not kill task group B if there are some pendingCompletionTaskGroups but only for the same group B. When scaling happens, partitions are reassigned and tasks from a different group A may also end up updating the offsets that B is reading from.

Fix

While performing verifyAndMergeCheckpoints, consider a task eligible for checkpoint verification only if no other task group (irrespective of groupId) is waiting to publish to any of the partitions from which a task is reading.

This still feels like a temporary fix.

A cleaner, more long-term fix would be to rethink the way TaskGroup is handled inside SeekableStreamSupervisor, so that it lends itself to scaling and partition reassignments better.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz changed the title Do not kill a task if offsets are inconsistent but publish is pending Do not kill a task if offsets are inconsistent but publish from another group is pending Mar 5, 2026
/**
* Approximately 10 minutes of retrying using {@link RetryUtils#nextRetrySleepMillis(int)}.
*/
private static final int MAX_RETRIES = 13;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concern about something that will never succeed no matter what take 10 min to fail here? Not sure what we could do about it anyways though. If this is needed to stabilize auto scaling then I guess it is worth it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there may still be cases that may not succeed even if after the 10 min window.
Handling that would require some kind of queueing mechanism for publishing tasks on the supervisor side.

I intend to explore that angle soon and make the TaskGroup more auto-scaler friendly as well. The TaskGroup/groupId concept is currently tightly tied to the assumption of a fixed task count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale behind extending the retries? If it's useful, go ahead— 10 minutes isn't so bad— but I worry about retrying things that aren't possibly going to succeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned about the case where we decide not to kill off B since A is currently pending publish,
and when it is finally time for B to publish, A still hasn't finished publishing.

In that case, it would make sense for B to retry for a while.
The current retry count of 5 amounts to only about 1 minute.

I do want to improve upon the retry algorithm such that we throw a retryable exception only if there is another task group pending publish for the partitions that the current task action is trying to update (in fact, let me try including that in this PR since we already have the new method isAnotherGroupPublishing).

Copy link
Contributor

@jtuglu1 jtuglu1 Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how many of these cases will actually be solved by extending retries. From what I've seen, 90%+ of the cases where B fails to commit are because task A also outright failed (not because it is slow to commit). In this case, we are prolonging the inevitable.

Copy link
Contributor Author

@kfaraz kfaraz Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a fair point.

I have updated SegmentTransactionalAppendAction and SegmentTransactionalInsertAction to return a retryable response only if there is a conflicting publish queued up. Otherwise, we will just fail the publish action and not retry anymore.

.values()
.stream()
.flatMap(Collection::stream)
.filter(group -> !group.equals(taskGroup))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskGroup doesn't have an override for the equals method. can we compare by group id or add an equals override? Or am I missing the point here where the reference equality is fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out!

Yeah, I didn't override the equals/hashCode on purpose since each TaskGroup object is supposed to be distinct. I will add a comment to that effect.

Also the equality filter in this method is just a safe-side measure since the taskGroup passed into this method will always be one of the activelyReadingTaskGroups and it will be compared against pendingCompletionTaskGroups. The two sets will always be distinct except when the target taskGroup is moved from activeReadingTaskGroups to pendingCompletionTaskGroups while this method is in progress (which should not happen either since the notices are all handled by the single-threaded exec).

Let me know if that makes sense or if I should update the logic or add more comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are intentionally using reference equality, it's best to use == to make the intent clear. This will also make it robust to someone possibly adding equals to TaskGroup in the future for some reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to update the equality check since method signature has also changed. I am now simply checking the set of taskIds in the TaskGroup against the target taskId.

@jtuglu1
Copy link
Contributor

jtuglu1 commented Mar 5, 2026

In the example above, how are we avoiding the case where we don't actually want to allow B to continue b/c it's using invalid offsets? For example, what if A fails to publish?

@kfaraz
Copy link
Contributor Author

kfaraz commented Mar 6, 2026

In the example above, how are we avoiding the case where we don't actually want to allow B to continue b/c it's using invalid offsets? For example, what if A fails to publish?

@jtuglu1 , that case is already handled in the existing code.

After A publishes to fail, the code flow moves into the else block and we check if the checkpoints of B are consistent with the persisted offsets or not. Since they wouldn't be consistent, B will be killed off and a new task group C will be created.

As long as A is pending publish though, we have to assume that it is going to succeed and continue with B as usual.

* A failed publish action should be retried only if there is another task
* waiting to publish offsets for an overlapping set of partitions.
*/
private boolean shouldNotRetryFailure(SegmentPublishResult result, Task task, TaskActionToolbox toolbox)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be clearer to call this shouldFailImmediately.

*/
private boolean shouldNotRetryFailure(SegmentPublishResult result, Task task, TaskActionToolbox toolbox)
{
if (result.isSuccess() || !result.isRetryable() || startMetadata != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this meant to be startMetadata == null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, thanks for catching this!

} else {
return -1;
}
// return count > 0 ? sum / count : -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't include commented-out code.

import java.util.Map;

/**
* Work in progress.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you intend to finish this before merging the PR.

Copy link
Contributor Author

@kfaraz kfaraz Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, still ironing out some kinks in the test.

@abhishekrb19
Copy link
Contributor

@kfaraz, thank you for fixing these ingestion errors: Inconsistency between stored metadata state [KafkaDataSourceMetadata]. We’ve seen these recurring from time to time in our production clusters as well, causing transient lag bursts, etc. A few questions:

  1. Is this fix and Fix recurring bug "Inconsistency between stored metadata" during auto-scaling #19034
    only applicable to auto-scaling, or are they broadly applicable when tasks roll over with task replicas enabled? We don’t have auto-scalers set up but still see these errors, so I’m wondering if these fixes would actually help us.

  2. We hardcoded MAX_RETRIES = 15 in TransactionalSegmentPublisher to account for slower publish times and occasional spurious task failures as a result of these metadata inconsistencies. It has been running in production for some time now. Some of the tables ingest a large volume of complex JSON objects, so we’d noticed slower handoff times and increasing this seemed to help to an extent.

Is it worth considering making this property configurable rather than hardcoding it to 13 in this patch? Alternatively, maybe it could be derived dynamically as a function of completionTimeout?

@kfaraz
Copy link
Contributor Author

kfaraz commented Mar 8, 2026

[1]

are they broadly applicable when tasks roll over with task replicas enabled?

Yes, @abhishekrb19 , the "Inconsistency between stored metadata" error can sometimes occur when two replicas try to publish at around the same time. The first replica is able to publish segments successfully, while the other one would fail to publish. However, this is not a retryable failure and the slow replica just ends up marking the publish as success.

log.info(
"Could not publish [%d] segments, but they have already been published by another task.",
ourSegments.size()
);

I assume you would be seeing more errors of the format below:

"The new start metadata state[%s] is ahead of the last committed end state[%s]. Try resetting the supervisor."

These are retryable errors which typically occur when publish is slow.
Please let me know if that is not the case.

[2]

Is it worth considering making this property configurable rather than hardcoding it to 13 in this patch? Alternatively, maybe it could be derived dynamically as a function of completionTimeout?

I am not really a fan of the current retry mechanism , since it requires the task to blindly retry without knowing how long it should wait. Exponential backoff retries make more sense when the issue is transient (and likely self-resolving). Slow publish, on the other hand, is somewhat predictable/repeatable behaviour and we should probably be able to incorporate that in the retry timeout.

I don't want to make the number of retries configurable just yet. Instead, tying it to completionTimeout probably makes more sense. On top of that, I think the task could submit an "async" task action which kind of works like a long poll. The supervisor could keep a queue of pending publish actions, and as soon as this one finishes, we send back a success response. The completionTimeout (or some function of it) would then simply be used as the request timeout for this async action. Thus, no retries needed at all.

I want to merge the current patch and verify if the isAnotherTaskGroupPublishing() method is effective in identifying if there are prior pending publish actions. If it works as expected, I think we can build upon it further to get the queue semantics on the supervisor.

@gianm , @jtuglu1 , what are your thoughts?

@kfaraz
Copy link
Contributor Author

kfaraz commented Mar 8, 2026

@gianm , @jtuglu1 , @capistrant , I have updated this PR with fixes for a couple of bugs that I discovered:

  1. CostBasedAutoScaler does not scale tasks if processing rate is zero, irrespective of lag. When both lag and processing rate are low, it would make sense to scale down.
  2. CostBasedAutoScaler scales down spuriously even when all candidates task counts have same cost. In such cases, leaving the task count unchanged is desirable.
  3. SeekableStreamSupervisorIOConfig did not honor the correct priority order of taskCountStart > taskCount > taskCountMin.
    • This meant that even after ioConfig was updated and supervisor was persisted, supervisor spec didn't actually updated.
    • We didn't encounter this typically since the supervisor spec was being served from Overlord memory.
    • But the supervisor history didn't work since it is served from DB.
    • Overlord leadership change would cause the taskCount to revert back to 1.

@gianm
Copy link
Contributor

gianm commented Mar 8, 2026

  • CostBasedAutoScaler scales down tasks if processing rate is zero, even if lag is high. (I guess this shouldn't happen in practice since if lag is high, tasks must be busy with some processing. Except when there is a bug and task threads are stuck somewhere.)

What'd you have it do in this situation? I suppose if processing rate is zero but lag is high, we shouldn't scale at all (either up or down). The combination of those two metrics suggests something is broken that scaling isn't going to fix.

  • SeekableStreamSupervisorIOConfig did not honor the correct priority order of taskCountStart > taskCount > taskCountMin

Huh, I thought we already changed that in #18745. Could you please also update the comment above the code block that currently says: // if autoscaler is enabled, then taskCount will be ignored.

Please also update docs/ingestion/supervisor.md. It says some things that are no longer accurate, such as saying in the docs for taskCountMin and taskCountStart that "When you enable the autoscaler, Druid ignores the value of taskCount".

I want to merge the current patch and verify if the isAnotherTaskGroupPublishing() method is effective in identifying if there are prior pending publish actions. If it works as expected, I think we can build upon it further to get the queue semantics on the supervisor.

@gianm , @jtuglu1 , what are your thoughts?

I'm ok with going ahead with this patch given that more work is coming in this area, and that hopefully the retryable flag is going to prevent us from retrying things that will never succeed. I also believe that it would be better to move away from retries completely. The supervisor should be able to do more to help tasks sequence their publishes. That would also fix the long-standing issue that can happen with short-lived tasks: if task A is publishing, and its replacement task B runs for a short period of time and then starts publishing, it can finish before A and fail with a mismatched offsets error.

@kfaraz
Copy link
Contributor Author

kfaraz commented Mar 9, 2026

Huh, I thought we already changed that in #18745. Could you please also update the comment above the code block that currently says: // if autoscaler is enabled, then taskCount will be ignored.

Yeah, apparently, we missed a case in SeekableStreamSupervisorIOConfig.

I also believe that it would be better to move away from retries completely. The supervisor should be able to do more to help tasks sequence their publishes.

Yes, agreed.

I suppose if processing rate is zero but lag is high, we shouldn't scale at all (either up or down). The combination of those two metrics suggests something is broken that scaling isn't going to fix.

Yeah, that sounds reasonable. Let me add that check and maybe raise an alert for the same.

CostBasedAutoScaler scales down tasks if processing rate is zero, even if lag is high. (I guess this shouldn't happen in practice since if lag is high, tasks must be busy with some processing. Except when there is a bug and task threads are stuck somewhere.)

What'd you have it do in this situation?

Oh, this was an old comment. I later rephrased it. The actual problem was 2-fold: (1) auto-scaler had a bug where it would do spurious scale downs even when cost was some for all task counts, and (2) auto-scaler did not take any action if processing rate was zero. Instead, it should have skipped scaling when processing rate was zero but lag was high.

I will update the check accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants