-
-
Notifications
You must be signed in to change notification settings - Fork 0
fix: allow using in select #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| # Reason for This PR | ||
|
|
||
| `[Author TODO: add issue # or explain reasoning.]` | ||
|
|
||
| ## Description of Changes | ||
|
|
||
| `[Author TODO: add description of changes.]` | ||
|
|
||
| ## License Acceptance | ||
|
|
||
| By submitting this pull request, I confirm that my contribution is made under | ||
| the terms of the MIT license. | ||
|
|
||
| ## PR Checklist | ||
|
|
||
| `[Author TODO: Meet these criteria.]` | ||
| `[Reviewer TODO: Verify that these criteria are met. Request changes if not]` | ||
|
|
||
| - [ ] All commits in this PR are signed (`git commit -s`). | ||
| - [ ] The reason for this PR is clearly provided (issue no. or explanation). | ||
| - [ ] The description of changes is clear and encompassing. | ||
| - [ ] Any required documentation changes (code and docs) are included in this PR. | ||
| - [ ] Any user-facing changes are mentioned in `CHANGELOG.md`. | ||
| - [ ] All added/changed functionality is tested. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ binary heap (min-heap) algorithm used as a core for the priority queue | |
| package priorityqueue | ||
|
|
||
| import ( | ||
| "context" | ||
| "sync" | ||
| "sync/atomic" | ||
| ) | ||
|
|
@@ -25,20 +26,30 @@ type BinHeap[T Item] struct { | |
| exists map[string]struct{} | ||
| st *stack | ||
| // find a way to use a pointer to the raw data | ||
| len uint64 | ||
| maxLen uint64 | ||
| cond sync.Cond | ||
| len uint64 | ||
| maxLen uint64 | ||
| cond sync.Cond | ||
| minCh chan T | ||
| stopCf context.CancelFunc | ||
| stopCtx context.Context | ||
| } | ||
|
|
||
| func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] { | ||
| return &BinHeap[T]{ | ||
| bh := &BinHeap[T]{ | ||
| items: make([]T, 0, 1000), | ||
| exists: make(map[string]struct{}, 1000), | ||
| st: newStack(), | ||
| len: 0, | ||
| maxLen: maxLen, | ||
| cond: sync.Cond{L: &sync.Mutex{}}, | ||
| minCh: make(chan T), | ||
| } | ||
|
|
||
| bh.stopCtx, bh.stopCf = context.WithCancel(context.Background()) | ||
|
|
||
| go bh.extractMin() | ||
|
|
||
| return bh | ||
| } | ||
|
|
||
| func (bh *BinHeap[T]) fixUp() { | ||
|
|
@@ -95,6 +106,10 @@ func (bh *BinHeap[T]) Exists(id string) bool { | |
| return false | ||
| } | ||
|
|
||
| func (bh *BinHeap[T]) Stop() { | ||
| bh.stopCf() | ||
| } | ||
|
Comment on lines
+109
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stop must wake the extractor before channel consumers hang. If func (bh *BinHeap[T]) Stop() {
- bh.stopCf()
+ bh.stopCf()
+ bh.cond.L.Lock()
+ bh.cond.Broadcast()
+ bh.cond.L.Unlock()
}
…
- for bh.Len() == 0 {
- bh.cond.Wait()
- }
+ for bh.Len() == 0 {
+ if bh.stopCtx.Err() != nil {
+ bh.cond.L.Unlock()
+ close(bh.minCh)
+ return
+ }
+ bh.cond.Wait()
+ }Also applies to: 207-209 🤖 Prompt for AI Agents |
||
|
|
||
| // Remove removes all elements with the provided ID and returns the slice with them | ||
| func (bh *BinHeap[T]) Remove(groupID string) []T { | ||
| bh.cond.L.Lock() | ||
|
|
@@ -112,13 +127,13 @@ func (bh *BinHeap[T]) Remove(groupID string) []T { | |
| } | ||
|
|
||
| ids := bh.st.Indices() | ||
| adjusment := 0 | ||
| adjustment := 0 | ||
| for i := range ids { | ||
| start := ids[i][0] - adjusment | ||
| end := ids[i][1] - adjusment | ||
| start := ids[i][0] - adjustment | ||
| end := ids[i][1] - adjustment | ||
|
|
||
| bh.items = append(bh.items[:start], bh.items[end+1:]...) | ||
| adjusment += end - start + 1 | ||
| adjustment += end - start + 1 | ||
| } | ||
|
|
||
| atomic.StoreUint64(&bh.len, uint64(len(bh.items))) | ||
|
|
@@ -177,26 +192,40 @@ func (bh *BinHeap[T]) Insert(item T) { | |
| bh.cond.Signal() | ||
| } | ||
|
|
||
| func (bh *BinHeap[T]) ExtractMin() T { | ||
| bh.cond.L.Lock() | ||
| // ExtractMinCh returns a channel to extract the minimum item | ||
| // We need this function to be able to use select statement and avoid blocking | ||
| // because ExtractMin is a blocking operation (on bh.cond.Wait()) | ||
| func (bh *BinHeap[T]) ExtractMinCh() <-chan T { | ||
| return bh.minCh | ||
| } | ||
|
rustatian marked this conversation as resolved.
|
||
|
|
||
| // if len == 0, wait for the signal | ||
| for bh.Len() == 0 { | ||
| bh.cond.Wait() | ||
| } | ||
| func (bh *BinHeap[T]) extractMin() { | ||
| for { | ||
| bh.cond.L.Lock() | ||
|
|
||
| bh.swap(0, bh.len-1) | ||
| // if len == 0, wait for the signal | ||
| for bh.Len() == 0 { | ||
| bh.cond.Wait() | ||
| } | ||
|
|
||
| item := (bh.items)[int(bh.len)-1] //nolint:gosec | ||
| bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec | ||
| bh.fixDown(0, int(bh.len-2)) //nolint:gosec | ||
| bh.swap(0, bh.len-1) | ||
|
|
||
| // reduce len | ||
| atomic.AddUint64(&bh.len, ^uint64(0)) | ||
| item := (bh.items)[int(bh.len)-1] //nolint:gosec | ||
| bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec | ||
| bh.fixDown(0, int(bh.len-2)) //nolint:gosec | ||
|
|
||
| // remove item | ||
| delete(bh.exists, item.ID()) | ||
| // reduce len | ||
| atomic.AddUint64(&bh.len, ^uint64(0)) | ||
|
|
||
| bh.cond.L.Unlock() | ||
| return item | ||
| // remove item | ||
| delete(bh.exists, item.ID()) | ||
| bh.cond.L.Unlock() | ||
| // send item to the channel | ||
| select { | ||
| case bh.minCh <- item: | ||
| case <-bh.stopCtx.Done(): | ||
| close(bh.minCh) | ||
| return | ||
| } | ||
|
Comment on lines
+211
to
+229
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don’t remove heap entries until a receiver is ready. This loop swaps, truncates, decrements |
||
| } | ||
|
Comment on lines
+202
to
+230
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stop draining the heap without a waiting receiver.
🤖 Prompt for AI Agents |
||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -76,8 +76,9 @@ func TestBinHeap_Init(t *testing.T) { | |||||||||||||||||||
|
|
||||||||||||||||||||
| res := make([]int64, 0, 12) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| itemCh := bh.ExtractMinCh() | ||||||||||||||||||||
| for range 11 { | ||||||||||||||||||||
| item := bh.ExtractMin() | ||||||||||||||||||||
| item := <-itemCh | ||||||||||||||||||||
| item.Priority() | ||||||||||||||||||||
| res = append(res, item.Priority()) | ||||||||||||||||||||
|
Comment on lines
+79
to
83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Background extractor is removing items prematurely. After the switch to |
||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -105,8 +106,9 @@ func TestBinHeap_MaxLen(t *testing.T) { | |||||||||||||||||||
| go func() { | ||||||||||||||||||||
| res := make([]Item, 0, 12) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| itemCh := bh.ExtractMinCh() | ||||||||||||||||||||
| for range 11 { | ||||||||||||||||||||
| item := bh.ExtractMin() | ||||||||||||||||||||
| item := <-itemCh | ||||||||||||||||||||
| res = append(res, item) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| require.Equal(t, 11, len(res)) | ||||||||||||||||||||
|
|
@@ -156,13 +158,17 @@ func TestNewPriorityQueue(t *testing.T) { | |||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| go func() { | ||||||||||||||||||||
| itemCh := pq.ExtractMinCh() | ||||||||||||||||||||
| for { | ||||||||||||||||||||
| select { | ||||||||||||||||||||
| case <-stopCh: | ||||||||||||||||||||
| return | ||||||||||||||||||||
| default: | ||||||||||||||||||||
| pq.ExtractMin() | ||||||||||||||||||||
| case item, cl := <-itemCh: | ||||||||||||||||||||
| if !cl { | ||||||||||||||||||||
| return | ||||||||||||||||||||
| } | ||||||||||||||||||||
| atomic.AddUint64(&getPerSec, 1) | ||||||||||||||||||||
| require.NotNil(t, item) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
@@ -180,10 +186,18 @@ func TestNewPriorityQueue(t *testing.T) { | |||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| time.Sleep(time.Second * 5) | ||||||||||||||||||||
| stopCh <- struct{}{} | ||||||||||||||||||||
| stopCh <- struct{}{} | ||||||||||||||||||||
| stopCh <- struct{}{} | ||||||||||||||||||||
| stopCh <- struct{}{} | ||||||||||||||||||||
|
|
||||||||||||||||||||
| pq.Stop() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| for range 4 { | ||||||||||||||||||||
| select { | ||||||||||||||||||||
| case stopCh <- struct{}{}: | ||||||||||||||||||||
| default: | ||||||||||||||||||||
| // it might happed that we already exited from one of the goroutines | ||||||||||||||||||||
| // by the signal from the PQ.Stop() method | ||||||||||||||||||||
| continue | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func TestNewItemWithTimeout(t *testing.T) { | ||||||||||||||||||||
|
|
@@ -212,7 +226,8 @@ func TestNewItemWithTimeout(t *testing.T) { | |||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| tn := time.Now() | ||||||||||||||||||||
| item := bh.ExtractMin() | ||||||||||||||||||||
| itemCh := bh.ExtractMinCh() | ||||||||||||||||||||
| item := <-itemCh | ||||||||||||||||||||
| assert.Equal(t, int64(5), item.Priority()) | ||||||||||||||||||||
| assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) | ||||||||||||||||||||
|
Comment on lines
+229
to
232
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix inverted timing assertion. The assertion logic is inverted. Line 221 checks that Apply this diff: tn := time.Now()
itemCh := bh.ExtractMinCh()
item := <-itemCh
assert.Equal(t, int64(5), item.Priority())
- assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
+ assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -233,7 +248,7 @@ func TestItemPeek(t *testing.T) { | |||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /* | ||||||||||||||||||||
| first item should be extracted not less than 5 seconds after we call ExtractMin | ||||||||||||||||||||
| the first item should be extracted not less than 5 seconds after we call ExtractMin | ||||||||||||||||||||
| 5 seconds is a minimum timeout for our items | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| bh := NewBinHeap[Item](100) | ||||||||||||||||||||
|
|
@@ -246,7 +261,7 @@ func TestItemPeek(t *testing.T) { | |||||||||||||||||||
| assert.Equal(t, int64(5), tmp) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| tn := time.Now() | ||||||||||||||||||||
| item := bh.ExtractMin() | ||||||||||||||||||||
| item := <-bh.ExtractMinCh() | ||||||||||||||||||||
| assert.Equal(t, int64(5), item.Priority()) | ||||||||||||||||||||
| assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) | ||||||||||||||||||||
|
Comment on lines
+264
to
266
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix inverted timing assertion. The assertion has the same inversion issue as Apply this diff: tn := time.Now()
item := <-bh.ExtractMinCh()
assert.Equal(t, int64(5), item.Priority())
- assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
+ assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -289,7 +304,7 @@ func TestItemPeekConcurrent(t *testing.T) { | |||||||||||||||||||
| go func() { | ||||||||||||||||||||
| defer wg.Done() | ||||||||||||||||||||
| for range 11 { | ||||||||||||||||||||
| min := bh.ExtractMin() | ||||||||||||||||||||
| min := <-bh.ExtractMinCh() | ||||||||||||||||||||
| _ = min | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
@@ -340,7 +355,7 @@ func TestBinHeap_Remove(t *testing.T) { | |||||||||||||||||||
| res := make([]Item, 0, 12) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| for range 5 { | ||||||||||||||||||||
| item := bh.ExtractMin() | ||||||||||||||||||||
| item := <-bh.ExtractMinCh() | ||||||||||||||||||||
| res = append(res, item) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goroutine started in the constructor has no mechanism to be stopped, which could lead to goroutine leaks. Consider adding a context or stop channel to allow proper cleanup of this background goroutine.