Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ message BlockRangeRequest {
uint64 from_block = 1; // Starting block number (inclusive)
uint64 to_block = 2; // Ending block number (inclusive, 0 = latest)
uint32 batch_size = 3; // Records per batch (default: 1000)
bool enable_traces = 4; // Enable callTracer traces during ExecuteBlocks (default: false, 2-5x slower)
}

// A batch of RLP-encoded block headers
Expand Down Expand Up @@ -79,11 +80,12 @@ message ReceiptBatch {
bool is_last = 4; // True if this is the last batch
}

// RLP-encoded receipt data (receipts contain logs)
// RLP-encoded receipt data with optional trace data
message ReceiptData {
uint64 block_number = 1;
bytes block_hash = 2; // 32-byte block hash
uint32 tx_index = 3; // Transaction index in block
bytes rlp_receipt = 4; // RLP-encoded receipt (contains all logs for this tx)
bytes tx_hash = 5; // 32-byte transaction hash
bytes trace_data = 6; // JSON-encoded trace (callTracer format), empty if tracing disabled
}
10 changes: 7 additions & 3 deletions crates/bridges/evm/erigon-bridge/src/blockdata_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl BlockDataClient {
info!("Note: Custom Erigon must be running with BlockDataBackend service enabled");

let uri = if !endpoint.starts_with("http://") {
format!("http://{}", endpoint)
format!("http://{endpoint}")
} else {
endpoint.clone()
};
Expand Down Expand Up @@ -88,6 +88,7 @@ impl BlockDataClient {
from_block,
to_block,
batch_size: if batch_size > 0 { batch_size } else { 1000 },
enable_traces: false,
};

debug!(
Expand All @@ -110,6 +111,7 @@ impl BlockDataClient {
from_block,
to_block,
batch_size: if batch_size > 0 { batch_size } else { 1000 },
enable_traces: false,
};

debug!(
Expand All @@ -128,16 +130,18 @@ impl BlockDataClient {
from_block: u64,
to_block: u64,
batch_size: u32,
enable_traces: bool,
) -> Result<Streaming<ReceiptBatch>, ErigonBridgeError> {
let request = BlockRangeRequest {
from_block,
to_block,
batch_size: if batch_size > 0 { batch_size } else { 1000 },
enable_traces,
};

debug!(
"Starting receipt stream via block execution (blocks {}-{}, batch_size: {})",
from_block, to_block, batch_size
"Starting receipt stream via block execution (blocks {}-{}, batch_size: {}, traces: {})",
from_block, to_block, batch_size, enable_traces
);

let response = self.client.execute_blocks(request).await?;
Expand Down
54 changes: 25 additions & 29 deletions crates/bridges/evm/erigon-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ impl ErigonFlightBridge {
serde_json::from_str::<phaser_bridge::descriptors::BlockchainDescriptor>(first).map_err(
|e| {
Box::new(TonicStatus::invalid_argument(format!(
"Invalid descriptor: {}",
e
"Invalid descriptor: {e}"
)))
},
)
Expand Down Expand Up @@ -266,8 +265,7 @@ impl ErigonFlightBridge {
Ok(handle) => handle,
Err(e) => {
let err_msg = format!(
"Failed to get client from pool for segment {}-{}: {}",
seg_start, seg_end, e
"Failed to get client from pool for segment {seg_start}-{seg_end}: {e}"
);
error!("{}", err_msg);
return futures::stream::once(async move {
Expand Down Expand Up @@ -306,8 +304,7 @@ impl ErigonFlightBridge {
return futures::stream::once(async move {
Err(arrow_flight::error::FlightError::ExternalError(Box::new(
std::io::Error::other(format!(
"Failed to access client: {}",
e
"Failed to access client: {e}"
)),
)))
})
Expand Down Expand Up @@ -344,8 +341,7 @@ impl ErigonFlightBridge {
client_handle.mark_error();
arrow_flight::error::FlightError::ExternalError(Box::new(
std::io::Error::other(format!(
"Segment {}-{} failed: {}",
seg_start, seg_end, e
"Segment {seg_start}-{seg_end} failed: {e}"
)),
))
})
Expand Down Expand Up @@ -398,8 +394,7 @@ impl ErigonFlightBridge {
Ok(handle) => handle,
Err(e) => {
let err_msg = format!(
"Failed to get client from pool for segment {}-{}: {}",
seg_start, seg_end, e
"Failed to get client from pool for segment {seg_start}-{seg_end}: {e}"
);
error!("{}", err_msg);
return futures::stream::once(async move {
Expand Down Expand Up @@ -438,8 +433,7 @@ impl ErigonFlightBridge {
return futures::stream::once(async move {
Err(arrow_flight::error::FlightError::ExternalError(Box::new(
std::io::Error::other(format!(
"Failed to access client: {}",
e
"Failed to access client: {e}"
)),
)))
})
Expand Down Expand Up @@ -476,8 +470,7 @@ impl ErigonFlightBridge {
client_handle.mark_error();
arrow_flight::error::FlightError::ExternalError(Box::new(
std::io::Error::other(format!(
"Segment {}-{} failed: {}",
seg_start, seg_end, e
"Segment {seg_start}-{seg_end} failed: {e}"
)),
))
})
Expand All @@ -496,6 +489,7 @@ impl ErigonFlightBridge {
start: u64,
end: u64,
validate: bool,
enable_traces: bool,
) -> Result<
Pin<
Box<
Expand All @@ -521,13 +515,12 @@ impl ErigonFlightBridge {
client
.get_latest_block()
.await
.map_err(|e| Status::internal(format!("Failed to query chain head: {}", e)))?
.map_err(|e| Status::internal(format!("Failed to query chain head: {e}")))?
};

if end > chain_head {
return Err(Status::invalid_argument(format!(
"Requested end block {} exceeds current chain head {}",
end, chain_head
"Requested end block {end} exceeds current chain head {chain_head}"
)));
}

Expand All @@ -552,9 +545,11 @@ impl ErigonFlightBridge {

if stream_type == StreamType::Logs {
let pool = self.blockdata_pool.clone();
let mut config = self.segment_config.clone();
config.enable_traces = enable_traces;
let stream = self.process_logs_with_segments(
pool,
self.segment_config.clone(),
config,
self.validator.clone(),
start,
end,
Expand All @@ -576,7 +571,7 @@ impl ErigonFlightBridge {
Err(e) => {
error!("Failed to get client from pool for blocks stream: {}", e);
yield Err(arrow_flight::error::FlightError::ExternalError(
Box::new(std::io::Error::other(format!("Pool error: {}", e)))
Box::new(std::io::Error::other(format!("Pool error: {e}")))
));
return;
}
Expand All @@ -599,7 +594,7 @@ impl ErigonFlightBridge {
error!("Failed to access client for blocks stream: {}", e);
client_handle.mark_error();
yield Err(arrow_flight::error::FlightError::ExternalError(
Box::new(std::io::Error::other(format!("Client error: {}", e)))
Box::new(std::io::Error::other(format!("Client error: {e}")))
));
return;
}
Expand Down Expand Up @@ -726,7 +721,7 @@ impl FlightBridge for ErigonFlightBridge {
let response = HandshakeResponse {
protocol_version: 1,
payload: serde_json::to_vec(&self.bridge_info().await)
.map_err(|e| Status::internal(format!("Failed to serialize bridge info: {}", e)))?
.map_err(|e| Status::internal(format!("Failed to serialize bridge info: {e}")))?
.into(),
};

Expand Down Expand Up @@ -822,7 +817,7 @@ impl FlightBridge for ErigonFlightBridge {
.and_then(|s| {
serde_json::from_str::<phaser_bridge::descriptors::BlockchainDescriptor>(&s)
.map_err(|e| {
Status::invalid_argument(format!("Invalid descriptor in ticket: {}", e))
Status::invalid_argument(format!("Invalid descriptor in ticket: {e}"))
})
})?;
let stream_type = blockchain_desc.stream_type;
Expand All @@ -845,7 +840,7 @@ impl FlightBridge for ErigonFlightBridge {
let flight_stream = encoder.map(|result| {
result.map_err(|e| {
error!("Error encoding flight data: {}", e);
Status::internal(format!("Encoding error: {}", e))
Status::internal(format!("Encoding error: {e}"))
})
});

Expand Down Expand Up @@ -883,6 +878,7 @@ impl FlightBridge for ErigonFlightBridge {
start,
end,
should_validate_ingestion,
blockchain_desc.enable_traces,
)
.await?,
)
Expand Down Expand Up @@ -934,7 +930,7 @@ impl FlightBridge for ErigonFlightBridge {
Ok(m) => m,
Err(e) => {
error!("Failed to encode batch metadata: {}", e);
yield Err(Status::internal(format!("Metadata encoding error: {}", e)));
yield Err(Status::internal(format!("Metadata encoding error: {e}")));
continue;
}
};
Expand All @@ -959,7 +955,7 @@ impl FlightBridge for ErigonFlightBridge {
}
Err(e) => {
error!("Error encoding batch to flight data: {}", e);
yield Err(Status::internal(format!("Batch encoding error: {}", e)));
yield Err(Status::internal(format!("Batch encoding error: {e}")));
}
}
}
Expand Down Expand Up @@ -989,7 +985,7 @@ impl FlightBridge for ErigonFlightBridge {

metrics_for_stream.error(error_type, data_type);

yield Err(Status::internal(format!("Stream error: {}", e)));
yield Err(Status::internal(format!("Stream error: {e}")));
}
}
}
Expand All @@ -1011,7 +1007,7 @@ impl FlightBridge for ErigonFlightBridge {
.next()
.await
.ok_or_else(|| Status::invalid_argument("Empty stream"))?
.map_err(|e| Status::internal(format!("Stream error: {}", e)))?;
.map_err(|e| Status::internal(format!("Stream error: {e}")))?;

let stream_type = if let Some(desc) = first.flight_descriptor {
Self::parse_descriptor(&desc)
Expand All @@ -1034,7 +1030,7 @@ impl FlightBridge for ErigonFlightBridge {
let flight_stream = encoder.map(|result| {
result.map_err(|e| {
error!("Error encoding flight data: {}", e);
Status::internal(format!("Encoding error: {}", e))
Status::internal(format!("Encoding error: {e}"))
})
});

Expand Down Expand Up @@ -1067,7 +1063,7 @@ impl FlightBridge for ErigonFlightBridge {
let flight_stream = encoder.map(|result| {
result.map_err(|e| {
error!("Error encoding flight data: {}", e);
Status::internal(format!("Encoding error: {}", e))
Status::internal(format!("Encoding error: {e}"))
})
});

Expand Down
2 changes: 1 addition & 1 deletion crates/bridges/evm/erigon-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ErigonClient {
endpoint
);

Channel::from_shared(format!("http://{}", endpoint))?
Channel::from_shared(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|e| {
Expand Down
4 changes: 2 additions & 2 deletions crates/bridges/evm/erigon-bridge/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ impl From<ErigonBridgeError> for Status {
ErigonBridgeError::ConversionError(msg) => Status::internal(msg),
ErigonBridgeError::ValidationError(msg) => Status::failed_precondition(msg),
ErigonBridgeError::ConnectionFailed(msg) => {
Status::unavailable(format!("Connection failed: {}", msg))
Status::unavailable(format!("Connection failed: {msg}"))
}
ErigonBridgeError::StreamProtocol(stream_err) => {
Status::aborted(format!("Stream protocol violation: {}", stream_err))
Status::aborted(format!("Stream protocol violation: {stream_err}"))
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion crates/bridges/evm/erigon-bridge/src/generated/custom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ pub struct BlockRangeRequest {
/// Records per batch (default: 1000)
#[prost(uint32, tag = "3")]
pub batch_size: u32,
/// Enable callTracer traces during ExecuteBlocks (default: false, 2-5x slower)
#[prost(bool, tag = "4")]
pub enable_traces: bool,
}
/// A batch of RLP-encoded block headers
#[allow(dead_code)]
Expand Down Expand Up @@ -407,7 +410,7 @@ pub struct ReceiptBatch {
#[prost(bool, tag = "4")]
pub is_last: bool,
}
/// RLP-encoded receipt data (receipts contain logs)
/// RLP-encoded receipt data with optional trace data
#[allow(dead_code)]
#[allow(clippy::enum_variant_names)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand All @@ -426,6 +429,9 @@ pub struct ReceiptData {
/// 32-byte transaction hash
#[prost(bytes = "vec", tag = "5")]
pub tx_hash: ::prost::alloc::vec::Vec<u8>,
/// JSON-encoded trace (callTracer format), empty if tracing disabled
#[prost(bytes = "vec", tag = "6")]
pub trace_data: ::prost::alloc::vec::Vec<u8>,
}
/// Generated client implementations.
pub mod block_data_backend_client {
Expand Down
5 changes: 3 additions & 2 deletions crates/bridges/evm/erigon-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ async fn main() -> Result<()> {
max_concurrent_executions: args.max_concurrent_executions.unwrap_or_else(num_cpus::get),
global_max_execute_blocks: args.global_max_execute_blocks,
execute_blocks_semaphore: None, // Will be set by bridge
enable_traces: false, // Will be set per-request from Flight descriptor
};

info!("Segment configuration:");
Expand Down Expand Up @@ -187,14 +188,14 @@ async fn main() -> Result<()> {
Err(e) => (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
[("content-type", "text/plain")],
format!("Error gathering metrics: {}", e),
format!("Error gathering metrics: {e}"),
),
}
}

let app = Router::new().route("/metrics", get(metrics_handler));

let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", metrics_port))
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{metrics_port}"))
.await
.unwrap();
info!(
Expand Down
Loading
Loading