Skip to content

Commit 0095171

Browse files
Savidclaude
andcommitted
refactor: change batch collector to use blocking channel writes
Replace non-blocking channel writes with blocking behavior in BatchCollector. This ensures tasks wait for available channel space rather than returning errors when the channel is full, providing better backpressure handling. - Remove ErrChannelFull error type - Update SubmitBatch to accept context and block on channel writes - Update tests to verify blocking behavior with timeouts - Simplify error handling by removing channel full fallback logic 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent e3c1cbe commit 0095171

3 files changed

Lines changed: 20 additions & 20 deletions

File tree

pkg/processor/transaction/structlog/batch_collector.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package structlog
22

33
import (
44
"context"
5-
"fmt"
65
"runtime"
76
"sync"
87
"time"
@@ -76,23 +75,22 @@ func (bc *BatchCollector) Stop(ctx context.Context) error {
7675
}
7776

7877
// SubmitBatch submits a task batch for processing
79-
func (bc *BatchCollector) SubmitBatch(taskBatch TaskBatch) error {
78+
func (bc *BatchCollector) SubmitBatch(ctx context.Context, taskBatch TaskBatch) error {
8079
// Check shutdown first
8180
select {
8281
case <-bc.shutdown:
8382
return context.Canceled
8483
default:
8584
}
8685

87-
// Then try to submit
86+
// Block and wait for space in the channel
8887
select {
8988
case bc.taskChannel <- taskBatch:
9089
return nil
9190
case <-bc.shutdown:
9291
return context.Canceled
93-
default:
94-
// Channel is full, caller should fallback to direct insert
95-
return ErrChannelFull
92+
case <-ctx.Done():
93+
return ctx.Err()
9694
}
9795
}
9896

@@ -344,6 +342,3 @@ func (bc *BatchCollector) flushRemaining() {
344342
bc.flushBatch(ctx)
345343
}
346344
}
347-
348-
// Custom error for when channel is full
349-
var ErrChannelFull = fmt.Errorf("batch collector channel is full")

pkg/processor/transaction/structlog/batch_collector_test.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ func TestBatchCollector_ProcessLargeTask(t *testing.T) {
131131
assert.True(t, len(largeTask.Rows) > bc.maxBatchSize)
132132
}
133133

134-
func TestBatchCollector_ChannelFull(t *testing.T) {
135-
// Test behavior when channel is full
134+
func TestBatchCollector_ChannelBlocking(t *testing.T) {
135+
// Test blocking behavior when channel is full
136136
config := BatchConfig{
137137
Enabled: true,
138138
MaxRows: 1000,
@@ -155,18 +155,23 @@ func TestBatchCollector_ChannelFull(t *testing.T) {
155155
ResponseChan: make(chan error, 1),
156156
TaskID: "test-" + string(rune(i)),
157157
}
158-
err := bc.SubmitBatch(task)
158+
err := bc.SubmitBatch(context.Background(), task)
159159
assert.NoError(t, err)
160160
}
161161

162-
// Next submit should fail with channel full
162+
// Next submit should block, test with timeout
163163
task := TaskBatch{
164164
Rows: createTestStructlogs(100),
165165
ResponseChan: make(chan error, 1),
166-
TaskID: "test-full",
166+
TaskID: "test-blocking",
167167
}
168-
err := bc.SubmitBatch(task)
169-
assert.Equal(t, ErrChannelFull, err)
168+
169+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
170+
defer cancel()
171+
172+
err := bc.SubmitBatch(ctx, task)
173+
assert.Error(t, err)
174+
assert.Equal(t, context.DeadlineExceeded, err)
170175
}
171176

172177
func TestBatchCollector_StartStop(t *testing.T) {
@@ -196,14 +201,14 @@ func TestBatchCollector_StartStop(t *testing.T) {
196201
}
197202

198203
// Can submit before shutdown
199-
err := bc.SubmitBatch(task)
204+
err := bc.SubmitBatch(context.Background(), task)
200205
assert.NoError(t, err)
201206

202207
// Close shutdown channel
203208
close(bc.shutdown)
204209

205210
// Should not accept new tasks after shutdown
206-
err = bc.SubmitBatch(task)
211+
err = bc.SubmitBatch(context.Background(), task)
207212
assert.Equal(t, context.Canceled, err)
208213
}
209214

@@ -352,7 +357,7 @@ func TestBatchCollector_ConcurrentAccess(t *testing.T) {
352357
TaskID: "concurrent-" + string(rune(id)),
353358
}
354359

355-
if err := bc.SubmitBatch(task); err != nil && err != ErrChannelFull {
360+
if err := bc.SubmitBatch(context.Background(), task); err != nil {
356361
errors <- err
357362
}
358363
done <- true

pkg/processor/transaction/structlog/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func (p *Processor) sendToBatchCollector(ctx context.Context, structlogs []Struc
250250
}()
251251

252252
// Submit to batch collector
253-
if err := p.batchCollector.SubmitBatch(taskBatch); err != nil {
253+
if err := p.batchCollector.SubmitBatch(ctx, taskBatch); err != nil {
254254
// Let Asynq handle retries - don't fallback to direct insert
255255
p.log.WithFields(logrus.Fields{
256256
"task_id": taskBatch.TaskID,

0 commit comments

Comments
 (0)