Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/ISSUE_TEMPLATE/.github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Reason for This PR

`[Author TODO: add issue # or explain reasoning.]`

## Description of Changes

`[Author TODO: add description of changes.]`

## License Acceptance

By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.

## PR Checklist

`[Author TODO: Meet these criteria.]`
`[Reviewer TODO: Verify that these criteria are met. Request changes if not]`

- [ ] All commits in this PR are signed (`git commit -s`).
- [ ] The reason for this PR is clearly provided (issue no. or explanation).
- [ ] The description of changes is clear and encompassing.
- [ ] Any required documentation changes (code and docs) are included in this PR.
- [ ] Any user-facing changes are mentioned in `CHANGELOG.md`.
- [ ] All added/changed functionality is tested.
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
fail-fast: false
matrix:
go: [stable]
os: ["ubuntu-latest", "macos-latest"]
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v6 # action page: <https://github.com/actions/setup-go>
Expand Down
77 changes: 53 additions & 24 deletions binary_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ binary heap (min-heap) algorithm used as a core for the priority queue
package priorityqueue

import (
"context"
"sync"
"sync/atomic"
)
Expand All @@ -25,20 +26,30 @@ type BinHeap[T Item] struct {
exists map[string]struct{}
st *stack
// find a way to use a pointer to the raw data
len uint64
maxLen uint64
cond sync.Cond
len uint64
maxLen uint64
cond sync.Cond
minCh chan T
stopCf context.CancelFunc
stopCtx context.Context
}

func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] {
return &BinHeap[T]{
bh := &BinHeap[T]{
items: make([]T, 0, 1000),
exists: make(map[string]struct{}, 1000),
st: newStack(),
len: 0,
maxLen: maxLen,
cond: sync.Cond{L: &sync.Mutex{}},
minCh: make(chan T),
}

bh.stopCtx, bh.stopCf = context.WithCancel(context.Background())

go bh.extractMin()
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goroutine started in the constructor has no mechanism to be stopped, which could lead to goroutine leaks. Consider adding a context or stop channel to allow proper cleanup of this background goroutine.

Copilot uses AI. Check for mistakes.

return bh
}

func (bh *BinHeap[T]) fixUp() {
Expand Down Expand Up @@ -95,6 +106,10 @@ func (bh *BinHeap[T]) Exists(id string) bool {
return false
}

func (bh *BinHeap[T]) Stop() {
bh.stopCf()
}
Comment on lines +109 to +111
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Stop must wake the extractor before channel consumers hang.

If Stop() is called while the heap is empty, extractMin() stays blocked on cond.Wait() because the loop never checks stopCtx and Stop() never signals the condition. As a result the goroutine leaks, minCh is never closed, and any consumer waiting in a select will block forever. Please broadcast the condition inside Stop() and short-circuit the wait loop when the context is canceled.

 func (bh *BinHeap[T]) Stop() {
-	bh.stopCf()
+	bh.stopCf()
+	bh.cond.L.Lock()
+	bh.cond.Broadcast()
+	bh.cond.L.Unlock()
 }
 …
-		for bh.Len() == 0 {
-			bh.cond.Wait()
-		}
+		for bh.Len() == 0 {
+			if bh.stopCtx.Err() != nil {
+				bh.cond.L.Unlock()
+				close(bh.minCh)
+				return
+			}
+			bh.cond.Wait()
+		}

Also applies to: 207-209

🤖 Prompt for AI Agents
In binary_heap.go around lines 109-111 (and similarly 207-209), Stop() currently
only cancels the context but does not wake any goroutine waiting on the
condition, so extractMin() can block on cond.Wait() forever; modify Stop() to
call bh.stopCf() and then bh.cond.Broadcast() (or Signal() if only one waiter)
so any waiting extractor wakes, and update extractMin()'s wait loop to
short-circuit when the stop context is canceled by checking bh.stopCtx.Done() or
bh.stopCtx.Err() after waking (break/return to allow minCh to be closed) instead
of waiting indefinitely; apply the same broadcast + stop-check fix to the other
Stop() occurrence at lines 207-209.


// Remove removes all elements with the provided ID and returns the slice with them
func (bh *BinHeap[T]) Remove(groupID string) []T {
bh.cond.L.Lock()
Expand All @@ -112,13 +127,13 @@ func (bh *BinHeap[T]) Remove(groupID string) []T {
}

ids := bh.st.Indices()
adjusment := 0
adjustment := 0
for i := range ids {
start := ids[i][0] - adjusment
end := ids[i][1] - adjusment
start := ids[i][0] - adjustment
end := ids[i][1] - adjustment

bh.items = append(bh.items[:start], bh.items[end+1:]...)
adjusment += end - start + 1
adjustment += end - start + 1
}

atomic.StoreUint64(&bh.len, uint64(len(bh.items)))
Expand Down Expand Up @@ -177,26 +192,40 @@ func (bh *BinHeap[T]) Insert(item T) {
bh.cond.Signal()
}

func (bh *BinHeap[T]) ExtractMin() T {
bh.cond.L.Lock()
// ExtractMinCh returns a channel to extract the minimum item
// We need this function to be able to use select statement and avoid blocking
// because ExtractMin is a blocking operation (on bh.cond.Wait())
func (bh *BinHeap[T]) ExtractMinCh() <-chan T {
return bh.minCh
}
Comment thread
rustatian marked this conversation as resolved.

// if len == 0, wait for the signal
for bh.Len() == 0 {
bh.cond.Wait()
}
func (bh *BinHeap[T]) extractMin() {
for {
bh.cond.L.Lock()

bh.swap(0, bh.len-1)
// if len == 0, wait for the signal
for bh.Len() == 0 {
bh.cond.Wait()
}

item := (bh.items)[int(bh.len)-1] //nolint:gosec
bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec
bh.fixDown(0, int(bh.len-2)) //nolint:gosec
bh.swap(0, bh.len-1)

// reduce len
atomic.AddUint64(&bh.len, ^uint64(0))
item := (bh.items)[int(bh.len)-1] //nolint:gosec
bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec
bh.fixDown(0, int(bh.len-2)) //nolint:gosec

// remove item
delete(bh.exists, item.ID())
// reduce len
atomic.AddUint64(&bh.len, ^uint64(0))

bh.cond.L.Unlock()
return item
// remove item
delete(bh.exists, item.ID())
bh.cond.L.Unlock()
// send item to the channel
select {
case bh.minCh <- item:
case <-bh.stopCtx.Done():
close(bh.minCh)
return
}
Comment on lines +211 to +229
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t remove heap entries until a receiver is ready.

This loop swaps, truncates, decrements len, and deletes from exists before confirming that anyone is ready to receive from minCh. When no consumer is listening (the common case in tests such as TestBinHeap_Remove and TestExists), the goroutine still drains the heap and the affected item disappears from Remove/Exists queries while the send remains blocked. That’s exactly why the pipeline is failing (TestBinHeap_Remove: should be 5, TestExists: Should be true). Please rework the extraction so the heap state is mutated only after a corresponding receive succeeds (e.g., guard the removal behind a handshake/pending buffer, or otherwise delay the swap/truncate/delete until the channel send completes).

}
Comment on lines +202 to +230
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Stop draining the heap without a waiting receiver.

extractMin keeps popping items and pushing them into minCh even when nobody is receiving. That drains the heap ahead of consumers, so invariants like PeekPriority and Exists observe already-removed elements. The failing pipeline tests (TestItemPeek expecting 5 but seeing 6, and TestExists) demonstrate the breakage. We need to couple the removal with an actual receiver—e.g. block on the send before mutating the heap, or gate the pop behind an explicit request path—so the heap state only advances when a consumer is ready.

🤖 Prompt for AI Agents
In binary_heap.go around lines 192 to 215, extractMin currently removes items
and then sends them to minCh, which lets the heap be drained ahead of receivers;
instead, change the sequence so you peek the minimum item first and perform the
blocking send to bh.minCh while still holding bh.cond.L, and only after the send
succeeds mutate the heap (swap/remove/fixDown/atomic len decrement/delete from
exists) and then unlock; this couples removal with an actual receiver and
prevents advancing heap state before a consumer is ready.

}
41 changes: 28 additions & 13 deletions binary_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ func TestBinHeap_Init(t *testing.T) {

res := make([]int64, 0, 12)

itemCh := bh.ExtractMinCh()
for range 11 {
item := bh.ExtractMin()
item := <-itemCh
item.Priority()
res = append(res, item.Priority())
Comment on lines +79 to 83
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Background extractor is removing items prematurely.

After the switch to ExtractMinCh, the background goroutine is now popping the min element even when nobody is reading from the channel, so tests that only call Exists (see the TestExists failure in CI) lose items immediately. The extractor must block before removing from the heap (e.g., coordinate on demand or buffer the request, not the item) so items remain present until a consumer is ready.

}
Expand Down Expand Up @@ -105,8 +106,9 @@ func TestBinHeap_MaxLen(t *testing.T) {
go func() {
res := make([]Item, 0, 12)

itemCh := bh.ExtractMinCh()
for range 11 {
item := bh.ExtractMin()
item := <-itemCh
res = append(res, item)
}
require.Equal(t, 11, len(res))
Expand Down Expand Up @@ -156,13 +158,17 @@ func TestNewPriorityQueue(t *testing.T) {
}()

go func() {
itemCh := pq.ExtractMinCh()
for {
select {
case <-stopCh:
return
default:
pq.ExtractMin()
case item, cl := <-itemCh:
if !cl {
return
}
atomic.AddUint64(&getPerSec, 1)
require.NotNil(t, item)
}
}
}()
Expand All @@ -180,10 +186,18 @@ func TestNewPriorityQueue(t *testing.T) {
}()

time.Sleep(time.Second * 5)
stopCh <- struct{}{}
stopCh <- struct{}{}
stopCh <- struct{}{}
stopCh <- struct{}{}

pq.Stop()

for range 4 {
select {
case stopCh <- struct{}{}:
default:
// it might happed that we already exited from one of the goroutines
// by the signal from the PQ.Stop() method
continue
}
}
}

func TestNewItemWithTimeout(t *testing.T) {
Expand Down Expand Up @@ -212,7 +226,8 @@ func TestNewItemWithTimeout(t *testing.T) {
}

tn := time.Now()
item := bh.ExtractMin()
itemCh := bh.ExtractMinCh()
item := <-itemCh
assert.Equal(t, int64(5), item.Priority())
assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
Comment on lines +229 to 232
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix inverted timing assertion.

The assertion logic is inverted. Line 221 checks that 5 >= elapsed_time, but the comment on line 208 and test intent require that the elapsed time be at least 5 seconds.

Apply this diff:

 	tn := time.Now()
 	itemCh := bh.ExtractMinCh()
 	item := <-itemCh
 	assert.Equal(t, int64(5), item.Priority())
-	assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
+	assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
itemCh := bh.ExtractMinCh()
item := <-itemCh
assert.Equal(t, int64(5), item.Priority())
assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
tn := time.Now()
itemCh := bh.ExtractMinCh()
item := <-itemCh
assert.Equal(t, int64(5), item.Priority())
assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))
🤖 Prompt for AI Agents
In binary_heap_test.go around lines 218 to 221, the timing assertion is
inverted: it currently asserts 5 >= elapsed, but the test expects elapsed >= 5
seconds. Replace the assertion so it checks time.Since(tn).Seconds() is greater
than or equal to 5 (e.g., call assert.GreaterOrEqual(t,
time.Since(tn).Seconds(), float64(5))) to ensure the elapsed time is at least 5
seconds.

}
Expand All @@ -233,7 +248,7 @@ func TestItemPeek(t *testing.T) {
}

/*
first item should be extracted not less than 5 seconds after we call ExtractMin
the first item should be extracted not less than 5 seconds after we call ExtractMin
5 seconds is a minimum timeout for our items
*/
bh := NewBinHeap[Item](100)
Expand All @@ -246,7 +261,7 @@ func TestItemPeek(t *testing.T) {
assert.Equal(t, int64(5), tmp)

tn := time.Now()
item := bh.ExtractMin()
item := <-bh.ExtractMinCh()
assert.Equal(t, int64(5), item.Priority())
assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
Comment on lines +264 to 266
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix inverted timing assertion.

The assertion has the same inversion issue as TestNewItemWithTimeout. Line 255 checks that 5 >= elapsed_time, but should verify that elapsed time is at least 5 seconds.

Apply this diff:

 	tn := time.Now()
 	item := <-bh.ExtractMinCh()
 	assert.Equal(t, int64(5), item.Priority())
-	assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
+	assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
item := <-bh.ExtractMinCh()
assert.Equal(t, int64(5), item.Priority())
assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
tn := time.Now()
item := <-bh.ExtractMinCh()
assert.Equal(t, int64(5), item.Priority())
assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))
🤖 Prompt for AI Agents
In binary_heap_test.go around lines 253 to 255, the timing assertion is
inverted: it currently asserts 5 >= elapsed but should assert elapsed >= 5;
change the assertion to compare the elapsed seconds as the first value and 5 as
the second (e.g. assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5)))
so the test verifies that at least 5 seconds have passed.

}
Expand Down Expand Up @@ -289,7 +304,7 @@ func TestItemPeekConcurrent(t *testing.T) {
go func() {
defer wg.Done()
for range 11 {
min := bh.ExtractMin()
min := <-bh.ExtractMinCh()
_ = min
}
}()
Expand Down Expand Up @@ -340,7 +355,7 @@ func TestBinHeap_Remove(t *testing.T) {
res := make([]Item, 0, 12)

for range 5 {
item := bh.ExtractMin()
item := <-bh.ExtractMinCh()
res = append(res, item)
}

Expand Down
Loading