diff --git a/crates/bridges/evm/erigon-bridge/proto/customized-erigon/blockdata.proto b/crates/bridges/evm/erigon-bridge/proto/customized-erigon/blockdata.proto index d2e5467..4cd7c68 100644 --- a/crates/bridges/evm/erigon-bridge/proto/customized-erigon/blockdata.proto +++ b/crates/bridges/evm/erigon-bridge/proto/customized-erigon/blockdata.proto @@ -30,6 +30,7 @@ message BlockRangeRequest { uint64 from_block = 1; // Starting block number (inclusive) uint64 to_block = 2; // Ending block number (inclusive, 0 = latest) uint32 batch_size = 3; // Records per batch (default: 1000) + bool enable_traces = 4; // Enable callTracer traces during ExecuteBlocks (default: false, 2-5x slower) } // A batch of RLP-encoded block headers @@ -79,11 +80,12 @@ message ReceiptBatch { bool is_last = 4; // True if this is the last batch } -// RLP-encoded receipt data (receipts contain logs) +// RLP-encoded receipt data with optional trace data message ReceiptData { uint64 block_number = 1; bytes block_hash = 2; // 32-byte block hash uint32 tx_index = 3; // Transaction index in block bytes rlp_receipt = 4; // RLP-encoded receipt (contains all logs for this tx) bytes tx_hash = 5; // 32-byte transaction hash + bytes trace_data = 6; // JSON-encoded trace (callTracer format), empty if tracing disabled } \ No newline at end of file diff --git a/crates/bridges/evm/erigon-bridge/src/blockdata_client.rs b/crates/bridges/evm/erigon-bridge/src/blockdata_client.rs index 536fffe..fb4b0b5 100644 --- a/crates/bridges/evm/erigon-bridge/src/blockdata_client.rs +++ b/crates/bridges/evm/erigon-bridge/src/blockdata_client.rs @@ -52,7 +52,7 @@ impl BlockDataClient { info!("Note: Custom Erigon must be running with BlockDataBackend service enabled"); let uri = if !endpoint.starts_with("http://") { - format!("http://{}", endpoint) + format!("http://{endpoint}") } else { endpoint.clone() }; @@ -88,6 +88,7 @@ impl BlockDataClient { from_block, to_block, batch_size: if batch_size > 0 { batch_size } else { 1000 }, + enable_traces: false, }; debug!( @@ -110,6 +111,7 @@ impl BlockDataClient { from_block, to_block, batch_size: if batch_size > 0 { batch_size } else { 1000 }, + enable_traces: false, }; debug!( @@ -128,16 +130,18 @@ impl BlockDataClient { from_block: u64, to_block: u64, batch_size: u32, + enable_traces: bool, ) -> Result, ErigonBridgeError> { let request = BlockRangeRequest { from_block, to_block, batch_size: if batch_size > 0 { batch_size } else { 1000 }, + enable_traces, }; debug!( - "Starting receipt stream via block execution (blocks {}-{}, batch_size: {})", - from_block, to_block, batch_size + "Starting receipt stream via block execution (blocks {}-{}, batch_size: {}, traces: {})", + from_block, to_block, batch_size, enable_traces ); let response = self.client.execute_blocks(request).await?; diff --git a/crates/bridges/evm/erigon-bridge/src/bridge.rs b/crates/bridges/evm/erigon-bridge/src/bridge.rs index 3f42bb9..9675554 100644 --- a/crates/bridges/evm/erigon-bridge/src/bridge.rs +++ b/crates/bridges/evm/erigon-bridge/src/bridge.rs @@ -160,8 +160,7 @@ impl ErigonFlightBridge { serde_json::from_str::(first).map_err( |e| { Box::new(TonicStatus::invalid_argument(format!( - "Invalid descriptor: {}", - e + "Invalid descriptor: {e}" ))) }, ) @@ -266,8 +265,7 @@ impl ErigonFlightBridge { Ok(handle) => handle, Err(e) => { let err_msg = format!( - "Failed to get client from pool for segment {}-{}: {}", - seg_start, seg_end, e + "Failed to get client from pool for segment {seg_start}-{seg_end}: {e}" ); error!("{}", err_msg); return futures::stream::once(async move { @@ -306,8 +304,7 @@ impl ErigonFlightBridge { return futures::stream::once(async move { Err(arrow_flight::error::FlightError::ExternalError(Box::new( std::io::Error::other(format!( - "Failed to access client: {}", - e + "Failed to access client: {e}" )), ))) }) @@ -344,8 +341,7 @@ impl ErigonFlightBridge { client_handle.mark_error(); arrow_flight::error::FlightError::ExternalError(Box::new( std::io::Error::other(format!( - "Segment {}-{} failed: {}", - seg_start, seg_end, e + "Segment {seg_start}-{seg_end} failed: {e}" )), )) }) @@ -398,8 +394,7 @@ impl ErigonFlightBridge { Ok(handle) => handle, Err(e) => { let err_msg = format!( - "Failed to get client from pool for segment {}-{}: {}", - seg_start, seg_end, e + "Failed to get client from pool for segment {seg_start}-{seg_end}: {e}" ); error!("{}", err_msg); return futures::stream::once(async move { @@ -438,8 +433,7 @@ impl ErigonFlightBridge { return futures::stream::once(async move { Err(arrow_flight::error::FlightError::ExternalError(Box::new( std::io::Error::other(format!( - "Failed to access client: {}", - e + "Failed to access client: {e}" )), ))) }) @@ -476,8 +470,7 @@ impl ErigonFlightBridge { client_handle.mark_error(); arrow_flight::error::FlightError::ExternalError(Box::new( std::io::Error::other(format!( - "Segment {}-{} failed: {}", - seg_start, seg_end, e + "Segment {seg_start}-{seg_end} failed: {e}" )), )) }) @@ -496,6 +489,7 @@ impl ErigonFlightBridge { start: u64, end: u64, validate: bool, + enable_traces: bool, ) -> Result< Pin< Box< @@ -521,13 +515,12 @@ impl ErigonFlightBridge { client .get_latest_block() .await - .map_err(|e| Status::internal(format!("Failed to query chain head: {}", e)))? + .map_err(|e| Status::internal(format!("Failed to query chain head: {e}")))? }; if end > chain_head { return Err(Status::invalid_argument(format!( - "Requested end block {} exceeds current chain head {}", - end, chain_head + "Requested end block {end} exceeds current chain head {chain_head}" ))); } @@ -552,9 +545,11 @@ impl ErigonFlightBridge { if stream_type == StreamType::Logs { let pool = self.blockdata_pool.clone(); + let mut config = self.segment_config.clone(); + config.enable_traces = enable_traces; let stream = self.process_logs_with_segments( pool, - self.segment_config.clone(), + config, self.validator.clone(), start, end, @@ -576,7 +571,7 @@ impl ErigonFlightBridge { Err(e) => { error!("Failed to get client from pool for blocks stream: {}", e); yield Err(arrow_flight::error::FlightError::ExternalError( - Box::new(std::io::Error::other(format!("Pool error: {}", e))) + Box::new(std::io::Error::other(format!("Pool error: {e}"))) )); return; } @@ -599,7 +594,7 @@ impl ErigonFlightBridge { error!("Failed to access client for blocks stream: {}", e); client_handle.mark_error(); yield Err(arrow_flight::error::FlightError::ExternalError( - Box::new(std::io::Error::other(format!("Client error: {}", e))) + Box::new(std::io::Error::other(format!("Client error: {e}"))) )); return; } @@ -726,7 +721,7 @@ impl FlightBridge for ErigonFlightBridge { let response = HandshakeResponse { protocol_version: 1, payload: serde_json::to_vec(&self.bridge_info().await) - .map_err(|e| Status::internal(format!("Failed to serialize bridge info: {}", e)))? + .map_err(|e| Status::internal(format!("Failed to serialize bridge info: {e}")))? .into(), }; @@ -822,7 +817,7 @@ impl FlightBridge for ErigonFlightBridge { .and_then(|s| { serde_json::from_str::(&s) .map_err(|e| { - Status::invalid_argument(format!("Invalid descriptor in ticket: {}", e)) + Status::invalid_argument(format!("Invalid descriptor in ticket: {e}")) }) })?; let stream_type = blockchain_desc.stream_type; @@ -845,7 +840,7 @@ impl FlightBridge for ErigonFlightBridge { let flight_stream = encoder.map(|result| { result.map_err(|e| { error!("Error encoding flight data: {}", e); - Status::internal(format!("Encoding error: {}", e)) + Status::internal(format!("Encoding error: {e}")) }) }); @@ -883,6 +878,7 @@ impl FlightBridge for ErigonFlightBridge { start, end, should_validate_ingestion, + blockchain_desc.enable_traces, ) .await?, ) @@ -934,7 +930,7 @@ impl FlightBridge for ErigonFlightBridge { Ok(m) => m, Err(e) => { error!("Failed to encode batch metadata: {}", e); - yield Err(Status::internal(format!("Metadata encoding error: {}", e))); + yield Err(Status::internal(format!("Metadata encoding error: {e}"))); continue; } }; @@ -959,7 +955,7 @@ impl FlightBridge for ErigonFlightBridge { } Err(e) => { error!("Error encoding batch to flight data: {}", e); - yield Err(Status::internal(format!("Batch encoding error: {}", e))); + yield Err(Status::internal(format!("Batch encoding error: {e}"))); } } } @@ -989,7 +985,7 @@ impl FlightBridge for ErigonFlightBridge { metrics_for_stream.error(error_type, data_type); - yield Err(Status::internal(format!("Stream error: {}", e))); + yield Err(Status::internal(format!("Stream error: {e}"))); } } } @@ -1011,7 +1007,7 @@ impl FlightBridge for ErigonFlightBridge { .next() .await .ok_or_else(|| Status::invalid_argument("Empty stream"))? - .map_err(|e| Status::internal(format!("Stream error: {}", e)))?; + .map_err(|e| Status::internal(format!("Stream error: {e}")))?; let stream_type = if let Some(desc) = first.flight_descriptor { Self::parse_descriptor(&desc) @@ -1034,7 +1030,7 @@ impl FlightBridge for ErigonFlightBridge { let flight_stream = encoder.map(|result| { result.map_err(|e| { error!("Error encoding flight data: {}", e); - Status::internal(format!("Encoding error: {}", e)) + Status::internal(format!("Encoding error: {e}")) }) }); @@ -1067,7 +1063,7 @@ impl FlightBridge for ErigonFlightBridge { let flight_stream = encoder.map(|result| { result.map_err(|e| { error!("Error encoding flight data: {}", e); - Status::internal(format!("Encoding error: {}", e)) + Status::internal(format!("Encoding error: {e}")) }) }); diff --git a/crates/bridges/evm/erigon-bridge/src/client.rs b/crates/bridges/evm/erigon-bridge/src/client.rs index 04a09b7..3351a32 100644 --- a/crates/bridges/evm/erigon-bridge/src/client.rs +++ b/crates/bridges/evm/erigon-bridge/src/client.rs @@ -70,7 +70,7 @@ impl ErigonClient { endpoint ); - Channel::from_shared(format!("http://{}", endpoint))? + Channel::from_shared(format!("http://{endpoint}"))? .connect() .await .map_err(|e| { diff --git a/crates/bridges/evm/erigon-bridge/src/error.rs b/crates/bridges/evm/erigon-bridge/src/error.rs index 6a3093e..26cb4e1 100644 --- a/crates/bridges/evm/erigon-bridge/src/error.rs +++ b/crates/bridges/evm/erigon-bridge/src/error.rs @@ -73,10 +73,10 @@ impl From for Status { ErigonBridgeError::ConversionError(msg) => Status::internal(msg), ErigonBridgeError::ValidationError(msg) => Status::failed_precondition(msg), ErigonBridgeError::ConnectionFailed(msg) => { - Status::unavailable(format!("Connection failed: {}", msg)) + Status::unavailable(format!("Connection failed: {msg}")) } ErigonBridgeError::StreamProtocol(stream_err) => { - Status::aborted(format!("Stream protocol violation: {}", stream_err)) + Status::aborted(format!("Stream protocol violation: {stream_err}")) } } } diff --git a/crates/bridges/evm/erigon-bridge/src/generated/custom.rs b/crates/bridges/evm/erigon-bridge/src/generated/custom.rs index b0e3959..5173cd4 100644 --- a/crates/bridges/evm/erigon-bridge/src/generated/custom.rs +++ b/crates/bridges/evm/erigon-bridge/src/generated/custom.rs @@ -312,6 +312,9 @@ pub struct BlockRangeRequest { /// Records per batch (default: 1000) #[prost(uint32, tag = "3")] pub batch_size: u32, + /// Enable callTracer traces during ExecuteBlocks (default: false, 2-5x slower) + #[prost(bool, tag = "4")] + pub enable_traces: bool, } /// A batch of RLP-encoded block headers #[allow(dead_code)] @@ -407,7 +410,7 @@ pub struct ReceiptBatch { #[prost(bool, tag = "4")] pub is_last: bool, } -/// RLP-encoded receipt data (receipts contain logs) +/// RLP-encoded receipt data with optional trace data #[allow(dead_code)] #[allow(clippy::enum_variant_names)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -426,6 +429,9 @@ pub struct ReceiptData { /// 32-byte transaction hash #[prost(bytes = "vec", tag = "5")] pub tx_hash: ::prost::alloc::vec::Vec, + /// JSON-encoded trace (callTracer format), empty if tracing disabled + #[prost(bytes = "vec", tag = "6")] + pub trace_data: ::prost::alloc::vec::Vec, } /// Generated client implementations. pub mod block_data_backend_client { diff --git a/crates/bridges/evm/erigon-bridge/src/main.rs b/crates/bridges/evm/erigon-bridge/src/main.rs index 8728d6d..688f77c 100644 --- a/crates/bridges/evm/erigon-bridge/src/main.rs +++ b/crates/bridges/evm/erigon-bridge/src/main.rs @@ -147,6 +147,7 @@ async fn main() -> Result<()> { max_concurrent_executions: args.max_concurrent_executions.unwrap_or_else(num_cpus::get), global_max_execute_blocks: args.global_max_execute_blocks, execute_blocks_semaphore: None, // Will be set by bridge + enable_traces: false, // Will be set per-request from Flight descriptor }; info!("Segment configuration:"); @@ -187,14 +188,14 @@ async fn main() -> Result<()> { Err(e) => ( axum::http::StatusCode::INTERNAL_SERVER_ERROR, [("content-type", "text/plain")], - format!("Error gathering metrics: {}", e), + format!("Error gathering metrics: {e}"), ), } } let app = Router::new().route("/metrics", get(metrics_handler)); - let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", metrics_port)) + let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{metrics_port}")) .await .unwrap(); info!( diff --git a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs index ed326ad..5adb7b2 100644 --- a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs +++ b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs @@ -48,6 +48,9 @@ pub struct SegmentConfig { /// Global semaphore for limiting ExecuteBlocks calls across all workers /// Not included in Default implementation - must be set by bridge pub execute_blocks_semaphore: Option>, + + /// Enable transaction traces (callTracer) during log/receipt execution (default: false) + pub enable_traces: bool, } impl std::fmt::Debug for SegmentConfig { @@ -63,6 +66,7 @@ impl std::fmt::Debug for SegmentConfig { "execute_blocks_semaphore", &self.execute_blocks_semaphore.is_some(), ) + .field("enable_traces", &self.enable_traces) .finish() } } @@ -77,6 +81,7 @@ impl Default for SegmentConfig { max_concurrent_executions: num_cpus::get(), global_max_execute_blocks: 64, execute_blocks_semaphore: None, + enable_traces: false, } } } @@ -545,6 +550,7 @@ impl SegmentWorker { &mut client, &metrics, semaphore, + self.config.enable_traces, ).await; (work_idx, start, end, result) @@ -799,8 +805,7 @@ impl SegmentWorker { block_num, segment_start, segment_end, e ); return Err(ErigonBridgeError::ValidationError(format!( - "Block {} validation failed: {}", - block_num, e + "Block {block_num} validation failed: {e}" ))); } } @@ -840,6 +845,7 @@ impl SegmentWorker { client: &mut BlockDataClient, metrics: &BridgeMetrics, execute_blocks_semaphore: Option>, + enable_traces: bool, ) -> Result, Header)>, ErigonBridgeError> { let call_start = std::time::Instant::now(); let segment_id = from_block / 500_000; @@ -885,7 +891,7 @@ impl SegmentWorker { let request_start = Instant::now(); let mut receipt_stream = client - .execute_blocks(from_block, to_block, 100) + .execute_blocks(from_block, to_block, 100, enable_traces) .await .map_err(|e| { // Cancel the monitor task since the call failed @@ -1041,8 +1047,7 @@ impl SegmentWorker { block_num, segment_start, segment_end, e ); return Err(ErigonBridgeError::ValidationError(format!( - "Block {} receipt validation failed: {}", - block_num, e + "Block {block_num} receipt validation failed: {e}" ))); } } @@ -1109,7 +1114,7 @@ impl SegmentWorker { .take(80) .collect::(); - format!("unknown:{}", pattern) + format!("unknown:{pattern}") } } diff --git a/crates/bridges/evm/erigon-bridge/src/trie_client.rs b/crates/bridges/evm/erigon-bridge/src/trie_client.rs index 14e7612..f3e41a7 100644 --- a/crates/bridges/evm/erigon-bridge/src/trie_client.rs +++ b/crates/bridges/evm/erigon-bridge/src/trie_client.rs @@ -42,7 +42,7 @@ impl TrieClient { info!("Note: Custom Erigon must be running with TrieBackend service enabled"); let uri = if !endpoint.starts_with("http://") { - format!("http://{}", endpoint) + format!("http://{endpoint}") } else { endpoint.clone() }; diff --git a/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs b/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs index 1709d83..84fb0d6 100644 --- a/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs +++ b/crates/bridges/evm/jsonrpc-bridge/src/bridge.rs @@ -109,14 +109,8 @@ impl JsonRpcFlightBridge { descriptor: &FlightDescriptor, ) -> std::result::Result> { if let Some(first) = descriptor.path.first() { - serde_json::from_str::(first).map_err( - |e| { - Box::new(Status::invalid_argument(format!( - "Invalid descriptor: {}", - e - ))) - }, - ) + serde_json::from_str::(first) + .map_err(|e| Box::new(Status::invalid_argument(format!("Invalid descriptor: {e}")))) } else { Err(Box::new(Status::invalid_argument("Empty descriptor path"))) } @@ -178,7 +172,7 @@ impl JsonRpcFlightBridge { } Err(e) => { error!("Failed to fetch block #{}: {}", block_num, e); - yield Err(Status::internal(format!("Failed to fetch block {}: {}", block_num, e))); + yield Err(Status::internal(format!("Failed to fetch block {block_num}: {e}"))); continue; } }; @@ -192,7 +186,7 @@ impl JsonRpcFlightBridge { }, Err(e) => { error!("Failed to convert block header #{}: {}", block_num, e); - yield Err(Status::internal(format!("Conversion error: {}", e))); + yield Err(Status::internal(format!("Conversion error: {e}"))); } } } @@ -216,14 +210,14 @@ impl JsonRpcFlightBridge { }, Err(e) => { error!("Transaction validation failed for block #{}: {}", block_num, e); - yield Err(Status::internal(format!("Validation error for block {}: {}", block_num, e))); + yield Err(Status::internal(format!("Validation error for block {block_num}: {e}"))); continue; } } }, Err(e) => { error!("Failed to extract transaction records for validation: {}", e); - yield Err(Status::internal(format!("Failed to extract records for validation: {}", e))); + yield Err(Status::internal(format!("Failed to extract records for validation: {e}"))); continue; } } @@ -242,7 +236,7 @@ impl JsonRpcFlightBridge { }, Err(e) => { error!("Failed to convert transactions for block #{}: {}", block_num, e); - yield Err(Status::internal(format!("Conversion error: {}", e))); + yield Err(Status::internal(format!("Conversion error: {e}"))); } } } @@ -258,7 +252,7 @@ impl JsonRpcFlightBridge { Ok(batch) => record_batches.push(batch), Err(e) => { error!("Failed to convert logs for block #{}: {}", block_num, e); - yield Err(Status::internal(format!("Conversion error: {}", e))); + yield Err(Status::internal(format!("Conversion error: {e}"))); } } } @@ -267,7 +261,7 @@ impl JsonRpcFlightBridge { } Err(e) => { error!("Failed to fetch logs for block #{}: {}", block_num, e); - yield Err(Status::internal(format!("Failed to fetch logs: {}", e))); + yield Err(Status::internal(format!("Failed to fetch logs: {e}"))); } } } @@ -289,7 +283,7 @@ impl JsonRpcFlightBridge { } Err(e) => { error!("Failed to concatenate batches: {}", e); - yield Err(Status::internal(format!("Failed to concatenate batches: {}", e))); + yield Err(Status::internal(format!("Failed to concatenate batches: {e}"))); } } } @@ -332,7 +326,7 @@ impl FlightBridge for JsonRpcFlightBridge { let response = HandshakeResponse { protocol_version: 1, payload: serde_json::to_vec(&self.bridge_info()) - .map_err(|e| Status::internal(format!("Failed to serialize bridge info: {}", e)))? + .map_err(|e| Status::internal(format!("Failed to serialize bridge info: {e}")))? .into(), }; @@ -424,7 +418,7 @@ impl FlightBridge for JsonRpcFlightBridge { .and_then(|s| { serde_json::from_str::(&s) .map_err(|e| { - Status::invalid_argument(format!("Invalid descriptor in ticket: {}", e)) + Status::invalid_argument(format!("Invalid descriptor in ticket: {e}")) }) })?; @@ -498,7 +492,7 @@ impl FlightBridge for JsonRpcFlightBridge { let flight_stream = encoder.map(|result| { result.map_err(|e| { error!("Error encoding flight data: {}", e); - Status::internal(format!("Encoding error: {}", e)) + Status::internal(format!("Encoding error: {e}")) }) }); @@ -520,7 +514,7 @@ impl FlightBridge for JsonRpcFlightBridge { .next() .await .ok_or_else(|| Status::invalid_argument("Empty stream"))? - .map_err(|e| Status::internal(format!("Stream error: {}", e)))?; + .map_err(|e| Status::internal(format!("Stream error: {e}")))?; let stream_type = if let Some(desc) = first.flight_descriptor { Self::parse_descriptor(&desc) @@ -563,7 +557,7 @@ impl FlightBridge for JsonRpcFlightBridge { let flight_stream = encoder.map(|result| { result.map_err(|e| { error!("Error encoding flight data: {}", e); - Status::internal(format!("Encoding error: {}", e)) + Status::internal(format!("Encoding error: {e}")) }) }); @@ -670,7 +664,7 @@ impl FlightService for JsonRpcFlightBridge { let schema_result = schema_as_ipc .try_into() .map_err(|e: arrow::error::ArrowError| { - Status::internal(format!("Failed to encode schema: {}", e)) + Status::internal(format!("Failed to encode schema: {e}")) })?; Ok(Response::new(schema_result)) diff --git a/crates/evm-index/examples/index_transactions.rs b/crates/evm-index/examples/index_transactions.rs index 7dcb86d..81870c9 100644 --- a/crates/evm-index/examples/index_transactions.rs +++ b/crates/evm-index/examples/index_transactions.rs @@ -190,9 +190,9 @@ fn main() -> Result<(), Box> { let tx_by_to_count = storage.get_all(CF_TX_BY_TO).len(); println!("\nIndex statistics:"); - println!(" tx_by_hash entries: {}", tx_by_hash_count); - println!(" tx_by_from entries: {}", tx_by_from_count); - println!(" tx_by_to entries: {}", tx_by_to_count); + println!(" tx_by_hash entries: {tx_by_hash_count}"); + println!(" tx_by_from entries: {tx_by_from_count}"); + println!(" tx_by_to entries: {tx_by_to_count}"); Ok(()) } diff --git a/crates/evm-index/examples/index_with_rocksdb.rs b/crates/evm-index/examples/index_with_rocksdb.rs index 41b3010..e6984d3 100644 --- a/crates/evm-index/examples/index_with_rocksdb.rs +++ b/crates/evm-index/examples/index_with_rocksdb.rs @@ -46,7 +46,7 @@ fn main() -> Result<()> { println!("Creating RocksDB storage with column families:"); for cf in &column_families { - println!(" - {}", cf); + println!(" - {cf}"); } println!(); @@ -88,11 +88,8 @@ fn main() -> Result<()> { rocksdb_path.display() ); println!("You can query them by reopening the database and using:"); - println!(" storage.get({:?}, <32-byte-hash>)", CF_TX_BY_HASH); - println!( - " storage.prefix_iterator({:?}, <20-byte-address>)", - CF_TX_BY_FROM - ); + println!(" storage.get({CF_TX_BY_HASH:?}, <32-byte-hash>)"); + println!(" storage.prefix_iterator({CF_TX_BY_FROM:?}, <20-byte-address>)"); Ok(()) } diff --git a/crates/parquet-index/src/reader.rs b/crates/parquet-index/src/reader.rs index 6bbcd0b..8d77b22 100644 --- a/crates/parquet-index/src/reader.rs +++ b/crates/parquet-index/src/reader.rs @@ -41,7 +41,7 @@ impl PageReader { let path = Path::from_filesystem_path( file_path .to_str() - .ok_or_else(|| ReaderError::InvalidPath(format!("{:?}", file_path)))?, + .ok_or_else(|| ReaderError::InvalidPath(format!("{file_path:?}")))?, ) .map_err(|e| ReaderError::InvalidPath(e.to_string()))?; let fs = LocalFs {}; diff --git a/crates/parquet-meta/src/main.rs b/crates/parquet-meta/src/main.rs index de2c294..ab59973 100644 --- a/crates/parquet-meta/src/main.rs +++ b/crates/parquet-meta/src/main.rs @@ -130,7 +130,7 @@ fn show_metadata(file_path: PathBuf, verbose: bool) -> Result<()> { println!(); } Err(e) => { - println!("Error parsing phaser metadata: {}", e); + println!("Error parsing phaser metadata: {e}"); println!(); } } @@ -189,10 +189,7 @@ fn show_metadata(file_path: PathBuf, verbose: bool) -> Result<()> { max_bytes[6], max_bytes[7], ]); - println!( - " Block range (from statistics): {}-{}", - min_block, max_block - ); + println!(" Block range (from statistics): {min_block}-{max_block}"); } } } @@ -293,10 +290,7 @@ fn set_metadata( anyhow::bail!("Cannot infer data_end from statistics"); }; - println!( - "Inferred data_start={}, data_end={} from statistics", - min_block, max_block - ); + println!("Inferred data_start={min_block}, data_end={max_block} from statistics"); (min_block, max_block) } else { anyhow::bail!("--infer is required (explicit data_start/data_end not yet supported)"); @@ -314,12 +308,9 @@ fn set_metadata( ); println!("Setting phaser metadata on {}", file_path.display()); - println!(" Segment: {}-{}", segment_start, segment_end); - println!( - " Responsibility: {}-{}", - responsibility_start, responsibility_end - ); - println!(" Data range: {}-{}", data_start, data_end); + println!(" Segment: {segment_start}-{segment_end}"); + println!(" Responsibility: {responsibility_start}-{responsibility_end}"); + println!(" Data range: {data_start}-{data_end}"); println!(" Data type: {}", phaser_meta.data_type); println!(); diff --git a/crates/phaser-bridge/src/client.rs b/crates/phaser-bridge/src/client.rs index 029e4c0..73aef48 100644 --- a/crates/phaser-bridge/src/client.rs +++ b/crates/phaser-bridge/src/client.rs @@ -71,7 +71,7 @@ impl FlightBridgeClient { let uri = if endpoint.starts_with("http://") || endpoint.starts_with("https://") { endpoint.clone() } else { - format!("http://{}", endpoint) + format!("http://{endpoint}") }; Channel::from_shared(uri)?.connect().await? }; @@ -202,7 +202,7 @@ impl FlightBridgeClient { // Decode schema from FlightData header (based on flight_data_to_batches implementation) let message = root_as_message(&flight_data.data_header[..]) .map_err(|err| arrow_flight::error::FlightError::DecodeError( - format!("Cannot get root as message: {:?}", err) + format!("Cannot get root as message: {err:?}") ))?; let ipc_schema = message @@ -218,7 +218,7 @@ impl FlightBridgeClient { // Extract and decode app_metadata (required) let metadata = crate::BatchMetadata::decode(&flight_data.app_metadata) .map_err(|e| arrow_flight::error::FlightError::DecodeError( - format!("Failed to decode batch metadata: {}", e) + format!("Failed to decode batch metadata: {e}") ))?; // Decode the RecordBatch from FlightData using the schema diff --git a/crates/phaser-bridge/src/descriptors.rs b/crates/phaser-bridge/src/descriptors.rs index e371923..1096581 100644 --- a/crates/phaser-bridge/src/descriptors.rs +++ b/crates/phaser-bridge/src/descriptors.rs @@ -101,6 +101,9 @@ pub struct BlockchainDescriptor { /// Stream preferences for transfer settings #[serde(default, skip_serializing_if = "Option::is_none")] pub preferences: Option, + /// Enable transaction traces (callTracer) for Logs stream (default: false, 2-5x slower) + #[serde(default)] + pub enable_traces: bool, } impl BlockchainDescriptor { @@ -114,6 +117,7 @@ impl BlockchainDescriptor { include_reorgs: false, validation: ValidationStage::None, preferences: None, + enable_traces: false, } } @@ -127,6 +131,7 @@ impl BlockchainDescriptor { include_reorgs: false, validation: ValidationStage::None, preferences: None, + enable_traces: false, } } @@ -160,6 +165,12 @@ impl BlockchainDescriptor { self } + /// Enable transaction traces (callTracer) for Logs stream + pub fn with_traces(mut self, enable: bool) -> Self { + self.enable_traces = enable; + self + } + /// Get stream preferences or return default pub fn get_preferences(&self) -> StreamPreferences { self.preferences.clone().unwrap_or_default() diff --git a/crates/phaser-bridge/src/lib.rs b/crates/phaser-bridge/src/lib.rs index 73bcb28..7fe04b6 100644 --- a/crates/phaser-bridge/src/lib.rs +++ b/crates/phaser-bridge/src/lib.rs @@ -88,7 +88,7 @@ impl BatchMetadata { } bincode::deserialize(metadata) - .map_err(|e| format!("Failed to decode BatchMetadata from app_metadata: {}", e).into()) + .map_err(|e| format!("Failed to decode BatchMetadata from app_metadata: {e}").into()) } } diff --git a/crates/phaser-integration-test/src/benchmark.rs b/crates/phaser-integration-test/src/benchmark.rs index cbd5282..bf4e3e6 100644 --- a/crates/phaser-integration-test/src/benchmark.rs +++ b/crates/phaser-integration-test/src/benchmark.rs @@ -140,7 +140,7 @@ impl BenchmarkRunner { let std_dev = Duration::from_secs_f64(variance.sqrt()); Ok(StageResult { - name: format!("full_pipeline_{}tx", target_tx_count), + name: format!("full_pipeline_{target_tx_count}tx"), iterations: timings.len(), mean, median, diff --git a/crates/phaser-integration-test/src/loader.rs b/crates/phaser-integration-test/src/loader.rs index 3482233..9f7eea2 100644 --- a/crates/phaser-integration-test/src/loader.rs +++ b/crates/phaser-integration-test/src/loader.rs @@ -158,7 +158,7 @@ impl DataLoader { } fn load_parquet(&self, prefix: &str, segment: u64) -> Result, anyhow::Error> { - let filename = format!("{}_{}.parquet", prefix, segment); + let filename = format!("{prefix}_{segment}.parquet"); let path = self.data_dir.join(filename); let file = File::open(&path)?; diff --git a/crates/phaser-integration-test/src/validator.rs b/crates/phaser-integration-test/src/validator.rs index b9509c8..dbcebd9 100644 --- a/crates/phaser-integration-test/src/validator.rs +++ b/crates/phaser-integration-test/src/validator.rs @@ -148,6 +148,6 @@ mod humantime_serde { where S: Serializer, { - serializer.serialize_str(&format!("{:?}", duration)) + serializer.serialize_str(&format!("{duration:?}")) } } diff --git a/crates/phaser-metrics/src/lib.rs b/crates/phaser-metrics/src/lib.rs index 912b4c0..fca51f8 100644 --- a/crates/phaser-metrics/src/lib.rs +++ b/crates/phaser-metrics/src/lib.rs @@ -105,9 +105,9 @@ pub fn gather_metrics() -> Result { encoder .encode(&metric_families, &mut buffer) - .map_err(|e| format!("Failed to encode metrics: {}", e))?; + .map_err(|e| format!("Failed to encode metrics: {e}"))?; - String::from_utf8(buffer).map_err(|e| format!("Failed to convert metrics to UTF-8: {}", e)) + String::from_utf8(buffer).map_err(|e| format!("Failed to convert metrics to UTF-8: {e}")) } #[cfg(test)] diff --git a/crates/phaser-parquet-metadata/src/lib.rs b/crates/phaser-parquet-metadata/src/lib.rs index 6fecb45..8b3351d 100644 --- a/crates/phaser-parquet-metadata/src/lib.rs +++ b/crates/phaser-parquet-metadata/src/lib.rs @@ -231,8 +231,7 @@ mod tests { let initial_row_group_count = reader.metadata().num_row_groups(); assert!( initial_row_group_count > 1, - "Expected multiple row groups, got {}", - initial_row_group_count + "Expected multiple row groups, got {initial_row_group_count}" ); // Update metadata @@ -335,8 +334,7 @@ mod tests { err_msg.contains("PAR1") || err_msg.contains("magic") || err_msg.contains("Corrupt footer"), - "Expected error about corrupted footer, got: {}", - err_msg + "Expected error about corrupted footer, got: {err_msg}" ); } } diff --git a/crates/phaser-query/src/bin/phaser-cli.rs b/crates/phaser-query/src/bin/phaser-cli.rs index 38b3786..21dc5c9 100644 --- a/crates/phaser-query/src/bin/phaser-cli.rs +++ b/crates/phaser-query/src/bin/phaser-cli.rs @@ -167,8 +167,8 @@ async fn main() -> Result<()> { }; println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); - println!("Job ID: {}", job_id); - println!("Status: {}", status_str); + println!("Job ID: {job_id}"); + println!("Status: {status_str}"); println!("Chain: {} / Bridge: {}", job.chain_id, job.bridge_name); println!("Blocks: {}-{}", job.from_block, job.to_block); @@ -184,7 +184,7 @@ async fn main() -> Result<()> { } else if bytes >= 1_000 { format!("{:.1} KB", bytes as f64 / 1_000.0) } else { - format!("{} bytes", bytes) + format!("{bytes} bytes") } }; @@ -322,13 +322,13 @@ async fn main() -> Result<()> { println!("Incomplete Segments: {}", gap.missing_segments); if missing_blocks_count > 0 { - println!(" - {} segments missing blocks", missing_blocks_count); + println!(" - {missing_blocks_count} segments missing blocks"); } if missing_txs_count > 0 { - println!(" - {} segments missing transactions", missing_txs_count); + println!(" - {missing_txs_count} segments missing transactions"); } if missing_logs_count > 0 { - println!(" - {} segments missing logs", missing_logs_count); + println!(" - {missing_logs_count} segments missing logs"); } } @@ -341,7 +341,7 @@ async fn main() -> Result<()> { } else { 0.0 }; - println!("Progress: {:.1}%", percent); + println!("Progress: {percent:.1}%"); if job.current_block > job.from_block { println!("Highest completed: block {}", job.current_block); } @@ -362,7 +362,7 @@ async fn main() -> Result<()> { "FAILED" => SyncStatus::Failed, "CANCELLED" => SyncStatus::Cancelled, _ => { - println!("Invalid status filter: {}", status_str); + println!("Invalid status filter: {status_str}"); println!( "Valid values: PENDING, RUNNING, COMPLETED, FAILED, CANCELLED" ); @@ -395,7 +395,7 @@ async fn main() -> Result<()> { println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); println!("Job ID: {}", job.job_id); - println!("Status: {}", status_str); + println!("Status: {status_str}"); println!("Chain: {} / Bridge: {}", job.chain_id, job.bridge_name); println!("Blocks: {}-{}", job.from_block, job.to_block); @@ -412,7 +412,7 @@ async fn main() -> Result<()> { } else if bytes >= 1_000 { format!("{:.1} KB", bytes as f64 / 1_000.0) } else { - format!("{} bytes", bytes) + format!("{bytes} bytes") } }; @@ -559,19 +559,15 @@ async fn main() -> Result<()> { println!("Incomplete Segments: {}", gap.missing_segments); if missing_blocks_count > 0 { - println!( - " - {} segments missing blocks", - missing_blocks_count - ); + println!(" - {missing_blocks_count} segments missing blocks"); } if missing_txs_count > 0 { println!( - " - {} segments missing transactions", - missing_txs_count + " - {missing_txs_count} segments missing transactions" ); } if missing_logs_count > 0 { - println!(" - {} segments missing logs", missing_logs_count); + println!(" - {missing_logs_count} segments missing logs"); } } @@ -585,7 +581,7 @@ async fn main() -> Result<()> { } else { 0.0 }; - println!("Progress: {:.1}%", percent); + println!("Progress: {percent:.1}%"); if job.current_block > job.from_block { println!("Highest completed: block {}", job.current_block); } diff --git a/crates/phaser-query/src/bin/phaser-query.rs b/crates/phaser-query/src/bin/phaser-query.rs index 1a61e90..d9339a5 100644 --- a/crates/phaser-query/src/bin/phaser-query.rs +++ b/crates/phaser-query/src/bin/phaser-query.rs @@ -278,14 +278,14 @@ async fn main() -> Result<()> { Err(e) => ( axum::http::StatusCode::INTERNAL_SERVER_ERROR, [("content-type", "text/plain")], - format!("Error gathering metrics: {}", e), + format!("Error gathering metrics: {e}"), ), } } let app = Router::new().route("/metrics", get(metrics_handler)); - let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", metrics_port)) + let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{metrics_port}")) .await .unwrap(); info!( diff --git a/crates/phaser-query/src/buffer_manager.rs b/crates/phaser-query/src/buffer_manager.rs index 03b39e4..345eb37 100644 --- a/crates/phaser-query/src/buffer_manager.rs +++ b/crates/phaser-query/src/buffer_manager.rs @@ -144,7 +144,7 @@ impl CfToParquetBuffer { }; // Store file info for the range - let file_info_key = format!("file_{:016x}_{:016x}", min_key, max_key); + let file_info_key = format!("file_{min_key:016x}_{max_key:016x}"); batch.put_cf( index_cf, file_info_key.as_bytes(), @@ -193,13 +193,10 @@ impl CfToParquetBuffer { .file_counter .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let filename = format!( - "blocks_{}_{}_{:08}_to_{:08}.parquet", - timestamp, counter, min_key, max_key - ); + let filename = format!("blocks_{timestamp}_{counter}_{min_key:08}_to_{max_key:08}.parquet"); let file_path = self.target_dir.join(&filename); - let temp_path = self.target_dir.join(format!(".{}.tmp", filename)); + let temp_path = self.target_dir.join(format!(".{filename}.tmp")); // Write to temp file first let file = File::create(&temp_path)?; diff --git a/crates/phaser-query/src/erigon_client.rs b/crates/phaser-query/src/erigon_client.rs index 3122fda..1425f93 100644 --- a/crates/phaser-query/src/erigon_client.rs +++ b/crates/phaser-query/src/erigon_client.rs @@ -19,7 +19,7 @@ impl ErigonClient { endpoint ); - let channel = Channel::from_shared(format!("http://{}", endpoint))? + let channel = Channel::from_shared(format!("http://{endpoint}"))? .connect() .await .map_err(|e| { diff --git a/crates/phaser-query/src/lib.rs b/crates/phaser-query/src/lib.rs index 3fdb946..a3b91a5 100644 --- a/crates/phaser-query/src/lib.rs +++ b/crates/phaser-query/src/lib.rs @@ -252,16 +252,14 @@ fn default_max_concurrent_log_segments() -> u32 { impl PhaserConfig { pub fn bridge_data_dir(&self, chain_id: u64, bridge_name: &str) -> PathBuf { - self.data_root - .join(format!("{}", chain_id)) - .join(bridge_name) + self.data_root.join(format!("{chain_id}")).join(bridge_name) } pub fn from_yaml_file(path: &PathBuf) -> Result { let content = std::fs::read_to_string(path) - .with_context(|| format!("Failed to read config file: {:?}", path))?; + .with_context(|| format!("Failed to read config file: {path:?}"))?; let config: PhaserConfig = serde_yaml::from_str(&content) - .with_context(|| format!("Failed to parse YAML config: {:?}", path))?; + .with_context(|| format!("Failed to parse YAML config: {path:?}"))?; Ok(config) } diff --git a/crates/phaser-query/src/streaming_with_writer.rs b/crates/phaser-query/src/streaming_with_writer.rs index bbcc671..de1be7c 100644 --- a/crates/phaser-query/src/streaming_with_writer.rs +++ b/crates/phaser-query/src/streaming_with_writer.rs @@ -112,7 +112,7 @@ impl StreamingServiceWithWriter { chain_id: u64, bridge_name: String, ) { - let stream_name = format!("{:?}", stream_type).to_lowercase(); + let stream_name = format!("{stream_type:?}").to_lowercase(); let mut first_block_received = false; tokio::spawn(async move { @@ -257,7 +257,7 @@ impl StreamingServiceWithWriter { ); // Subscribe to logs with metadata - let logs_descriptor = BlockchainDescriptor::live(StreamType::Logs); + let logs_descriptor = BlockchainDescriptor::live(StreamType::Logs).with_traces(true); info!("Subscribing to logs from bridge"); let logs_stream = bridge.subscribe_with_metadata(&logs_descriptor).await?; Self::spawn_stream_processor( diff --git a/crates/phaser-query/src/sync/data_scanner.rs b/crates/phaser-query/src/sync/data_scanner.rs index d5530ee..dd47cd1 100644 --- a/crates/phaser-query/src/sync/data_scanner.rs +++ b/crates/phaser-query/src/sync/data_scanner.rs @@ -978,8 +978,7 @@ impl DataScanner { let block_start = start * segment_size; let block_end = (end + 1) * segment_size - 1; ranges.push(format!( - " segments {}-{} (blocks {}-{})", - start, end, block_start, block_end + " segments {start}-{end} (blocks {block_start}-{block_end})" )); } range_start = Some(seg); @@ -991,8 +990,7 @@ impl DataScanner { let block_start = start * segment_size; let block_end = (end + 1) * segment_size - 1; ranges.push(format!( - " segments {}-{} (blocks {}-{})", - start, end, block_start, block_end + " segments {start}-{end} (blocks {block_start}-{block_end})" )); } @@ -1053,7 +1051,7 @@ impl DataScanner { let filename_str = filename.to_string_lossy(); // Look for temp files for this segment: {type}_from_{X}_to_{segment_end}.parquet.tmp - if filename_str.ends_with(&format!("_to_{}.parquet.tmp", segment_end)) { + if filename_str.ends_with(&format!("_to_{segment_end}.parquet.tmp")) { // Parse to get the start block if let Some(range) = self.parse_filename(&path)? { // Only clean if this starts at the segment boundary (failed historical sync) @@ -1560,8 +1558,7 @@ mod tests { .unwrap(); assert!( missing.is_empty(), - "Expected no missing segments, but got: {:?}", - missing + "Expected no missing segments, but got: {missing:?}" ); } diff --git a/crates/phaser-query/src/sync/error.rs b/crates/phaser-query/src/sync/error.rs index 39786b5..ad3dc82 100644 --- a/crates/phaser-query/src/sync/error.rs +++ b/crates/phaser-query/src/sync/error.rs @@ -106,7 +106,7 @@ impl fmt::Display for MultipleDataTypeErrors { if i > 0 { write!(f, "; ")?; } - write!(f, "{}: {}", data_type, err)?; + write!(f, "{data_type}: {err}")?; } Ok(()) } @@ -122,7 +122,7 @@ impl std::error::Error for MultipleDataTypeErrors { impl From for SyncError { fn from(multi_err: MultipleDataTypeErrors) -> Self { // Aggregate into a single SyncError with Unknown data type - let message = format!("{}", multi_err); + let message = format!("{multi_err}"); let category = multi_err .errors .first() @@ -169,8 +169,7 @@ impl SyncError { /// Create a protocol error (bridge returned zero batches) pub fn protocol_error(data_type: DataType, from_block: u64, to_block: u64) -> Self { let msg = format!( - "Bridge returned zero batches for {} {}-{}. This indicates a protocol error.", - data_type, from_block, to_block + "Bridge returned zero batches for {data_type} {from_block}-{to_block}. This indicates a protocol error." ); Self { data_type, @@ -285,7 +284,7 @@ impl SyncError { }; let context_str = context.into(); - let message = format!("{}: {}", context_str, err_str); + let message = format!("{context_str}: {err_str}"); Self { data_type, @@ -327,7 +326,7 @@ impl SyncError { }; let context_str = context.into(); - let message = format!("{}: {}", context_str, err_str); + let message = format!("{context_str}: {err_str}"); Self { data_type, diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index 9e78220..9157c30 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -178,7 +178,7 @@ impl SyncServer { } pub async fn start(self, port: u16) -> Result<()> { - let addr = format!("0.0.0.0:{}", port).parse()?; + let addr = format!("0.0.0.0:{port}").parse()?; info!("Starting sync admin gRPC server on {}", addr); tonic::transport::Server::builder() @@ -625,12 +625,12 @@ impl SyncService for SyncServer { // Connect to bridge to check chain tip let mut client = FlightBridgeClient::connect(bridge.endpoint.clone()) .await - .map_err(|e| Status::unavailable(format!("Failed to connect to bridge: {}", e)))?; + .map_err(|e| Status::unavailable(format!("Failed to connect to bridge: {e}")))?; let bridge_info = client .get_info() .await - .map_err(|e| Status::internal(format!("Failed to get bridge info: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to get bridge info: {e}")))?; if bridge_info.current_block > 0 && to_block > bridge_info.current_block { return Err(Status::invalid_argument(format!( @@ -653,7 +653,7 @@ impl SyncService for SyncServer { let mut gap_analysis = scanner .analyze_sync_range(req.from_block, to_block, self.config.segment_size) .await - .map_err(|e| Status::internal(format!("Failed to analyze sync range: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to analyze sync range: {e}")))?; // Filter out segments >= live sync boundary to avoid cleaning active live streaming temp files let segments_to_clean: Vec = if let Some(boundary_block) = historical_boundary { @@ -676,7 +676,7 @@ impl SyncService for SyncServer { // Clean only temp files that conflict with segments we're about to sync (excluding live sync segments) let cleaned_count = scanner .clean_conflicting_temp_files(&segments_to_clean, self.config.segment_size) - .map_err(|e| Status::internal(format!("Failed to clean temp files: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to clean temp files: {e}")))?; gap_analysis.cleaned_temp_files = cleaned_count; @@ -776,7 +776,7 @@ impl SyncService for SyncServer { let analysis = scanner .analyze_sync_range(job.from_block, job.to_block, self.config.segment_size) .await - .map_err(|e| Status::internal(format!("Failed to analyze sync progress: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to analyze sync progress: {e}")))?; // Calculate actual progress from disk let complete_segments = analysis.complete_segments.len() as u64; @@ -934,7 +934,7 @@ impl SyncService for SyncServer { let mut gap_analysis = scanner .analyze_sync_range(req.from_block, req.to_block, self.config.segment_size) .await - .map_err(|e| Status::internal(format!("Failed to analyze sync range: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to analyze sync range: {e}")))?; // Get historical boundary from LiveStreamingState to avoid cleaning live streaming temp files let id = crate::ChainBridgeId::new(req.chain_id, &req.bridge_name); @@ -961,7 +961,7 @@ impl SyncService for SyncServer { // Clean only temp files that conflict with segments we're analyzing (excluding live sync segments) let cleaned_count = scanner .clean_conflicting_temp_files(&segments_to_clean, self.config.segment_size) - .map_err(|e| Status::internal(format!("Failed to clean temp files: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to clean temp files: {e}")))?; gap_analysis.cleaned_temp_files = cleaned_count; diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index 811e78e..c1890b4 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -513,7 +513,7 @@ impl SyncWorker { .min(MAX_BACKOFF_SECS); let progress_msg = if let Some(last) = last_written { - format!("last written block: {}", last) + format!("last written block: {last}") } else { "no data received yet".to_string() }; @@ -645,8 +645,7 @@ impl SyncWorker { from_block, to_block, format!( - "Bridge returned blocks starting at {} but requested range started at {}", - first, from_block + "Bridge returned blocks starting at {first} but requested range started at {from_block}" ), )); } @@ -656,8 +655,7 @@ impl SyncWorker { from_block, to_block, format!( - "Bridge returned blocks ending at {} but requested range ended at {}", - last, to_block + "Bridge returned blocks ending at {last} but requested range ended at {to_block}" ), )); } @@ -823,7 +821,7 @@ impl SyncWorker { .min(MAX_BACKOFF_SECS); let progress_msg = if let Some(last) = last_written { - format!("last written block: {}", last) + format!("last written block: {last}") } else { "no data received yet".to_string() }; @@ -1005,8 +1003,7 @@ impl SyncWorker { from_block, to_block, format!( - "Bridge returned transactions starting at block {} which is outside requested range {}-{}", - first, from_block, to_block + "Bridge returned transactions starting at block {first} which is outside requested range {from_block}-{to_block}" ), )); } @@ -1016,8 +1013,7 @@ impl SyncWorker { from_block, to_block, format!( - "Bridge returned transactions ending at block {} which is outside requested range {}-{}", - last, from_block, to_block + "Bridge returned transactions ending at block {last} which is outside requested range {from_block}-{to_block}" ), )); } @@ -1212,7 +1208,7 @@ impl SyncWorker { .min(MAX_BACKOFF_SECS); let progress_msg = if let Some(last) = last_written { - format!("last written block: {}", last) + format!("last written block: {last}") } else { "no data received yet".to_string() }; @@ -1249,7 +1245,8 @@ impl SyncWorker { }; let descriptor = BlockchainDescriptor::historical(StreamType::Logs, from_block, to_block) .with_validation(self.validation_stage) - .with_preferences(preferences); + .with_preferences(preferences) + .with_traces(true); // Subscribe to the log stream with metadata (returns RecordBatch + responsibility range) let stream = client @@ -1344,8 +1341,7 @@ impl SyncWorker { from_block, to_block, format!( - "Bridge returned logs starting at block {} which is outside requested range {}-{}", - first, from_block, to_block + "Bridge returned logs starting at block {first} which is outside requested range {from_block}-{to_block}" ), )); } @@ -1355,8 +1351,7 @@ impl SyncWorker { from_block, to_block, format!( - "Bridge returned logs ending at block {} which is outside requested range {}-{}", - last, from_block, to_block + "Bridge returned logs ending at block {last} which is outside requested range {from_block}-{to_block}" ), )); } diff --git a/crates/validators/evm/src/executor.rs b/crates/validators/evm/src/executor.rs index ae794a6..ace8d01 100644 --- a/crates/validators/evm/src/executor.rs +++ b/crates/validators/evm/src/executor.rs @@ -41,8 +41,7 @@ impl FromStr for ExecutorType { "tokio" => Ok(ExecutorType::Tokio), "core" => Ok(ExecutorType::Core), _ => Err(format!( - "invalid executor type '{}', expected 'tokio' or 'core'", - s + "invalid executor type '{s}', expected 'tokio' or 'core'" )), } } diff --git a/crates/validators/evm/src/executors/core_exec.rs b/crates/validators/evm/src/executors/core_exec.rs index 5648bfb..e21163f 100644 --- a/crates/validators/evm/src/executors/core_exec.rs +++ b/crates/validators/evm/src/executors/core_exec.rs @@ -41,8 +41,7 @@ impl CoreExecutor { } Err(e) => { eprintln!( - "CoreExecutor: NUMA detection failed ({}), using regular thread pool with {} workers", - e, num_workers + "CoreExecutor: NUMA detection failed ({e}), using regular thread pool with {num_workers} workers" ); ThreadPoolExecutor::new(num_workers) }