KAFKA-20396: Fix RocksDBStore initialization error handling#21946
Conversation
… during initialization
| } catch (final RocksDBException e) { | ||
| throw new ProcessorStateException("Error opening store " + name, e); | ||
| final String fatalMessage = "Error opening store " + name; | ||
| throw new TaskCorruptedException(Set.of(taskId), new ProcessorStateException(fatalMessage)); |
There was a problem hiding this comment.
Should misc errors wipe the stores too? 🤔
There was a problem hiding this comment.
I believe yes,
because RocksDB will throw a RocksDBException and stop the application from starting if the RocksDB directory is corrupted for any reason.
There was a problem hiding this comment.
I get your point here, but I'm wondering if we should check if EOS is enabled to throw the TaskCorruptedException. OK the ProcessorStateException is fatal and kills the app but what if it's transient error and the app is using AT_LEAST_ONCE and this was the pre-existing behavior. - but you're both using this in practice so let me know what you think
There was a problem hiding this comment.
The problem I see here is that if we don't throw a TaskCorruptedException the application will fail when bootstrapping, and if the error is not temporary, the application will never start. This is because we are now initializing the stores directly when the application instance is bootstrapping.
There was a problem hiding this comment.
True, but with RocksDBException is not from our code so we don't know what the error could be, I think that we should shutdown at that point.
There was a problem hiding this comment.
IIRC TaskCorruptedException will not cause stores to be wiped under ALOS; it just closes the task (dirty) and attempts to reassign/reopen later. Obviously, if this is not a transient corruption, it will not be able to continue, but I believe this is the case currently anyway.
There was a problem hiding this comment.
Obviously, if this is not a transient corruption, it will not be able to continue, but I believe this is the case currently anyway.
I agree. This is currently the case.
I made some changes so that we only throw a TaskCorruptedException if we can't find a valid state during initialization. If any other exception is caught, it just shuts down, stopping the instance from continuing.
|
|
||
| @SuppressWarnings("unchecked") | ||
| void openDB(final Map<String, Object> configs, final File stateDir) { | ||
| void openDB(final Map<String, Object> configs, final File stateDir, final TaskId taskId) { |
There was a problem hiding this comment.
This should be fine, because openDB is not part of any public API.
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR @eduwercamacaro! I've made a pass.
Regarding the chnages adding a TaskId parameter I'm thinking may we could consider storing TaskId as an instance field on RocksDBStore (set before openDB is called) rather than adding it as a parameter to openDB. This would avoid the signature change cascade through 5 segment subclass overrides, 7 callers, and multiple tests.
Since each RocksDBStore instance is unique per task — the ProcessorStateManager creates fresh store instances for each task, and segments are created per-task as well. So an instance field is safe with no risk of cross-task sharing or overwrites.
I think it could be set in init() before calling openDB(), and in each openSegmentDB() before calling segment.openDB() — both of which already have access to context.taskId(). Let me know what you think
|
|
||
| /** | ||
| * Initializes the ColumnFamily. | ||
| * @return the position of the store based on the data in the ColumnFamily. If no offset position is found, an empty position is returned. |
There was a problem hiding this comment.
We should change this to be @throws ProcessorStateException
| position.merge(existingPositionOrEmpty); | ||
| } | ||
| } catch (final StreamsException fatal) { | ||
| } catch (final ProcessorStateException fatal) { |
There was a problem hiding this comment.
Since we are changing what's caught here we should change the throws declaration below on line 1006
| } catch (final ProcessorStateException fatal) { | ||
| final String fatalMessage = "State store " + name + " didn't find a valid state, since under EOS it has the risk of getting uncommitted data in stores"; | ||
| throw new ProcessorStateException(fatalMessage, fatal); | ||
| throw new TaskCorruptedException(Set.of(taskId), new ProcessorStateException(fatalMessage, fatal)); |
There was a problem hiding this comment.
Do we want to check here for EOS enabled before throwing a TaskCorruptedException ? Or do we only reach this part when EOS is enabled
There was a problem hiding this comment.
Exactly.
We don't need to check again for EOS enabled because we only reach this part when EOS is enabled.
Actually, no, that's not true.
openRocksDB on line 251 could also throw a ProcessorStateException if there is an error when opening the store.
| } catch (final RocksDBException e) { | ||
| throw new ProcessorStateException("Error opening store " + name, e); | ||
| final String fatalMessage = "Error opening store " + name; | ||
| throw new TaskCorruptedException(Set.of(taskId), new ProcessorStateException(fatalMessage)); |
There was a problem hiding this comment.
I get your point here, but I'm wondering if we should check if EOS is enabled to throw the TaskCorruptedException. OK the ProcessorStateException is fatal and kills the app but what if it's transient error and the app is using AT_LEAST_ONCE and this was the pre-existing behavior. - but you're both using this in practice so let me know what you think
|
I've been able to confirm that with this PR all tests pass in the ColumnFamilyOffsetRecoveryIntegrationTest otherwise all of the EOS variants fail. I'll push a PR for the integration after this one is merged. |
@bbejeck Yes, that works too, and as you said, it is safe to put it as an instance variable because a RocksDBStore is unique per task. protected void openSegmentDB(final KeyValueSegment segment, final StateStoreContext context) {
segment.setTaskId(context.taskId());
segment.openDB(context.appConfigs(), context.stateDir());
}This approach requires looking for all of the openDB calls and making sure we always set the taskId before invoking this method. IMO, That opens the door for unexpected NullPointerExceptions. |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @eduwercamacaro for the follow-up! I've made another pass
| } catch (final RocksDBException e) { | ||
| throw new ProcessorStateException("Error opening store " + name, e); | ||
| final String fatalMessage = "Error opening store " + name; | ||
| throw new TaskCorruptedException(Set.of(taskId), new ProcessorStateException(fatalMessage)); |
There was a problem hiding this comment.
True, but with RocksDBException is not from our code so we don't know what the error could be, I think that we should shutdown at that point.
| position.merge(existingPositionOrEmpty); | ||
| } | ||
| } catch (final StreamsException fatal) { | ||
| } catch (final ProcessorStateException fatal) { |
There was a problem hiding this comment.
Since we are treating this as non-fatal by throwing the TaskCorruptedException maybe we should change the name here. But adding back the StreamsException block that one can be named fatal
| // For segmented stores, the overall position is composed of multiple underlying stores, so merge this store's position into it. | ||
| position.merge(existingPositionOrEmpty); | ||
| } | ||
| } catch (final StreamsException fatal) { |
There was a problem hiding this comment.
Thinking some more about this original StreamsException which was fatal I'm thinking we should add that back after catching the ProcessorStateException and keep the fatal behavior - much for similar reasons.
There was a problem hiding this comment.
@eduwercamacaro -- I agree with Bill on this. We should not remove this catch block, but only insert the new catch (final ProcessorStateException) before this existing catch-block
There was a problem hiding this comment.
Thanks for the feedback, I added back the fatal StreamsException block
@eduwercamacaro I believe Bill's suggestion is to set That eliminates the need to change the |
@nicktelford Yep. I get that. However, openDB is being called not only from the |
| position.merge(existingPositionOrEmpty); | ||
| } | ||
| } catch (final StreamsException fatal) { | ||
| } catch (final ProcessorStateException fatal) { |
There was a problem hiding this comment.
ProcessorStateException is not fatal -- we re-throw it as TaskCorruptedException and recover from it.
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @eduwercamacaro! LGTM.
Will merge after the build completes
|
Kicked off another system test with latests commits |
|
Merged #21946 into trunk |
Throw a TaskCorruptedException when the initialization phase encounters an invalid state and EOS is enabled. ProcesorStateManager already handles this exception by wiping the state store out. Also, added a new refactor to the testing class for segment-based stores, which makes it easier to test for corruption in these stores. Reviewers: Nick Telfford <nick.telford@gmail.com>, Bill Bejeck <bbejeck@apache.org>, Matthias Sax <mjsax@apache.org>
|
cherry-picked to 4.3 |
…1946) Throw a TaskCorruptedException when the initialization phase encounters an invalid state and EOS is enabled. ProcesorStateManager already handles this exception by wiping the state store out. Also, added a new refactor to the testing class for segment-based stores, which makes it easier to test for corruption in these stores. Reviewers: Nick Telfford <nick.telford@gmail.com>, Bill Bejeck <bbejeck@apache.org>, Matthias Sax <mjsax@apache.org>


Throw a TaskCorruptedException when the initialization phase encounters
an invalid state and EOS is enabled. ProcesorStateManager already
handles this exception by wiping the state store out.
Also, added a new refactor to the testing class for segment-based
stores, which makes it easier to test for corruption in these stores.
Reviewers: Nick Telfford nick.telford@gmail.com, Bill Bejeck
bbejeck@apache.org, Matthias Sax mjsax@apache.org