diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 8e1649e..cb833fe 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -34,7 +34,7 @@ jobs: - name: Run golang tests with coverage run: | mkdir ./coverage - go test -v -race -cover -coverpkg=./... -coverprofile=./coverage/pq-${{ matrix.os }}.out -covermode=atomic ./... + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq-${{ matrix.os }}.out -covermode=atomic ./... - name: Archive code coverage results uses: actions/upload-artifact@v7 @@ -50,17 +50,30 @@ jobs: timeout-minutes: 60 steps: + - name: Check out code + uses: actions/checkout@v6 + - name: Download code coverage results uses: actions/download-artifact@v8 - - run: | - cd pq-ubuntu-latest + with: + name: pq-ubuntu-latest + path: coverage + + - name: Prepare coverage report + run: | echo 'mode: atomic' > summary.txt - tail -q -n +2 *.out >> summary.txt - sed -i '2,${/roadrunner/!d}' summary.txt + tail -q -n +2 coverage/*.out >> summary.txt + awk ' + NR == 1 { print; next } + /^github\.com\/roadrunner-server\/priority_queue\// { + sub(/^github\.com\/roadrunner-server\/priority_queue\//, "", $0) + print + } + ' summary.txt > summary.filtered.txt + mv summary.filtered.txt summary.txt - - name: upload to codecov - uses: codecov/codecov-action@v5 # Docs: + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v5 with: - token: ${{ secrets.CODECOV_TOKEN }} - files: ./coverage/summary.txt + files: summary.txt fail_ci_if_error: false diff --git a/binary_heap.go b/binary_heap.go index 58d723d..83cb65d 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -2,7 +2,6 @@ package priorityqueue import ( "sync" - "sync/atomic" ) // Item represents a binary heap item @@ -21,7 +20,6 @@ type BinHeap[T Item] struct { exists map[string]struct{} st *stack // find a way to use a pointer to the raw data - len uint64 maxLen uint64 cond sync.Cond } @@ -31,14 +29,13 @@ func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] { items: make([]T, 0, 1000), exists: make(map[string]struct{}, 1000), st: newStack(), - len: 0, maxLen: maxLen, cond: sync.Cond{L: &sync.Mutex{}}, } } func (bh *BinHeap[T]) fixUp() { - k := bh.len - 1 + k := uint64(len(bh.items)) - 1 p := (k - 1) >> 1 // k-1 / 2 for k > 0 { @@ -84,11 +81,8 @@ func (bh *BinHeap[T]) Exists(id string) bool { bh.cond.L.Lock() defer bh.cond.L.Unlock() - if _, ok := bh.exists[id]; ok { - return true - } - - return false + _, ok := bh.exists[id] + return ok } // Remove removes all elements with the provided ID and returns the slice with them @@ -107,18 +101,29 @@ func (bh *BinHeap[T]) Remove(groupID string) []T { } } + oldLen := len(bh.items) + ids := bh.st.Indices() - adjusment := 0 + adjustment := 0 for i := range ids { - start := ids[i][0] - adjusment - end := ids[i][1] - adjusment + start := ids[i][0] - adjustment + end := ids[i][1] - adjustment bh.items = append(bh.items[:start], bh.items[end+1:]...) - adjusment += end - start + 1 + adjustment += end - start + 1 + } + + // Zero freed tail slots to allow GC of removed items + clear(bh.items[len(bh.items):oldLen]) + + // re-heapify after compaction (Floyd's algorithm) + n := len(bh.items) + for i := n/2 - 1; i >= 0; i-- { + bh.fixDown(i, n-1) } - atomic.StoreUint64(&bh.len, uint64(len(bh.items))) bh.st.clear() + bh.cond.Broadcast() return out } @@ -128,7 +133,7 @@ func (bh *BinHeap[T]) PeekPriority() int64 { bh.cond.L.Lock() defer bh.cond.L.Unlock() - if bh.Len() > 0 { + if len(bh.items) > 0 { return bh.items[0].Priority() } @@ -136,63 +141,53 @@ func (bh *BinHeap[T]) PeekPriority() int64 { } func (bh *BinHeap[T]) Len() uint64 { - return atomic.LoadUint64(&bh.len) + bh.cond.L.Lock() + defer bh.cond.L.Unlock() + return uint64(len(bh.items)) } func (bh *BinHeap[T]) Insert(item T) { bh.cond.L.Lock() - // check the binary heap len before insertion - if bh.Len() > bh.maxLen { - // unlock the mutex to proceed to get-max - bh.cond.L.Unlock() - - // signal waiting goroutines - for bh.Len() > 0 { - // signal waiting goroutines - bh.cond.Signal() - } - // lock mutex to proceed inserting into the empty slice - bh.cond.L.Lock() + for uint64(len(bh.items)) >= bh.maxLen { + bh.cond.Wait() } bh.items = append(bh.items, item) - // add len to the slice - atomic.AddUint64(&bh.len, 1) - // fix binary heap up bh.fixUp() // add item bh.exists[item.ID()] = struct{}{} - bh.cond.L.Unlock() - // signal the goroutine on wait - bh.cond.Signal() + bh.cond.Broadcast() + bh.cond.L.Unlock() } func (bh *BinHeap[T]) ExtractMin() T { bh.cond.L.Lock() // if len == 0, wait for the signal - for bh.Len() == 0 { + for len(bh.items) == 0 { bh.cond.Wait() } - bh.swap(0, bh.len-1) - - item := (bh.items)[int(bh.len)-1] //nolint:gosec - bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec - bh.fixDown(0, int(bh.len-2)) //nolint:gosec + n := uint64(len(bh.items)) + bh.swap(0, n-1) - // reduce len - atomic.AddUint64(&bh.len, ^uint64(0)) + item := bh.items[n-1] + var zero T + bh.items[n-1] = zero + bh.items = bh.items[:n-1] + bh.fixDown(0, int(n)-2) //nolint:gosec // remove item delete(bh.exists, item.ID()) + // signal blocked producers waiting for space + bh.cond.Broadcast() bh.cond.L.Unlock() return item } diff --git a/binary_heap_test.go b/binary_heap_test.go index c821066..2b42e51 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -121,8 +121,8 @@ func TestBinHeap_MaxLen(t *testing.T) { } func TestNewPriorityQueue(t *testing.T) { - insertsPerSec := uint64(0) - getPerSec := uint64(0) + var insertsPerSec atomic.Uint64 + var getPerSec atomic.Uint64 stopCh := make(chan struct{}, 1) pq := NewBinHeap[Item](1000) @@ -144,10 +144,10 @@ func TestNewPriorityQueue(t *testing.T) { for { select { case <-tt.C: - fmt.Printf("Insert per second: %d\n", atomic.LoadUint64(&insertsPerSec)) - atomic.StoreUint64(&insertsPerSec, 0) - fmt.Printf("ExtractMin per second: %d\n", atomic.LoadUint64(&getPerSec)) - atomic.StoreUint64(&getPerSec, 0) + fmt.Printf("Insert per second: %d\n", insertsPerSec.Load()) + insertsPerSec.Store(0) + fmt.Printf("ExtractMin per second: %d\n", getPerSec.Load()) + getPerSec.Store(0) case <-stopCh: tt.Stop() return @@ -162,7 +162,7 @@ func TestNewPriorityQueue(t *testing.T) { return default: pq.ExtractMin() - atomic.AddUint64(&getPerSec, 1) + getPerSec.Add(1) } } }() @@ -174,7 +174,7 @@ func TestNewPriorityQueue(t *testing.T) { return default: pq.Insert(NewTest(rand.Int63(), uuid.NewString(), uuid.NewString())) //nolint:gosec - atomic.AddUint64(&insertsPerSec, 1) + insertsPerSec.Add(1) } } }() @@ -276,27 +276,45 @@ func TestItemPeekConcurrent(t *testing.T) { bh.Insert(a[i]) } - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - defer wg.Done() + var wg sync.WaitGroup + wg.Go(func() { for range 1000 { tmp := bh.PeekPriority() _ = tmp } - }() + }) - go func() { - defer wg.Done() + wg.Go(func() { for range 11 { - min := bh.ExtractMin() - _ = min + m := bh.ExtractMin() + _ = m } - }() + }) wg.Wait() } +func TestBinHeap_RemoveHeapProperty(t *testing.T) { + // Regression test: Remove must restore the heap property after compaction. + // Insert priorities [1(A), 3(B), 2(B)] → heap: [1, 3, 2] + // Remove group "A" → compacts to [3, 2] which violates min-heap + // Without re-heapify, ExtractMin would return 3 instead of 2. + bh := NewBinHeap[Item](10) + bh.Insert(NewTest(1, "A", "id1")) + bh.Insert(NewTest(3, "B", "id2")) + bh.Insert(NewTest(2, "B", "id3")) + + removed := bh.Remove("A") + require.Len(t, removed, 1) + require.Equal(t, "id1", removed[0].ID()) + + first := bh.ExtractMin() + assert.Equal(t, int64(2), first.Priority(), "expected min priority 2, got %d", first.Priority()) + + second := bh.ExtractMin() + assert.Equal(t, int64(3), second.Priority(), "expected priority 3, got %d", second.Priority()) +} + func TestBinHeap_Remove(t *testing.T) { a := []Item{ NewTest(2, "1", "101"), @@ -328,7 +346,7 @@ func TestBinHeap_Remove(t *testing.T) { out := bh.Remove("1") if len(out) != 6 { - t.Fatal("should be 5") + t.Fatalf("expected 6, got %d", len(out)) } for i := range out { @@ -377,15 +395,548 @@ func TestExists(t *testing.T) { assert.False(t, bh.Exists(id)) } -func BenchmarkGeneral(b *testing.B) { +func TestBinHeap_RemoveHeapPropertyLarge(t *testing.T) { + bh := NewBinHeap[Item](200) + + // Insert 100 items across 5 groups with interleaved priorities so + // the target group's items are scattered at root, mid, and leaf heap levels. + for i := range 100 { + groupID := fmt.Sprintf("g%d", i%5) + priority := int64(i + 1) // 1..100, round-robin across groups + id := fmt.Sprintf("item-%d", i) + bh.Insert(NewTest(priority, groupID, id)) + } + + require.Equal(t, uint64(100), bh.Len()) + + // Remove group "g2" (priorities 3,8,13,18,...,98 — 20 items) + removed := bh.Remove("g2") + require.Len(t, removed, 20) + for _, item := range removed { + require.Equal(t, "g2", item.GroupID()) + } + require.Equal(t, uint64(80), bh.Len()) + + // Extract all remaining items and verify strictly non-decreasing order + var prev int64 + for i := range 80 { + item := bh.ExtractMin() + require.GreaterOrEqual(t, item.Priority(), prev, + "item %d: priority %d should be >= previous %d", i, item.Priority(), prev) + require.NotEqual(t, "g2", item.GroupID()) + prev = item.Priority() + } + + require.Equal(t, uint64(0), bh.Len()) +} + +func TestBinHeap_RemoveMultipleGroups(t *testing.T) { bh := NewBinHeap[Item](100) - id := uuid.NewString() - id2 := uuid.NewString() + // 4 groups with known priorities + bh.Insert(NewTest(10, "A", "a1")) + bh.Insert(NewTest(30, "A", "a2")) + bh.Insert(NewTest(5, "B", "b1")) + bh.Insert(NewTest(25, "B", "b2")) + bh.Insert(NewTest(15, "C", "c1")) + bh.Insert(NewTest(35, "C", "c2")) + bh.Insert(NewTest(20, "D", "d1")) + bh.Insert(NewTest(40, "D", "d2")) + + // Remove group A, verify min is B's 5 + removedA := bh.Remove("A") + require.Len(t, removedA, 2) + require.Equal(t, int64(5), bh.PeekPriority()) + + // Remove group B, verify min is now C's 15 + removedB := bh.Remove("B") + require.Len(t, removedB, 2) + require.Equal(t, int64(15), bh.PeekPriority()) + + // Extract remaining items (C and D) and verify order + expected := []int64{15, 20, 35, 40} + for _, exp := range expected { + item := bh.ExtractMin() + require.Equal(t, exp, item.Priority()) + } +} + +func TestBinHeap_RemoveEdgeCases(t *testing.T) { + t.Run("remove all items", func(t *testing.T) { + bh := NewBinHeap[Item](10) + bh.Insert(NewTest(1, "only", "id1")) + bh.Insert(NewTest(2, "only", "id2")) + bh.Insert(NewTest(3, "only", "id3")) + + removed := bh.Remove("only") + require.Len(t, removed, 3) + require.Equal(t, uint64(0), bh.Len()) + + // Insert new items and verify they work after full removal + bh.Insert(NewTest(42, "new", "id4")) + require.Equal(t, uint64(1), bh.Len()) + item := bh.ExtractMin() + require.Equal(t, int64(42), item.Priority()) + }) + + t.Run("remove non-existent group", func(t *testing.T) { + bh := NewBinHeap[Item](10) + bh.Insert(NewTest(1, "exists", "id1")) + bh.Insert(NewTest(2, "exists", "id2")) + + removed := bh.Remove("ghost") + require.Empty(t, removed) + require.Equal(t, uint64(2), bh.Len()) + + // Verify heap still works correctly + item := bh.ExtractMin() + require.Equal(t, int64(1), item.Priority()) + }) + + t.Run("remove from empty heap", func(t *testing.T) { + bh := NewBinHeap[Item](10) + removed := bh.Remove("anything") + require.Empty(t, removed) + require.Equal(t, uint64(0), bh.Len()) + }) +} + +func TestBinHeap_BoundedInsertBackpressure(t *testing.T) { + bh := NewBinHeap[Item](5) + + // Fill to capacity + for i := range 5 { + bh.Insert(NewTest(int64(i+1), "g1", fmt.Sprintf("item-%d", i))) + } + require.Equal(t, uint64(5), bh.Len()) + + // Launch goroutine to insert one more (should block at capacity) + inserted := make(chan struct{}) + go func() { + bh.Insert(NewTest(10, "g1", "blocked-item")) + close(inserted) + }() + + // Give goroutine time to start and block on the full heap + time.Sleep(100 * time.Millisecond) + require.Equal(t, uint64(5), bh.Len(), "producer should be blocked, heap still at capacity") + + // Extract one item to free space and signal the blocked producer + item := bh.ExtractMin() + require.Equal(t, int64(1), item.Priority()) + + // Wait for insert goroutine to complete + select { + case <-inserted: + // success — producer unblocked + case <-time.After(2 * time.Second): + t.Fatal("insert goroutine did not unblock after ExtractMin") + } + + require.Equal(t, uint64(5), bh.Len(), "should be 5: was 5, extracted 1, inserted 1") +} + +func TestBinHeap_RemoveUnblocksInsert(t *testing.T) { + bh := NewBinHeap[Item](5) + + // Fill to capacity with one removable group + for i := range 5 { + bh.Insert(NewTest(int64(i+1), "removeMe", fmt.Sprintf("item-%d", i))) + } + require.Equal(t, uint64(5), bh.Len()) + + // Launch goroutine to insert one more (should block at capacity) + inserted := make(chan struct{}) + go func() { + bh.Insert(NewTest(99, "keep", "new-item")) + close(inserted) + }() + + // Give goroutine time to start and block + time.Sleep(100 * time.Millisecond) + require.Equal(t, uint64(5), bh.Len(), "producer should be blocked, heap still at capacity") + + // Remove group to free space — this should unblock the producer + removed := bh.Remove("removeMe") + require.Len(t, removed, 5) + + select { + case <-inserted: + // success — producer unblocked by Remove + case <-time.After(2 * time.Second): + t.Fatal("insert goroutine did not unblock after Remove freed space") + } + + require.Equal(t, uint64(1), bh.Len()) + item := bh.ExtractMin() + require.Equal(t, int64(99), item.Priority()) +} + +func TestBinHeap_ConcurrentInsertRemoveExtract(t *testing.T) { + // Large capacity to avoid Insert back-pressure during stress test; + // back-pressure is tested separately in TestBinHeap_BoundedInsertBackpressure. + bh := NewBinHeap[Item](^uint64(0)) + + var done atomic.Bool + var producerWg sync.WaitGroup + var consumerWg sync.WaitGroup + + // 3 producer goroutines inserting items with random priorities across 10 groups + for p := range 3 { + producerWg.Go(func() { + for i := 0; !done.Load(); i++ { + groupID := fmt.Sprintf("g%d", i%10) + itemID := fmt.Sprintf("p%d-i%d", p, i) + bh.Insert(NewTest(rand.Int63n(1000), groupID, itemID)) //nolint:gosec + } + }) + } + + // 2 consumer goroutines calling ExtractMin + for range 2 { + consumerWg.Go(func() { + for !done.Load() { + _ = bh.ExtractMin() + } + }) + } + + // 1 remover goroutine periodically removing a random group + producerWg.Go(func() { + for !done.Load() { + bh.Remove(fmt.Sprintf("g%d", rand.Intn(10))) //nolint:gosec + time.Sleep(10 * time.Millisecond) + } + }) + + // Run for 2 seconds + time.Sleep(2 * time.Second) + done.Store(true) + + // Wait for producers and remover to finish + producerWg.Wait() + + // Unblock consumers that may be stuck waiting on an empty heap + consumerDone := make(chan struct{}) + go func() { + consumerWg.Wait() + close(consumerDone) + }() + + for { + select { + case <-consumerDone: + // All consumers exited — verify heap is in a consistent state + _ = bh.Len() + return + default: + bh.Insert(NewTest(0, "sentinel", fmt.Sprintf("s-%d", time.Now().UnixNano()))) + time.Sleep(10 * time.Millisecond) + } + } +} + +func TestBinHeap_LargeScaleOrdering(t *testing.T) { + const n = 10_000 + bh := NewBinHeap[Item](uint64(n) + 1) + + for i := range n { + bh.Insert(NewTest(rand.Int63n(1000), "g", fmt.Sprintf("item-%d", i))) //nolint:gosec + } + + var prev int64 + for i := range n { + item := bh.ExtractMin() + require.GreaterOrEqual(t, item.Priority(), prev, + "item %d: priority %d should be >= previous %d", i, item.Priority(), prev) + prev = item.Priority() + } +} + +// TestBinHeap_BroadcastPreventsDeadlock. +// +// With Signal(), the following deadlock is possible (maxLen=1): +// 1. Queue has 1 item. P1, P2 call Insert → both Wait() (queue full). +// 2. C1 calls ExtractMin → extracts the item, Signal() wakes P1. +// 3. C2 calls ExtractMin → acquires lock before P1, sees empty → Wait(). +// 4. P1 wakes, inserts item, Signal() → wakes P2 (wrong type!), not C2. +// 5. P2 rechecks: queue full → Wait(). +// 6. Deadlock: P2 and C2 both waiting, queue has 1 item, no wakeup coming. +// +// With Broadcast(), step 4 wakes both P2 and C2, so C2 rechecks, finds 1 item, +// and successfully extracts — no deadlock. +func TestBinHeap_BroadcastPreventsDeadlock(t *testing.T) { + t.Run("maxLen=1 tight contention", func(t *testing.T) { + const numProducers = 4 + const numConsumers = 4 + const itemsPerProducer = 500 + + bh := NewBinHeap[Item](1) + + var producerWg sync.WaitGroup + var consumerWg sync.WaitGroup + var consumed atomic.Int64 + + totalItems := int64(numProducers * itemsPerProducer) + + // Launch producers: each inserts itemsPerProducer items then exits. + for p := range numProducers { + producerWg.Go(func() { + for i := range itemsPerProducer { + bh.Insert(NewTest(int64(i), fmt.Sprintf("p%d", p), fmt.Sprintf("p%d-i%d", p, i))) + } + }) + } + + // Launch consumers: each extracts until totalItems have been consumed. + for range numConsumers { + consumerWg.Go(func() { + for consumed.Add(1) <= totalItems { + _ = bh.ExtractMin() + } + }) + } + + // Watchdog: if the test doesn't finish within the timeout, it's a deadlock. + done := make(chan struct{}) + go func() { + producerWg.Wait() + consumerWg.Wait() + close(done) + }() + + select { + case <-done: + // success — no deadlock + case <-time.After(10 * time.Second): + t.Fatal("DEADLOCK: producers and consumers did not complete within 10s (missed wakeup)") + } + }) + + t.Run("maxLen=2 multiple rounds", func(t *testing.T) { + // A slightly larger capacity still triggers the bug with Signal() + // because producers and consumers can both be queued on the same cond. + const numProducers = 6 + const numConsumers = 6 + const itemsPerProducer = 300 + + bh := NewBinHeap[Item](2) + + var producerWg sync.WaitGroup + var consumerWg sync.WaitGroup + var consumed atomic.Int64 + + totalItems := int64(numProducers * itemsPerProducer) + + for p := range numProducers { + producerWg.Go(func() { + for i := range itemsPerProducer { + bh.Insert(NewTest(int64(i%50), fmt.Sprintf("p%d", p), fmt.Sprintf("p%d-i%d", p, i))) + } + }) + } + + for range numConsumers { + consumerWg.Go(func() { + for consumed.Add(1) <= totalItems { + _ = bh.ExtractMin() + } + }) + } + + done := make(chan struct{}) + go func() { + producerWg.Wait() + consumerWg.Wait() + close(done) + }() + + select { + case <-done: + // success + case <-time.After(10 * time.Second): + t.Fatal("DEADLOCK: producers and consumers did not complete within 10s (missed wakeup)") + } + }) + + t.Run("asymmetric producers/consumers", func(t *testing.T) { + // Many producers, few consumers — maximizes the chance that Signal() + // wakes a producer instead of the sole consumer. + const numProducers = 8 + const numConsumers = 1 + const itemsPerProducer = 200 + + bh := NewBinHeap[Item](1) + + var producerWg sync.WaitGroup + var consumerWg sync.WaitGroup + var consumed atomic.Int64 + + totalItems := int64(numProducers * itemsPerProducer) + + for p := range numProducers { + producerWg.Go(func() { + for i := range itemsPerProducer { + bh.Insert(NewTest(int64(i), fmt.Sprintf("p%d", p), fmt.Sprintf("p%d-i%d", p, i))) + } + }) + } + + for range numConsumers { + consumerWg.Go(func() { + for consumed.Add(1) <= totalItems { + _ = bh.ExtractMin() + } + }) + } + + done := make(chan struct{}) + go func() { + producerWg.Wait() + consumerWg.Wait() + close(done) + }() + + select { + case <-done: + // success + case <-time.After(10 * time.Second): + t.Fatal("DEADLOCK: producers and consumers did not complete within 10s (missed wakeup)") + } + }) + + t.Run("repeated stress cycles", func(t *testing.T) { + // Run multiple short cycles to increase the probability of hitting + // the specific interleaving that causes a missed wakeup. + for cycle := range 20 { + bh := NewBinHeap[Item](1) + const numGoroutines = 4 + const itemsEach = 100 + + var wg sync.WaitGroup + var consumed atomic.Int64 + totalItems := int64(numGoroutines * itemsEach) + + for g := range numGoroutines { + wg.Go(func() { + for i := range itemsEach { + bh.Insert(NewTest(int64(i), "g", fmt.Sprintf("c%d-g%d-i%d", cycle, g, i))) + } + }) + } + + for range numGoroutines { + wg.Go(func() { + for consumed.Add(1) <= totalItems { + _ = bh.ExtractMin() + } + }) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // cycle passed + case <-time.After(5 * time.Second): + t.Fatalf("DEADLOCK on cycle %d: did not complete within 5s", cycle) + } + } + }) +} + +func BenchmarkInsert(b *testing.B) { + bh := NewBinHeap[Item](1 << 30) + b.ReportAllocs() + i := 0 + for b.Loop() { + bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("b-%d", i))) //nolint:gosec + i++ + } +} + +func BenchmarkExtractMin(b *testing.B) { + bh := NewBinHeap[Item](uint64(max(b.N, 0)) + 1) + for i := range b.N { + bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("b-%d", i))) //nolint:gosec + } + b.ReportAllocs() + b.ResetTimer() + for range b.N { + bh.ExtractMin() + } +} + +func BenchmarkInsertExtractMin(b *testing.B) { + bh := NewBinHeap[Item](2000) + // Pre-fill with 1000 items + for i := range 1000 { + bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("pre-%d", i))) //nolint:gosec + } b.ReportAllocs() + b.ResetTimer() + i := 0 + for b.Loop() { + bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("b-%d", i))) //nolint:gosec + bh.ExtractMin() + i++ + } +} + +func BenchmarkRemove(b *testing.B) { + const numGroups = 100 + const itemsPerGroup = 10 + bh := NewBinHeap[Item](numGroups*itemsPerGroup + 100) + + // Fill with 1000 items across 100 groups (10 items each) + groups := make([][]Item, numGroups) + for g := range numGroups { + groups[g] = make([]Item, 0, itemsPerGroup) + for i := range itemsPerGroup { + item := NewTest(rand.Int63n(10000), fmt.Sprintf("g%d", g), fmt.Sprintf("g%d-i%d", g, i)) //nolint:gosec + bh.Insert(item) + groups[g] = append(groups[g], item) + } + } + b.ReportAllocs() + b.ResetTimer() + i := 0 for b.Loop() { - bh.Insert(NewTest(2, id, id2)) - bh.Remove(id) + groupIdx := i % numGroups + groupID := fmt.Sprintf("g%d", groupIdx) + bh.Remove(groupID) + // Restore items for next iteration + b.StopTimer() + for _, item := range groups[groupIdx] { + bh.Insert(item) + } + b.StartTimer() + i++ + } +} + +func BenchmarkConcurrentInsertExtract(b *testing.B) { + bh := NewBinHeap[Item](10000) + // Pre-fill so ExtractMin rarely blocks + for i := range 5000 { + bh.Insert(NewTest(rand.Int63n(10000), "bench", fmt.Sprintf("pre-%d", i))) //nolint:gosec } + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + if i%2 == 0 { + bh.Insert(NewTest(rand.Int63n(10000), "bench", fmt.Sprintf("p-%d-%d", i, rand.Int63()))) //nolint:gosec + } else { + bh.ExtractMin() + } + i++ + } + }) } diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..d57cc68 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,12 @@ +coverage: + status: + project: + default: + target: auto + threshold: 50% + informational: true + patch: + default: + target: auto + threshold: 50% + informational: true diff --git a/monotonic_stack.go b/monotonic_stack.go index 3094b53..8759a7a 100644 --- a/monotonic_stack.go +++ b/monotonic_stack.go @@ -45,5 +45,5 @@ func (st *stack) Indices() [][2]int { } func (st *stack) clear() { - st.idx = make([][2]int, 0, 10) + st.idx = st.idx[:0] }