diff --git a/configs/config.go b/configs/config.go index 99e1269..cef29de 100644 --- a/configs/config.go +++ b/configs/config.go @@ -61,6 +61,7 @@ type Config struct { CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"` CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"` CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"` + CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"` StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` @@ -94,6 +95,9 @@ func LoadConfig(cfgFile string) error { panic(err) } + // Set default values for viper-managed configs + viper.SetDefault("rpc.blockReceipts.enabled", true) + if cfgFile != "" { viper.SetConfigFile(cfgFile) if err := viper.ReadInConfig(); err != nil { diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index 9380044..0575241 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -29,7 +29,7 @@ func pollLatest() error { // Update latest block number metric metrics.CommitterLatestBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(latestBlock.Uint64())) - if nextBlockNumber >= latestBlock.Uint64() { + if nextBlockNumber+config.Cfg.CommitterLagByBlocks >= latestBlock.Uint64() { time.Sleep(250 * time.Millisecond) continue } diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index 6910d9f..e8c8acf 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -71,7 +71,16 @@ func Initialize() (IRPCClient, error) { return nil, fmt.Errorf("RPC_URL environment variable is not set") } log.Debug().Msg("Initializing RPC") - rpcClient, dialErr := gethRpc.Dial(rpcUrl) + + ctx := context.Background() + + // Initialize RPC client with custom header to disable insight RPC tracking + rpcClient, dialErr := gethRpc.DialOptions( + ctx, + rpcUrl, + gethRpc.WithHeader("x-disable-insight-rpc", "true"), + ) + if dialErr != nil { return nil, dialErr } @@ -172,19 +181,18 @@ func (rpc *Client) checkGetBlockByNumberSupport() error { } func (rpc *Client) checkGetBlockReceiptsSupport() error { - if config.Cfg.RPC.BlockReceipts.Enabled { - var getBlockReceiptsResult interface{} - receiptsErr := rpc.RPCClient.Call(&getBlockReceiptsResult, "eth_getBlockReceipts", "latest") - if receiptsErr != nil { - log.Warn().Err(receiptsErr).Msg("eth_getBlockReceipts method not supported") - return fmt.Errorf("eth_getBlockReceipts method not supported: %v", receiptsErr) - } else { - rpc.supportsBlockReceipts = true - log.Debug().Msg("eth_getBlockReceipts method supported") - } - } else { + // Always probe to see if the method is supported + var getBlockReceiptsResult interface{} + receiptsErr := rpc.RPCClient.Call(&getBlockReceiptsResult, "eth_getBlockReceipts", "latest") + + if receiptsErr != nil { + // Method not supported by RPC endpoint rpc.supportsBlockReceipts = false - log.Debug().Msg("eth_getBlockReceipts method disabled") + log.Warn().Err(receiptsErr).Msg("eth_getBlockReceipts method not supported by RPC endpoint") + } else { + // Method supported and enabled + rpc.supportsBlockReceipts = true + log.Info().Msg("eth_getBlockReceipts method enabled") } return nil }