diff --git a/examples/audiotrack/main.go b/examples/audiotrack/main.go index b0994c7c..ef0e5485 100644 --- a/examples/audiotrack/main.go +++ b/examples/audiotrack/main.go @@ -126,8 +126,6 @@ func handlePublish(room *lksdk.Room) { if err != nil { logger.Errorw("error writing sample", err) } - // temp: some delay before writing next sample - time.Sleep(15 * time.Millisecond) } } } diff --git a/pkg/lockless_circular_buffer/circular_buffer.go b/pkg/lockless_circular_buffer/circular_buffer.go new file mode 100644 index 00000000..460e10a4 --- /dev/null +++ b/pkg/lockless_circular_buffer/circular_buffer.go @@ -0,0 +1,331 @@ +package lockless_circular_buffer + +import ( + "runtime" + "time" + "unsafe" + + "go.uber.org/atomic" +) + +type CircularBuffer[T any] struct { + buffer []T + head *atomic.Uint32 + tail *atomic.Uint32 + mask uint32 + size uint32 + + // todo(anunaym14): arch-aware padding to avoid false sharing + // _padding [x]byte +} + +func NewCircularBuffer[T any](capacity uint32) *CircularBuffer[T] { + // Ensure capacity is a power of 2 and at least 1 + // todo(anunaym14): cleanup + if capacity == 0 { + capacity = 1 + } else if (capacity & (capacity - 1)) != 0 { + capacity-- + capacity |= capacity >> 1 + capacity |= capacity >> 2 + capacity |= capacity >> 4 + capacity |= capacity >> 8 + capacity |= capacity >> 16 + capacity++ + } + + return &CircularBuffer[T]{ + buffer: make([]T, capacity), + head: atomic.NewUint32(0), + tail: atomic.NewUint32(0), + mask: capacity - 1, + size: capacity, + } +} + +func (cb *CircularBuffer[T]) Push(item T) { + backoffCounter := 0 + backoffMax := 32 + + for { + tail := cb.tail.Load() + head := cb.head.Load() + + nextTail := (tail + 1) & cb.mask + if nextTail == head { + if backoffCounter < backoffMax { + backoffCounter++ + continue + } + runtime.Gosched() + continue + } + + if cb.tail.CompareAndSwap(tail, nextTail) { + cb.buffer[tail] = item + return + } + + if backoffCounter > 0 { + backoffCounter-- + } + } +} + +func (cb *CircularBuffer[T]) TryPush(item T) bool { + const maxAttempts = 5 + + for i := 0; i < maxAttempts; i++ { + tail := cb.tail.Load() + head := cb.head.Load() + + nextTail := (tail + 1) & cb.mask + if nextTail == head { + return false + } + + if cb.tail.CompareAndSwap(tail, nextTail) { + cb.buffer[tail] = item + return true + } + + if i > 1 { + runtime.Gosched() + } + } + return false +} + +func (cb *CircularBuffer[T]) PushTimeout(item T, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + tail := cb.tail.Load() + head := cb.head.Load() + + nextTail := (tail + 1) & cb.mask + if nextTail == head { + runtime.Gosched() + continue + } + + if cb.tail.CompareAndSwap(tail, nextTail) { + cb.buffer[tail] = item + return true + } + } + + return false +} + +func (cb *CircularBuffer[T]) Pop() (T, bool) { + var zero T + const maxAttempts = 5 + + for i := 0; i < maxAttempts; i++ { + head := cb.head.Load() + tail := cb.tail.Load() + + if head == tail { + return zero, false + } + + nextHead := (head + 1) & cb.mask + if cb.head.CompareAndSwap(head, nextHead) { + item := cb.buffer[head] + return item, true + } + + if i > 1 { + runtime.Gosched() + } + } + return zero, false +} + +func (cb *CircularBuffer[T]) PushBatch(items []T) int { + if len(items) == 0 { + return 0 + } + + head := cb.head.Load() + tail := cb.tail.Load() + + var availableSpace uint32 + if head <= tail { + availableSpace = cb.size - (tail - head) - 1 + } else { + availableSpace = head - tail - 1 + } + + batchSize := uint32(len(items)) + if batchSize > availableSpace { + batchSize = availableSpace + } + + if batchSize == 0 { + return 0 + } + + pushed := uint32(0) + for pushed < batchSize { + tail = cb.tail.Load() + head = cb.head.Load() + + if head <= tail { + availableSpace = cb.size - (tail - head) - 1 + } else { + availableSpace = head - tail - 1 + } + + if availableSpace == 0 { + break + } + + currentBatchSize := batchSize - pushed + if currentBatchSize > availableSpace { + currentBatchSize = availableSpace + } + + newTail := (tail + currentBatchSize) & cb.mask + + if cb.tail.CompareAndSwap(tail, newTail) { + for i := uint32(0); i < currentBatchSize; i++ { + slotIndex := (tail + i) & cb.mask + cb.buffer[slotIndex] = items[pushed+i] + } + pushed += currentBatchSize + } + } + + return int(pushed) +} + +func (cb *CircularBuffer[T]) PushBatchBlocking(items []T) { + remaining := items + for len(remaining) > 0 { + pushed := cb.PushBatch(remaining) + if pushed == 0 { + runtime.Gosched() + continue + } + remaining = remaining[pushed:] + } +} + +func (cb *CircularBuffer[T]) PopBatch(maxItems int) (int, []T) { + if maxItems <= 0 { + return 0, nil + } + + head := cb.head.Load() + tail := cb.tail.Load() + + var availableItems uint32 + if tail >= head { + availableItems = tail - head + } else { + availableItems = cb.size - (head - tail) + } + + batchSize := uint32(maxItems) + if batchSize > availableItems { + batchSize = availableItems + } + + if batchSize == 0 { + return 0, nil + } + + result := make([]T, 0, batchSize) + popped := uint32(0) + + for popped < batchSize { + head = cb.head.Load() + tail = cb.tail.Load() + + if tail >= head { + availableItems = tail - head + } else { + availableItems = cb.size - (head - tail) + } + + if availableItems == 0 { + break + } + + currentBatchSize := batchSize - popped + if currentBatchSize > availableItems { + currentBatchSize = availableItems + } + + newHead := (head + currentBatchSize) & cb.mask + + if cb.head.CompareAndSwap(head, newHead) { + for i := uint32(0); i < currentBatchSize; i++ { + slotIndex := (head + i) & cb.mask + result = append(result, cb.buffer[slotIndex]) + } + popped += currentBatchSize + } + } + + return int(popped), result +} + +//go:noinline +func prefetch(addr unsafe.Pointer) { + _ = addr +} + +func (cb *CircularBuffer[T]) AddPreloaded(items []T) int { + if len(items) == 0 { + return 0 + } + + prefetch(unsafe.Pointer(cb.head)) + prefetch(unsafe.Pointer(cb.tail)) + + return cb.PushBatch(items) +} + +func (cb *CircularBuffer[T]) Size() uint32 { + head := cb.head.Load() + tail := cb.tail.Load() + + if head == tail { + return 0 + } + + if tail >= head { + return tail - head + } + + return cb.size - (head - tail) +} + +func (cb *CircularBuffer[T]) Capacity() uint32 { + return cb.size +} + +func (cb *CircularBuffer[T]) IsEmpty() bool { + return cb.head.Load() == cb.tail.Load() +} + +func (cb *CircularBuffer[T]) IsFull() bool { + head := cb.head.Load() + tail := cb.tail.Load() + + return ((tail + 1) & cb.mask) == head +} + +func (cb *CircularBuffer[T]) Clear() { + tail := cb.tail.Load() + for { + head := cb.head.Load() + if head == tail || cb.head.CompareAndSwap(head, tail) { + break + } + runtime.Gosched() + } +} diff --git a/pkg/lockless_circular_buffer/circular_buffer_test.go b/pkg/lockless_circular_buffer/circular_buffer_test.go new file mode 100644 index 00000000..b2a05c79 --- /dev/null +++ b/pkg/lockless_circular_buffer/circular_buffer_test.go @@ -0,0 +1,449 @@ +package lockless_circular_buffer + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCircularBufferBasicOperations(t *testing.T) { + t.Run("empty buffer", func(t *testing.T) { + cb := NewCircularBuffer[int](16) + require.True(t, cb.IsEmpty()) + require.False(t, cb.IsFull()) + require.Equal(t, uint32(0), cb.Size()) + _, ok := cb.Pop() + require.False(t, ok) + }) + + t.Run("push and pop", func(t *testing.T) { + cb := NewCircularBuffer[string](16) + cb.Push("test") + require.False(t, cb.IsEmpty()) + require.Equal(t, uint32(1), cb.Size()) + item, ok := cb.Pop() + require.True(t, ok) + require.Equal(t, "test", item) + require.True(t, cb.IsEmpty()) + require.Equal(t, uint32(0), cb.Size()) + }) + + t.Run("custom type", func(t *testing.T) { + type customType struct { + ID int + Name string + } + + cb := NewCircularBuffer[customType](16) + item := customType{ID: 1, Name: "test"} + cb.Push(item) + popped, ok := cb.Pop() + require.True(t, ok) + require.Equal(t, item, popped) + }) + + t.Run("full buffer", func(t *testing.T) { + cb := NewCircularBuffer[int](4) + + for i := 0; i < 3; i++ { + cb.Push(i) + } + + require.False(t, cb.IsEmpty()) + require.True(t, cb.IsFull()) + + success := cb.TryPush(4) + require.False(t, success) + + item, ok := cb.Pop() + require.True(t, ok) + require.Equal(t, 0, item) + require.False(t, cb.IsFull()) + + success = cb.TryPush(4) + require.True(t, success) + }) + + t.Run("push timeout", func(t *testing.T) { + cb := NewCircularBuffer[int](4) + + for i := 0; i < 3; i++ { + cb.Push(i) + } + + require.True(t, cb.IsFull()) + + success := cb.PushTimeout(4, 10*time.Millisecond) + require.False(t, success) + + _, _ = cb.Pop() + + success = cb.PushTimeout(4, 10*time.Millisecond) + require.True(t, success) + }) +} + +func TestCircularBufferBatchOperations(t *testing.T) { + t.Run("push batch non-blocking", func(t *testing.T) { + cb := NewCircularBuffer[int](8) + items := []int{1, 2, 3, 4, 5} + + pushed := cb.PushBatch(items) + require.Equal(t, 5, pushed) + + pushed = cb.PushBatch([]int{6, 7, 8}) + require.Equal(t, 2, pushed) + + require.True(t, cb.IsFull()) + + pushed = cb.PushBatch([]int{9, 10}) + require.Equal(t, 0, pushed) + + for i := 1; i <= 7; i++ { + item, ok := cb.Pop() + require.True(t, ok) + require.Equal(t, i, item) + } + + require.True(t, cb.IsEmpty()) + }) + + t.Run("push batch blocking", func(t *testing.T) { + cb := NewCircularBuffer[int](4) + items := []int{1, 2, 3} + + cb.PushBatchBlocking(items) + + require.Equal(t, uint32(3), cb.Size()) + + for i := 1; i <= 3; i++ { + item, ok := cb.Pop() + require.True(t, ok) + require.Equal(t, i, item) + } + }) + + t.Run("pop batch", func(t *testing.T) { + cb := NewCircularBuffer[int](16) + + for i := 1; i <= 10; i++ { + cb.Push(i) + } + + count, items := cb.PopBatch(5) + require.Equal(t, 5, count) + require.Equal(t, 5, len(items)) + for i := 0; i < 5; i++ { + require.Equal(t, i+1, items[i]) + } + + count, items = cb.PopBatch(10) + require.Equal(t, 5, count) + require.Equal(t, 5, len(items)) + for i := 0; i < 5; i++ { + require.Equal(t, i+6, items[i]) + } + + require.True(t, cb.IsEmpty()) + + count, items = cb.PopBatch(5) + require.Equal(t, 0, count) + require.Equal(t, 0, len(items)) + }) +} + +func TestCircularBufferConcurrentOperations(t *testing.T) { + t.Run("concurrent push and pop", func(t *testing.T) { + cb := NewCircularBuffer[int](1024) + const itemCount = 1000 + var wg sync.WaitGroup + + for i := 0; i < 4; i++ { + wg.Add(1) + go func(offset int) { + defer wg.Done() + for j := 0; j < itemCount/4; j++ { + value := offset*itemCount/4 + j + cb.Push(value) + } + }(i) + } + + results := make(map[int]struct{}) + var mu sync.Mutex + + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + count, items := cb.PopBatch(10) + if count > 0 { + mu.Lock() + for _, item := range items { + results[item] = struct{}{} + } + mu.Unlock() + } else { + if len(results) >= itemCount { + break + } + time.Sleep(1 * time.Millisecond) + } + } + }() + } + + wg.Wait() + + require.Equal(t, itemCount, len(results)) + for i := 0; i < itemCount; i++ { + _, ok := results[i] + require.True(t, ok, "Item %d was not found in results", i) + } + }) + + t.Run("concurrent batch operations", func(t *testing.T) { + cb := NewCircularBuffer[int](1024) + const batchCount = 20 + const batchSize = 50 + var wg sync.WaitGroup + + for i := 0; i < 4; i++ { + wg.Add(1) + go func(offset int) { + defer wg.Done() + for j := 0; j < batchCount/4; j++ { + batch := make([]int, batchSize) + for k := 0; k < batchSize; k++ { + batch[k] = offset*(batchCount/4)*batchSize + j*batchSize + k + } + cb.PushBatchBlocking(batch) + } + }(i) + } + + results := make(map[int]struct{}) + var mu sync.Mutex + totalItems := batchCount * batchSize + + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + count, items := cb.PopBatch(20) + if count > 0 { + mu.Lock() + for _, item := range items { + results[item] = struct{}{} + } + mu.Unlock() + } else { + mu.Lock() + resultCount := len(results) + mu.Unlock() + if resultCount >= totalItems { + break + } + time.Sleep(1 * time.Millisecond) + } + } + }() + } + + wg.Wait() + + require.Equal(t, totalItems, len(results)) + for i := 0; i < totalItems; i++ { + _, ok := results[i] + require.True(t, ok, "Item %d was not found in results", i) + } + }) +} + +func BenchmarkCircularBuffer(b *testing.B) { + b.Run("single push-pop", func(b *testing.B) { + cb := NewCircularBuffer[int](1024) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + cb.Push(i) + _, _ = cb.Pop() + } + }) + + b.Run("batch push-pop-small", func(b *testing.B) { + cb := NewCircularBuffer[int](1024) + batchSize := 10 + items := make([]int, batchSize) + for i := 0; i < batchSize; i++ { + items[i] = i + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + pushed := cb.PushBatch(items) + _, popped := cb.PopBatch(pushed) + if len(popped) != pushed { + b.Fatalf("pushed %d items but popped %d", pushed, len(popped)) + } + } + }) + + b.Run("batch push-pop-large", func(b *testing.B) { + cb := NewCircularBuffer[int](1024) + batchSize := 100 + items := make([]int, batchSize) + for i := 0; i < batchSize; i++ { + items[i] = i + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + pushed := cb.PushBatch(items) + _, popped := cb.PopBatch(pushed) + if len(popped) != pushed { + b.Fatalf("pushed %d items but popped %d", pushed, len(popped)) + } + } + }) + + b.Run("concurrent-small", func(b *testing.B) { + cb := NewCircularBuffer[int](1024) + const numGoroutines = 4 + const itemsPerGoroutine = 100 + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + + for !cb.IsEmpty() { + cb.Pop() + } + + var wg sync.WaitGroup + + for j := 0; j < numGoroutines; j++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for k := 0; k < itemsPerGoroutine; k++ { + cb.Push(id*itemsPerGoroutine + k) + } + }(j) + } + + for j := 0; j < numGoroutines; j++ { + wg.Add(1) + go func() { + defer wg.Done() + remainingItems := itemsPerGoroutine + for remainingItems > 0 { + if item, ok := cb.Pop(); ok { + _ = item + remainingItems-- + } + } + }() + } + + b.StartTimer() + wg.Wait() + } + }) + + b.Run("concurrent-batch", func(b *testing.B) { + cb := NewCircularBuffer[int](1024) + const numGoroutines = 4 + const batchesPerGoroutine = 20 + const batchSize = 10 + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + + for !cb.IsEmpty() { + cb.Pop() + } + + var wg sync.WaitGroup + + items := make([]int, batchSize) + for j := 0; j < batchSize; j++ { + items[j] = j + } + + for j := 0; j < numGoroutines; j++ { + wg.Add(1) + go func() { + defer wg.Done() + for k := 0; k < batchesPerGoroutine; k++ { + cb.PushBatch(items) + } + }() + } + + for j := 0; j < numGoroutines; j++ { + wg.Add(1) + go func() { + defer wg.Done() + remainingBatches := batchesPerGoroutine + for remainingBatches > 0 { + count, _ := cb.PopBatch(batchSize) + if count > 0 { + remainingBatches-- + } + } + }() + } + + b.StartTimer() + wg.Wait() + } + }) + + b.Run("pop-multi", func(b *testing.B) { + cb := NewCircularBuffer[int](1024) + batchSize := 10 + + for i := 0; i < 1000; i++ { + cb.Push(i) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + count, _ := cb.PopBatch(batchSize) + if count == 0 { + b.StopTimer() + for j := 0; j < 1000; j++ { + cb.Push(j) + } + b.StartTimer() + _, _ = cb.PopBatch(batchSize) + } + } + }) + + b.Run("preloaded-push", func(b *testing.B) { + cb := NewCircularBuffer[int](1024) + batchSize := 100 + items := make([]int, batchSize) + for i := 0; i < batchSize; i++ { + items[i] = i + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + pushed := cb.AddPreloaded(items) + _, popped := cb.PopBatch(pushed) + if len(popped) != pushed { + b.Fatalf("pushed %d items but popped %d", pushed, len(popped)) + } + } + }) +} diff --git a/pkg/media/audiotrack.go b/pkg/media/audiotrack.go index e82eda1d..69033d44 100644 --- a/pkg/media/audiotrack.go +++ b/pkg/media/audiotrack.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/gammazero/deque" "github.com/google/uuid" "github.com/pion/webrtc/v4" "go.uber.org/atomic" @@ -16,12 +15,15 @@ import ( "github.com/livekit/media-sdk/opus" "github.com/livekit/media-sdk/rtp" protoLogger "github.com/livekit/protocol/logger" + buffer "github.com/livekit/server-sdk-go/v2/pkg/lockless_circular_buffer" ) const ( DefaultOpusSampleRate = 48000 DefaultOpusSampleDuration = 20 * time.Millisecond defaultPCMSampleDuration = 10000 * time.Microsecond + // todo(anunaym14): make this configurable + defaultBufferCapacity = 32768 ) type PCMLocalTrackParams struct { @@ -49,13 +51,10 @@ type PCMLocalTrack struct { chunksPerSample int writeSilenceOnNoData bool - // int16 to support a LE/BE PCM16 chunk that has a high byte and low byte - // TODO(anunaym14): switch out deque for a ring buffer - chunkBuffer *deque.Deque[int16] - - mu sync.Mutex - cond *sync.Cond + chunkBuffer *buffer.CircularBuffer[int16] + writeBufMu sync.Mutex + writeBufCond *sync.Cond emptyBufMu sync.Mutex emptyBufCond *sync.Cond @@ -113,58 +112,48 @@ func NewPCMLocalTrack(sourceSampleRate int, sourceChannels int, logger protoLogg sourceSampleRate: sourceSampleRate, frameDuration: defaultPCMSampleDuration, sourceChannels: sourceChannels, - chunkBuffer: new(deque.Deque[int16]), + chunkBuffer: buffer.NewCircularBuffer[int16](defaultBufferCapacity), chunksPerSample: (sourceSampleRate * sourceChannels * int(defaultPCMSampleDuration/time.Nanosecond)) / 1e9, writeSilenceOnNoData: params.WriteSilenceOnNoData, } - t.cond = sync.NewCond(&t.mu) + t.writeBufCond = sync.NewCond(&t.writeBufMu) t.emptyBufCond = sync.NewCond(&t.emptyBufMu) go t.processSamples() return t, nil } -func (t *PCMLocalTrack) pushChunksToBuffer(sample media.PCM16Sample) { - for _, chunk := range sample { - t.chunkBuffer.PushBack(chunk) - } -} - func (t *PCMLocalTrack) waitUntilBufferHasChunks(count int) bool { - var didWait bool + t.writeBufMu.Lock() + defer t.writeBufMu.Unlock() - for t.chunkBuffer.Len() < count && !t.closed.Load() { + didWait := false + for int(t.chunkBuffer.Size()) < count && !t.closed.Load() { t.emptyBufMu.Lock() t.emptyBufCond.Broadcast() t.emptyBufMu.Unlock() - t.cond.Wait() + didWait = true + t.writeBufCond.Wait() } return didWait } func (t *PCMLocalTrack) getChunksFromBuffer() (media.PCM16Sample, bool) { - chunks := make(media.PCM16Sample, t.chunksPerSample) - var didWait = false + if !t.writeSilenceOnNoData { didWait = t.waitUntilBufferHasChunks(t.chunksPerSample) } - if t.closed.Load() && t.chunkBuffer.Len() == 0 { + if t.closed.Load() && t.chunkBuffer.IsEmpty() { return nil, false } - for i := 0; i < t.chunksPerSample; i++ { - if t.chunkBuffer.Len() == 0 { - // this will zero-init at index i, which will be a silent chunk. - // if writeSilenceOnNoData is false, this condition will never be true. - continue - } else { - chunks[i] = t.chunkBuffer.PopFront() - } - } + _, items := t.chunkBuffer.PopBatch(t.chunksPerSample) + chunks := make(media.PCM16Sample, t.chunksPerSample) + copy(chunks, items) return chunks, didWait } @@ -174,10 +163,13 @@ func (t *PCMLocalTrack) WriteSample(sample media.PCM16Sample) error { return errors.New("track is closed") } - t.mu.Lock() - t.pushChunksToBuffer(sample) - t.cond.Broadcast() - t.mu.Unlock() + t.chunkBuffer.PushBatchBlocking(sample) + + // Signal waiting goroutines that new samples are available + t.writeBufMu.Lock() + t.writeBufCond.Broadcast() + t.writeBufMu.Unlock() + return nil } @@ -186,11 +178,10 @@ func (t *PCMLocalTrack) processSamples() { defer ticker.Stop() for { - if t.closed.Load() && t.chunkBuffer.Len() == 0 { + if t.closed.Load() && int(t.chunkBuffer.Size()) > t.chunksPerSample { break } - t.mu.Lock() sample, didWait := t.getChunksFromBuffer() if sample != nil { // sample is only nil when the track is closed, so we don't need to @@ -200,34 +191,40 @@ func (t *PCMLocalTrack) processSamples() { ticker.Reset(t.frameDuration) } } - t.mu.Unlock() <-ticker.C } + + t.resampledPCMWriter.Close() + t.pcmWriter.Close() + t.opusWriter.Close() } func (t *PCMLocalTrack) WaitForPlayout() { t.emptyBufMu.Lock() defer t.emptyBufMu.Unlock() - for t.chunkBuffer.Len() > t.chunksPerSample { + for t.chunkBuffer.Size() > uint32(t.chunksPerSample) { t.emptyBufCond.Wait() } } func (t *PCMLocalTrack) ClearQueue() { - t.mu.Lock() - defer t.mu.Unlock() t.chunkBuffer.Clear() + + t.emptyBufMu.Lock() + t.emptyBufCond.Broadcast() + t.emptyBufMu.Unlock() } func (t *PCMLocalTrack) Close() { if t.closed.CompareAndSwap(false, true) { - t.mu.Lock() - defer t.mu.Unlock() - t.cond.Broadcast() - t.resampledPCMWriter.Close() - t.pcmWriter.Close() - t.opusWriter.Close() + t.writeBufMu.Lock() + t.writeBufCond.Broadcast() + t.writeBufMu.Unlock() + + t.emptyBufMu.Lock() + t.emptyBufCond.Broadcast() + t.emptyBufMu.Unlock() } }