From 2ee194f94b87de06dc22eeaa5a4242de99a26a02 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Fri, 5 Dec 2025 21:53:31 -0500 Subject: [PATCH 1/5] add more logs --- crates/ingress-rpc/src/lib.rs | 4 +++- crates/ingress-rpc/src/service.rs | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index fe98f2b..205414a 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -12,7 +12,7 @@ use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use tips_core::MeterBundleResponse; use tokio::sync::broadcast; -use tracing::error; +use tracing::{error, info}; use url::Url; #[derive(Debug, Clone, Copy)] @@ -175,6 +175,7 @@ pub fn connect_ingress_to_builder( backrun_rx: broadcast::Receiver, builder_rpc: Url, ) { + let builder_rpc_clone = builder_rpc.clone(); let builder: RootProvider = ProviderBuilder::new() .disable_recommended_fillers() .network::() @@ -209,6 +210,7 @@ pub fn connect_ingress_to_builder( { error!(error = %e, "Failed to send backrun bundle to builder"); } + info!(message = "Sent backrun bundle to builder", builder_rpc = %builder_rpc_clone); } }); } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 7f28e67..f5d571f 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -135,6 +135,7 @@ where let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; self.metrics.backrun_bundles_received_total.increment(1); + info!(message = "Received backrun bundle", bundle_hash = %bundle_hash); if let Err(e) = self.builder_backrun_tx.send(bundle) { warn!( @@ -197,6 +198,8 @@ where let start = Instant::now(); let transaction = self.get_tx(&data).await?; + info!(message = "Received raw transaction", tx_hash = %transaction.tx_hash()); + let send_to_kafka = matches!( self.tx_submission_method, TxSubmissionMethod::Kafka | TxSubmissionMethod::MempoolAndKafka From 9887411808cb304db70059ba7dbaff2bcdcc2bbd Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 9 Dec 2025 18:49:19 -0500 Subject: [PATCH 2/5] more events --- Cargo.lock | 4 + crates/audit/src/publisher.rs | 10 +- crates/audit/src/reader.rs | 10 +- crates/audit/src/storage.rs | 192 ++++++++++++++- crates/audit/src/types.rs | 225 +++++++++++++++++- crates/audit/tests/integration_tests.rs | 2 + crates/audit/tests/s3_test.rs | 11 +- crates/ingress-rpc/Cargo.toml | 2 + crates/ingress-rpc/src/bin/main.rs | 15 +- crates/ingress-rpc/src/lib.rs | 30 ++- crates/ingress-rpc/src/service.rs | 103 +++++++- justfile | 39 +-- ui/src/app/api/txn/[hash]/route.ts | 52 +++- ui/src/app/bundles/[uuid]/page.tsx | 18 +- ui/src/app/page.tsx | 16 +- ui/src/app/txn/[hash]/page.tsx | 302 ++++++++++++++++++++++-- ui/src/lib/s3.ts | 2 + 17 files changed, 936 insertions(+), 97 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7938a3..65abda2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1964,8 +1964,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -7086,6 +7088,7 @@ dependencies = [ "async-trait", "axum", "backon", + "chrono", "clap", "dotenvy", "jsonrpsee", @@ -7105,6 +7108,7 @@ dependencies = [ "tokio", "tracing", "url", + "uuid", "wiremock", ] diff --git a/crates/audit/src/publisher.rs b/crates/audit/src/publisher.rs index 0d31391..d908381 100644 --- a/crates/audit/src/publisher.rs +++ b/crates/audit/src/publisher.rs @@ -24,7 +24,6 @@ impl KafkaBundleEventPublisher { } async fn send_event(&self, event: &BundleEvent) -> Result<()> { - let bundle_id = event.bundle_id(); let key = event.generate_event_key(); let payload = serde_json::to_vec(event)?; @@ -37,7 +36,8 @@ impl KafkaBundleEventPublisher { { Ok(_) => { debug!( - bundle_id = %bundle_id, + bundle_id = ?event.bundle_id(), + tx_hash = ?event.tx_hash(), topic = %self.topic, payload_size = payload.len(), "Successfully published event" @@ -46,7 +46,8 @@ impl KafkaBundleEventPublisher { } Err((err, _)) => { error!( - bundle_id = %bundle_id, + bundle_id = ?event.bundle_id(), + tx_hash = ?event.tx_hash(), topic = %self.topic, error = %err, "Failed to publish event" @@ -90,7 +91,8 @@ impl Default for LoggingBundleEventPublisher { impl BundleEventPublisher for LoggingBundleEventPublisher { async fn publish(&self, event: BundleEvent) -> Result<()> { info!( - bundle_id = %event.bundle_id(), + bundle_id = ?event.bundle_id(), + tx_hash = ?event.tx_hash(), event = ?event, "Received bundle event" ); diff --git a/crates/audit/src/reader.rs b/crates/audit/src/reader.rs index 207b8ac..b1e1ab5 100644 --- a/crates/audit/src/reader.rs +++ b/crates/audit/src/reader.rs @@ -10,7 +10,7 @@ use rdkafka::{ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tips_core::kafka::load_kafka_config_from_file; use tokio::time::sleep; -use tracing::{debug, error}; +use tracing::{error, info}; pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result { let client_config = @@ -79,12 +79,14 @@ impl EventReader for KafkaAuditLogReader { let event: BundleEvent = serde_json::from_slice(payload)?; - debug!( - bundle_id = %event.bundle_id(), + info!( + event_name = %event.event_name(), + bundle_id = ?event.bundle_id(), + tx_hash = ?event.tx_hash(), timestamp = timestamp, offset = message.offset(), partition = message.partition(), - "Received event with timestamp" + "Received event from Kafka" ); self.last_message_offset = Some(message.offset()); diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index c8b619f..8bc02c9 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -63,6 +63,62 @@ pub enum BundleHistoryEvent { timestamp: i64, reason: DropReason, }, + /// Transaction received by ingress-rpc + TransactionReceived { + key: String, + timestamp: i64, + bundle: Box, + }, + /// Transaction sent from ingress-rpc + TransactionSent { + key: String, + timestamp: i64, + tx_hash: TxHash, + }, + /// Backrun bundle received by ingress-rpc + BackrunReceived { + key: String, + timestamp: i64, + bundle: Box, + }, + /// Backrun bundle sent to builder + BackrunSent { + key: String, + timestamp: i64, + target_tx_hash: TxHash, + }, + /// Backrun bundle inserted into builder store + BackrunInserted { + key: String, + timestamp: i64, + target_tx_hash: TxHash, + backrun_tx_hashes: Vec, + }, + /// Transaction selected for execution + StartExecuting { + key: String, + timestamp: i64, + tx_hash: TxHash, + block_number: u64, + }, + /// Transaction successfully executed + Executed { + key: String, + timestamp: i64, + tx_hash: TxHash, + block_number: u64, + gas_used: u64, + }, + /// Backrun bundle transaction executed + BackrunBundleExecuted { + key: String, + timestamp: i64, + target_tx_hash: TxHash, + backrun_tx_hash: TxHash, + block_number: u64, + gas_used: u64, + success: bool, + }, } impl BundleHistoryEvent { @@ -73,6 +129,14 @@ impl BundleHistoryEvent { BundleHistoryEvent::BuilderIncluded { key, .. } => key, BundleHistoryEvent::BlockIncluded { key, .. } => key, BundleHistoryEvent::Dropped { key, .. } => key, + BundleHistoryEvent::TransactionReceived { key, .. } => key, + BundleHistoryEvent::TransactionSent { key, .. } => key, + BundleHistoryEvent::BackrunReceived { key, .. } => key, + BundleHistoryEvent::BackrunSent { key, .. } => key, + BundleHistoryEvent::BackrunInserted { key, .. } => key, + BundleHistoryEvent::StartExecuting { key, .. } => key, + BundleHistoryEvent::Executed { key, .. } => key, + BundleHistoryEvent::BackrunBundleExecuted { key, .. } => key, } } } @@ -87,7 +151,13 @@ fn update_bundle_history_transform( event: &Event, ) -> Option { let mut history = bundle_history.history; - let bundle_id = event.event.bundle_id(); + let bundle_id = match event.event.bundle_id() { + Some(id) => id, + None => { + // Backrun events don't have bundle_ids, skip history update + return None; + } + }; // Check for deduplication - if event with same key already exists, skip if history.iter().any(|h| h.key() == event.key) { @@ -99,15 +169,18 @@ fn update_bundle_history_transform( return None; } + // Use the event's internal timestamp_ms instead of Kafka timestamp for accurate ordering + let timestamp = event.event.timestamp_ms(); + let history_event = match &event.event { BundleEvent::Received { bundle, .. } => BundleHistoryEvent::Received { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, bundle: bundle.clone(), }, BundleEvent::Cancelled { .. } => BundleHistoryEvent::Cancelled { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, }, BundleEvent::BuilderIncluded { builder, @@ -116,7 +189,7 @@ fn update_bundle_history_transform( .. } => BundleHistoryEvent::BuilderIncluded { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, builder: builder.clone(), block_number: *block_number, flashblock_index: *flashblock_index, @@ -127,15 +200,85 @@ fn update_bundle_history_transform( .. } => BundleHistoryEvent::BlockIncluded { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, block_number: *block_number, block_hash: *block_hash, }, BundleEvent::Dropped { reason, .. } => BundleHistoryEvent::Dropped { key: event.key.clone(), - timestamp: event.timestamp, + timestamp, reason: reason.clone(), }, + BundleEvent::TransactionReceived { bundle, .. } => { + BundleHistoryEvent::TransactionReceived { + key: event.key.clone(), + timestamp, + bundle: bundle.clone(), + } + } + BundleEvent::TransactionSent { tx_hash, .. } => BundleHistoryEvent::TransactionSent { + key: event.key.clone(), + timestamp, + tx_hash: *tx_hash, + }, + BundleEvent::BackrunReceived { bundle, .. } => BundleHistoryEvent::BackrunReceived { + key: event.key.clone(), + timestamp, + bundle: bundle.clone(), + }, + BundleEvent::BackrunSent { target_tx_hash, .. } => BundleHistoryEvent::BackrunSent { + key: event.key.clone(), + timestamp, + target_tx_hash: *target_tx_hash, + }, + BundleEvent::BackrunInserted { + target_tx_hash, + backrun_tx_hashes, + .. + } => BundleHistoryEvent::BackrunInserted { + key: event.key.clone(), + timestamp, + target_tx_hash: *target_tx_hash, + backrun_tx_hashes: backrun_tx_hashes.clone(), + }, + BundleEvent::StartExecuting { + tx_hash, + block_number, + .. + } => BundleHistoryEvent::StartExecuting { + key: event.key.clone(), + timestamp, + tx_hash: *tx_hash, + block_number: *block_number, + }, + BundleEvent::Executed { + tx_hash, + block_number, + gas_used, + .. + } => BundleHistoryEvent::Executed { + key: event.key.clone(), + timestamp, + tx_hash: *tx_hash, + block_number: *block_number, + gas_used: *gas_used, + }, + BundleEvent::BackrunBundleExecuted { + target_tx_hash, + backrun_tx_hash, + block_number, + gas_used, + success, + .. + } => BundleHistoryEvent::BackrunBundleExecuted { + key: event.key.clone(), + timestamp, + target_tx_hash: *target_tx_hash, + backrun_tx_hash: *backrun_tx_hash, + block_number: *block_number, + gas_used: *gas_used, + success: *success, + }, }; history.push(history_event); @@ -190,7 +333,25 @@ impl S3EventReaderWriter { } async fn update_bundle_history(&self, event: Event) -> Result<()> { - let s3_key = S3Key::Bundle(event.event.bundle_id()).to_string(); + let event_name = event.event.event_name(); + + // Backrun events without bundle_ids are skipped from S3 storage + let Some(bundle_id) = event.event.bundle_id() else { + info!( + event_name = %event_name, + tx_hash = ?event.event.tx_hash(), + "Skipping S3 storage for event without bundle_id" + ); + return Ok(()); + }; + + info!( + event_name = %event_name, + bundle_id = %bundle_id, + "Processing event for S3 storage" + ); + + let s3_key = S3Key::Bundle(bundle_id).to_string(); self.idempotent_write::(&s3_key, |current_history| { update_bundle_history_transform(current_history, &event) @@ -325,7 +486,11 @@ impl S3EventReaderWriter { #[async_trait] impl EventWriter for S3EventReaderWriter { async fn archive_event(&self, event: Event) -> Result<()> { - let bundle_id = event.event.bundle_id(); + // Backrun events don't have bundle_ids, skip S3 archival + let Some(bundle_id) = event.event.bundle_id() else { + return Ok(()); + }; + let transaction_ids = event.event.transaction_ids(); self.update_bundle_history(event.clone()).await?; @@ -384,6 +549,7 @@ mod tests { let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle.clone()), + timestamp_ms: 1234567890, }; let event = create_test_event("test-key", 1234567890, bundle_event); @@ -423,6 +589,7 @@ mod tests { let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle), + timestamp_ms: 1234567890, }; let event = create_test_event("duplicate-key", 1234567890, bundle_event); @@ -440,12 +607,16 @@ mod tests { let bundle_event = BundleEvent::Received { bundle_id, bundle: Box::new(bundle), + timestamp_ms: 1234567890, }; let event = create_test_event("test-key", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); assert!(result.is_some()); - let bundle_event = BundleEvent::Cancelled { bundle_id }; + let bundle_event = BundleEvent::Cancelled { + bundle_id, + timestamp_ms: 1234567890, + }; let event = create_test_event("test-key-2", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); assert!(result.is_some()); @@ -455,6 +626,7 @@ mod tests { builder: "test-builder".to_string(), block_number: 12345, flashblock_index: 1, + timestamp_ms: 1234567890, }; let event = create_test_event("test-key-3", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); @@ -464,6 +636,7 @@ mod tests { bundle_id, block_number: 12345, block_hash: TxHash::from([1u8; 32]), + timestamp_ms: 1234567890, }; let event = create_test_event("test-key-4", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); @@ -472,6 +645,7 @@ mod tests { let bundle_event = BundleEvent::Dropped { bundle_id, reason: DropReason::TimedOut, + timestamp_ms: 1234567890, }; let event = create_test_event("test-key-5", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history, &event); diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index 6b0d7d1..88d3dd8 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -32,35 +32,147 @@ pub enum BundleEvent { Received { bundle_id: BundleId, bundle: Box, + timestamp_ms: i64, }, Cancelled { bundle_id: BundleId, + timestamp_ms: i64, }, BuilderIncluded { bundle_id: BundleId, builder: String, block_number: u64, flashblock_index: u64, + timestamp_ms: i64, }, BlockIncluded { bundle_id: BundleId, block_number: u64, block_hash: TxHash, + timestamp_ms: i64, }, Dropped { bundle_id: BundleId, reason: DropReason, + timestamp_ms: i64, + }, + /// Transaction received by ingress-rpc (start of send_raw_transaction) + TransactionReceived { + bundle_id: BundleId, + bundle: Box, + timestamp_ms: i64, + }, + /// Transaction processing complete in ingress-rpc (end of send_raw_transaction) + TransactionSent { + bundle_id: BundleId, + tx_hash: TxHash, + timestamp_ms: i64, + }, + /// Backrun bundle received by ingress-rpc (start of send_backrun_bundle) + BackrunReceived { + bundle_id: BundleId, + bundle: Box, + timestamp_ms: i64, + }, + /// Backrun bundle sent to builder (end of send_backrun_bundle) + BackrunSent { + bundle_id: BundleId, + target_tx_hash: TxHash, + timestamp_ms: i64, + }, + /// Backrun bundle inserted into builder store + BackrunInserted { + bundle_id: BundleId, + target_tx_hash: TxHash, + backrun_tx_hashes: Vec, + timestamp_ms: i64, + }, + /// Transaction selected from mempool, about to start executing + StartExecuting { + bundle_id: Option, + tx_hash: TxHash, + block_number: u64, + timestamp_ms: i64, + }, + /// Transaction successfully executed and committed + Executed { + bundle_id: Option, + tx_hash: TxHash, + block_number: u64, + gas_used: u64, + timestamp_ms: i64, + }, + /// Backrun bundle transaction executed (success or reverted) + BackrunBundleExecuted { + bundle_id: BundleId, + target_tx_hash: TxHash, + backrun_tx_hash: TxHash, + block_number: u64, + gas_used: u64, + success: bool, + timestamp_ms: i64, }, } impl BundleEvent { - pub fn bundle_id(&self) -> BundleId { + /// Returns a human-readable name for the event type + pub fn event_name(&self) -> &'static str { + match self { + BundleEvent::Received { .. } => "Received", + BundleEvent::Cancelled { .. } => "Cancelled", + BundleEvent::BuilderIncluded { .. } => "BuilderIncluded", + BundleEvent::BlockIncluded { .. } => "BlockIncluded", + BundleEvent::Dropped { .. } => "Dropped", + BundleEvent::TransactionReceived { .. } => "TransactionReceived", + BundleEvent::TransactionSent { .. } => "TransactionSent", + BundleEvent::BackrunReceived { .. } => "BackrunReceived", + BundleEvent::BackrunSent { .. } => "BackrunSent", + BundleEvent::BackrunInserted { .. } => "BackrunInserted", + BundleEvent::StartExecuting { .. } => "StartExecuting", + BundleEvent::Executed { .. } => "Executed", + BundleEvent::BackrunBundleExecuted { .. } => "BackrunBundleExecuted", + } + } + + /// Returns the bundle_id for events that have one + pub fn bundle_id(&self) -> Option { match self { - BundleEvent::Received { bundle_id, .. } => *bundle_id, - BundleEvent::Cancelled { bundle_id, .. } => *bundle_id, - BundleEvent::BuilderIncluded { bundle_id, .. } => *bundle_id, - BundleEvent::BlockIncluded { bundle_id, .. } => *bundle_id, - BundleEvent::Dropped { bundle_id, .. } => *bundle_id, + BundleEvent::Received { bundle_id, .. } => Some(*bundle_id), + BundleEvent::Cancelled { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BuilderIncluded { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BlockIncluded { bundle_id, .. } => Some(*bundle_id), + BundleEvent::Dropped { bundle_id, .. } => Some(*bundle_id), + // Transaction events + BundleEvent::TransactionReceived { bundle_id, .. } => Some(*bundle_id), + BundleEvent::TransactionSent { bundle_id, .. } => Some(*bundle_id), + // Backrun events with bundle_id + BundleEvent::BackrunReceived { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BackrunSent { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BackrunInserted { bundle_id, .. } => Some(*bundle_id), + BundleEvent::BackrunBundleExecuted { bundle_id, .. } => Some(*bundle_id), + // These events have optional bundle_id (looked up from tx hash) + BundleEvent::StartExecuting { bundle_id, .. } => *bundle_id, + BundleEvent::Executed { bundle_id, .. } => *bundle_id, + } + } + + /// Returns the tx_hash for events that track individual transactions + pub fn tx_hash(&self) -> Option { + match self { + BundleEvent::TransactionReceived { bundle, .. } => { + bundle.txs.first().map(|tx| tx.tx_hash()) + } + BundleEvent::TransactionSent { tx_hash, .. } => Some(*tx_hash), + BundleEvent::BackrunReceived { bundle, .. } => { + bundle.txs.first().map(|tx| tx.tx_hash()) + } + BundleEvent::BackrunSent { target_tx_hash, .. } => Some(*target_tx_hash), + BundleEvent::BackrunInserted { target_tx_hash, .. } => Some(*target_tx_hash), + BundleEvent::StartExecuting { tx_hash, .. } => Some(*tx_hash), + BundleEvent::Executed { tx_hash, .. } => Some(*tx_hash), + BundleEvent::BackrunBundleExecuted { target_tx_hash, .. } => Some(*target_tx_hash), + // Standard bundle events don't have a tx_hash + _ => None, } } @@ -86,6 +198,58 @@ impl BundleEvent { BundleEvent::BuilderIncluded { .. } => vec![], BundleEvent::BlockIncluded { .. } => vec![], BundleEvent::Dropped { .. } => vec![], + // TransactionReceived has full bundle, extract transaction IDs + BundleEvent::TransactionReceived { bundle, .. } => bundle + .txs + .iter() + .filter_map(|envelope| match envelope.recover_signer() { + Ok(sender) => Some(TransactionId { + sender, + nonce: U256::from(envelope.nonce()), + hash: *envelope.hash(), + }), + Err(_) => None, + }) + .collect(), + // BackrunReceived has full bundle, extract transaction IDs + BundleEvent::BackrunReceived { bundle, .. } => bundle + .txs + .iter() + .filter_map(|envelope| match envelope.recover_signer() { + Ok(sender) => Some(TransactionId { + sender, + nonce: U256::from(envelope.nonce()), + hash: *envelope.hash(), + }), + Err(_) => None, + }) + .collect(), + // These events don't track transaction IDs this way + BundleEvent::TransactionSent { .. } => vec![], + BundleEvent::BackrunSent { .. } => vec![], + BundleEvent::BackrunInserted { .. } => vec![], + BundleEvent::StartExecuting { .. } => vec![], + BundleEvent::Executed { .. } => vec![], + BundleEvent::BackrunBundleExecuted { .. } => vec![], + } + } + + /// Returns the timestamp_ms for any event + pub fn timestamp_ms(&self) -> i64 { + match self { + BundleEvent::Received { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::Cancelled { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BuilderIncluded { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BlockIncluded { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::Dropped { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::TransactionReceived { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::TransactionSent { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BackrunReceived { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BackrunSent { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BackrunInserted { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::StartExecuting { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::Executed { timestamp_ms, .. } => *timestamp_ms, + BundleEvent::BackrunBundleExecuted { timestamp_ms, .. } => *timestamp_ms, } } @@ -98,8 +262,55 @@ impl BundleEvent { } => { format!("{bundle_id}-{block_hash}") } + // Transaction events + BundleEvent::TransactionReceived { bundle_id, .. } => { + format!("transaction-received-{bundle_id}") + } + BundleEvent::TransactionSent { bundle_id, .. } => { + format!("transaction-sent-{bundle_id}") + } + // Backrun events use bundle_id + BundleEvent::BackrunReceived { bundle_id, .. } => { + format!("backrun-received-{bundle_id}") + } + BundleEvent::BackrunSent { bundle_id, .. } => { + format!("backrun-sent-{bundle_id}") + } + BundleEvent::BackrunInserted { bundle_id, .. } => { + format!("backrun-inserted-{bundle_id}") + } + BundleEvent::StartExecuting { + bundle_id, + tx_hash, + block_number, + .. + } => match bundle_id { + Some(id) => format!("start-executing-{id}-{block_number}"), + None => format!("start-executing-{tx_hash}-{block_number}"), + }, + BundleEvent::Executed { + bundle_id, + tx_hash, + block_number, + .. + } => match bundle_id { + Some(id) => format!("executed-{id}-{block_number}"), + None => format!("executed-{tx_hash}-{block_number}"), + }, + BundleEvent::BackrunBundleExecuted { + bundle_id, + backrun_tx_hash, + block_number, + .. + } => { + format!("backrun-bundle-executed-{bundle_id}-{backrun_tx_hash}-{block_number}") + } _ => { - format!("{}-{}", self.bundle_id(), Uuid::new_v4()) + format!( + "{}-{}", + self.bundle_id().unwrap_or_default(), + Uuid::new_v4() + ) } } } diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index bb79d24..2f07a6a 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -24,10 +24,12 @@ async fn test_kafka_publisher_s3_archiver_integration() BundleEvent::Received { bundle_id: test_bundle_id, bundle: Box::new(create_bundle_from_txn_data()), + timestamp_ms: 1234567890, }, BundleEvent::Dropped { bundle_id: test_bundle_id, reason: DropReason::TimedOut, + timestamp_ms: 1234567891, }, ]; diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index 53b1a72..d3dd8f6 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -33,6 +33,7 @@ async fn test_event_write_and_read() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box anyhow::Result<()> { let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); - let (builder_backrun_tx, _) = broadcast::channel::(config.max_buffered_backrun_bundles); + let (builder_backrun_tx, _) = + broadcast::channel::<(Bundle, uuid::Uuid)>(config.max_buffered_backrun_bundles); + let (builder_tx_bundle_id_tx, _) = + broadcast::channel::<(TxHash, uuid::Uuid)>(config.max_buffered_meter_bundle_responses); config.builder_rpcs.iter().for_each(|builder_rpc| { let metering_rx = builder_tx.subscribe(); let backrun_rx = builder_backrun_tx.subscribe(); - connect_ingress_to_builder(metering_rx, backrun_rx, builder_rpc.clone()); + let tx_bundle_id_rx = builder_tx_bundle_id_tx.subscribe(); + connect_ingress_to_builder( + metering_rx, + backrun_rx, + tx_bundle_id_rx, + builder_rpc.clone(), + ); }); let health_check_addr = config.health_check_addr; @@ -95,6 +105,7 @@ async fn main() -> anyhow::Result<()> { audit_tx, builder_tx, builder_backrun_tx, + builder_tx_bundle_id_tx, cfg, ); let bind_addr = format!("{}:{}", config.address, config.port); diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 205414a..3ea78b1 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -172,7 +172,8 @@ pub struct Config { pub fn connect_ingress_to_builder( metering_rx: broadcast::Receiver, - backrun_rx: broadcast::Receiver, + backrun_rx: broadcast::Receiver<(tips_core::Bundle, uuid::Uuid)>, + tx_bundle_id_rx: broadcast::Receiver<(TxHash, uuid::Uuid)>, builder_rpc: Url, ) { let builder_rpc_clone = builder_rpc.clone(); @@ -200,17 +201,36 @@ pub fn connect_ingress_to_builder( } }); + let backrun_builder = builder.clone(); + let backrun_builder_rpc_clone = builder_rpc_clone.clone(); tokio::spawn(async move { let mut event_rx = backrun_rx; - while let Ok(bundle) = event_rx.recv().await { - if let Err(e) = builder + while let Ok((bundle, bundle_id)) = event_rx.recv().await { + if let Err(e) = backrun_builder .client() - .request::<(tips_core::Bundle,), ()>("base_sendBackrunBundle", (bundle,)) + .request::<(tips_core::Bundle, uuid::Uuid), ()>( + "base_sendBackrunBundle", + (bundle, bundle_id), + ) .await { error!(error = %e, "Failed to send backrun bundle to builder"); } - info!(message = "Sent backrun bundle to builder", builder_rpc = %builder_rpc_clone); + info!(message = "Sent backrun bundle to builder", builder_rpc = %backrun_builder_rpc_clone, bundle_id = %bundle_id); + } + }); + + tokio::spawn(async move { + let mut event_rx = tx_bundle_id_rx; + while let Ok((tx_hash, bundle_id)) = event_rx.recv().await { + if let Err(e) = builder + .client() + .request::<(TxHash, uuid::Uuid), ()>("base_txBundleId", (tx_hash, bundle_id)) + .await + { + error!(error = %e, "Failed to send tx_bundle_id mapping to builder"); + } + info!(message = "Sent tx_bundle_id to builder", builder_rpc = %builder_rpc_clone, tx_hash = %tx_hash, bundle_id = %bundle_id); } }); } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index f5d571f..021ac10 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,6 +1,6 @@ use alloy_consensus::transaction::Recovered; use alloy_consensus::{Transaction, transaction::SignerRecoverable}; -use alloy_primitives::{B256, Bytes}; +use alloy_primitives::{B256, Bytes, TxHash}; use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718}; use jsonrpsee::{ core::{RpcResult, async_trait}, @@ -73,7 +73,9 @@ pub struct IngressService { meter_bundle_timeout_ms: u64, builder_tx: broadcast::Sender, backrun_enabled: bool, - builder_backrun_tx: broadcast::Sender, + builder_backrun_tx: broadcast::Sender<(Bundle, uuid::Uuid)>, + /// Channel to send tx_hash -> bundle_id mappings to builder for audit tracking + builder_tx_bundle_id_tx: broadcast::Sender<(TxHash, uuid::Uuid)>, } impl IngressService { @@ -82,7 +84,8 @@ impl IngressService { queue: Queue, audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, - builder_backrun_tx: broadcast::Sender, + builder_backrun_tx: broadcast::Sender<(Bundle, uuid::Uuid)>, + builder_tx_bundle_id_tx: broadcast::Sender<(TxHash, uuid::Uuid)>, config: Config, ) -> Self { let mempool_provider = Arc::new(providers.mempool); @@ -110,6 +113,7 @@ impl IngressService { builder_tx, backrun_enabled: config.backrun_enabled, builder_backrun_tx, + builder_tx_bundle_id_tx, } } } @@ -133,19 +137,40 @@ where let start = Instant::now(); let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; + let bundle_id = *accepted_bundle.uuid(); + + // Get target tx hash (first tx in bundle) + let target_tx_hash = accepted_bundle + .txs + .first() + .map(|tx| tx.tx_hash()) + .unwrap_or_default(); self.metrics.backrun_bundles_received_total.increment(1); - info!(message = "Received backrun bundle", bundle_hash = %bundle_hash); + info!(message = "Received backrun bundle", bundle_hash = %bundle_hash, bundle_id = %bundle_id); + + // Emit BackrunReceived event at start of processing + self.send_transaction_event(BundleEvent::BackrunReceived { + bundle_id, + bundle: Box::new(accepted_bundle.clone()), + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); - if let Err(e) = self.builder_backrun_tx.send(bundle) { + if let Err(e) = self.builder_backrun_tx.send((bundle, bundle_id)) { warn!( message = "Failed to send backrun bundle to builders", bundle_hash = %bundle_hash, + bundle_id = %bundle_id, error = %e ); } - self.send_audit_event(&accepted_bundle, bundle_hash); + // Emit BackrunSent event at end of processing + self.send_transaction_event(BundleEvent::BackrunSent { + bundle_id, + target_tx_hash, + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); self.metrics .backrun_bundles_sent_duration @@ -197,8 +222,9 @@ where async fn send_raw_transaction(&self, data: Bytes) -> RpcResult { let start = Instant::now(); let transaction = self.get_tx(&data).await?; + let tx_hash = transaction.tx_hash(); - info!(message = "Received raw transaction", tx_hash = %transaction.tx_hash()); + info!(message = "Received raw transaction", tx_hash = %tx_hash); let send_to_kafka = matches!( self.tx_submission_method, @@ -235,6 +261,14 @@ where }); let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone()); + let bundle_id = *accepted_bundle.uuid(); + + // Emit TransactionReceived event at start of processing + self.send_transaction_event(BundleEvent::TransactionReceived { + bundle_id, + bundle: Box::new(accepted_bundle.clone()), + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); self.builder_tx .send(meter_bundle_response) @@ -286,13 +320,23 @@ where }); } - self.send_audit_event(&accepted_bundle, transaction.tx_hash()); + // self.send_audit_event(&accepted_bundle, tx_hash); + + // Send tx_hash -> bundle_id mapping to builder for audit tracking + self.send_tx_bundle_id(tx_hash, bundle_id); + + // Emit TransactionSent event at end of processing + self.send_transaction_event(BundleEvent::TransactionSent { + bundle_id, + tx_hash, + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); self.metrics .send_raw_transaction_duration .record(start.elapsed().as_secs_f64()); - Ok(transaction.tx_hash()) + Ok(tx_hash) } async fn send_user_operation( @@ -421,6 +465,7 @@ where let audit_event = BundleEvent::Received { bundle_id: *accepted_bundle.uuid(), bundle: Box::new(accepted_bundle.clone()), + timestamp_ms: chrono::Utc::now().timestamp_millis(), }; if let Err(e) = self.audit_channel.send(audit_event) { warn!( @@ -430,6 +475,28 @@ where ); } } + + /// Helper method to send tx_hash -> bundle_id mapping to builder for audit tracking + fn send_tx_bundle_id(&self, tx_hash: TxHash, bundle_id: uuid::Uuid) { + if let Err(e) = self.builder_tx_bundle_id_tx.send((tx_hash, bundle_id)) { + warn!( + message = "Failed to send tx_bundle_id mapping", + tx_hash = %tx_hash, + bundle_id = %bundle_id, + error = %e + ); + } + } + + /// Helper method to send transaction lifecycle events + fn send_transaction_event(&self, event: BundleEvent) { + if let Err(e) = self.audit_channel.send(event) { + warn!( + message = "Failed to send transaction event", + error = %e + ); + } + } } #[cfg(test)] @@ -562,9 +629,16 @@ mod tests { let (audit_tx, _audit_rx) = mpsc::unbounded_channel(); let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let (tx_bundle_id_tx, _tx_bundle_id_rx) = broadcast::channel(1); let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + tx_bundle_id_tx, + config, ); let bundle = Bundle::default(); @@ -621,9 +695,16 @@ mod tests { let (audit_tx, _audit_rx) = mpsc::unbounded_channel(); let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let (tx_bundle_id_tx, _tx_bundle_id_rx) = broadcast::channel(1); let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + tx_bundle_id_tx, + config, ); // Valid signed transaction bytes diff --git a/justfile b/justfile index 4c30a66..c43e734 100644 --- a/justfile +++ b/justfile @@ -102,12 +102,14 @@ sender_key := "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690 backrunner := "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" backrunner_key := "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a" +chain_id := "13" + send-txn: #!/usr/bin/env bash set -euxo pipefail echo "sending txn" nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) - txn=$(cast mktx --private-key {{ sender_key }} 0x0000000000000000000000000000000000000000 --value 0.01ether --nonce $nonce --chain-id 13 -r {{ builder_url }}) + txn=$(cast mktx --private-key {{ sender_key }} 0x0000000000000000000000000000000000000000 --value 0.00001ether --nonce $nonce --chain-id {{ chain_id }} -r {{ builder_url }}) hash=$(curl -s {{ ingress_url }} -X POST -H "Content-Type: application/json" --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" | jq -r ".result") cast receipt $hash -r {{ sequencer_url }} | grep status cast receipt $hash -r {{ builder_url }} | grep status @@ -118,37 +120,31 @@ send-txn-with-backrun: # 1. Get nonce and send target transaction from sender account nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) - echo "Sending target transaction from sender (nonce=$nonce)..." + echo "Building target transaction from sender (nonce=$nonce)..." target_txn=$(cast mktx --private-key {{ sender_key }} \ 0x0000000000000000000000000000000000000000 \ - --value 0.01ether \ + --value 0.00001ether \ --nonce $nonce \ - --chain-id 13 \ + --chain-id {{ chain_id }} \ -r {{ builder_url }}) - target_hash=$(curl -s {{ ingress_url }} -X POST \ - -H "Content-Type: application/json" \ - --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$target_txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" \ - | jq -r ".result") - echo "Target tx sent: $target_hash" - # 2. Build backrun transaction from backrunner account (different account!) backrun_nonce=$(cast nonce {{ backrunner }} -r {{ builder_url }}) echo "Building backrun transaction from backrunner (nonce=$backrun_nonce)..." backrun_txn=$(cast mktx --private-key {{ backrunner_key }} \ 0x0000000000000000000000000000000000000001 \ - --value 0.001ether \ + --value 0.000001ether \ --nonce $backrun_nonce \ - --chain-id 13 \ + --chain-id {{ chain_id }} \ -r {{ builder_url }}) # 3. Compute tx hashes for reverting_tx_hashes backrun_hash_computed=$(cast keccak $backrun_txn) + target_hash=$(cast keccak $target_txn) echo "Target tx hash: $target_hash" echo "Backrun tx hash: $backrun_hash_computed" - # 4. Construct and send bundle with reverting_tx_hashes - echo "Sending backrun bundle..." + # 4. prepare backrun bundle bundle_json=$(jq -n \ --arg target "$target_txn" \ --arg backrun "$backrun_txn" \ @@ -160,11 +156,18 @@ send-txn-with-backrun: revertingTxHashes: [$target_hash, $backrun_hash] }') - bundle_hash=$(curl -s {{ ingress_url }} -X POST \ + # 5. send target transaction and backrun bundle in parallel + # target tx + curl -s {{ ingress_url }} -X POST \ + -H "Content-Type: application/json" \ + --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$target_txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" & + echo "Target tx sent: $target_hash" + + # backrun bundle + curl -s {{ ingress_url }} -X POST \ -H "Content-Type: application/json" \ - --data "{\"method\":\"eth_sendBackrunBundle\",\"params\":[$bundle_json],\"id\":1,\"jsonrpc\":\"2.0\"}" \ - | jq -r ".result") - echo "Bundle sent: $bundle_hash" + --data "{\"method\":\"eth_sendBackrunBundle\",\"params\":[$bundle_json],\"id\":1,\"jsonrpc\":\"2.0\"}" & + echo "Backrun bundle sent" # 5. Wait and verify both transactions echo "Waiting for transactions to land..." diff --git a/ui/src/app/api/txn/[hash]/route.ts b/ui/src/app/api/txn/[hash]/route.ts index 44f6824..ac33747 100644 --- a/ui/src/app/api/txn/[hash]/route.ts +++ b/ui/src/app/api/txn/[hash]/route.ts @@ -1,6 +1,7 @@ import { type NextRequest, NextResponse } from "next/server"; import { type BundleEvent, + type BundleHistory, getBundleHistory, getTransactionMetadataByHash, } from "@/lib/s3"; @@ -28,15 +29,17 @@ export interface TransactionEvent { }; } +export type BundleEventWithId = BundleEvent & { bundleId: string }; + export interface TransactionHistoryResponse { hash: string; bundle_ids: string[]; - history: BundleEvent[]; + history: BundleEventWithId[]; } export async function GET( _request: NextRequest, - { params }: { params: Promise<{ hash: string }> }, + { params }: { params: Promise<{ hash: string }> } ) { try { const { hash } = await params; @@ -46,20 +49,51 @@ export async function GET( if (!metadata) { return NextResponse.json( { error: "Transaction not found" }, - { status: 404 }, + { status: 404 } ); } - // TODO: Can be in multiple bundles - const bundle = await getBundleHistory(metadata.bundle_ids[0]); - if (!bundle) { - return NextResponse.json({ error: "Bundle not found" }, { status: 404 }); + console.log("metadata", metadata); + + // Fetch ALL bundle histories in parallel + const bundleHistories = await Promise.all( + metadata.bundle_ids.map((id) => getBundleHistory(id)) + ); + + // Filter out nulls and merge all events, tagging each with its bundleId + const allEvents: BundleEventWithId[] = bundleHistories + .map((bundle, index) => ({ + bundle, + bundleId: metadata.bundle_ids[index], + })) + .filter( + (item): item is { bundle: BundleHistory; bundleId: string } => + item.bundle !== null + ) + .flatMap(({ bundle, bundleId }) => + bundle.history.map((event) => ({ ...event, bundleId })) + ); + + if (allEvents.length === 0) { + return NextResponse.json( + { error: "No bundle history found" }, + { status: 404 } + ); } + // Sort by timestamp + allEvents.sort((a, b) => (a.data.timestamp ?? 0) - (b.data.timestamp ?? 0)); + + // Deduplicate by event key + const uniqueEvents = allEvents.filter( + (event, index, self) => + index === self.findIndex((e) => e.data?.key === event.data?.key) + ); + const response: TransactionHistoryResponse = { hash, bundle_ids: metadata.bundle_ids, - history: bundle.history, + history: uniqueEvents, }; return NextResponse.json(response); @@ -67,7 +101,7 @@ export async function GET( console.error("Error fetching transaction data:", error); return NextResponse.json( { error: "Internal server error" }, - { status: 500 }, + { status: 500 } ); } } diff --git a/ui/src/app/bundles/[uuid]/page.tsx b/ui/src/app/bundles/[uuid]/page.tsx index 95187f3..8b3c523 100644 --- a/ui/src/app/bundles/[uuid]/page.tsx +++ b/ui/src/app/bundles/[uuid]/page.tsx @@ -198,7 +198,9 @@ function TransactionDetails({ @@ -425,6 +427,16 @@ function SectionTitle({ children }: { children: React.ReactNode }) { ); } +function formatTimestampWithMs(timestamp: number): string { + const date = new Date(timestamp); + const dateStr = date.toLocaleDateString(); + const hours = date.getHours().toString().padStart(2, "0"); + const minutes = date.getMinutes().toString().padStart(2, "0"); + const seconds = date.getSeconds().toString().padStart(2, "0"); + const ms = date.getMilliseconds().toString().padStart(3, "0"); + return `${dateStr}, ${hours}:${minutes}:${seconds}.${ms}`; +} + export default function BundlePage({ params }: PageProps) { const [uuid, setUuid] = useState(""); const [data, setData] = useState(null); @@ -445,7 +457,7 @@ export default function BundlePage({ params }: PageProps) { setError( response.status === 404 ? "Bundle not found" - : "Failed to fetch bundle data", + : "Failed to fetch bundle data" ); setData(null); return; diff --git a/ui/src/app/page.tsx b/ui/src/app/page.tsx index 21e257f..a6cfdc6 100644 --- a/ui/src/app/page.tsx +++ b/ui/src/app/page.tsx @@ -31,7 +31,13 @@ function SearchBar({ onError }: { onError: (error: string | null) => void }) { const result = await response.json(); if (result.bundle_ids && result.bundle_ids.length > 0) { - router.push(`/bundles/${result.bundle_ids[0]}`); + if (result.bundle_ids.length === 1) { + // Single bundle - go directly to bundle page + router.push(`/bundles/${result.bundle_ids[0]}`); + } else { + // Multiple bundles - show transaction page with merged history + router.push(`/txn/${hash}`); + } } else { onError("No bundle found for this transaction"); } @@ -70,10 +76,10 @@ function BlockRow({ block, index }: { block: BlockSummary; index: number }) { timeSince <= 0 ? "now" : timeSince < 60 - ? `${timeSince}s ago` - : timeSince < 3600 - ? `${Math.floor(timeSince / 60)}m ago` - : `${Math.floor(timeSince / 3600)}h ago`; + ? `${timeSince}s ago` + : timeSince < 3600 + ? `${Math.floor(timeSince / 60)}m ago` + : `${Math.floor(timeSince / 3600)}h ago`; return ( ; } -export default function TransactionRedirectPage({ params }: PageProps) { +// Color palette for distinguishing bundles +const BUNDLE_COLORS = [ + { + bg: "bg-blue-100", + dot: "bg-blue-600", + text: "text-blue-700", + badge: "bg-blue-50 text-blue-700 ring-blue-600/20", + link: "bg-blue-50 text-blue-600", + }, + { + bg: "bg-purple-100", + dot: "bg-purple-600", + text: "text-purple-700", + badge: "bg-purple-50 text-purple-700 ring-purple-600/20", + link: "bg-purple-50 text-purple-600", + }, + { + bg: "bg-amber-100", + dot: "bg-amber-600", + text: "text-amber-700", + badge: "bg-amber-50 text-amber-700 ring-amber-600/20", + link: "bg-amber-50 text-amber-600", + }, + { + bg: "bg-emerald-100", + dot: "bg-emerald-600", + text: "text-emerald-700", + badge: "bg-emerald-50 text-emerald-700 ring-emerald-600/20", + link: "bg-emerald-50 text-emerald-600", + }, + { + bg: "bg-rose-100", + dot: "bg-rose-600", + text: "text-rose-700", + badge: "bg-rose-50 text-rose-700 ring-rose-600/20", + link: "bg-rose-50 text-rose-600", + }, + { + bg: "bg-cyan-100", + dot: "bg-cyan-600", + text: "text-cyan-700", + badge: "bg-cyan-50 text-cyan-700 ring-cyan-600/20", + link: "bg-cyan-50 text-cyan-600", + }, +]; + +type BundleColorMap = Map; + +function Badge({ + children, + variant = "default", + className = "", +}: { + children: React.ReactNode; + variant?: "default" | "success" | "warning" | "error"; + className?: string; +}) { + const variants = { + default: "bg-blue-50 text-blue-700 ring-blue-600/20", + success: "bg-emerald-50 text-emerald-700 ring-emerald-600/20", + warning: "bg-amber-50 text-amber-700 ring-amber-600/20", + error: "bg-red-50 text-red-700 ring-red-600/20", + }; + + return ( + + {children} + + ); +} + +function Card({ + children, + className = "", +}: { + children: React.ReactNode; + className?: string; +}) { + return ( +
+ {children} +
+ ); +} + +function TimelineEventDetails({ + event, + colors, +}: { + event: BundleEventWithId; + colors: (typeof BUNDLE_COLORS)[0]; +}) { + if (event.event === "BlockIncluded" && event.data?.block_hash) { + return ( +
+ {event.event} + + Block #{event.data.block_number} + +
+ ); + } + + if (event.event === "BuilderIncluded" && event.data?.builder) { + return ( +
+ {event.event} + + {event.data.builder} (flashblock #{event.data.flashblock_index}) + +
+ ); + } + + if (event.event === "Dropped" && event.data?.reason) { + return ( +
+ {event.event} + {event.data.reason} +
+ ); + } + + if (event.event === "Executed") { + return ( +
+ {event.event} +
+ ); + } + + if (event.event === "BackrunBundleExecuted") { + return ( +
+ {event.event} +
+ ); + } + + return {event.event}; +} + +function Timeline({ + events, + bundleColorMap, +}: { + events: BundleEventWithId[]; + bundleColorMap: BundleColorMap; +}) { + if (events.length === 0) return null; + + return ( +
+ {events.map((event, index) => { + const colors = bundleColorMap.get(event.bundleId) ?? BUNDLE_COLORS[0]; + return ( +
+
+
+
+
+ + +
+
+ ); + })} +
+ ); +} + +function SectionTitle({ children }: { children: React.ReactNode }) { + return ( +

{children}

+ ); +} + +function formatTimestampWithMs(timestamp: number): string { + const date = new Date(timestamp); + const dateStr = date.toLocaleDateString(); + const hours = date.getHours().toString().padStart(2, "0"); + const minutes = date.getMinutes().toString().padStart(2, "0"); + const seconds = date.getSeconds().toString().padStart(2, "0"); + const ms = date.getMilliseconds().toString().padStart(3, "0"); + return `${dateStr}, ${hours}:${minutes}:${seconds}.${ms}`; +} + +export default function TransactionPage({ params }: PageProps) { const router = useRouter(); const [hash, setHash] = useState(""); + const [data, setData] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); @@ -25,7 +235,7 @@ export default function TransactionRedirectPage({ params }: PageProps) { useEffect(() => { if (!hash) return; - const fetchAndRedirect = async () => { + const fetchData = async () => { try { const response = await fetch(`/api/txn/${hash}`); if (!response.ok) { @@ -38,11 +248,13 @@ export default function TransactionRedirectPage({ params }: PageProps) { } const result: TransactionHistoryResponse = await response.json(); - if (result.bundle_ids && result.bundle_ids.length > 0) { - router.push(`/bundles/${result.bundle_ids[0]}`); - } else { - setError("No bundle found for this transaction"); + // If only 1 bundle, redirect to bundle page + if (result.bundle_ids && result.bundle_ids.length === 1) { + router.replace(`/bundles/${result.bundle_ids[0]}`); + return; } + + setData(result); } catch (_err) { setError("Failed to fetch transaction data"); } finally { @@ -50,10 +262,10 @@ export default function TransactionRedirectPage({ params }: PageProps) { } }; - fetchAndRedirect(); + fetchData(); }, [hash, router]); - if (!hash) { + if (!hash || loading) { return (
Loading...
@@ -61,19 +273,71 @@ export default function TransactionRedirectPage({ params }: PageProps) { ); } + if (error) { + return ( +
+
+

Transaction

+

{hash}

+
{error}
+
+
+ ); + } + + if (!data) { + return null; + } + + // Create bundle -> color mapping + const bundleColorMap: BundleColorMap = new Map( + data.bundle_ids.map((id, index) => [ + id, + BUNDLE_COLORS[index % BUNDLE_COLORS.length], + ]) + ); + return ( -
+
-

Transaction {hash}

- {loading && ( -
- Redirecting to bundle page... -
- )} - {error && ( -
{error}
- )} +

Transaction

+

{hash}

+ + {/* Bundle IDs */} +
+ Associated Bundles + +
+ {data.bundle_ids.map((bundleId) => { + const colors = bundleColorMap.get(bundleId) ?? BUNDLE_COLORS[0]; + return ( + + {bundleId} + + ); + })} +
+
+
+ + {/* Event History */} +
+ Event History + + {data.history.length > 0 ? ( + + ) : ( +
+ No events recorded yet. +
+ )} +
+
); } diff --git a/ui/src/lib/s3.ts b/ui/src/lib/s3.ts index 229fdad..5695eac 100644 --- a/ui/src/lib/s3.ts +++ b/ui/src/lib/s3.ts @@ -165,6 +165,8 @@ export async function getBundleHistory( const key = `bundles/${bundleId}`; const content = await getObjectContent(key); + console.log("content", content); + if (!content) { return null; } From 52be4d9e4461d3729cca0ee2b19c2c9ead8faf1a Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Mon, 15 Dec 2025 10:38:16 -0500 Subject: [PATCH 3/5] clean up --- crates/audit/src/publisher.rs | 8 ++-- crates/audit/src/reader.rs | 2 +- crates/audit/src/storage.rs | 6 +-- crates/audit/src/types.rs | 38 +++-------------- crates/ingress-rpc/src/lib.rs | 4 -- crates/ingress-rpc/src/service.rs | 17 ++------ justfile | 65 ------------------------------ ui/src/app/api/txn/[hash]/route.ts | 2 - ui/src/lib/s3.ts | 16 ++++---- 9 files changed, 21 insertions(+), 137 deletions(-) diff --git a/crates/audit/src/publisher.rs b/crates/audit/src/publisher.rs index 5e9cd55..5931df8 100644 --- a/crates/audit/src/publisher.rs +++ b/crates/audit/src/publisher.rs @@ -24,6 +24,7 @@ impl KafkaBundleEventPublisher { } async fn send_event(&self, event: &BundleEvent) -> Result<()> { + let bundle_id = event.bundle_id(); let key = event.generate_event_key(); let payload = serde_json::to_vec(event)?; @@ -36,8 +37,7 @@ impl KafkaBundleEventPublisher { { Ok(_) => { debug!( - bundle_id = ?event.bundle_id(), - tx_hash = ?event.tx_hash(), + bundle_id = ?bundle_id, topic = %self.topic, payload_size = payload.len(), "successfully published event" @@ -46,8 +46,7 @@ impl KafkaBundleEventPublisher { } Err((err, _)) => { error!( - bundle_id = ?event.bundle_id(), - tx_hash = ?event.tx_hash(), + bundle_id = ?bundle_id, topic = %self.topic, error = %err, "failed to publish event" @@ -92,7 +91,6 @@ impl BundleEventPublisher for LoggingBundleEventPublisher { async fn publish(&self, event: BundleEvent) -> Result<()> { info!( bundle_id = ?event.bundle_id(), - tx_hash = ?event.tx_hash(), event = ?event, "Received bundle event" ); diff --git a/crates/audit/src/reader.rs b/crates/audit/src/reader.rs index 3f96345..a1469fe 100644 --- a/crates/audit/src/reader.rs +++ b/crates/audit/src/reader.rs @@ -86,7 +86,7 @@ impl EventReader for KafkaAuditLogReader { timestamp = timestamp, offset = message.offset(), partition = message.partition(), - "Received event from Kafka" + "Received event with timestamp" ); self.last_message_offset = Some(message.offset()); diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 8bc02c9..16fe782 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -81,13 +81,13 @@ pub enum BundleHistoryEvent { timestamp: i64, bundle: Box, }, - /// Backrun bundle sent to builder + /// Backrun bundle sent to builder from ingress-rpc BackrunSent { key: String, timestamp: i64, target_tx_hash: TxHash, }, - /// Backrun bundle inserted into builder store + /// Backrun bundle inserted into builder store in builder BackrunInserted { key: String, timestamp: i64, @@ -154,7 +154,6 @@ fn update_bundle_history_transform( let bundle_id = match event.event.bundle_id() { Some(id) => id, None => { - // Backrun events don't have bundle_ids, skip history update return None; } }; @@ -486,7 +485,6 @@ impl S3EventReaderWriter { #[async_trait] impl EventWriter for S3EventReaderWriter { async fn archive_event(&self, event: Event) -> Result<()> { - // Backrun events don't have bundle_ids, skip S3 archival let Some(bundle_id) = event.event.bundle_id() else { return Ok(()); }; diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index f5530df..986af73 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -65,45 +65,38 @@ pub enum BundleEvent { reason: DropReason, timestamp_ms: i64, }, - /// Transaction received by ingress-rpc (start of send_raw_transaction) TransactionReceived { bundle_id: BundleId, bundle: Box, timestamp_ms: i64, }, - /// Transaction processing complete in ingress-rpc (end of send_raw_transaction) TransactionSent { bundle_id: BundleId, tx_hash: TxHash, timestamp_ms: i64, }, - /// Backrun bundle received by ingress-rpc (start of send_backrun_bundle) BackrunReceived { bundle_id: BundleId, bundle: Box, timestamp_ms: i64, }, - /// Backrun bundle sent to builder (end of send_backrun_bundle) BackrunSent { bundle_id: BundleId, target_tx_hash: TxHash, timestamp_ms: i64, }, - /// Backrun bundle inserted into builder store BackrunInserted { bundle_id: BundleId, target_tx_hash: TxHash, backrun_tx_hashes: Vec, timestamp_ms: i64, }, - /// Transaction selected from mempool, about to start executing StartExecuting { bundle_id: Option, tx_hash: TxHash, block_number: u64, timestamp_ms: i64, }, - /// Transaction successfully executed and committed Executed { bundle_id: Option, tx_hash: TxHash, @@ -111,7 +104,6 @@ pub enum BundleEvent { gas_used: u64, timestamp_ms: i64, }, - /// Backrun bundle transaction executed (success or reverted) BackrunBundleExecuted { bundle_id: BundleId, target_tx_hash: TxHash, @@ -124,7 +116,6 @@ pub enum BundleEvent { } impl BundleEvent { - /// Returns a human-readable name for the event type pub fn event_name(&self) -> &'static str { match self { BundleEvent::Received { .. } => "Received", @@ -143,7 +134,6 @@ impl BundleEvent { } } - /// Returns the bundle_id for events that have one pub fn bundle_id(&self) -> Option { match self { BundleEvent::Received { bundle_id, .. } => Some(*bundle_id), @@ -151,21 +141,17 @@ impl BundleEvent { BundleEvent::BuilderIncluded { bundle_id, .. } => Some(*bundle_id), BundleEvent::BlockIncluded { bundle_id, .. } => Some(*bundle_id), BundleEvent::Dropped { bundle_id, .. } => Some(*bundle_id), - // Transaction events BundleEvent::TransactionReceived { bundle_id, .. } => Some(*bundle_id), BundleEvent::TransactionSent { bundle_id, .. } => Some(*bundle_id), - // Backrun events with bundle_id BundleEvent::BackrunReceived { bundle_id, .. } => Some(*bundle_id), BundleEvent::BackrunSent { bundle_id, .. } => Some(*bundle_id), BundleEvent::BackrunInserted { bundle_id, .. } => Some(*bundle_id), BundleEvent::BackrunBundleExecuted { bundle_id, .. } => Some(*bundle_id), - // These events have optional bundle_id (looked up from tx hash) BundleEvent::StartExecuting { bundle_id, .. } => *bundle_id, BundleEvent::Executed { bundle_id, .. } => *bundle_id, } } - /// Returns the tx_hash for events that track individual transactions pub fn tx_hash(&self) -> Option { match self { BundleEvent::TransactionReceived { bundle, .. } => { @@ -180,7 +166,6 @@ impl BundleEvent { BundleEvent::StartExecuting { tx_hash, .. } => Some(*tx_hash), BundleEvent::Executed { tx_hash, .. } => Some(*tx_hash), BundleEvent::BackrunBundleExecuted { target_tx_hash, .. } => Some(*target_tx_hash), - // Standard bundle events don't have a tx_hash _ => None, } } @@ -207,7 +192,6 @@ impl BundleEvent { BundleEvent::BuilderIncluded { .. } => vec![], BundleEvent::BlockIncluded { .. } => vec![], BundleEvent::Dropped { .. } => vec![], - // TransactionReceived has full bundle, extract transaction IDs BundleEvent::TransactionReceived { bundle, .. } => bundle .txs .iter() @@ -220,7 +204,6 @@ impl BundleEvent { Err(_) => None, }) .collect(), - // BackrunReceived has full bundle, extract transaction IDs BundleEvent::BackrunReceived { bundle, .. } => bundle .txs .iter() @@ -233,7 +216,6 @@ impl BundleEvent { Err(_) => None, }) .collect(), - // These events don't track transaction IDs this way BundleEvent::TransactionSent { .. } => vec![], BundleEvent::BackrunSent { .. } => vec![], BundleEvent::BackrunInserted { .. } => vec![], @@ -243,7 +225,6 @@ impl BundleEvent { } } - /// Returns the timestamp_ms for any event pub fn timestamp_ms(&self) -> i64 { match self { BundleEvent::Received { timestamp_ms, .. } => *timestamp_ms, @@ -271,7 +252,6 @@ impl BundleEvent { } => { format!("{bundle_id}-{block_hash}") } - // Transaction events BundleEvent::TransactionReceived { bundle_id, .. } => { format!("transaction-received-{bundle_id}") } @@ -289,30 +269,22 @@ impl BundleEvent { format!("backrun-inserted-{bundle_id}") } BundleEvent::StartExecuting { - bundle_id, tx_hash, - block_number, .. - } => match bundle_id { - Some(id) => format!("start-executing-{id}-{block_number}"), - None => format!("start-executing-{tx_hash}-{block_number}"), + } => { + format!("tx-start-executing-{tx_hash}") }, BundleEvent::Executed { - bundle_id, tx_hash, - block_number, .. - } => match bundle_id { - Some(id) => format!("executed-{id}-{block_number}"), - None => format!("executed-{tx_hash}-{block_number}"), + } => { + format!("tx-executed-{tx_hash}") }, BundleEvent::BackrunBundleExecuted { bundle_id, - backrun_tx_hash, - block_number, .. } => { - format!("backrun-bundle-executed-{bundle_id}-{backrun_tx_hash}-{block_number}") + format!("backrun-bundle-executed-{bundle_id}") } _ => { format!( diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index fd40298..44b28b5 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -185,7 +185,6 @@ pub fn connect_ingress_to_builder( tx_bundle_id_rx: broadcast::Receiver<(TxHash, uuid::Uuid)>, builder_rpc: Url, ) { - let builder_rpc_clone = builder_rpc.clone(); let builder: RootProvider = ProviderBuilder::new() .disable_recommended_fillers() .network::() @@ -215,7 +214,6 @@ pub fn connect_ingress_to_builder( }); let backrun_builder = builder.clone(); - let backrun_builder_rpc_clone = builder_rpc_clone.clone(); tokio::spawn(async move { let mut event_rx = backrun_rx; while let Ok((bundle, bundle_id)) = event_rx.recv().await { @@ -229,7 +227,6 @@ pub fn connect_ingress_to_builder( { error!(error = %e, "Failed to send backrun bundle to builder"); } - info!(message = "Sent backrun bundle to builder", builder_rpc = %backrun_builder_rpc_clone, bundle_id = %bundle_id); } }); @@ -243,7 +240,6 @@ pub fn connect_ingress_to_builder( { error!(error = %e, "Failed to send tx_bundle_id mapping to builder"); } - info!(message = "Sent tx_bundle_id to builder", builder_rpc = %builder_rpc_clone, tx_hash = %tx_hash, bundle_id = %bundle_id); } }); } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 0f21ffa..a4b5873 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -75,7 +75,6 @@ pub struct IngressService { builder_tx: broadcast::Sender, backrun_enabled: bool, builder_backrun_tx: broadcast::Sender<(Bundle, uuid::Uuid)>, - /// Channel to send tx_hash -> bundle_id mappings to builder for audit tracking builder_tx_bundle_id_tx: broadcast::Sender<(TxHash, uuid::Uuid)>, } @@ -152,9 +151,7 @@ impl IngressApiServer for IngressService { .unwrap_or_default(); self.metrics.backrun_bundles_received_total.increment(1); - info!(message = "Received backrun bundle", bundle_hash = %bundle_hash, bundle_id = %bundle_id); - // Emit BackrunReceived event at start of processing self.send_transaction_event(BundleEvent::BackrunReceived { bundle_id, bundle: Box::new(accepted_bundle.clone()), @@ -170,7 +167,6 @@ impl IngressApiServer for IngressService { ); } - // Emit BackrunSent event at end of processing self.send_transaction_event(BundleEvent::BackrunSent { bundle_id, target_tx_hash, @@ -227,9 +223,6 @@ impl IngressApiServer for IngressService { async fn send_raw_transaction(&self, data: Bytes) -> RpcResult { let start = Instant::now(); let transaction = self.get_tx(&data).await?; - let tx_hash = transaction.tx_hash(); - - info!(message = "Received raw transaction", tx_hash = %tx_hash); let send_to_kafka = matches!( self.tx_submission_method, @@ -270,7 +263,6 @@ impl IngressApiServer for IngressService { let bundle_id = *accepted_bundle.uuid(); - // Emit TransactionReceived event at start of processing self.send_transaction_event(BundleEvent::TransactionReceived { bundle_id, bundle: Box::new(accepted_bundle.clone()), @@ -330,12 +322,11 @@ impl IngressApiServer for IngressService { ); // Send tx_hash -> bundle_id mapping to builder for audit tracking - self.send_tx_bundle_id(tx_hash, bundle_id); + self.send_tx_bundle_id(transaction.tx_hash(), bundle_id); - // Emit TransactionSent event at end of processing self.send_transaction_event(BundleEvent::TransactionSent { bundle_id, - tx_hash, + tx_hash: transaction.tx_hash(), timestamp_ms: chrono::Utc::now().timestamp_millis(), }); @@ -343,7 +334,7 @@ impl IngressApiServer for IngressService { .send_raw_transaction_duration .record(start.elapsed().as_secs_f64()); - Ok(tx_hash) + Ok(transaction.tx_hash()) } async fn send_user_operation( @@ -491,7 +482,6 @@ impl IngressService { } } - /// Helper method to send tx_hash -> bundle_id mapping to builder for audit tracking fn send_tx_bundle_id(&self, tx_hash: TxHash, bundle_id: uuid::Uuid) { if let Err(e) = self.builder_tx_bundle_id_tx.send((tx_hash, bundle_id)) { warn!( @@ -503,7 +493,6 @@ impl IngressService { } } - /// Helper method to send transaction lifecycle events fn send_transaction_event(&self, event: BundleEvent) { if let Err(e) = self.audit_channel.send(event) { warn!( diff --git a/justfile b/justfile index c43e734..3945f32 100644 --- a/justfile +++ b/justfile @@ -113,68 +113,3 @@ send-txn: hash=$(curl -s {{ ingress_url }} -X POST -H "Content-Type: application/json" --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" | jq -r ".result") cast receipt $hash -r {{ sequencer_url }} | grep status cast receipt $hash -r {{ builder_url }} | grep status - -send-txn-with-backrun: - #!/usr/bin/env bash - set -euxo pipefail - - # 1. Get nonce and send target transaction from sender account - nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) - echo "Building target transaction from sender (nonce=$nonce)..." - target_txn=$(cast mktx --private-key {{ sender_key }} \ - 0x0000000000000000000000000000000000000000 \ - --value 0.00001ether \ - --nonce $nonce \ - --chain-id {{ chain_id }} \ - -r {{ builder_url }}) - - # 2. Build backrun transaction from backrunner account (different account!) - backrun_nonce=$(cast nonce {{ backrunner }} -r {{ builder_url }}) - echo "Building backrun transaction from backrunner (nonce=$backrun_nonce)..." - backrun_txn=$(cast mktx --private-key {{ backrunner_key }} \ - 0x0000000000000000000000000000000000000001 \ - --value 0.000001ether \ - --nonce $backrun_nonce \ - --chain-id {{ chain_id }} \ - -r {{ builder_url }}) - - # 3. Compute tx hashes for reverting_tx_hashes - backrun_hash_computed=$(cast keccak $backrun_txn) - target_hash=$(cast keccak $target_txn) - echo "Target tx hash: $target_hash" - echo "Backrun tx hash: $backrun_hash_computed" - - # 4. prepare backrun bundle - bundle_json=$(jq -n \ - --arg target "$target_txn" \ - --arg backrun "$backrun_txn" \ - --arg target_hash "$target_hash" \ - --arg backrun_hash "$backrun_hash_computed" \ - '{ - txs: [$target, $backrun], - blockNumber: 0, - revertingTxHashes: [$target_hash, $backrun_hash] - }') - - # 5. send target transaction and backrun bundle in parallel - # target tx - curl -s {{ ingress_url }} -X POST \ - -H "Content-Type: application/json" \ - --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$target_txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" & - echo "Target tx sent: $target_hash" - - # backrun bundle - curl -s {{ ingress_url }} -X POST \ - -H "Content-Type: application/json" \ - --data "{\"method\":\"eth_sendBackrunBundle\",\"params\":[$bundle_json],\"id\":1,\"jsonrpc\":\"2.0\"}" & - echo "Backrun bundle sent" - - # 5. Wait and verify both transactions - echo "Waiting for transactions to land..." - sleep 5 - - echo "=== Target transaction (from sender) ===" - cast receipt $target_hash -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" - - echo "=== Backrun transaction (from backrunner) ===" - cast receipt $backrun_hash_computed -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" || echo "Backrun tx not found yet" diff --git a/ui/src/app/api/txn/[hash]/route.ts b/ui/src/app/api/txn/[hash]/route.ts index ac33747..bc572cf 100644 --- a/ui/src/app/api/txn/[hash]/route.ts +++ b/ui/src/app/api/txn/[hash]/route.ts @@ -53,8 +53,6 @@ export async function GET( ); } - console.log("metadata", metadata); - // Fetch ALL bundle histories in parallel const bundleHistories = await Promise.all( metadata.bundle_ids.map((id) => getBundleHistory(id)) diff --git a/ui/src/lib/s3.ts b/ui/src/lib/s3.ts index 5695eac..ac48f24 100644 --- a/ui/src/lib/s3.ts +++ b/ui/src/lib/s3.ts @@ -66,7 +66,7 @@ async function getObjectContent(key: string): Promise { } export async function getTransactionMetadataByHash( - hash: string, + hash: string ): Promise { const key = `transactions/by_hash/${hash}`; const content = await getObjectContent(key); @@ -80,7 +80,7 @@ export async function getTransactionMetadataByHash( } catch (error) { console.error( `Failed to parse transaction metadata for hash ${hash}:`, - error, + error ); return null; } @@ -160,13 +160,11 @@ export interface BundleHistory { } export async function getBundleHistory( - bundleId: string, + bundleId: string ): Promise { const key = `bundles/${bundleId}`; const content = await getObjectContent(key); - console.log("content", content); - if (!content) { return null; } @@ -176,7 +174,7 @@ export async function getBundleHistory( } catch (error) { console.error( `Failed to parse bundle history for bundle ${bundleId}:`, - error, + error ); return null; } @@ -203,7 +201,7 @@ export interface BlockData { } export async function getBlockFromCache( - blockHash: string, + blockHash: string ): Promise { const key = `blocks/${blockHash}`; const content = await getObjectContent(key); @@ -224,7 +222,7 @@ export async function getBlockFromCache( (tx: BlockTransaction & { gasUsed: string }) => ({ ...tx, gasUsed: BigInt(tx.gasUsed), - }), + }) ), } as BlockData; } catch (error) { @@ -241,7 +239,7 @@ export async function cacheBlockData(blockData: BlockData): Promise { Bucket: BUCKET_NAME, Key: key, Body: JSON.stringify(blockData, (_, value) => - typeof value === "bigint" ? value.toString() : value, + typeof value === "bigint" ? value.toString() : value ), ContentType: "application/json", }); From 2f0a86bcde3e33c401d9fe5454b26b0478f2484c Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Mon, 15 Dec 2025 10:47:57 -0500 Subject: [PATCH 4/5] clean up pt2 --- crates/audit/src/reader.rs | 4 ++-- crates/audit/src/storage.rs | 8 -------- crates/audit/src/types.rs | 19 +++++-------------- crates/ingress-rpc/src/lib.rs | 2 +- justfile | 5 ----- 5 files changed, 8 insertions(+), 30 deletions(-) diff --git a/crates/audit/src/reader.rs b/crates/audit/src/reader.rs index a1469fe..45cec31 100644 --- a/crates/audit/src/reader.rs +++ b/crates/audit/src/reader.rs @@ -10,7 +10,7 @@ use rdkafka::{ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tips_core::kafka::load_kafka_config_from_file; use tokio::time::sleep; -use tracing::{error, info}; +use tracing::{debug, error}; pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result { let client_config: ClientConfig = @@ -79,7 +79,7 @@ impl EventReader for KafkaAuditLogReader { let event: BundleEvent = serde_json::from_slice(payload)?; - info!( + debug!( event_name = %event.event_name(), bundle_id = ?event.bundle_id(), tx_hash = ?event.tx_hash(), diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 16fe782..0a1de8b 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -63,45 +63,38 @@ pub enum BundleHistoryEvent { timestamp: i64, reason: DropReason, }, - /// Transaction received by ingress-rpc TransactionReceived { key: String, timestamp: i64, bundle: Box, }, - /// Transaction sent from ingress-rpc TransactionSent { key: String, timestamp: i64, tx_hash: TxHash, }, - /// Backrun bundle received by ingress-rpc BackrunReceived { key: String, timestamp: i64, bundle: Box, }, - /// Backrun bundle sent to builder from ingress-rpc BackrunSent { key: String, timestamp: i64, target_tx_hash: TxHash, }, - /// Backrun bundle inserted into builder store in builder BackrunInserted { key: String, timestamp: i64, target_tx_hash: TxHash, backrun_tx_hashes: Vec, }, - /// Transaction selected for execution StartExecuting { key: String, timestamp: i64, tx_hash: TxHash, block_number: u64, }, - /// Transaction successfully executed Executed { key: String, timestamp: i64, @@ -109,7 +102,6 @@ pub enum BundleHistoryEvent { block_number: u64, gas_used: u64, }, - /// Backrun bundle transaction executed BackrunBundleExecuted { key: String, timestamp: i64, diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index 986af73..d1ef9ad 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -268,22 +268,13 @@ impl BundleEvent { BundleEvent::BackrunInserted { bundle_id, .. } => { format!("backrun-inserted-{bundle_id}") } - BundleEvent::StartExecuting { - tx_hash, - .. - } => { + BundleEvent::StartExecuting { tx_hash, .. } => { format!("tx-start-executing-{tx_hash}") - }, - BundleEvent::Executed { - tx_hash, - .. - } => { + } + BundleEvent::Executed { tx_hash, .. } => { format!("tx-executed-{tx_hash}") - }, - BundleEvent::BackrunBundleExecuted { - bundle_id, - .. - } => { + } + BundleEvent::BackrunBundleExecuted { bundle_id, .. } => { format!("backrun-bundle-executed-{bundle_id}") } _ => { diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 44b28b5..008b07b 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -11,7 +11,7 @@ use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use tips_core::MeterBundleResponse; use tokio::sync::broadcast; -use tracing::{error, info, warn}; +use tracing::{error, warn}; use url::Url; #[derive(Debug, Clone, Copy)] diff --git a/justfile b/justfile index 3945f32..07756ee 100644 --- a/justfile +++ b/justfile @@ -99,11 +99,6 @@ get-blocks: sender := "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" sender_key := "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" -backrunner := "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" -backrunner_key := "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a" - -chain_id := "13" - send-txn: #!/usr/bin/env bash set -euxo pipefail From abb5c2381b0896adb433a4b47ce439ca40dec03b Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Mon, 15 Dec 2025 15:30:59 -0500 Subject: [PATCH 5/5] fix justfile --- justfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/justfile b/justfile index 07756ee..2e36735 100644 --- a/justfile +++ b/justfile @@ -99,6 +99,8 @@ get-blocks: sender := "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" sender_key := "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" +chain_id := "13" + send-txn: #!/usr/bin/env bash set -euxo pipefail