diff --git a/configs/config.go b/configs/config.go index 2c05606..2987132 100644 --- a/configs/config.go +++ b/configs/config.go @@ -42,50 +42,54 @@ type RPCConfig struct { } type Config struct { - RPC RPCConfig `mapstructure:"rpc"` - Log LogConfig `mapstructure:"log"` - ZeetProjectName string `env:"ZEET_PROJECT_NAME" envDefault:"insight-indexer"` - ZeetDeploymentId string `env:"ZEET_DEPLOYMENT_ID"` - ZeetClusterId string `env:"ZEET_CLUSTER_ID"` - CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"` - CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"` - CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"` - CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"` - CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"` - CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"` - CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"` - CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"` - CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"` - CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"` - CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"` - CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"` - CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"` - CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"` - CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"` + RPC RPCConfig `mapstructure:"rpc"` + Log LogConfig `mapstructure:"log"` + ZeetProjectName string `env:"ZEET_PROJECT_NAME" envDefault:"insight-indexer"` + ZeetDeploymentId string `env:"ZEET_DEPLOYMENT_ID"` + ZeetClusterId string `env:"ZEET_CLUSTER_ID"` + CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"` + CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"` + CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"` + CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"` + CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"` + CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"` + CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"` + CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"` + CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"` + CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"` + CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"` + CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"` + CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"` + CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"` + CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"` + // PollerLag subtracts this many blocks from the RPC head only when COMMITTER_IS_LIVE + // is true, so the highest published block stays roughly this far behind the true tip. + // Ignored for non-live catch-up (effective head stays rpcLatest; CommitterLagByBlocks unchanged). + PollerLag uint64 `env:"POLLER_LAG" envDefault:"0"` // CommitterStartBlock, when set (>0), forces the committer to start publishing // from this block number regardless of what ClickHouse says is already committed. // This can cause duplicate publishing if ClickHouse already contains higher blocks. - CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"` - StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` - StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` - StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` - StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"` - StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"` - BackfillStartBlock uint64 `env:"BACKFILL_START_BLOCK"` - BackfillEndBlock uint64 `env:"BACKFILL_END_BLOCK"` - RPCNumParallelCalls uint64 `env:"RPC_NUM_PARALLEL_CALLS" envDefault:"20"` - RPCBatchSize uint64 `env:"RPC_BATCH_SIZE" envDefault:"10"` - RPCBatchMaxMemoryUsageMB uint64 `env:"RPC_BATCH_MAX_MEMORY_USAGE_MB" envDefault:"32"` - RPCDisableBlockReceipts bool `env:"RPC_DISABLE_BLOCK_RECEIPTS" envDefault:"false"` - ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"` - InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"` - InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"` - RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"` - RedisUsername string `env:"REDIS_USERNAME"` - RedisPassword string `env:"REDIS_PASSWORD"` - RedisDB int `env:"REDIS_DB" envDefault:"0"` - ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"` - EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"` + CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"` + StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` + StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` + StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` + StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"` + StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"` + BackfillStartBlock uint64 `env:"BACKFILL_START_BLOCK"` + BackfillEndBlock uint64 `env:"BACKFILL_END_BLOCK"` + RPCNumParallelCalls uint64 `env:"RPC_NUM_PARALLEL_CALLS" envDefault:"20"` + RPCBatchSize uint64 `env:"RPC_BATCH_SIZE" envDefault:"10"` + RPCBatchMaxMemoryUsageMB uint64 `env:"RPC_BATCH_MAX_MEMORY_USAGE_MB" envDefault:"32"` + RPCDisableBlockReceipts bool `env:"RPC_DISABLE_BLOCK_RECEIPTS" envDefault:"false"` + ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"` + InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"` + InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"` + RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"` + RedisUsername string `env:"REDIS_USERNAME"` + RedisPassword string `env:"REDIS_PASSWORD"` + RedisDB int `env:"REDIS_DB" envDefault:"0"` + ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"` + EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"` } var Cfg Config diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index 0575241..c0c42e0 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -26,16 +26,26 @@ func pollLatest() error { time.Sleep(250 * time.Millisecond) continue } - // Update latest block number metric - metrics.CommitterLatestBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(latestBlock.Uint64())) + rpcLatest := latestBlock.Uint64() + effectiveLatest := rpcLatest + if config.Cfg.CommitterIsLive { + if config.Cfg.PollerLag < rpcLatest { + effectiveLatest = rpcLatest - config.Cfg.PollerLag + } else { + effectiveLatest = 0 + } + } + + // Update latest block number metric (RPC head, not poller-lag-adjusted) + metrics.CommitterLatestBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(rpcLatest)) - if nextBlockNumber+config.Cfg.CommitterLagByBlocks >= latestBlock.Uint64() { + if nextBlockNumber+config.Cfg.CommitterLagByBlocks >= effectiveLatest { time.Sleep(250 * time.Millisecond) continue } // will panic if any block is invalid - blockDataArray := libblockdata.GetValidBlockDataInBatch(latestBlock.Uint64(), nextBlockNumber) + blockDataArray := libblockdata.GetValidBlockDataInBatch(effectiveLatest, nextBlockNumber) // Validate that all blocks are sequential and nothing is missing expectedBlockNumber := nextBlockNumber @@ -80,9 +90,10 @@ func pollLatest() error { metrics.CommitterIsLive.WithLabelValues(indexerName, chainIdStr).Set(1) } - if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !hasRightsized { + if !config.Cfg.CommitterIsLive && int64(effectiveLatest)-int64(nextBlockNumber) < 20 && !hasRightsized { log.Debug(). - Uint64("latest_block", latestBlock.Uint64()). + Uint64("rpc_latest_block", rpcLatest). + Uint64("effective_latest_block", effectiveLatest). Uint64("next_commit_block", nextBlockNumber). Msg("Latest block is close to next commit block. Resizing s3 committer") libs.RightsizeS3Committer()