Skip to content
31 changes: 22 additions & 9 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- name: Run golang tests with coverage
run: |
mkdir ./coverage
go test -v -race -cover -coverpkg=./... -coverprofile=./coverage/pq-${{ matrix.os }}.out -covermode=atomic ./...
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq-${{ matrix.os }}.out -covermode=atomic ./...

- name: Archive code coverage results
uses: actions/upload-artifact@v7
Expand All @@ -50,17 +50,30 @@ jobs:

timeout-minutes: 60
steps:
- name: Check out code
uses: actions/checkout@v6

- name: Download code coverage results
uses: actions/download-artifact@v8
- run: |
cd pq-ubuntu-latest
with:
name: pq-ubuntu-latest
path: coverage

- name: Prepare coverage report
run: |
echo 'mode: atomic' > summary.txt
tail -q -n +2 *.out >> summary.txt
sed -i '2,${/roadrunner/!d}' summary.txt
tail -q -n +2 coverage/*.out >> summary.txt
awk '
NR == 1 { print; next }
/^github\.com\/roadrunner-server\/priority_queue\// {
sub(/^github\.com\/roadrunner-server\/priority_queue\//, "", $0)
print
}
' summary.txt > summary.filtered.txt
mv summary.filtered.txt summary.txt

- name: upload to codecov
uses: codecov/codecov-action@v5 # Docs: <https://github.com/codecov/codecov-action>
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./coverage/summary.txt
files: summary.txt
fail_ci_if_error: false
79 changes: 37 additions & 42 deletions binary_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package priorityqueue

import (
"sync"
"sync/atomic"
)

// Item represents a binary heap item
Expand All @@ -21,7 +20,6 @@ type BinHeap[T Item] struct {
exists map[string]struct{}
st *stack
// find a way to use a pointer to the raw data
len uint64
maxLen uint64
cond sync.Cond
}
Expand All @@ -31,14 +29,13 @@ func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] {
items: make([]T, 0, 1000),
exists: make(map[string]struct{}, 1000),
st: newStack(),
len: 0,
maxLen: maxLen,
cond: sync.Cond{L: &sync.Mutex{}},
}
}

func (bh *BinHeap[T]) fixUp() {
k := bh.len - 1
k := uint64(len(bh.items)) - 1
p := (k - 1) >> 1 // k-1 / 2

for k > 0 {
Expand Down Expand Up @@ -84,11 +81,8 @@ func (bh *BinHeap[T]) Exists(id string) bool {
bh.cond.L.Lock()
defer bh.cond.L.Unlock()

if _, ok := bh.exists[id]; ok {
return true
}

return false
_, ok := bh.exists[id]
return ok
}

// Remove removes all elements with the provided ID and returns the slice with them
Expand All @@ -107,18 +101,29 @@ func (bh *BinHeap[T]) Remove(groupID string) []T {
}
}

oldLen := len(bh.items)

ids := bh.st.Indices()
adjusment := 0
adjustment := 0
for i := range ids {
start := ids[i][0] - adjusment
end := ids[i][1] - adjusment
start := ids[i][0] - adjustment
end := ids[i][1] - adjustment

bh.items = append(bh.items[:start], bh.items[end+1:]...)
adjusment += end - start + 1
adjustment += end - start + 1
}
Comment thread
rustatian marked this conversation as resolved.

// Zero freed tail slots to allow GC of removed items
clear(bh.items[len(bh.items):oldLen])

// re-heapify after compaction (Floyd's algorithm)
n := len(bh.items)
for i := n/2 - 1; i >= 0; i-- {
bh.fixDown(i, n-1)
}
Comment thread
rustatian marked this conversation as resolved.
Comment thread
rustatian marked this conversation as resolved.

atomic.StoreUint64(&bh.len, uint64(len(bh.items)))
bh.st.clear()
bh.cond.Broadcast()

return out
Comment thread
rustatian marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
Expand All @@ -128,71 +133,61 @@ func (bh *BinHeap[T]) PeekPriority() int64 {
bh.cond.L.Lock()
defer bh.cond.L.Unlock()

if bh.Len() > 0 {
if len(bh.items) > 0 {
return bh.items[0].Priority()
}

return -1
}

func (bh *BinHeap[T]) Len() uint64 {
return atomic.LoadUint64(&bh.len)
bh.cond.L.Lock()
defer bh.cond.L.Unlock()
return uint64(len(bh.items))
}

func (bh *BinHeap[T]) Insert(item T) {
bh.cond.L.Lock()

// check the binary heap len before insertion
if bh.Len() > bh.maxLen {
// unlock the mutex to proceed to get-max
bh.cond.L.Unlock()

// signal waiting goroutines
for bh.Len() > 0 {
// signal waiting goroutines
bh.cond.Signal()
}
// lock mutex to proceed inserting into the empty slice
bh.cond.L.Lock()
for uint64(len(bh.items)) >= bh.maxLen {
bh.cond.Wait()
}

bh.items = append(bh.items, item)

// add len to the slice
atomic.AddUint64(&bh.len, 1)

// fix binary heap up
bh.fixUp()

// add item
bh.exists[item.ID()] = struct{}{}

bh.cond.L.Unlock()

// signal the goroutine on wait
bh.cond.Signal()
bh.cond.Broadcast()
bh.cond.L.Unlock()
}

func (bh *BinHeap[T]) ExtractMin() T {
bh.cond.L.Lock()

// if len == 0, wait for the signal
for bh.Len() == 0 {
for len(bh.items) == 0 {
bh.cond.Wait()
}

bh.swap(0, bh.len-1)

item := (bh.items)[int(bh.len)-1] //nolint:gosec
bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec
bh.fixDown(0, int(bh.len-2)) //nolint:gosec
n := uint64(len(bh.items))
bh.swap(0, n-1)

// reduce len
atomic.AddUint64(&bh.len, ^uint64(0))
item := bh.items[n-1]
var zero T
bh.items[n-1] = zero
bh.items = bh.items[:n-1]
bh.fixDown(0, int(n)-2) //nolint:gosec

// remove item
delete(bh.exists, item.ID())

// signal blocked producers waiting for space
bh.cond.Broadcast()
bh.cond.L.Unlock()
return item
}
Loading
Loading