From 0d7bbd5a0b557d12e74ff8746184490eb33ab248 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 3 Oct 2025 22:42:15 +0200 Subject: [PATCH 1/5] fix: allow using in select Signed-off-by: Valery Piashchynski --- binary_heap.go | 48 +++++++++++++++++++++++++++++---------------- binary_heap_test.go | 20 +++++++++++-------- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/binary_heap.go b/binary_heap.go index 21f1e3f..1214789 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -28,17 +28,22 @@ type BinHeap[T Item] struct { len uint64 maxLen uint64 cond sync.Cond + minCh chan T } func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] { - return &BinHeap[T]{ + bh := &BinHeap[T]{ items: make([]T, 0, 1000), exists: make(map[string]struct{}, 1000), st: newStack(), len: 0, maxLen: maxLen, cond: sync.Cond{L: &sync.Mutex{}}, + minCh: make(chan T, 5), } + go bh.extractMin() + + return bh } func (bh *BinHeap[T]) fixUp() { @@ -177,26 +182,35 @@ func (bh *BinHeap[T]) Insert(item T) { bh.cond.Signal() } -func (bh *BinHeap[T]) ExtractMin() T { - bh.cond.L.Lock() +// ExtractMinCh returns a channel to extract the minimum item +// We need this function to be able to use select statement and avoid blocking +// because ExtractMin is a blocking operation (on bh.cond.Wait()) +func (bh *BinHeap[T]) ExtractMinCh() chan T { + return bh.minCh +} - // if len == 0, wait for the signal - for bh.Len() == 0 { - bh.cond.Wait() - } +func (bh *BinHeap[T]) extractMin() { + for { + bh.cond.L.Lock() - bh.swap(0, bh.len-1) + // if len == 0, wait for the signal + for bh.Len() == 0 { + bh.cond.Wait() + } - item := (bh.items)[int(bh.len)-1] //nolint:gosec - bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec - bh.fixDown(0, int(bh.len-2)) //nolint:gosec + bh.swap(0, bh.len-1) - // reduce len - atomic.AddUint64(&bh.len, ^uint64(0)) + item := (bh.items)[int(bh.len)-1] //nolint:gosec + bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec + bh.fixDown(0, int(bh.len-2)) //nolint:gosec - // remove item - delete(bh.exists, item.ID()) + // reduce len + atomic.AddUint64(&bh.len, ^uint64(0)) - bh.cond.L.Unlock() - return item + // remove item + delete(bh.exists, item.ID()) + bh.cond.L.Unlock() + // send item to the channel + bh.minCh <- item + } } diff --git a/binary_heap_test.go b/binary_heap_test.go index c821066..0abdbf5 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -76,8 +76,9 @@ func TestBinHeap_Init(t *testing.T) { res := make([]int64, 0, 12) + itemCh := bh.ExtractMinCh() for range 11 { - item := bh.ExtractMin() + item := <-itemCh item.Priority() res = append(res, item.Priority()) } @@ -105,8 +106,9 @@ func TestBinHeap_MaxLen(t *testing.T) { go func() { res := make([]Item, 0, 12) + itemCh := bh.ExtractMinCh() for range 11 { - item := bh.ExtractMin() + item := <-itemCh res = append(res, item) } require.Equal(t, 11, len(res)) @@ -156,13 +158,14 @@ func TestNewPriorityQueue(t *testing.T) { }() go func() { + itemCh := pq.ExtractMinCh() for { select { case <-stopCh: return - default: - pq.ExtractMin() + case item := <-itemCh: atomic.AddUint64(&getPerSec, 1) + _ = item } } }() @@ -212,7 +215,8 @@ func TestNewItemWithTimeout(t *testing.T) { } tn := time.Now() - item := bh.ExtractMin() + itemCh := bh.ExtractMinCh() + item := <-itemCh assert.Equal(t, int64(5), item.Priority()) assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) } @@ -246,7 +250,7 @@ func TestItemPeek(t *testing.T) { assert.Equal(t, int64(5), tmp) tn := time.Now() - item := bh.ExtractMin() + item := <-bh.ExtractMinCh() assert.Equal(t, int64(5), item.Priority()) assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) } @@ -289,7 +293,7 @@ func TestItemPeekConcurrent(t *testing.T) { go func() { defer wg.Done() for range 11 { - min := bh.ExtractMin() + min := <-bh.ExtractMinCh() _ = min } }() @@ -340,7 +344,7 @@ func TestBinHeap_Remove(t *testing.T) { res := make([]Item, 0, 12) for range 5 { - item := bh.ExtractMin() + item := <-bh.ExtractMinCh() res = append(res, item) } From e5347402de151544ea51188b8c9a1e4679993ac8 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 3 Oct 2025 22:49:40 +0200 Subject: [PATCH 2/5] Update binary_heap_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- binary_heap_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binary_heap_test.go b/binary_heap_test.go index 0abdbf5..680ac1f 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -165,7 +165,7 @@ func TestNewPriorityQueue(t *testing.T) { return case item := <-itemCh: atomic.AddUint64(&getPerSec, 1) - _ = item + require.NotNil(t, item) } } }() From 84d79704637708cd795f6d764cb8191b5ae5549b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 4 Oct 2025 12:29:50 +0200 Subject: [PATCH 3/5] chore: update tests and add Stop function Signed-off-by: Valery Piashchynski --- .../.github/pull_request_template.md | 24 ++++++++++++ binary_heap.go | 37 +++++++++++++------ binary_heap_test.go | 9 ++++- 3 files changed, 57 insertions(+), 13 deletions(-) create mode 100644 .github/ISSUE_TEMPLATE/.github/pull_request_template.md diff --git a/.github/ISSUE_TEMPLATE/.github/pull_request_template.md b/.github/ISSUE_TEMPLATE/.github/pull_request_template.md new file mode 100644 index 0000000..6fa7b35 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/.github/pull_request_template.md @@ -0,0 +1,24 @@ +# Reason for This PR + +`[Author TODO: add issue # or explain reasoning.]` + +## Description of Changes + +`[Author TODO: add description of changes.]` + +## License Acceptance + +By submitting this pull request, I confirm that my contribution is made under +the terms of the MIT license. + +## PR Checklist + +`[Author TODO: Meet these criteria.]` +`[Reviewer TODO: Verify that these criteria are met. Request changes if not]` + +- [ ] All commits in this PR are signed (`git commit -s`). +- [ ] The reason for this PR is clearly provided (issue no. or explanation). +- [ ] The description of changes is clear and encompassing. +- [ ] Any required documentation changes (code and docs) are included in this PR. +- [ ] Any user-facing changes are mentioned in `CHANGELOG.md`. +- [ ] All added/changed functionality is tested. \ No newline at end of file diff --git a/binary_heap.go b/binary_heap.go index 1214789..6b8ecae 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -5,6 +5,7 @@ binary heap (min-heap) algorithm used as a core for the priority queue package priorityqueue import ( + "context" "sync" "sync/atomic" ) @@ -25,10 +26,12 @@ type BinHeap[T Item] struct { exists map[string]struct{} st *stack // find a way to use a pointer to the raw data - len uint64 - maxLen uint64 - cond sync.Cond - minCh chan T + len uint64 + maxLen uint64 + cond sync.Cond + minCh chan T + stopCf context.CancelFunc + stopCtx context.Context } func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] { @@ -39,8 +42,11 @@ func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] { len: 0, maxLen: maxLen, cond: sync.Cond{L: &sync.Mutex{}}, - minCh: make(chan T, 5), + minCh: make(chan T), } + + bh.stopCtx, bh.stopCf = context.WithCancel(context.Background()) + go bh.extractMin() return bh @@ -100,6 +106,10 @@ func (bh *BinHeap[T]) Exists(id string) bool { return false } +func (bh *BinHeap[T]) Stop() { + bh.stopCf() +} + // Remove removes all elements with the provided ID and returns the slice with them func (bh *BinHeap[T]) Remove(groupID string) []T { bh.cond.L.Lock() @@ -117,13 +127,13 @@ func (bh *BinHeap[T]) Remove(groupID string) []T { } ids := bh.st.Indices() - adjusment := 0 + adjustment := 0 for i := range ids { - start := ids[i][0] - adjusment - end := ids[i][1] - adjusment + start := ids[i][0] - adjustment + end := ids[i][1] - adjustment bh.items = append(bh.items[:start], bh.items[end+1:]...) - adjusment += end - start + 1 + adjustment += end - start + 1 } atomic.StoreUint64(&bh.len, uint64(len(bh.items))) @@ -185,7 +195,7 @@ func (bh *BinHeap[T]) Insert(item T) { // ExtractMinCh returns a channel to extract the minimum item // We need this function to be able to use select statement and avoid blocking // because ExtractMin is a blocking operation (on bh.cond.Wait()) -func (bh *BinHeap[T]) ExtractMinCh() chan T { +func (bh *BinHeap[T]) ExtractMinCh() <-chan T { return bh.minCh } @@ -211,6 +221,11 @@ func (bh *BinHeap[T]) extractMin() { delete(bh.exists, item.ID()) bh.cond.L.Unlock() // send item to the channel - bh.minCh <- item + select { + case bh.minCh <- item: + case <-bh.stopCtx.Done(): + close(bh.minCh) + return + } } } diff --git a/binary_heap_test.go b/binary_heap_test.go index 680ac1f..9fad7bb 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -163,7 +163,10 @@ func TestNewPriorityQueue(t *testing.T) { select { case <-stopCh: return - case item := <-itemCh: + case item, cl := <-itemCh: + if !cl { + return + } atomic.AddUint64(&getPerSec, 1) require.NotNil(t, item) } @@ -183,6 +186,8 @@ func TestNewPriorityQueue(t *testing.T) { }() time.Sleep(time.Second * 5) + + pq.Stop() stopCh <- struct{}{} stopCh <- struct{}{} stopCh <- struct{}{} @@ -237,7 +242,7 @@ func TestItemPeek(t *testing.T) { } /* - first item should be extracted not less than 5 seconds after we call ExtractMin + the first item should be extracted not less than 5 seconds after we call ExtractMin 5 seconds is a minimum timeout for our items */ bh := NewBinHeap[Item](100) From a2979949b3b81625ce90fb9be1626a4675baf308 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 4 Oct 2025 12:31:51 +0200 Subject: [PATCH 4/5] chore: add windows platform Signed-off-by: Valery Piashchynski --- .github/workflows/linux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 0cca852..43b07cc 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -11,7 +11,7 @@ jobs: fail-fast: false matrix: go: [stable] - os: ["ubuntu-latest", "macos-latest"] + os: ["ubuntu-latest", "macos-latest", "windows-latest"] steps: - name: Set up Go ${{ matrix.go }} uses: actions/setup-go@v6 # action page: From e6e4348147da9c06816b00ed9517191768568142 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 4 Oct 2025 12:34:12 +0200 Subject: [PATCH 5/5] chore: correct pq test Signed-off-by: Valery Piashchynski --- binary_heap_test.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/binary_heap_test.go b/binary_heap_test.go index 9fad7bb..a970ea6 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -188,10 +188,16 @@ func TestNewPriorityQueue(t *testing.T) { time.Sleep(time.Second * 5) pq.Stop() - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} + + for range 4 { + select { + case stopCh <- struct{}{}: + default: + // it might happed that we already exited from one of the goroutines + // by the signal from the PQ.Stop() method + continue + } + } } func TestNewItemWithTimeout(t *testing.T) {