Skip to content

KAFKA-20396: Fix RocksDBStore initialization error handling#21946

Merged
bbejeck merged 5 commits intoapache:trunkfrom
eduwercamacaro:throw-task-corrupted-exception-on-invalid-rocksdb-state
Apr 7, 2026
Merged

KAFKA-20396: Fix RocksDBStore initialization error handling#21946
bbejeck merged 5 commits intoapache:trunkfrom
eduwercamacaro:throw-task-corrupted-exception-on-invalid-rocksdb-state

Conversation

@eduwercamacaro
Copy link
Copy Markdown
Contributor

@eduwercamacaro eduwercamacaro commented Apr 2, 2026

Throw a TaskCorruptedException when the initialization phase encounters
an invalid state and EOS is enabled. ProcesorStateManager already
handles this exception by wiping the state store out.

Also, added a new refactor to the testing class for segment-based
stores, which makes it easier to test for corruption in these stores.

Reviewers: Nick Telfford nick.telford@gmail.com, Bill Bejeck
bbejeck@apache.org, Matthias Sax mjsax@apache.org

@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels Apr 2, 2026
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
final String fatalMessage = "Error opening store " + name;
throw new TaskCorruptedException(Set.of(taskId), new ProcessorStateException(fatalMessage));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should misc errors wipe the stores too? 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe yes,
because RocksDB will throw a RocksDBException and stop the application from starting if the RocksDB directory is corrupted for any reason.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point here, but I'm wondering if we should check if EOS is enabled to throw the TaskCorruptedException. OK the ProcessorStateException is fatal and kills the app but what if it's transient error and the app is using AT_LEAST_ONCE and this was the pre-existing behavior. - but you're both using this in practice so let me know what you think

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I see here is that if we don't throw a TaskCorruptedException the application will fail when bootstrapping, and if the error is not temporary, the application will never start. This is because we are now initializing the stores directly when the application instance is bootstrapping.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but with RocksDBException is not from our code so we don't know what the error could be, I think that we should shutdown at that point.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC TaskCorruptedException will not cause stores to be wiped under ALOS; it just closes the task (dirty) and attempts to reassign/reopen later. Obviously, if this is not a transient corruption, it will not be able to continue, but I believe this is the case currently anyway.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously, if this is not a transient corruption, it will not be able to continue, but I believe this is the case currently anyway.

I agree. This is currently the case.

I made some changes so that we only throw a TaskCorruptedException if we can't find a valid state during initialization. If any other exception is caught, it just shuts down, stopping the instance from continuing.


@SuppressWarnings("unchecked")
void openDB(final Map<String, Object> configs, final File stateDir) {
void openDB(final Map<String, Object> configs, final File stateDir, final TaskId taskId) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine, because openDB is not part of any public API.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @eduwercamacaro! I've made a pass.

Regarding the chnages adding a TaskId parameter I'm thinking may we could consider storing TaskId as an instance field on RocksDBStore (set before openDB is called) rather than adding it as a parameter to openDB. This would avoid the signature change cascade through 5 segment subclass overrides, 7 callers, and multiple tests.

Since each RocksDBStore instance is unique per task — the ProcessorStateManager creates fresh store instances for each task, and segments are created per-task as well. So an instance field is safe with no risk of cross-task sharing or overwrites.

I think it could be set in init() before calling openDB(), and in each openSegmentDB() before calling segment.openDB() — both of which already have access to context.taskId(). Let me know what you think


/**
* Initializes the ColumnFamily.
* @return the position of the store based on the data in the ColumnFamily. If no offset position is found, an empty position is returned.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this to be @throws ProcessorStateException

position.merge(existingPositionOrEmpty);
}
} catch (final StreamsException fatal) {
} catch (final ProcessorStateException fatal) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are changing what's caught here we should change the throws declaration below on line 1006

} catch (final ProcessorStateException fatal) {
final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores";
throw new ProcessorStateException(fatalMessage, fatal);
throw new TaskCorruptedException(Set.of(taskId), new ProcessorStateException(fatalMessage, fatal));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to check here for EOS enabled before throwing a TaskCorruptedException ? Or do we only reach this part when EOS is enabled

Copy link
Copy Markdown
Contributor Author

@eduwercamacaro eduwercamacaro Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.
We don't need to check again for EOS enabled because we only reach this part when EOS is enabled.

Actually, no, that's not true.

openRocksDB on line 251 could also throw a ProcessorStateException if there is an error when opening the store.

} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
final String fatalMessage = "Error opening store " + name;
throw new TaskCorruptedException(Set.of(taskId), new ProcessorStateException(fatalMessage));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point here, but I'm wondering if we should check if EOS is enabled to throw the TaskCorruptedException. OK the ProcessorStateException is fatal and kills the app but what if it's transient error and the app is using AT_LEAST_ONCE and this was the pre-existing behavior. - but you're both using this in practice so let me know what you think

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 2, 2026

I've been able to confirm that with this PR all tests pass in the ColumnFamilyOffsetRecoveryIntegrationTest otherwise all of the EOS variants fail.

I'll push a PR for the integration after this one is merged.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 2, 2026

System tests passes

system_tests_passing_21946

@github-actions github-actions bot removed the triage PRs from the community label Apr 3, 2026
@eduwercamacaro
Copy link
Copy Markdown
Contributor Author

Thanks for the PR @eduwercamacaro! I've made a pass.

Regarding the chnages adding a TaskId parameter I'm thinking may we could consider storing TaskId as an instance field on RocksDBStore (set before openDB is called) rather than adding it as a parameter to openDB. This would avoid the signature change cascade through 5 segment subclass overrides, 7 callers, and multiple tests.

Since each RocksDBStore instance is unique per task — the ProcessorStateManager creates fresh store instances for each task, and segments are created per-task as well. So an instance field is safe with no risk of cross-task sharing or overwrites.

I think it could be set in init() before calling openDB(), and in each openSegmentDB() before calling segment.openDB() — both of which already have access to context.taskId(). Let me know what you think

@bbejeck Yes, that works too, and as you said, it is safe to put it as an instance variable because a RocksDBStore is unique per task.
However, I did it in this way because passing the taskId as an argument for the openDB method is less error-prone, for we don't rely on the caller of this method to set the taskId. If I understand your proposal correctly, openSegmentDB() would look like this:

    protected void openSegmentDB(final KeyValueSegment segment, final StateStoreContext context) {
        segment.setTaskId(context.taskId());
        segment.openDB(context.appConfigs(), context.stateDir());
    }

This approach requires looking for all of the openDB calls and making sure we always set the taskId before invoking this method. IMO, That opens the door for unexpected NullPointerExceptions.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @eduwercamacaro for the follow-up! I've made another pass

} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name, e);
final String fatalMessage = "Error opening store " + name;
throw new TaskCorruptedException(Set.of(taskId), new ProcessorStateException(fatalMessage));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but with RocksDBException is not from our code so we don't know what the error could be, I think that we should shutdown at that point.

position.merge(existingPositionOrEmpty);
}
} catch (final StreamsException fatal) {
} catch (final ProcessorStateException fatal) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are treating this as non-fatal by throwing the TaskCorruptedException maybe we should change the name here. But adding back the StreamsException block that one can be named fatal

// For segmented stores, the overall position is composed of multiple underlying stores, so merge this store's position into it.
position.merge(existingPositionOrEmpty);
}
} catch (final StreamsException fatal) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking some more about this original StreamsException which was fatal I'm thinking we should add that back after catching the ProcessorStateException and keep the fatal behavior - much for similar reasons.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eduwercamacaro -- I agree with Bill on this. We should not remove this catch block, but only insert the new catch (final ProcessorStateException) before this existing catch-block

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, I added back the fatal StreamsException block

@nicktelford
Copy link
Copy Markdown
Contributor

This approach requires looking for all of the openDB calls and making sure we always set the taskId before invoking this method. IMO, That opens the door for unexpected NullPointerExceptions.

@eduwercamacaro I believe Bill's suggestion is to set this.taskId = stateStoreContext.taskId() in RocksDBStore#init, before it calls openDB. This ensures that all overriding implementations will receive the TaskId automatically, because the only override of init always calls super.init.

That eliminates the need to change the openDB signature and update all the overrides of it, simplifying your changes.

@eduwercamacaro
Copy link
Copy Markdown
Contributor Author

This approach requires looking for all of the openDB calls and making sure we always set the taskId before invoking this method. IMO, That opens the door for unexpected NullPointerExceptions.

@eduwercamacaro I believe Bill's suggestion is to set this.taskId = stateStoreContext.taskId() in RocksDBStore#init, before it calls openDB. This ensures that all overriding implementations will receive the TaskId automatically, because the only override of init always calls super.init.

That eliminates the need to change the openDB signature and update all the overrides of it, simplifying your changes.

@nicktelford Yep. I get that.

However, openDB is being called not only from the init method but also from the openSegment. I'm not a fan of that, but we can do it in that way then.

position.merge(existingPositionOrEmpty);
}
} catch (final StreamsException fatal) {
} catch (final ProcessorStateException fatal) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessorStateException is not fatal -- we re-throw it as TaskCorruptedException and recover from it.

@github-actions github-actions bot removed the small Small PRs label Apr 7, 2026
Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @eduwercamacaro! LGTM.
Will merge after the build completes

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 7, 2026

Kicked off another system test with latests commits

System tests passed
eduwer_system_test_run_all_commits

@bbejeck bbejeck merged commit 794d2b6 into apache:trunk Apr 7, 2026
25 checks passed
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 7, 2026

Merged #21946 into trunk

bbejeck pushed a commit that referenced this pull request Apr 7, 2026
Throw a TaskCorruptedException when the initialization phase encounters
an invalid state and EOS is enabled. ProcesorStateManager already
handles this exception by wiping the state store out.

Also, added a new refactor to the testing class for segment-based
stores, which makes it easier to test for corruption in these stores.

Reviewers: Nick Telfford <nick.telford@gmail.com>,  Bill Bejeck
 <bbejeck@apache.org>, Matthias Sax <mjsax@apache.org>
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 7, 2026

cherry-picked to 4.3

nileshkumar3 pushed a commit to nileshkumar3/kafka that referenced this pull request Apr 15, 2026
…1946)

Throw a TaskCorruptedException when the initialization phase encounters
an invalid state and EOS is enabled. ProcesorStateManager already
handles this exception by wiping the state store out.

Also, added a new refactor to the testing class for segment-based
stores, which makes it easier to test for corruption in these stores.

Reviewers: Nick Telfford <nick.telford@gmail.com>,  Bill Bejeck
 <bbejeck@apache.org>, Matthias Sax <mjsax@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants