diff --git a/.github/ISSUE_TEMPLATE/.github/pull_request_template.md b/.github/ISSUE_TEMPLATE/.github/pull_request_template.md new file mode 100644 index 0000000..6fa7b35 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/.github/pull_request_template.md @@ -0,0 +1,24 @@ +# Reason for This PR + +`[Author TODO: add issue # or explain reasoning.]` + +## Description of Changes + +`[Author TODO: add description of changes.]` + +## License Acceptance + +By submitting this pull request, I confirm that my contribution is made under +the terms of the MIT license. + +## PR Checklist + +`[Author TODO: Meet these criteria.]` +`[Reviewer TODO: Verify that these criteria are met. Request changes if not]` + +- [ ] All commits in this PR are signed (`git commit -s`). +- [ ] The reason for this PR is clearly provided (issue no. or explanation). +- [ ] The description of changes is clear and encompassing. +- [ ] Any required documentation changes (code and docs) are included in this PR. +- [ ] Any user-facing changes are mentioned in `CHANGELOG.md`. +- [ ] All added/changed functionality is tested. \ No newline at end of file diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 0cca852..43b07cc 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -11,7 +11,7 @@ jobs: fail-fast: false matrix: go: [stable] - os: ["ubuntu-latest", "macos-latest"] + os: ["ubuntu-latest", "macos-latest", "windows-latest"] steps: - name: Set up Go ${{ matrix.go }} uses: actions/setup-go@v6 # action page: diff --git a/binary_heap.go b/binary_heap.go index 21f1e3f..6b8ecae 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -5,6 +5,7 @@ binary heap (min-heap) algorithm used as a core for the priority queue package priorityqueue import ( + "context" "sync" "sync/atomic" ) @@ -25,20 +26,30 @@ type BinHeap[T Item] struct { exists map[string]struct{} st *stack // find a way to use a pointer to the raw data - len uint64 - maxLen uint64 - cond sync.Cond + len uint64 + maxLen uint64 + cond sync.Cond + minCh chan T + stopCf context.CancelFunc + stopCtx context.Context } func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] { - return &BinHeap[T]{ + bh := &BinHeap[T]{ items: make([]T, 0, 1000), exists: make(map[string]struct{}, 1000), st: newStack(), len: 0, maxLen: maxLen, cond: sync.Cond{L: &sync.Mutex{}}, + minCh: make(chan T), } + + bh.stopCtx, bh.stopCf = context.WithCancel(context.Background()) + + go bh.extractMin() + + return bh } func (bh *BinHeap[T]) fixUp() { @@ -95,6 +106,10 @@ func (bh *BinHeap[T]) Exists(id string) bool { return false } +func (bh *BinHeap[T]) Stop() { + bh.stopCf() +} + // Remove removes all elements with the provided ID and returns the slice with them func (bh *BinHeap[T]) Remove(groupID string) []T { bh.cond.L.Lock() @@ -112,13 +127,13 @@ func (bh *BinHeap[T]) Remove(groupID string) []T { } ids := bh.st.Indices() - adjusment := 0 + adjustment := 0 for i := range ids { - start := ids[i][0] - adjusment - end := ids[i][1] - adjusment + start := ids[i][0] - adjustment + end := ids[i][1] - adjustment bh.items = append(bh.items[:start], bh.items[end+1:]...) - adjusment += end - start + 1 + adjustment += end - start + 1 } atomic.StoreUint64(&bh.len, uint64(len(bh.items))) @@ -177,26 +192,40 @@ func (bh *BinHeap[T]) Insert(item T) { bh.cond.Signal() } -func (bh *BinHeap[T]) ExtractMin() T { - bh.cond.L.Lock() +// ExtractMinCh returns a channel to extract the minimum item +// We need this function to be able to use select statement and avoid blocking +// because ExtractMin is a blocking operation (on bh.cond.Wait()) +func (bh *BinHeap[T]) ExtractMinCh() <-chan T { + return bh.minCh +} - // if len == 0, wait for the signal - for bh.Len() == 0 { - bh.cond.Wait() - } +func (bh *BinHeap[T]) extractMin() { + for { + bh.cond.L.Lock() - bh.swap(0, bh.len-1) + // if len == 0, wait for the signal + for bh.Len() == 0 { + bh.cond.Wait() + } - item := (bh.items)[int(bh.len)-1] //nolint:gosec - bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec - bh.fixDown(0, int(bh.len-2)) //nolint:gosec + bh.swap(0, bh.len-1) - // reduce len - atomic.AddUint64(&bh.len, ^uint64(0)) + item := (bh.items)[int(bh.len)-1] //nolint:gosec + bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec + bh.fixDown(0, int(bh.len-2)) //nolint:gosec - // remove item - delete(bh.exists, item.ID()) + // reduce len + atomic.AddUint64(&bh.len, ^uint64(0)) - bh.cond.L.Unlock() - return item + // remove item + delete(bh.exists, item.ID()) + bh.cond.L.Unlock() + // send item to the channel + select { + case bh.minCh <- item: + case <-bh.stopCtx.Done(): + close(bh.minCh) + return + } + } } diff --git a/binary_heap_test.go b/binary_heap_test.go index c821066..a970ea6 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -76,8 +76,9 @@ func TestBinHeap_Init(t *testing.T) { res := make([]int64, 0, 12) + itemCh := bh.ExtractMinCh() for range 11 { - item := bh.ExtractMin() + item := <-itemCh item.Priority() res = append(res, item.Priority()) } @@ -105,8 +106,9 @@ func TestBinHeap_MaxLen(t *testing.T) { go func() { res := make([]Item, 0, 12) + itemCh := bh.ExtractMinCh() for range 11 { - item := bh.ExtractMin() + item := <-itemCh res = append(res, item) } require.Equal(t, 11, len(res)) @@ -156,13 +158,17 @@ func TestNewPriorityQueue(t *testing.T) { }() go func() { + itemCh := pq.ExtractMinCh() for { select { case <-stopCh: return - default: - pq.ExtractMin() + case item, cl := <-itemCh: + if !cl { + return + } atomic.AddUint64(&getPerSec, 1) + require.NotNil(t, item) } } }() @@ -180,10 +186,18 @@ func TestNewPriorityQueue(t *testing.T) { }() time.Sleep(time.Second * 5) - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} - stopCh <- struct{}{} + + pq.Stop() + + for range 4 { + select { + case stopCh <- struct{}{}: + default: + // it might happed that we already exited from one of the goroutines + // by the signal from the PQ.Stop() method + continue + } + } } func TestNewItemWithTimeout(t *testing.T) { @@ -212,7 +226,8 @@ func TestNewItemWithTimeout(t *testing.T) { } tn := time.Now() - item := bh.ExtractMin() + itemCh := bh.ExtractMinCh() + item := <-itemCh assert.Equal(t, int64(5), item.Priority()) assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) } @@ -233,7 +248,7 @@ func TestItemPeek(t *testing.T) { } /* - first item should be extracted not less than 5 seconds after we call ExtractMin + the first item should be extracted not less than 5 seconds after we call ExtractMin 5 seconds is a minimum timeout for our items */ bh := NewBinHeap[Item](100) @@ -246,7 +261,7 @@ func TestItemPeek(t *testing.T) { assert.Equal(t, int64(5), tmp) tn := time.Now() - item := bh.ExtractMin() + item := <-bh.ExtractMinCh() assert.Equal(t, int64(5), item.Priority()) assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) } @@ -289,7 +304,7 @@ func TestItemPeekConcurrent(t *testing.T) { go func() { defer wg.Done() for range 11 { - min := bh.ExtractMin() + min := <-bh.ExtractMinCh() _ = min } }() @@ -340,7 +355,7 @@ func TestBinHeap_Remove(t *testing.T) { res := make([]Item, 0, 12) for range 5 { - item := bh.ExtractMin() + item := <-bh.ExtractMinCh() res = append(res, item) }