diff --git a/Cargo.lock b/Cargo.lock index 6faa5290..410bbd7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1966,8 +1966,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -7088,6 +7090,7 @@ dependencies = [ "async-trait", "axum", "backon", + "chrono", "clap", "dotenvy", "jsonrpsee", @@ -7108,6 +7111,7 @@ dependencies = [ "tokio", "tracing", "url", + "uuid", "wiremock", ] diff --git a/crates/audit/src/publisher.rs b/crates/audit/src/publisher.rs index bc3004c9..5931df8a 100644 --- a/crates/audit/src/publisher.rs +++ b/crates/audit/src/publisher.rs @@ -37,7 +37,7 @@ impl KafkaBundleEventPublisher { { Ok(_) => { debug!( - bundle_id = %bundle_id, + bundle_id = ?bundle_id, topic = %self.topic, payload_size = payload.len(), "successfully published event" @@ -46,7 +46,7 @@ impl KafkaBundleEventPublisher { } Err((err, _)) => { error!( - bundle_id = %bundle_id, + bundle_id = ?bundle_id, topic = %self.topic, error = %err, "failed to publish event" @@ -90,7 +90,7 @@ impl Default for LoggingBundleEventPublisher { impl BundleEventPublisher for LoggingBundleEventPublisher { async fn publish(&self, event: BundleEvent) -> Result<()> { info!( - bundle_id = %event.bundle_id(), + bundle_id = ?event.bundle_id(), event = ?event, "Received bundle event" ); diff --git a/crates/audit/src/reader.rs b/crates/audit/src/reader.rs index f58265d2..45cec31d 100644 --- a/crates/audit/src/reader.rs +++ b/crates/audit/src/reader.rs @@ -80,7 +80,9 @@ impl EventReader for KafkaAuditLogReader { let event: BundleEvent = serde_json::from_slice(payload)?; debug!( - bundle_id = %event.bundle_id(), + event_name = %event.event_name(), + bundle_id = ?event.bundle_id(), + tx_hash = ?event.tx_hash(), timestamp = timestamp, offset = message.offset(), partition = message.partition(), diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index c8b619f4..0a1de8ba 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -63,6 +63,54 @@ pub enum BundleHistoryEvent { timestamp: i64, reason: DropReason, }, + TransactionReceived { + key: String, + timestamp: i64, + bundle: Box, + }, + TransactionSent { + key: String, + timestamp: i64, + tx_hash: TxHash, + }, + BackrunReceived { + key: String, + timestamp: i64, + bundle: Box, + }, + BackrunSent { + key: String, + timestamp: i64, + target_tx_hash: TxHash, + }, + BackrunInserted { + key: String, + timestamp: i64, + target_tx_hash: TxHash, + backrun_tx_hashes: Vec, + }, + StartExecuting { + key: String, + timestamp: i64, + tx_hash: TxHash, + block_number: u64, + }, + Executed { + key: String, + timestamp: i64, + tx_hash: TxHash, + block_number: u64, + gas_used: u64, + }, + BackrunBundleExecuted { + key: String, + timestamp: i64, + target_tx_hash: TxHash, + backrun_tx_hash: TxHash, + block_number: u64, + gas_used: u64, + success: bool, + }, } impl BundleHistoryEvent { @@ -73,6 +121,14 @@ impl BundleHistoryEvent { BundleHistoryEvent::BuilderIncluded { key, .. } => key, BundleHistoryEvent::BlockIncluded { key, .. } => key, BundleHistoryEvent::Dropped { key, .. } => key, + BundleHistoryEvent::TransactionReceived { key, .. } => key, + BundleHistoryEvent::TransactionSent { key, .. } => key, + BundleHistoryEvent::BackrunReceived { key, .. } => key, + BundleHistoryEvent::BackrunSent { key, .. } => key, + BundleHistoryEvent::BackrunInserted { key, .. } => key, + BundleHistoryEvent::StartExecuting { key, .. } => key, + BundleHistoryEvent::Executed { key, .. } => key, + BundleHistoryEvent::BackrunBundleExecuted { key, .. } => key, } } } @@ -87,7 +143,12 @@ fn update_bundle_history_transform( event: &Event, ) -> Option { let mut history = bundle_history.history; - let bundle_id = event.event.bundle_id(); + let bundle_id = match event.event.bundle_id() { + Some(id) => id, + None => { + return None; + } + }; // Check for deduplication - if event with same key already exists, skip if history.iter().any(|h| h.key() == event.key) { @@ -99,15 +160,18 @@ fn update_bundle_history_transform( return None; } + // Use the event's internal timestamp_ms instead of Kafka timestamp for accurate ordering + let timestamp = event.event.timestamp_ms(); + let history_event = match &event.event { BundleEvent::Received { bundle, .. } => BundleHistoryEvent::Received { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, bundle: bundle.clone(), }, BundleEvent::Cancelled { .. } => BundleHistoryEvent::Cancelled { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, }, BundleEvent::BuilderIncluded { builder, @@ -116,7 +180,7 @@ fn update_bundle_history_transform( .. } => BundleHistoryEvent::BuilderIncluded { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, builder: builder.clone(), block_number: *block_number, flashblock_index: *flashblock_index, @@ -127,15 +191,85 @@ fn update_bundle_history_transform( .. } => BundleHistoryEvent::BlockIncluded { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, block_number: *block_number, block_hash: *block_hash, }, BundleEvent::Dropped { reason, .. } => BundleHistoryEvent::Dropped { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, reason: reason.clone(), }, + BundleEvent::TransactionReceived { bundle, .. } => { + BundleHistoryEvent::TransactionReceived { + key: event.key.clone(), + timestamp, + bundle: bundle.clone(), + } + } + BundleEvent::TransactionSent { tx_hash, .. } => BundleHistoryEvent::TransactionSent { + key: event.key.clone(), + timestamp, + tx_hash: *tx_hash, + }, + BundleEvent::BackrunReceived { bundle, .. } => BundleHistoryEvent::BackrunReceived { + key: event.key.clone(), + timestamp, + bundle: bundle.clone(), + }, + BundleEvent::BackrunSent { target_tx_hash, .. } => BundleHistoryEvent::BackrunSent { + key: event.key.clone(), + timestamp, + target_tx_hash: *target_tx_hash, + }, + BundleEvent::BackrunInserted { + target_tx_hash, + backrun_tx_hashes, + .. + } => BundleHistoryEvent::BackrunInserted { + key: event.key.clone(), + timestamp, + target_tx_hash: *target_tx_hash, + backrun_tx_hashes: backrun_tx_hashes.clone(), + }, + BundleEvent::StartExecuting { + tx_hash, + block_number, + .. + } => BundleHistoryEvent::StartExecuting { + key: event.key.clone(), + timestamp, + tx_hash: *tx_hash, + block_number: *block_number, + }, + BundleEvent::Executed { + tx_hash, + block_number, + gas_used, + .. + } => BundleHistoryEvent::Executed { + key: event.key.clone(), + timestamp, + tx_hash: *tx_hash, + block_number: *block_number, + gas_used: *gas_used, + }, + BundleEvent::BackrunBundleExecuted { + target_tx_hash, + backrun_tx_hash, + block_number, + gas_used, + success, + .. + } => BundleHistoryEvent::BackrunBundleExecuted { + key: event.key.clone(), + timestamp, + target_tx_hash: *target_tx_hash, + backrun_tx_hash: *backrun_tx_hash, + block_number: *block_number, + gas_used: *gas_used, + success: *success, + }, }; history.push(history_event); @@ -190,7 +324,25 @@ impl S3EventReaderWriter { } async fn update_bundle_history(&self, event: Event) -> Result<()> { - let s3_key = S3Key::Bundle(event.event.bundle_id()).to_string(); + let event_name = event.event.event_name(); + + // Backrun events without bundle_ids are skipped from S3 storage + let Some(bundle_id) = event.event.bundle_id() else { + info!( + event_name = %event_name, + tx_hash = ?event.event.tx_hash(), + "Skipping S3 storage for event without bundle_id" + ); + return Ok(()); + }; + + info!( + event_name = %event_name, + bundle_id = %bundle_id, + "Processing event for S3 storage" + ); + + let s3_key = S3Key::Bundle(bundle_id).to_string(); self.idempotent_write::(&s3_key, |current_history| { update_bundle_history_transform(current_history, &event) @@ -325,7 +477,10 @@ impl S3EventReaderWriter { #[async_trait] impl EventWriter for S3EventReaderWriter { async fn archive_event(&self, event: Event) -> Result<()> { - let bundle_id = event.event.bundle_id(); + let Some(bundle_id) = event.event.bundle_id() else { + return Ok(()); + }; + let transaction_ids = event.event.transaction_ids(); self.update_bundle_history(event.clone()).await?; @@ -384,6 +539,7 @@ mod tests { let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle.clone()), + timestamp_ms: 1234567890, }; let event = create_test_event("test-key", 1234567890, bundle_event); @@ -423,6 +579,7 @@ mod tests { let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle), + timestamp_ms: 1234567890, }; let event = create_test_event("duplicate-key", 1234567890, bundle_event); @@ -440,12 +597,16 @@ mod tests { let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle), + timestamp_ms: 1234567890, }; let event = create_test_event("test-key", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); assert!(result.is_some()); - let bundle_event = BundleEvent::Cancelled { bundle_id }; + let bundle_event = BundleEvent::Cancelled { + bundle_id, + timestamp_ms: 1234567890, + }; let event = create_test_event("test-key-2", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); assert!(result.is_some()); @@ -455,6 +616,7 @@ mod tests { builder: "test-builder".to_string(), block_number: 12345, flashblock_index: 1, + timestamp_ms: 1234567890, }; let event = create_test_event("test-key-3", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); @@ -464,6 +626,7 @@ mod tests { bundle_id, block_number: 12345, block_hash: TxHash::from([1u8; 32]), + timestamp_ms: 1234567890, }; let event = create_test_event("test-key-4", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); @@ -472,6 +635,7 @@ mod tests { let bundle_event = BundleEvent::Dropped { bundle_id, reason: DropReason::TimedOut, + timestamp_ms: 1234567890, }; let event = create_test_event("test-key-5", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history, &event); diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index f27241a0..d1ef9ad9 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -41,35 +41,132 @@ pub enum BundleEvent { Received { bundle_id: BundleId, bundle: Box, + timestamp_ms: i64, }, Cancelled { bundle_id: BundleId, + timestamp_ms: i64, }, BuilderIncluded { bundle_id: BundleId, builder: String, block_number: u64, flashblock_index: u64, + timestamp_ms: i64, }, BlockIncluded { bundle_id: BundleId, block_number: u64, block_hash: TxHash, + timestamp_ms: i64, }, Dropped { bundle_id: BundleId, reason: DropReason, + timestamp_ms: i64, + }, + TransactionReceived { + bundle_id: BundleId, + bundle: Box, + timestamp_ms: i64, + }, + TransactionSent { + bundle_id: BundleId, + tx_hash: TxHash, + timestamp_ms: i64, + }, + BackrunReceived { + bundle_id: BundleId, + bundle: Box, + timestamp_ms: i64, + }, + BackrunSent { + bundle_id: BundleId, + target_tx_hash: TxHash, + timestamp_ms: i64, + }, + BackrunInserted { + bundle_id: BundleId, + target_tx_hash: TxHash, + backrun_tx_hashes: Vec, + timestamp_ms: i64, + }, + StartExecuting { + bundle_id: Option, + tx_hash: TxHash, + block_number: u64, + timestamp_ms: i64, + }, + Executed { + bundle_id: Option, + tx_hash: TxHash, + block_number: u64, + gas_used: u64, + timestamp_ms: i64, + }, + BackrunBundleExecuted { + bundle_id: BundleId, + target_tx_hash: TxHash, + backrun_tx_hash: TxHash, + block_number: u64, + gas_used: u64, + success: bool, + timestamp_ms: i64, }, } impl BundleEvent { - pub fn bundle_id(&self) -> BundleId { + pub fn event_name(&self) -> &'static str { match self { - BundleEvent::Received { bundle_id, .. } => *bundle_id, - BundleEvent::Cancelled { bundle_id, .. } => *bundle_id, - BundleEvent::BuilderIncluded { bundle_id, .. } => *bundle_id, - BundleEvent::BlockIncluded { bundle_id, .. } => *bundle_id, - BundleEvent::Dropped { bundle_id, .. } => *bundle_id, + BundleEvent::Received { .. } => "Received", + BundleEvent::Cancelled { .. } => "Cancelled", + BundleEvent::BuilderIncluded { .. } => "BuilderIncluded", + BundleEvent::BlockIncluded { .. } => "BlockIncluded", + BundleEvent::Dropped { .. } => "Dropped", + BundleEvent::TransactionReceived { .. } => "TransactionReceived", + BundleEvent::TransactionSent { .. } => "TransactionSent", + BundleEvent::BackrunReceived { .. } => "BackrunReceived", + BundleEvent::BackrunSent { .. } => "BackrunSent", + BundleEvent::BackrunInserted { .. } => "BackrunInserted", + BundleEvent::StartExecuting { .. } => "StartExecuting", + BundleEvent::Executed { .. } => "Executed", + BundleEvent::BackrunBundleExecuted { .. } => "BackrunBundleExecuted", + } + } + + pub fn bundle_id(&self) -> Option { + match self { + BundleEvent::Received { bundle_id, .. } => Some(*bundle_id), + BundleEvent::Cancelled { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BuilderIncluded { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BlockIncluded { bundle_id, .. } => Some(*bundle_id), + BundleEvent::Dropped { bundle_id, .. } => Some(*bundle_id), + BundleEvent::TransactionReceived { bundle_id, .. } => Some(*bundle_id), + BundleEvent::TransactionSent { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BackrunReceived { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BackrunSent { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BackrunInserted { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BackrunBundleExecuted { bundle_id, .. } => Some(*bundle_id), + BundleEvent::StartExecuting { bundle_id, .. } => *bundle_id, + BundleEvent::Executed { bundle_id, .. } => *bundle_id, + } + } + + pub fn tx_hash(&self) -> Option { + match self { + BundleEvent::TransactionReceived { bundle, .. } => { + bundle.txs.first().map(|tx| tx.tx_hash()) + } + BundleEvent::TransactionSent { tx_hash, .. } => Some(*tx_hash), + BundleEvent::BackrunReceived { bundle, .. } => { + bundle.txs.first().map(|tx| tx.tx_hash()) + } + BundleEvent::BackrunSent { target_tx_hash, .. } => Some(*target_tx_hash), + BundleEvent::BackrunInserted { target_tx_hash, .. } => Some(*target_tx_hash), + BundleEvent::StartExecuting { tx_hash, .. } => Some(*tx_hash), + BundleEvent::Executed { tx_hash, .. } => Some(*tx_hash), + BundleEvent::BackrunBundleExecuted { target_tx_hash, .. } => Some(*target_tx_hash), + _ => None, } } @@ -95,6 +192,54 @@ impl BundleEvent { BundleEvent::BuilderIncluded { .. } => vec![], BundleEvent::BlockIncluded { .. } => vec![], BundleEvent::Dropped { .. } => vec![], + BundleEvent::TransactionReceived { bundle, .. } => bundle + .txs + .iter() + .filter_map(|envelope| match envelope.recover_signer() { + Ok(sender) => Some(TransactionId { + sender, + nonce: U256::from(envelope.nonce()), + hash: *envelope.hash(), + }), + Err(_) => None, + }) + .collect(), + BundleEvent::BackrunReceived { bundle, .. } => bundle + .txs + .iter() + .filter_map(|envelope| match envelope.recover_signer() { + Ok(sender) => Some(TransactionId { + sender, + nonce: U256::from(envelope.nonce()), + hash: *envelope.hash(), + }), + Err(_) => None, + }) + .collect(), + BundleEvent::TransactionSent { .. } => vec![], + BundleEvent::BackrunSent { .. } => vec![], + BundleEvent::BackrunInserted { .. } => vec![], + BundleEvent::StartExecuting { .. } => vec![], + BundleEvent::Executed { .. } => vec![], + BundleEvent::BackrunBundleExecuted { .. } => vec![], + } + } + + pub fn timestamp_ms(&self) -> i64 { + match self { + BundleEvent::Received { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::Cancelled { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BuilderIncluded { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BlockIncluded { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::Dropped { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::TransactionReceived { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::TransactionSent { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BackrunReceived { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BackrunSent { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BackrunInserted { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::StartExecuting { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::Executed { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BackrunBundleExecuted { timestamp_ms, .. } => *timestamp_ms, } } @@ -107,8 +252,37 @@ impl BundleEvent { } => { format!("{bundle_id}-{block_hash}") } + BundleEvent::TransactionReceived { bundle_id, .. } => { + format!("transaction-received-{bundle_id}") + } + BundleEvent::TransactionSent { bundle_id, .. } => { + format!("transaction-sent-{bundle_id}") + } + // Backrun events use bundle_id + BundleEvent::BackrunReceived { bundle_id, .. } => { + format!("backrun-received-{bundle_id}") + } + BundleEvent::BackrunSent { bundle_id, .. } => { + format!("backrun-sent-{bundle_id}") + } + BundleEvent::BackrunInserted { bundle_id, .. } => { + format!("backrun-inserted-{bundle_id}") + } + BundleEvent::StartExecuting { tx_hash, .. } => { + format!("tx-start-executing-{tx_hash}") + } + BundleEvent::Executed { tx_hash, .. } => { + format!("tx-executed-{tx_hash}") + } + BundleEvent::BackrunBundleExecuted { bundle_id, .. } => { + format!("backrun-bundle-executed-{bundle_id}") + } _ => { - format!("{}-{}", self.bundle_id(), Uuid::new_v4()) + format!( + "{}-{}", + self.bundle_id().unwrap_or_default(), + Uuid::new_v4() + ) } } } diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index bb79d24d..2f07a6a8 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -24,10 +24,12 @@ async fn test_kafka_publisher_s3_archiver_integration() BundleEvent::Received { bundle_id: test_bundle_id, bundle: Box::new(create_bundle_from_txn_data()), + timestamp_ms: 1234567890, }, BundleEvent::Dropped { bundle_id: test_bundle_id, reason: DropReason::TimedOut, + timestamp_ms: 1234567891, }, ]; diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index 53b1a723..d3dd8f60 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -33,6 +33,7 @@ async fn test_event_write_and_read() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box anyhow::Result<()> { let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); - let (builder_backrun_tx, _) = broadcast::channel::(config.max_buffered_backrun_bundles); + let (builder_backrun_tx, _) = + broadcast::channel::<(Bundle, uuid::Uuid)>(config.max_buffered_backrun_bundles); + let (builder_tx_bundle_id_tx, _) = + broadcast::channel::<(TxHash, uuid::Uuid)>(config.max_buffered_meter_bundle_responses); config.builder_rpcs.iter().for_each(|builder_rpc| { let metering_rx = builder_tx.subscribe(); let backrun_rx = builder_backrun_tx.subscribe(); - connect_ingress_to_builder(metering_rx, backrun_rx, builder_rpc.clone()); + let tx_bundle_id_rx = builder_tx_bundle_id_tx.subscribe(); + connect_ingress_to_builder( + metering_rx, + backrun_rx, + tx_bundle_id_rx, + builder_rpc.clone(), + ); }); let health_check_addr = config.health_check_addr; @@ -95,6 +105,7 @@ async fn main() -> anyhow::Result<()> { audit_tx, builder_tx, builder_backrun_tx, + builder_tx_bundle_id_tx, cfg, ); let bind_addr = format!("{}:{}", config.address, config.port); diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 10d4d4c6..008b07b1 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -181,7 +181,8 @@ pub struct Config { pub fn connect_ingress_to_builder( metering_rx: broadcast::Receiver, - backrun_rx: broadcast::Receiver, + backrun_rx: broadcast::Receiver<(tips_core::Bundle, uuid::Uuid)>, + tx_bundle_id_rx: broadcast::Receiver<(TxHash, uuid::Uuid)>, builder_rpc: Url, ) { let builder: RootProvider = ProviderBuilder::new() @@ -212,16 +213,33 @@ pub fn connect_ingress_to_builder( } }); + let backrun_builder = builder.clone(); tokio::spawn(async move { let mut event_rx = backrun_rx; - while let Ok(bundle) = event_rx.recv().await { - if let Err(e) = builder + while let Ok((bundle, bundle_id)) = event_rx.recv().await { + if let Err(e) = backrun_builder .client() - .request::<(tips_core::Bundle,), ()>("base_sendBackrunBundle", (bundle,)) + .request::<(tips_core::Bundle, uuid::Uuid), ()>( + "base_sendBackrunBundle", + (bundle, bundle_id), + ) .await { error!(error = %e, "Failed to send backrun bundle to builder"); } } }); + + tokio::spawn(async move { + let mut event_rx = tx_bundle_id_rx; + while let Ok((tx_hash, bundle_id)) = event_rx.recv().await { + if let Err(e) = builder + .client() + .request::<(TxHash, uuid::Uuid), ()>("base_txBundleId", (tx_hash, bundle_id)) + .await + { + error!(error = %e, "Failed to send tx_bundle_id mapping to builder"); + } + } + }); } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 1a2f9d00..a4b58736 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,6 +1,6 @@ use alloy_consensus::transaction::Recovered; use alloy_consensus::{Transaction, transaction::SignerRecoverable}; -use alloy_primitives::{B256, Bytes}; +use alloy_primitives::{B256, Bytes, TxHash}; use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718}; use jsonrpsee::{ core::{RpcResult, async_trait}, @@ -74,7 +74,8 @@ pub struct IngressService { meter_bundle_timeout_ms: u64, builder_tx: broadcast::Sender, backrun_enabled: bool, - builder_backrun_tx: broadcast::Sender, + builder_backrun_tx: broadcast::Sender<(Bundle, uuid::Uuid)>, + builder_tx_bundle_id_tx: broadcast::Sender<(TxHash, uuid::Uuid)>, } impl IngressService { @@ -83,7 +84,8 @@ impl IngressService { queue: Q, audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, - builder_backrun_tx: broadcast::Sender, + builder_backrun_tx: broadcast::Sender<(Bundle, uuid::Uuid)>, + builder_tx_bundle_id_tx: broadcast::Sender<(TxHash, uuid::Uuid)>, config: Config, ) -> Self { let mempool_provider = Arc::new(providers.mempool); @@ -118,6 +120,7 @@ impl IngressService { builder_tx, backrun_enabled: config.backrun_enabled, builder_backrun_tx, + builder_tx_bundle_id_tx, } } } @@ -138,18 +141,37 @@ impl IngressApiServer for IngressService { let start = Instant::now(); let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; + let bundle_id = *accepted_bundle.uuid(); + + // Get target tx hash (first tx in bundle) + let target_tx_hash = accepted_bundle + .txs + .first() + .map(|tx| tx.tx_hash()) + .unwrap_or_default(); self.metrics.backrun_bundles_received_total.increment(1); - if let Err(e) = self.builder_backrun_tx.send(bundle) { + self.send_transaction_event(BundleEvent::BackrunReceived { + bundle_id, + bundle: Box::new(accepted_bundle.clone()), + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); + + if let Err(e) = self.builder_backrun_tx.send((bundle, bundle_id)) { warn!( message = "Failed to send backrun bundle to builders", bundle_hash = %bundle_hash, + bundle_id = %bundle_id, error = %e ); } - self.send_audit_event(&accepted_bundle, bundle_hash); + self.send_transaction_event(BundleEvent::BackrunSent { + bundle_id, + target_tx_hash, + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); self.metrics .backrun_bundles_sent_duration @@ -239,6 +261,14 @@ impl IngressApiServer for IngressService { let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.unwrap_or_default()); + let bundle_id = *accepted_bundle.uuid(); + + self.send_transaction_event(BundleEvent::TransactionReceived { + bundle_id, + bundle: Box::new(accepted_bundle.clone()), + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); + if send_to_kafka { if let Err(e) = self .bundle_queue_publisher @@ -291,7 +321,14 @@ impl IngressApiServer for IngressService { transaction_hash = %transaction.tx_hash(), ); - self.send_audit_event(&accepted_bundle, accepted_bundle.bundle_hash()); + // Send tx_hash -> bundle_id mapping to builder for audit tracking + self.send_tx_bundle_id(transaction.tx_hash(), bundle_id); + + self.send_transaction_event(BundleEvent::TransactionSent { + bundle_id, + tx_hash: transaction.tx_hash(), + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); self.metrics .send_raw_transaction_duration @@ -434,6 +471,7 @@ impl IngressService { let audit_event = BundleEvent::Received { bundle_id: *accepted_bundle.uuid(), bundle: Box::new(accepted_bundle.clone()), + timestamp_ms: chrono::Utc::now().timestamp_millis(), }; if let Err(e) = self.audit_channel.send(audit_event) { warn!( @@ -443,6 +481,26 @@ impl IngressService { ); } } + + fn send_tx_bundle_id(&self, tx_hash: TxHash, bundle_id: uuid::Uuid) { + if let Err(e) = self.builder_tx_bundle_id_tx.send((tx_hash, bundle_id)) { + warn!( + message = "Failed to send tx_bundle_id mapping", + tx_hash = %tx_hash, + bundle_id = %bundle_id, + error = %e + ); + } + } + + fn send_transaction_event(&self, event: BundleEvent) { + if let Err(e) = self.audit_channel.send(event) { + warn!( + message = "Failed to send transaction event", + error = %e + ); + } + } } #[cfg(test)] @@ -572,9 +630,16 @@ mod tests { let (audit_tx, _audit_rx) = mpsc::unbounded_channel(); let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let (tx_bundle_id_tx, _tx_bundle_id_rx) = broadcast::channel(1); let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + tx_bundle_id_tx, + config, ); let bundle = Bundle::default(); @@ -631,9 +696,16 @@ mod tests { let (audit_tx, _audit_rx) = mpsc::unbounded_channel(); let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let (tx_bundle_id_tx, _tx_bundle_id_rx) = broadcast::channel(1); let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + tx_bundle_id_tx, + config, ); // Valid signed transaction bytes diff --git a/justfile b/justfile index 4c30a664..2e367359 100644 --- a/justfile +++ b/justfile @@ -99,79 +99,14 @@ get-blocks: sender := "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" sender_key := "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" -backrunner := "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" -backrunner_key := "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a" +chain_id := "13" send-txn: #!/usr/bin/env bash set -euxo pipefail echo "sending txn" nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) - txn=$(cast mktx --private-key {{ sender_key }} 0x0000000000000000000000000000000000000000 --value 0.01ether --nonce $nonce --chain-id 13 -r {{ builder_url }}) + txn=$(cast mktx --private-key {{ sender_key }} 0x0000000000000000000000000000000000000000 --value 0.00001ether --nonce $nonce --chain-id {{ chain_id }} -r {{ builder_url }}) hash=$(curl -s {{ ingress_url }} -X POST -H "Content-Type: application/json" --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" | jq -r ".result") cast receipt $hash -r {{ sequencer_url }} | grep status cast receipt $hash -r {{ builder_url }} | grep status - -send-txn-with-backrun: - #!/usr/bin/env bash - set -euxo pipefail - - # 1. Get nonce and send target transaction from sender account - nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) - echo "Sending target transaction from sender (nonce=$nonce)..." - target_txn=$(cast mktx --private-key {{ sender_key }} \ - 0x0000000000000000000000000000000000000000 \ - --value 0.01ether \ - --nonce $nonce \ - --chain-id 13 \ - -r {{ builder_url }}) - - target_hash=$(curl -s {{ ingress_url }} -X POST \ - -H "Content-Type: application/json" \ - --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$target_txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" \ - | jq -r ".result") - echo "Target tx sent: $target_hash" - - # 2. Build backrun transaction from backrunner account (different account!) - backrun_nonce=$(cast nonce {{ backrunner }} -r {{ builder_url }}) - echo "Building backrun transaction from backrunner (nonce=$backrun_nonce)..." - backrun_txn=$(cast mktx --private-key {{ backrunner_key }} \ - 0x0000000000000000000000000000000000000001 \ - --value 0.001ether \ - --nonce $backrun_nonce \ - --chain-id 13 \ - -r {{ builder_url }}) - - # 3. Compute tx hashes for reverting_tx_hashes - backrun_hash_computed=$(cast keccak $backrun_txn) - echo "Target tx hash: $target_hash" - echo "Backrun tx hash: $backrun_hash_computed" - - # 4. Construct and send bundle with reverting_tx_hashes - echo "Sending backrun bundle..." - bundle_json=$(jq -n \ - --arg target "$target_txn" \ - --arg backrun "$backrun_txn" \ - --arg target_hash "$target_hash" \ - --arg backrun_hash "$backrun_hash_computed" \ - '{ - txs: [$target, $backrun], - blockNumber: 0, - revertingTxHashes: [$target_hash, $backrun_hash] - }') - - bundle_hash=$(curl -s {{ ingress_url }} -X POST \ - -H "Content-Type: application/json" \ - --data "{\"method\":\"eth_sendBackrunBundle\",\"params\":[$bundle_json],\"id\":1,\"jsonrpc\":\"2.0\"}" \ - | jq -r ".result") - echo "Bundle sent: $bundle_hash" - - # 5. Wait and verify both transactions - echo "Waiting for transactions to land..." - sleep 5 - - echo "=== Target transaction (from sender) ===" - cast receipt $target_hash -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" - - echo "=== Backrun transaction (from backrunner) ===" - cast receipt $backrun_hash_computed -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" || echo "Backrun tx not found yet" diff --git a/ui/src/app/api/txn/[hash]/route.ts b/ui/src/app/api/txn/[hash]/route.ts index 44f68243..bc572cf9 100644 --- a/ui/src/app/api/txn/[hash]/route.ts +++ b/ui/src/app/api/txn/[hash]/route.ts @@ -1,6 +1,7 @@ import { type NextRequest, NextResponse } from "next/server"; import { type BundleEvent, + type BundleHistory, getBundleHistory, getTransactionMetadataByHash, } from "@/lib/s3"; @@ -28,15 +29,17 @@ export interface TransactionEvent { }; } +export type BundleEventWithId = BundleEvent & { bundleId: string }; + export interface TransactionHistoryResponse { hash: string; bundle_ids: string[]; - history: BundleEvent[]; + history: BundleEventWithId[]; } export async function GET( _request: NextRequest, - { params }: { params: Promise<{ hash: string }> }, + { params }: { params: Promise<{ hash: string }> } ) { try { const { hash } = await params; @@ -46,20 +49,49 @@ export async function GET( if (!metadata) { return NextResponse.json( { error: "Transaction not found" }, - { status: 404 }, + { status: 404 } ); } - // TODO: Can be in multiple bundles - const bundle = await getBundleHistory(metadata.bundle_ids[0]); - if (!bundle) { - return NextResponse.json({ error: "Bundle not found" }, { status: 404 }); + // Fetch ALL bundle histories in parallel + const bundleHistories = await Promise.all( + metadata.bundle_ids.map((id) => getBundleHistory(id)) + ); + + // Filter out nulls and merge all events, tagging each with its bundleId + const allEvents: BundleEventWithId[] = bundleHistories + .map((bundle, index) => ({ + bundle, + bundleId: metadata.bundle_ids[index], + })) + .filter( + (item): item is { bundle: BundleHistory; bundleId: string } => + item.bundle !== null + ) + .flatMap(({ bundle, bundleId }) => + bundle.history.map((event) => ({ ...event, bundleId })) + ); + + if (allEvents.length === 0) { + return NextResponse.json( + { error: "No bundle history found" }, + { status: 404 } + ); } + // Sort by timestamp + allEvents.sort((a, b) => (a.data.timestamp ?? 0) - (b.data.timestamp ?? 0)); + + // Deduplicate by event key + const uniqueEvents = allEvents.filter( + (event, index, self) => + index === self.findIndex((e) => e.data?.key === event.data?.key) + ); + const response: TransactionHistoryResponse = { hash, bundle_ids: metadata.bundle_ids, - history: bundle.history, + history: uniqueEvents, }; return NextResponse.json(response); @@ -67,7 +99,7 @@ export async function GET( console.error("Error fetching transaction data:", error); return NextResponse.json( { error: "Internal server error" }, - { status: 500 }, + { status: 500 } ); } } diff --git a/ui/src/app/bundles/[uuid]/page.tsx b/ui/src/app/bundles/[uuid]/page.tsx index 95187f30..8b3c523c 100644 --- a/ui/src/app/bundles/[uuid]/page.tsx +++ b/ui/src/app/bundles/[uuid]/page.tsx @@ -198,7 +198,9 @@ function TransactionDetails({ @@ -425,6 +427,16 @@ function SectionTitle({ children }: { children: React.ReactNode }) { ); } +function formatTimestampWithMs(timestamp: number): string { + const date = new Date(timestamp); + const dateStr = date.toLocaleDateString(); + const hours = date.getHours().toString().padStart(2, "0"); + const minutes = date.getMinutes().toString().padStart(2, "0"); + const seconds = date.getSeconds().toString().padStart(2, "0"); + const ms = date.getMilliseconds().toString().padStart(3, "0"); + return `${dateStr}, ${hours}:${minutes}:${seconds}.${ms}`; +} + export default function BundlePage({ params }: PageProps) { const [uuid, setUuid] = useState(""); const [data, setData] = useState(null); @@ -445,7 +457,7 @@ export default function BundlePage({ params }: PageProps) { setError( response.status === 404 ? "Bundle not found" - : "Failed to fetch bundle data", + : "Failed to fetch bundle data" ); setData(null); return; diff --git a/ui/src/app/page.tsx b/ui/src/app/page.tsx index 21e257ff..a6cfdc6b 100644 --- a/ui/src/app/page.tsx +++ b/ui/src/app/page.tsx @@ -31,7 +31,13 @@ function SearchBar({ onError }: { onError: (error: string | null) => void }) { const result = await response.json(); if (result.bundle_ids && result.bundle_ids.length > 0) { - router.push(`/bundles/${result.bundle_ids[0]}`); + if (result.bundle_ids.length === 1) { + // Single bundle - go directly to bundle page + router.push(`/bundles/${result.bundle_ids[0]}`); + } else { + // Multiple bundles - show transaction page with merged history + router.push(`/txn/${hash}`); + } } else { onError("No bundle found for this transaction"); } @@ -70,10 +76,10 @@ function BlockRow({ block, index }: { block: BlockSummary; index: number }) { timeSince <= 0 ? "now" : timeSince < 60 - ? `${timeSince}s ago` - : timeSince < 3600 - ? `${Math.floor(timeSince / 60)}m ago` - : `${Math.floor(timeSince / 3600)}h ago`; + ? `${timeSince}s ago` + : timeSince < 3600 + ? `${Math.floor(timeSince / 60)}m ago` + : `${Math.floor(timeSince / 3600)}h ago`; return ( ; } -export default function TransactionRedirectPage({ params }: PageProps) { +// Color palette for distinguishing bundles +const BUNDLE_COLORS = [ + { + bg: "bg-blue-100", + dot: "bg-blue-600", + text: "text-blue-700", + badge: "bg-blue-50 text-blue-700 ring-blue-600/20", + link: "bg-blue-50 text-blue-600", + }, + { + bg: "bg-purple-100", + dot: "bg-purple-600", + text: "text-purple-700", + badge: "bg-purple-50 text-purple-700 ring-purple-600/20", + link: "bg-purple-50 text-purple-600", + }, + { + bg: "bg-amber-100", + dot: "bg-amber-600", + text: "text-amber-700", + badge: "bg-amber-50 text-amber-700 ring-amber-600/20", + link: "bg-amber-50 text-amber-600", + }, + { + bg: "bg-emerald-100", + dot: "bg-emerald-600", + text: "text-emerald-700", + badge: "bg-emerald-50 text-emerald-700 ring-emerald-600/20", + link: "bg-emerald-50 text-emerald-600", + }, + { + bg: "bg-rose-100", + dot: "bg-rose-600", + text: "text-rose-700", + badge: "bg-rose-50 text-rose-700 ring-rose-600/20", + link: "bg-rose-50 text-rose-600", + }, + { + bg: "bg-cyan-100", + dot: "bg-cyan-600", + text: "text-cyan-700", + badge: "bg-cyan-50 text-cyan-700 ring-cyan-600/20", + link: "bg-cyan-50 text-cyan-600", + }, +]; + +type BundleColorMap = Map; + +function Badge({ + children, + variant = "default", + className = "", +}: { + children: React.ReactNode; + variant?: "default" | "success" | "warning" | "error"; + className?: string; +}) { + const variants = { + default: "bg-blue-50 text-blue-700 ring-blue-600/20", + success: "bg-emerald-50 text-emerald-700 ring-emerald-600/20", + warning: "bg-amber-50 text-amber-700 ring-amber-600/20", + error: "bg-red-50 text-red-700 ring-red-600/20", + }; + + return ( + + {children} + + ); +} + +function Card({ + children, + className = "", +}: { + children: React.ReactNode; + className?: string; +}) { + return ( +
+ {children} +
+ ); +} + +function TimelineEventDetails({ + event, + colors, +}: { + event: BundleEventWithId; + colors: (typeof BUNDLE_COLORS)[0]; +}) { + if (event.event === "BlockIncluded" && event.data?.block_hash) { + return ( +
+ {event.event} + + Block #{event.data.block_number} + +
+ ); + } + + if (event.event === "BuilderIncluded" && event.data?.builder) { + return ( +
+ {event.event} + + {event.data.builder} (flashblock #{event.data.flashblock_index}) + +
+ ); + } + + if (event.event === "Dropped" && event.data?.reason) { + return ( +
+ {event.event} + {event.data.reason} +
+ ); + } + + if (event.event === "Executed") { + return ( +
+ {event.event} +
+ ); + } + + if (event.event === "BackrunBundleExecuted") { + return ( +
+ {event.event} +
+ ); + } + + return {event.event}; +} + +function Timeline({ + events, + bundleColorMap, +}: { + events: BundleEventWithId[]; + bundleColorMap: BundleColorMap; +}) { + if (events.length === 0) return null; + + return ( +
+ {events.map((event, index) => { + const colors = bundleColorMap.get(event.bundleId) ?? BUNDLE_COLORS[0]; + return ( +
+
+
+
+
+ + +
+
+ ); + })} +
+ ); +} + +function SectionTitle({ children }: { children: React.ReactNode }) { + return ( +

{children}

+ ); +} + +function formatTimestampWithMs(timestamp: number): string { + const date = new Date(timestamp); + const dateStr = date.toLocaleDateString(); + const hours = date.getHours().toString().padStart(2, "0"); + const minutes = date.getMinutes().toString().padStart(2, "0"); + const seconds = date.getSeconds().toString().padStart(2, "0"); + const ms = date.getMilliseconds().toString().padStart(3, "0"); + return `${dateStr}, ${hours}:${minutes}:${seconds}.${ms}`; +} + +export default function TransactionPage({ params }: PageProps) { const router = useRouter(); const [hash, setHash] = useState(""); + const [data, setData] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); @@ -25,7 +235,7 @@ export default function TransactionRedirectPage({ params }: PageProps) { useEffect(() => { if (!hash) return; - const fetchAndRedirect = async () => { + const fetchData = async () => { try { const response = await fetch(`/api/txn/${hash}`); if (!response.ok) { @@ -38,11 +248,13 @@ export default function TransactionRedirectPage({ params }: PageProps) { } const result: TransactionHistoryResponse = await response.json(); - if (result.bundle_ids && result.bundle_ids.length > 0) { - router.push(`/bundles/${result.bundle_ids[0]}`); - } else { - setError("No bundle found for this transaction"); + // If only 1 bundle, redirect to bundle page + if (result.bundle_ids && result.bundle_ids.length === 1) { + router.replace(`/bundles/${result.bundle_ids[0]}`); + return; } + + setData(result); } catch (_err) { setError("Failed to fetch transaction data"); } finally { @@ -50,10 +262,10 @@ export default function TransactionRedirectPage({ params }: PageProps) { } }; - fetchAndRedirect(); + fetchData(); }, [hash, router]); - if (!hash) { + if (!hash || loading) { return (
Loading...
@@ -61,19 +273,71 @@ export default function TransactionRedirectPage({ params }: PageProps) { ); } + if (error) { + return ( +
+
+

Transaction

+

{hash}

+
{error}
+
+
+ ); + } + + if (!data) { + return null; + } + + // Create bundle -> color mapping + const bundleColorMap: BundleColorMap = new Map( + data.bundle_ids.map((id, index) => [ + id, + BUNDLE_COLORS[index % BUNDLE_COLORS.length], + ]) + ); + return ( -
+
-

Transaction {hash}

- {loading && ( -
- Redirecting to bundle page... -
- )} - {error && ( -
{error}
- )} +

Transaction

+

{hash}

+ + {/* Bundle IDs */} +
+ Associated Bundles + +
+ {data.bundle_ids.map((bundleId) => { + const colors = bundleColorMap.get(bundleId) ?? BUNDLE_COLORS[0]; + return ( + + {bundleId} + + ); + })} +
+
+
+ + {/* Event History */} +
+ Event History + + {data.history.length > 0 ? ( + + ) : ( +
+ No events recorded yet. +
+ )} +
+
); } diff --git a/ui/src/lib/s3.ts b/ui/src/lib/s3.ts index 229fdad1..ac48f24b 100644 --- a/ui/src/lib/s3.ts +++ b/ui/src/lib/s3.ts @@ -66,7 +66,7 @@ async function getObjectContent(key: string): Promise { } export async function getTransactionMetadataByHash( - hash: string, + hash: string ): Promise { const key = `transactions/by_hash/${hash}`; const content = await getObjectContent(key); @@ -80,7 +80,7 @@ export async function getTransactionMetadataByHash( } catch (error) { console.error( `Failed to parse transaction metadata for hash ${hash}:`, - error, + error ); return null; } @@ -160,7 +160,7 @@ export interface BundleHistory { } export async function getBundleHistory( - bundleId: string, + bundleId: string ): Promise { const key = `bundles/${bundleId}`; const content = await getObjectContent(key); @@ -174,7 +174,7 @@ export async function getBundleHistory( } catch (error) { console.error( `Failed to parse bundle history for bundle ${bundleId}:`, - error, + error ); return null; } @@ -201,7 +201,7 @@ export interface BlockData { } export async function getBlockFromCache( - blockHash: string, + blockHash: string ): Promise { const key = `blocks/${blockHash}`; const content = await getObjectContent(key); @@ -222,7 +222,7 @@ export async function getBlockFromCache( (tx: BlockTransaction & { gasUsed: string }) => ({ ...tx, gasUsed: BigInt(tx.gasUsed), - }), + }) ), } as BlockData; } catch (error) { @@ -239,7 +239,7 @@ export async function cacheBlockData(blockData: BlockData): Promise { Bucket: BUCKET_NAME, Key: key, Body: JSON.stringify(blockData, (_, value) => - typeof value === "bigint" ? value.toString() : value, + typeof value === "bigint" ? value.toString() : value ), ContentType: "application/json", });