-
Notifications
You must be signed in to change notification settings - Fork 862
Refactor changelog to generic WAL #2671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2671 +/- ##
==========================================
+ Coverage 43.68% 43.76% +0.08%
==========================================
Files 1913 1913
Lines 159450 159492 +42
==========================================
+ Hits 69650 69798 +148
+ Misses 83393 83277 -116
- Partials 6407 6417 +10
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
masih
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this Yiming! Before a full review can I ask you to fix all the ignored errors across the PR please?
I also recommend avoiding the unnecessary package nesting, or the word "generic" in the package name. A typical package structure in Go is typically flatter and less hierarchical than languages like Java. I recommend avoiding common package naming like types because there are already many such packages across the codebase and working with the package name conflicts would require alias, which can lead to inconsistent alias naming and ultimately higher cognitive cycles when reading the code.
Great suggestion! Will look into the naming here |
| t.UpdateCommitInfo() | ||
| replayElapsed := time.Since(startTime).Seconds() | ||
| t.logger.Info(fmt.Sprintf("Total replayed %d entries in %.1fs (%.1f entries/sec).\n", | ||
| replayCount, replayElapsed, float64(replayCount)/replayElapsed)) |
Check notice
Code scanning / CodeQL
Floating point arithmetic Note
* main: Fix integration tests to run on release branch and clean up rules (#2696)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main blockers:
- Potential resource leakage.
- Concurrent use safety of WAL
Other than those I left a bunch of feedback across the code
Thanks for working on this @yzang2019 ! 🍻 This is nearly there
sei-db/wal/wal_test.go
Outdated
|
|
||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| os.WriteFile(filepath.Join(dir, "00000000000000000001"), tc.logs, 0o600) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert no error?
sei-db/wal/wal_test.go
Outdated
|
|
||
| func TestTruncateAfter(t *testing.T) { | ||
| changelog := prepareTestData(t) | ||
| defer changelog.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert no error, i.e. graceful closure? Ditto for the rest of the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed all
| } | ||
|
|
||
| func TestLogPath(t *testing.T) { | ||
| path := LogPath("/some/dir") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "util" LogPath is only used once. It adds an additional 3 lines to the codebase and makes the reader jump into another file to follow what the code does. This is another example of over-refactoring that hardly adds any benefit at all I'm afraid.
| go func() { | ||
| defer walLog.wg.Done() | ||
| for entry := range ch { | ||
| bz, err := walLog.marshal(entry) | ||
| if err != nil { | ||
| walLog.recordAsyncWriteErr(err) | ||
| return | ||
| } | ||
| nextOffset, err := walLog.NextOffset() | ||
| if err != nil { | ||
| walLog.recordAsyncWriteErr(err) | ||
| return | ||
| } | ||
| err = walLog.log.Write(nextOffset, bz) | ||
| if err != nil { | ||
| walLog.recordAsyncWriteErr(err) | ||
| return | ||
| } | ||
|
|
||
| } | ||
| }() |
Check notice
Code scanning / CodeQL
Spawning a Go routine Note
| go func() { | ||
| defer walLog.wg.Done() | ||
| ticker := time.NewTicker(pruneInterval) | ||
| defer ticker.Stop() | ||
| for { | ||
| select { | ||
| case <-walLog.closeCh: | ||
| return | ||
| case <-ticker.C: | ||
| lastIndex, err := walLog.log.LastIndex() | ||
| if err != nil { | ||
| walLog.logger.Error("failed to get last index for pruning", "err", err) | ||
| continue | ||
| } | ||
| firstIndex, err := walLog.log.FirstIndex() | ||
| if err != nil { | ||
| walLog.logger.Error("failed to get first index for pruning", "err", err) | ||
| continue | ||
| } | ||
| if lastIndex > keepRecent && (lastIndex-keepRecent) > firstIndex { | ||
| prunePos := lastIndex - keepRecent | ||
| if err := walLog.TruncateBefore(prunePos); err != nil { | ||
| walLog.logger.Error(fmt.Sprintf("failed to prune changelog till index %d", prunePos), "err", err) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }() |
Check notice
Code scanning / CodeQL
Spawning a Go routine Note
|
LGTM. not a blocker, but some thoughts on adding test coverage: Concurrency testing:
Isolation testing:
|
Make sense for concurrency point, for isolation I dont think we need to test that much, since different WAL in different folder should be fully isolated, so I dont think there's any concern around that |
* main: fix: lthash worker loop break; remove unreachable digest.Read fallback (#2698)
masih
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No obvious blockers that I can see.
I see you have chosen to ignore the note on the resource leak. Please remember to continuously check for usage of those constructors, because not explicitly cleaning up things would leave the correctness to chance - the correctness can rot by accident.
| t.Fatalf("Open: %v", err) | ||
| } | ||
| defer func() { _ = db.Close() }() | ||
| t.Cleanup(func() { require.NoError(t, db.Close()) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small note on t.Cleanup vs defer: t.Cleanup is executed after the test belonging to t ends. Whereas, defer is executed when the function returns.
We do not always want one or the other. For example, in a test helper function we usually want t.Cleanup if it creates resources for the tests that are done after function returns.
In this specific case and the remaining changes here it makes no difference in using one or the other. But I thought I point out the difference so that we are careful not to always use either of those.
| streamHandler, err := changelog.NewStream(logger, utils.GetChangelogPath(opts.Dir), changelog.Config{ | ||
| // MemIAVL owns changelog lifecycle: always open the WAL here. | ||
| // Even in read-only mode we may need WAL replay to reconstruct non-snapshot versions. | ||
| streamHandler, err := wal.NewChangelogWAL(logger, utils.GetChangelogPath(opts.Dir), wal.Config{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far probably for the current implementation?
There is one way to make sure resource leaks cannot happen no matter how the constructor is used: On partial instantiation, the constructor should clean up after itself.
| tree.WaitToCompleteAsyncWrite() | ||
| } | ||
|
|
||
| if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but what async process?
Looking at the code the entire operation is executed sequentially. We just happen to pass "process" as a function. Right?
Oh actually I didn't ignore that, but the new change doesn't throw any error i WAL open function because we are not checking lastOffset any more, so it won't have that issue. Haven't really checked other places such as MemIAVL open function though |
Describe your changes and provide context
Purpose of this PR:
Testing performed to validate your change
Added and modified unit test