Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 45 additions & 41 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,50 +42,54 @@ type RPCConfig struct {
}

type Config struct {
RPC RPCConfig `mapstructure:"rpc"`
Log LogConfig `mapstructure:"log"`
ZeetProjectName string `env:"ZEET_PROJECT_NAME" envDefault:"insight-indexer"`
ZeetDeploymentId string `env:"ZEET_DEPLOYMENT_ID"`
ZeetClusterId string `env:"ZEET_CLUSTER_ID"`
CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"`
CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"`
CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"`
CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"`
CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"`
CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"`
CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"`
CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"`
CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"`
CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"`
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"`
RPC RPCConfig `mapstructure:"rpc"`
Log LogConfig `mapstructure:"log"`
ZeetProjectName string `env:"ZEET_PROJECT_NAME" envDefault:"insight-indexer"`
ZeetDeploymentId string `env:"ZEET_DEPLOYMENT_ID"`
ZeetClusterId string `env:"ZEET_CLUSTER_ID"`
CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"`
CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"`
CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"`
CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"`
CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"`
CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"`
CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"`
CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"`
CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"`
CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"`
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"`
// PollerLag subtracts this many blocks from the RPC head only when COMMITTER_IS_LIVE
// is true, so the highest published block stays roughly this far behind the true tip.
// Ignored for non-live catch-up (effective head stays rpcLatest; CommitterLagByBlocks unchanged).
PollerLag uint64 `env:"POLLER_LAG" envDefault:"0"`
// CommitterStartBlock, when set (>0), forces the committer to start publishing
// from this block number regardless of what ClickHouse says is already committed.
// This can cause duplicate publishing if ClickHouse already contains higher blocks.
CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"`
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"`
StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"`
BackfillStartBlock uint64 `env:"BACKFILL_START_BLOCK"`
BackfillEndBlock uint64 `env:"BACKFILL_END_BLOCK"`
RPCNumParallelCalls uint64 `env:"RPC_NUM_PARALLEL_CALLS" envDefault:"20"`
RPCBatchSize uint64 `env:"RPC_BATCH_SIZE" envDefault:"10"`
RPCBatchMaxMemoryUsageMB uint64 `env:"RPC_BATCH_MAX_MEMORY_USAGE_MB" envDefault:"32"`
RPCDisableBlockReceipts bool `env:"RPC_DISABLE_BLOCK_RECEIPTS" envDefault:"false"`
ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"`
InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"`
InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"`
RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"`
RedisUsername string `env:"REDIS_USERNAME"`
RedisPassword string `env:"REDIS_PASSWORD"`
RedisDB int `env:"REDIS_DB" envDefault:"0"`
ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"`
EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"`
CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"`
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"`
StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"`
BackfillStartBlock uint64 `env:"BACKFILL_START_BLOCK"`
BackfillEndBlock uint64 `env:"BACKFILL_END_BLOCK"`
RPCNumParallelCalls uint64 `env:"RPC_NUM_PARALLEL_CALLS" envDefault:"20"`
RPCBatchSize uint64 `env:"RPC_BATCH_SIZE" envDefault:"10"`
RPCBatchMaxMemoryUsageMB uint64 `env:"RPC_BATCH_MAX_MEMORY_USAGE_MB" envDefault:"32"`
RPCDisableBlockReceipts bool `env:"RPC_DISABLE_BLOCK_RECEIPTS" envDefault:"false"`
ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"`
InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"`
InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"`
RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"`
RedisUsername string `env:"REDIS_USERNAME"`
RedisPassword string `env:"REDIS_PASSWORD"`
RedisDB int `env:"REDIS_DB" envDefault:"0"`
ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"`
EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"`
}

var Cfg Config
Expand Down
23 changes: 17 additions & 6 deletions internal/committer/poollatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,26 @@ func pollLatest() error {
time.Sleep(250 * time.Millisecond)
continue
}
// Update latest block number metric
metrics.CommitterLatestBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(latestBlock.Uint64()))
rpcLatest := latestBlock.Uint64()
effectiveLatest := rpcLatest
if config.Cfg.CommitterIsLive {
if config.Cfg.PollerLag < rpcLatest {
effectiveLatest = rpcLatest - config.Cfg.PollerLag
} else {
effectiveLatest = 0
}
}

// Update latest block number metric (RPC head, not poller-lag-adjusted)
metrics.CommitterLatestBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(rpcLatest))

if nextBlockNumber+config.Cfg.CommitterLagByBlocks >= latestBlock.Uint64() {
if nextBlockNumber+config.Cfg.CommitterLagByBlocks >= effectiveLatest {
time.Sleep(250 * time.Millisecond)
continue
}

// will panic if any block is invalid
blockDataArray := libblockdata.GetValidBlockDataInBatch(latestBlock.Uint64(), nextBlockNumber)
blockDataArray := libblockdata.GetValidBlockDataInBatch(effectiveLatest, nextBlockNumber)

// Validate that all blocks are sequential and nothing is missing
expectedBlockNumber := nextBlockNumber
Expand Down Expand Up @@ -80,9 +90,10 @@ func pollLatest() error {
metrics.CommitterIsLive.WithLabelValues(indexerName, chainIdStr).Set(1)
}

if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !hasRightsized {
if !config.Cfg.CommitterIsLive && int64(effectiveLatest)-int64(nextBlockNumber) < 20 && !hasRightsized {
log.Debug().
Uint64("latest_block", latestBlock.Uint64()).
Uint64("rpc_latest_block", rpcLatest).
Uint64("effective_latest_block", effectiveLatest).
Uint64("next_commit_block", nextBlockNumber).
Msg("Latest block is close to next commit block. Resizing s3 committer")
libs.RightsizeS3Committer()
Expand Down
Loading