Skip to content

Commit 5bc68cd

Browse files
authored
Add async insert settings to processors (#67)
- Add configurable asyncInsert and waitForAsyncInsert settings to each processor's ClickHouse writes to reduce part creation pressure - Use *bool pointers to distinguish omitted config (defaults to true) from explicit false values - Remove unused chunkSize and progressLogThreshold config options - Simplify structlog ProcessTransaction by removing OnInput streaming pattern in favor of simple batch insert - Remove DefaultChunkSize and DefaultProgressLogThreshold from tracker - Update tests to remove chunk-related assertions
1 parent f64e8cb commit 5bc68cd

File tree

10 files changed

+200
-255
lines changed

10 files changed

+200
-255
lines changed

example_config.yaml

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,21 @@ processors:
5050
addr: "localhost:9000"
5151
database: "default"
5252
table: "canonical_execution_transaction_structlog"
53-
# debug: false # Enable debug logging for ClickHouse queries
54-
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
55-
# Channel-based batching configuration for memory-efficient processing
56-
# bigTransactionThreshold: 500000 # Transactions with more structlogs are considered "big" (default: 500000)
57-
# chunkSize: 10000 # Number of structlogs per batch (default: 10000)
58-
# channelBufferSize: 2 # Number of chunks to buffer in channel (default: 2)
59-
# progressLogThreshold: 100000 # Log progress every N structlogs for large transactions (default: 100000)
53+
# debug: false # Enable debug logging for ClickHouse queries
54+
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
55+
# asyncInsert: true # Enable async inserts to reduce part creation (default: true)
56+
# waitForAsyncInsert: true # Wait for async insert to complete for durability (default: true)
6057

61-
# Small transaction batching configuration
62-
# batchInsertThreshold: 50000 # Transactions with fewer structlogs than this will be batched (default: 50000)
63-
# batchFlushInterval: 5s # Maximum time to wait before flushing a batch (default: 5s)
64-
# batchMaxSize: 100000 # Maximum number of structlogs to accumulate in a batch (default: 100000)
58+
# Aggregated structlog processor (call frame level aggregation)
59+
transactionStructlogAgg:
60+
enabled: false
61+
addr: "localhost:9000"
62+
database: "default"
63+
table: "canonical_execution_transaction_structlog_agg"
64+
# debug: false # Enable debug logging for ClickHouse queries
65+
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
66+
# asyncInsert: true # Enable async inserts to reduce part creation (default: true)
67+
# waitForAsyncInsert: true # Wait for async insert to complete for durability (default: true)
6568

6669
# Simple transaction processor (lightweight - no debug traces)
6770
transactionSimple:
@@ -71,6 +74,8 @@ processors:
7174
table: "execution_transaction"
7275
# debug: false # Enable debug logging for ClickHouse queries
7376
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
77+
# asyncInsert: true # Enable async inserts to reduce part creation (default: true)
78+
# waitForAsyncInsert: true # Wait for async insert to complete for durability (default: true)
7479

7580
# Application settings
7681
shutdownTimeout: 6m

pkg/processor/tracker/processor.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,6 @@ const (
6565
// incomplete block and the current block before blocking new block processing.
6666
DefaultMaxPendingBlockRange = 2
6767

68-
// DefaultChunkSize is the default number of rows per ClickHouse insert batch.
69-
DefaultChunkSize = 10000
70-
71-
// DefaultProgressLogThreshold is the default threshold for logging progress
72-
// on large transactions (structlog processor).
73-
DefaultProgressLogThreshold = 100000
74-
7568
// DefaultClickHouseTimeout is the default timeout for ClickHouse operations.
7669
DefaultClickHouseTimeout = 30 * time.Second
7770

pkg/processor/transaction/simple/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ type Config struct {
1212
Enabled bool `yaml:"enabled"`
1313
Table string `yaml:"table"`
1414

15+
// Async insert settings for ClickHouse (pointers to distinguish omitted from explicit false)
16+
AsyncInsert *bool `yaml:"asyncInsert"` // Enable async inserts to reduce part creation. Default: true
17+
WaitForAsyncInsert *bool `yaml:"waitForAsyncInsert"` // Wait for async insert to complete. Default: true
18+
1519
// Block completion tracking
1620
MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2
1721
}

pkg/processor/transaction/simple/handlers.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,35 @@ func calculateEffectiveGasPrice(block execution.Block, tx execution.Transaction)
320320
return effectiveGasPrice
321321
}
322322

323+
// getInsertSettings returns ClickHouse settings for async inserts.
324+
func (p *Processor) getInsertSettings() []ch.Setting {
325+
// Default to true if not explicitly configured (nil pointer)
326+
asyncInsert := true
327+
if p.config.AsyncInsert != nil {
328+
asyncInsert = *p.config.AsyncInsert
329+
}
330+
331+
waitForAsync := true
332+
if p.config.WaitForAsyncInsert != nil {
333+
waitForAsync = *p.config.WaitForAsyncInsert
334+
}
335+
336+
asyncVal := "0"
337+
if asyncInsert {
338+
asyncVal = "1"
339+
}
340+
341+
waitVal := "0"
342+
if waitForAsync {
343+
waitVal = "1"
344+
}
345+
346+
return []ch.Setting{
347+
{Key: "async_insert", Value: asyncVal},
348+
{Key: "wait_for_async_insert", Value: waitVal},
349+
}
350+
}
351+
323352
// insertTransactions inserts transactions into ClickHouse using columnar protocol.
324353
func (p *Processor) insertTransactions(ctx context.Context, transactions []Transaction) error {
325354
if len(transactions) == 0 {
@@ -338,8 +367,9 @@ func (p *Processor) insertTransactions(ctx context.Context, transactions []Trans
338367
input := cols.Input()
339368

340369
if err := p.clickhouse.Do(insertCtx, ch.Query{
341-
Body: input.Into(p.config.Table),
342-
Input: input,
370+
Body: input.Into(p.config.Table),
371+
Input: input,
372+
Settings: p.getInsertSettings(),
343373
}); err != nil {
344374
common.ClickHouseInsertsRows.WithLabelValues(
345375
p.network.Name,

pkg/processor/transaction/structlog/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ type Config struct {
1212
Enabled bool `yaml:"enabled"`
1313
Table string `yaml:"table"`
1414

15-
// Streaming settings
16-
ChunkSize int `yaml:"chunkSize"` // Default: 10,000 rows per OnInput iteration
17-
ProgressLogThreshold int `yaml:"progressLogThreshold"` // Default: 100,000 - log progress for large txs
15+
// Async insert settings for ClickHouse (pointers to distinguish omitted from explicit false)
16+
AsyncInsert *bool `yaml:"asyncInsert"` // Enable async inserts to reduce part creation. Default: true
17+
WaitForAsyncInsert *bool `yaml:"waitForAsyncInsert"` // Wait for async insert to complete. Default: true
1818

1919
// Block completion tracking
2020
MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2

pkg/processor/transaction/structlog/processor.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,35 @@ func (p *Processor) getProcessBackwardsQueue() string {
196196
return tracker.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix)
197197
}
198198

199+
// getInsertSettings returns ClickHouse settings for async inserts.
200+
func (p *Processor) getInsertSettings() []ch.Setting {
201+
// Default to true if not explicitly configured (nil pointer)
202+
asyncInsert := true
203+
if p.config.AsyncInsert != nil {
204+
asyncInsert = *p.config.AsyncInsert
205+
}
206+
207+
waitForAsync := true
208+
if p.config.WaitForAsyncInsert != nil {
209+
waitForAsync = *p.config.WaitForAsyncInsert
210+
}
211+
212+
asyncVal := "0"
213+
if asyncInsert {
214+
asyncVal = "1"
215+
}
216+
217+
waitVal := "0"
218+
if waitForAsync {
219+
waitVal = "1"
220+
}
221+
222+
return []ch.Setting{
223+
{Key: "async_insert", Value: asyncVal},
224+
{Key: "wait_for_async_insert", Value: waitVal},
225+
}
226+
}
227+
199228
// insertStructlogs inserts structlogs into ClickHouse using columnar protocol.
200229
func (p *Processor) insertStructlogs(ctx context.Context, structlogs []Structlog) error {
201230
if len(structlogs) == 0 {
@@ -236,8 +265,9 @@ func (p *Processor) insertStructlogs(ctx context.Context, structlogs []Structlog
236265
input := cols.Input()
237266

238267
if err := p.clickhouse.Do(insertCtx, ch.Query{
239-
Body: input.Into(p.config.Table),
240-
Input: input,
268+
Body: input.Into(p.config.Table),
269+
Input: input,
270+
Settings: p.getInsertSettings(),
241271
}); err != nil {
242272
return fmt.Errorf("failed to insert structlogs: %w", err)
243273
}

pkg/processor/transaction/structlog/processor_test.go

Lines changed: 2 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ func TestProcessor_Creation(t *testing.T) {
2222
Config: clickhouse.Config{
2323
Addr: "localhost:9000",
2424
},
25-
ChunkSize: 10000,
2625
}
2726

2827
// Test config validation
@@ -44,7 +43,6 @@ func TestProcessor_ConfigValidation(t *testing.T) {
4443
Config: clickhouse.Config{
4544
Addr: "localhost:9000",
4645
},
47-
ChunkSize: 10000,
4846
},
4947
expectError: false,
5048
},
@@ -58,9 +56,8 @@ func TestProcessor_ConfigValidation(t *testing.T) {
5856
{
5957
name: "missing addr",
6058
config: transaction_structlog.Config{
61-
Enabled: true,
62-
Table: "test_table",
63-
ChunkSize: 10000,
59+
Enabled: true,
60+
Table: "test_table",
6461
},
6562
expectError: true,
6663
},
@@ -71,7 +68,6 @@ func TestProcessor_ConfigValidation(t *testing.T) {
7168
Config: clickhouse.Config{
7269
Addr: "localhost:9000",
7370
},
74-
ChunkSize: 10000,
7571
},
7672
expectError: true,
7773
},
@@ -104,7 +100,6 @@ func TestProcessor_ConcurrentConfigValidation(t *testing.T) {
104100
Config: clickhouse.Config{
105101
Addr: "localhost:9000",
106102
},
107-
ChunkSize: 10000,
108103
}
109104
results <- cfg.Validate()
110105
}()
@@ -252,33 +247,6 @@ func TestMemoryManagement(t *testing.T) {
252247

253248
assert.Equal(t, rowCount, cols.Rows(), "Should have correct row count")
254249

255-
// Test that chunking calculations work properly
256-
const chunkSize = 100
257-
258-
expectedChunks := (rowCount + chunkSize - 1) / chunkSize
259-
260-
// Verify chunking logic
261-
actualChunks := 0
262-
263-
for i := 0; i < rowCount; i += chunkSize {
264-
actualChunks++
265-
266-
end := i + chunkSize
267-
if end > rowCount {
268-
end = rowCount
269-
}
270-
271-
// Verify chunk size constraints
272-
chunkLen := end - i
273-
if chunkLen <= 0 || chunkLen > chunkSize {
274-
t.Errorf("Invalid chunk size: %d (expected 1-%d)", chunkLen, chunkSize)
275-
}
276-
}
277-
278-
if actualChunks != expectedChunks {
279-
t.Errorf("Expected %d chunks, got %d", expectedChunks, actualChunks)
280-
}
281-
282250
// Reset columns to free memory
283251
cols.Reset()
284252
assert.Equal(t, 0, cols.Rows(), "Reset should clear all rows")
@@ -302,86 +270,6 @@ func TestMemoryManagement(t *testing.T) {
302270
}
303271
}
304272

305-
func TestChunkProcessing(t *testing.T) {
306-
tests := []struct {
307-
name string
308-
inputSize int
309-
expectedChunks int
310-
chunkSize int
311-
}{
312-
{
313-
name: "small input",
314-
inputSize: 50,
315-
expectedChunks: 1,
316-
chunkSize: 100,
317-
},
318-
{
319-
name: "exact chunk size",
320-
inputSize: 100,
321-
expectedChunks: 1,
322-
chunkSize: 100,
323-
},
324-
{
325-
name: "multiple chunks",
326-
inputSize: 250,
327-
expectedChunks: 3,
328-
chunkSize: 100,
329-
},
330-
{
331-
name: "large input",
332-
inputSize: 1500,
333-
expectedChunks: 15,
334-
chunkSize: 100,
335-
},
336-
}
337-
338-
for _, tt := range tests {
339-
t.Run(tt.name, func(t *testing.T) {
340-
// Test chunking logic using Columns
341-
cols := transaction_structlog.NewColumns()
342-
now := time.Now()
343-
344-
// Fill columns with test data
345-
for i := 0; i < tt.inputSize; i++ {
346-
cols.Append(
347-
now, uint64(i), "0xtest", uint32(0), uint64(21000), false, nil,
348-
uint32(i), "PUSH1", uint64(20000), uint64(3), uint64(3), uint64(3), uint64(1),
349-
nil, nil, nil, nil, uint32(0), []uint32{}, "test",
350-
)
351-
}
352-
353-
assert.Equal(t, tt.inputSize, cols.Rows(), "Should have correct row count")
354-
355-
// Calculate expected chunks
356-
expectedChunks := (tt.inputSize + tt.chunkSize - 1) / tt.chunkSize
357-
358-
if expectedChunks != tt.expectedChunks {
359-
t.Errorf("Expected %d chunks for %d items, got %d", tt.expectedChunks, tt.inputSize, expectedChunks)
360-
}
361-
362-
// Test that the chunking logic would work correctly
363-
chunkCount := 0
364-
365-
for i := 0; i < tt.inputSize; i += tt.chunkSize {
366-
chunkCount++
367-
368-
end := i + tt.chunkSize
369-
if end > tt.inputSize {
370-
end = tt.inputSize
371-
}
372-
// Verify chunk boundaries
373-
if end <= i {
374-
t.Errorf("Invalid chunk boundaries: start=%d, end=%d", i, end)
375-
}
376-
}
377-
378-
if chunkCount != tt.expectedChunks {
379-
t.Errorf("Chunking produced %d chunks, expected %d", chunkCount, tt.expectedChunks)
380-
}
381-
})
382-
}
383-
}
384-
385273
func TestColumnsAppendAndReset(t *testing.T) {
386274
cols := transaction_structlog.NewColumns()
387275
now := time.Now()

0 commit comments

Comments
 (0)