From 37f65515bb282cda18ec0070e0c51149ffe822f2 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 17:29:51 -0500 Subject: [PATCH 01/12] chore: begining kafka engine --- Cargo.lock | 1 + crates/account-abstraction-core/Cargo.toml | 3 +- .../core/src/kafka_mempool_engine.rs | 62 +++++++++++++++++++ .../account-abstraction-core/core/src/lib.rs | 1 + .../core/src/types.rs | 2 +- 5 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 crates/account-abstraction-core/core/src/kafka_mempool_engine.rs diff --git a/Cargo.lock b/Cargo.lock index 49c509f..829adc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,7 @@ dependencies = [ "async-trait", "jsonrpsee", "op-alloy-network", + "rdkafka", "reth-rpc-eth-types", "serde", "serde_json", diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index 55277ca..a63ebb9 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -22,8 +22,9 @@ jsonrpsee.workspace = true async-trait = { workspace = true } alloy-sol-types.workspace= true anyhow.workspace = true +rdkafka.workspace = true +serde_json.workspace = true [dev-dependencies] alloy-primitives.workspace = true -serde_json.workspace = true wiremock.workspace = true diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs new file mode 100644 index 0000000..f2af9fe --- /dev/null +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -0,0 +1,62 @@ +use crate::mempool::{self, Mempool}; +use crate::types::WrappedUserOperation; +use rdkafka::{consumer::StreamConsumer, Message}; +use serde::{Deserialize, Serialize}; +use serde_json; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "event", content = "data")] +pub enum KafkaEvent { + UserOpAdded { + user_op: WrappedUserOperation, + }, + UserOpIncluded { + user_op: WrappedUserOperation, + }, + UserOpDropped { + user_op: WrappedUserOperation, + reason: String, + }, +} + +pub struct KafkaMempoolEngine { + mempool: Arc>, + kafka_producer: StreamConsumer, +} + +impl KafkaMempoolEngine { + pub fn new( + mempool: Arc>, + kafka_producer: StreamConsumer, + ) -> Self { + Self { + mempool, + kafka_producer, + } + } + + pub async fn run(&self) -> anyhow::Result<()> { + loop { + let msg = self.kafka_producer.recv().await?.detach(); + let payload = msg + .payload() + .ok_or_else(|| anyhow::anyhow!("Kafka message missing payload"))?; + let event: KafkaEvent = serde_json::from_slice(payload).map_err(|e| anyhow::anyhow!("Failed to parse Kafka event: {e}"))?; + + match event { + KafkaEvent::UserOpAdded { user_op } => { + self.mempool.write().await.add_operation(&user_op)?; + } + KafkaEvent::UserOpIncluded { user_op } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + KafkaEvent::UserOpDropped { user_op, reason: _ } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + } + } + } +} + diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index b3aa2f7..e6f0ffb 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -3,4 +3,5 @@ pub mod entrypoints; pub mod types; pub use account_abstraction_service::{AccountAbstractionService, AccountAbstractionServiceImpl}; pub use types::{SendUserOperationResponse, VersionedUserOperation}; +pub mod kafka_mempool_engine; pub mod mempool; diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 4600839..3d79564 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -138,7 +138,7 @@ pub struct AggregatorInfo { pub type UserOpHash = FixedBytes<32>; -#[derive(Eq, PartialEq, Clone, Debug)] +#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct WrappedUserOperation { pub operation: VersionedUserOperation, pub hash: UserOpHash, From e5d1e449033a498074ff40db1d151e63d8cff8c6 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Tue, 16 Dec 2025 18:03:54 -0500 Subject: [PATCH 02/12] chore: create small mempool engine --- .../core/src/kafka_mempool_engine.rs | 191 ++++++++++++++++-- .../core/src/mempool.rs | 8 + 2 files changed, 179 insertions(+), 20 deletions(-) diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index f2af9fe..029a137 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -1,6 +1,7 @@ use crate::mempool::{self, Mempool}; use crate::types::WrappedUserOperation; -use rdkafka::{consumer::StreamConsumer, Message}; +use async_trait::async_trait; +use rdkafka::{Message, consumer::StreamConsumer, message::OwnedMessage}; use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; @@ -21,42 +22,192 @@ pub enum KafkaEvent { }, } +#[async_trait] +pub trait KafkaConsumer { + async fn recv_msg(&self) -> anyhow::Result; +} + +#[async_trait] +impl KafkaConsumer for StreamConsumer { + async fn recv_msg(&self) -> anyhow::Result { + Ok(self.recv().await?.detach()) + } +} + pub struct KafkaMempoolEngine { mempool: Arc>, - kafka_producer: StreamConsumer, + kafka_consumer: Arc, } impl KafkaMempoolEngine { pub fn new( mempool: Arc>, - kafka_producer: StreamConsumer, + kafka_consumer: Arc, ) -> Self { Self { mempool, - kafka_producer, + kafka_consumer, } } pub async fn run(&self) -> anyhow::Result<()> { loop { - let msg = self.kafka_producer.recv().await?.detach(); - let payload = msg - .payload() - .ok_or_else(|| anyhow::anyhow!("Kafka message missing payload"))?; - let event: KafkaEvent = serde_json::from_slice(payload).map_err(|e| anyhow::anyhow!("Failed to parse Kafka event: {e}"))?; - - match event { - KafkaEvent::UserOpAdded { user_op } => { - self.mempool.write().await.add_operation(&user_op)?; - } - KafkaEvent::UserOpIncluded { user_op } => { - self.mempool.write().await.remove_operation(&user_op.hash)?; - } - KafkaEvent::UserOpDropped { user_op, reason: _ } => { - self.mempool.write().await.remove_operation(&user_op.hash)?; - } + self.process_next().await?; + } + } + + /// Process a single Kafka message (useful for tests and controlled loops) + pub async fn process_next(&self) -> anyhow::Result<()> { + let msg = self.kafka_consumer.recv_msg().await?; + let payload = msg + .payload() + .ok_or_else(|| anyhow::anyhow!("Kafka message missing payload"))?; + let event: KafkaEvent = serde_json::from_slice(payload) + .map_err(|e| anyhow::anyhow!("Failed to parse Kafka event: {e}"))?; + + self.handle_event(event).await + } + + async fn handle_event(&self, event: KafkaEvent) -> anyhow::Result<()> { + println!("Handling Kafka event: {:?}", event); + match event { + KafkaEvent::UserOpAdded { user_op } => { + self.mempool.write().await.add_operation(&user_op)?; + } + KafkaEvent::UserOpIncluded { user_op } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + KafkaEvent::UserOpDropped { user_op, reason: _ } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; } } + Ok(()) } } +#[cfg(test)] +mod tests { + use super::*; + use crate::mempool::PoolConfig; + use crate::types::VersionedUserOperation; + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::erc4337; + use rdkafka::Timestamp; + use tokio::sync::Mutex; + + fn make_wrapped_op(max_fee: u128, hash: [u8; 32]) -> WrappedUserOperation { + let op = VersionedUserOperation::UserOperation(erc4337::UserOperation { + sender: Address::ZERO, + nonce: Uint::from(0u64), + init_code: Default::default(), + call_data: Default::default(), + call_gas_limit: Uint::from(100_000u64), + verification_gas_limit: Uint::from(100_000u64), + pre_verification_gas: Uint::from(21_000u64), + max_fee_per_gas: Uint::from(max_fee), + max_priority_fee_per_gas: Uint::from(max_fee), + paymaster_and_data: Default::default(), + signature: Default::default(), + }); + + WrappedUserOperation { + operation: op, + hash: FixedBytes::from(hash), + } + } + + #[tokio::test] + async fn handle_add_operation() { + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::new(0)))); + + let op_hash = [1u8; 32]; + let wrapped = make_wrapped_op(1_000, op_hash); + + let add_event = KafkaEvent::UserOpAdded { + user_op: wrapped.clone(), + }; + let mock_consumer = Arc::new(MockConsumer::new(vec![OwnedMessage::new( + Some(serde_json::to_vec(&add_event).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + )])); + + let engine = KafkaMempoolEngine::new(mempool.clone(), mock_consumer); + + // Process add then remove deterministically + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from(op_hash)); + } + + #[tokio::test] + async fn remove_opperation_should_remove_from_mempool() { + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::new(0)))); + let op_hash = [1u8; 32]; + let wrapped = make_wrapped_op(1_000, op_hash); + let add_mempool = KafkaEvent::UserOpAdded { + user_op: wrapped.clone(), + }; + let remove_mempool = KafkaEvent::UserOpDropped { + user_op: wrapped.clone(), + reason: "test".to_string(), + }; + let mock_consumer = Arc::new(MockConsumer::new(vec![ + OwnedMessage::new( + Some(serde_json::to_vec(&add_mempool).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ), + OwnedMessage::new( + Some(serde_json::to_vec(&remove_mempool).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ), + ])); + + let engine = KafkaMempoolEngine::new(mempool.clone(), mock_consumer); + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from(op_hash)); + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 0); + } + struct MockConsumer { + msgs: Mutex>, + } + + impl MockConsumer { + fn new(msgs: Vec) -> Self { + Self { + msgs: Mutex::new(msgs), + } + } + } + + #[async_trait] + impl KafkaConsumer for MockConsumer { + async fn recv_msg(&self) -> anyhow::Result { + let mut guard = self.msgs.lock().await; + if guard.is_empty() { + Err(anyhow::anyhow!("no more messages")) + } else { + Ok(guard.remove(0)) + } + } + } +} diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index d5f2fe0..b2fb881 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -9,6 +9,14 @@ pub struct PoolConfig { minimum_max_fee_per_gas: u128, } +impl PoolConfig { + pub fn new(minimum_max_fee_per_gas: u128) -> Self { + Self { + minimum_max_fee_per_gas, + } + } +} + #[derive(Eq, PartialEq, Clone, Debug)] pub struct OrderedPoolOperation { pub pool_operation: WrappedUserOperation, From f95239fba945550bc2755768e83abdd128174198 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 10:10:52 -0500 Subject: [PATCH 03/12] chore: create kafka engine --- .../core/src/kafka_mempool_engine.rs | 19 ++++++++-- .../account-abstraction-core/core/src/lib.rs | 1 + .../core/src/mempool.rs | 4 +-- .../core/src/reputation_service.rs | 35 +++++++++++++++++++ crates/ingress-rpc/src/service.rs | 16 ++++++++- 5 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 crates/account-abstraction-core/core/src/reputation_service.rs diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 029a137..2279f25 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; use tokio::sync::RwLock; +use crate::mempool::PoolConfig; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] @@ -50,6 +51,20 @@ impl KafkaMempoolEngine { } } + pub fn with_kafka_consumer(kafka_consumer: Arc, pool_config: Option) -> Self { + let pool_config = pool_config.unwrap_or(PoolConfig::default()); + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(pool_config))); + Self { + mempool, + kafka_consumer, + } + + } + + pub fn get_mempool(&self) -> Arc> { + self.mempool.clone() + } + pub async fn run(&self) -> anyhow::Result<()> { loop { self.process_next().await?; @@ -118,7 +133,7 @@ mod tests { #[tokio::test] async fn handle_add_operation() { - let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::new(0)))); + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::default()))); let op_hash = [1u8; 32]; let wrapped = make_wrapped_op(1_000, op_hash); @@ -147,7 +162,7 @@ mod tests { #[tokio::test] async fn remove_opperation_should_remove_from_mempool() { - let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::new(0)))); + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::default()))); let op_hash = [1u8; 32]; let wrapped = make_wrapped_op(1_000, op_hash); let add_mempool = KafkaEvent::UserOpAdded { diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index e6f0ffb..6a609a9 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -5,3 +5,4 @@ pub use account_abstraction_service::{AccountAbstractionService, AccountAbstract pub use types::{SendUserOperationResponse, VersionedUserOperation}; pub mod kafka_mempool_engine; pub mod mempool; +pub mod reputation_service; \ No newline at end of file diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index b2fb881..7feaaef 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -10,9 +10,9 @@ pub struct PoolConfig { } impl PoolConfig { - pub fn new(minimum_max_fee_per_gas: u128) -> Self { + pub fn default() -> Self { Self { - minimum_max_fee_per_gas, + minimum_max_fee_per_gas: 0, } } } diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs new file mode 100644 index 0000000..40bc6de --- /dev/null +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; +use tokio::sync::RwLock; +use alloy_primitives::Address; +use crate::mempool; + +/// Reputation status for an entity +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ReputationStatus { + /// Entity is not throttled or banned + Ok, + /// Entity is throttled + Throttled, + /// Entity is banned + Banned, +} + +pub trait ReputationService { + fn get_reputation(&self, entity: &Address) -> ReputationStatus; +} + +pub struct ReputationServiceImpl { + mempool: Arc>, +} + +impl ReputationServiceImpl { + pub fn new(mempool: Arc>) -> Self { + Self { mempool } + } +} + +impl ReputationService for ReputationServiceImpl { + fn get_reputation(&self, entity: &Address) -> ReputationStatus { + ReputationStatus::Ok + } +} \ No newline at end of file diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index f0d3341..ce8a6c3 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,3 +1,5 @@ +use account_abstraction_core::kafka_mempool_engine::KafkaMempoolEngine; +use account_abstraction_core::reputation_service::ReputationStatus; use alloy_consensus::transaction::Recovered; use alloy_consensus::{Transaction, transaction::SignerRecoverable}; use alloy_primitives::{Address, B256, Bytes, FixedBytes}; @@ -25,8 +27,9 @@ use crate::validation::validate_bundle; use crate::{Config, TxSubmissionMethod}; use account_abstraction_core::entrypoints::version::EntryPointVersion; use account_abstraction_core::types::{UserOperationRequest, VersionedUserOperation}; -use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl}; +use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl, reputation_service::{ReputationService, ReputationServiceImpl}, mempool::{Mempool, PoolConfig}}; use std::sync::Arc; +use tokio::sync::RwLock; /// RPC providers for different endpoints pub struct Providers { @@ -69,6 +72,7 @@ pub struct IngressService { tx_submission_method: TxSubmissionMethod, bundle_queue_publisher: BundleQueuePublisher, user_op_queue_publisher: UserOpQueuePublisher, + reputation_service: Arc, audit_channel: mpsc::UnboundedSender, send_transaction_default_lifetime_seconds: u64, metrics: Metrics, @@ -97,6 +101,8 @@ impl IngressService { config.validate_user_operation_timeout_ms, ); let queue_connection = Arc::new(queue); + + let mempool = KafkaMempoolEngine::with_kafka_consumer(kafka_consumer, None); Self { mempool_provider, simulation_provider, @@ -111,6 +117,7 @@ impl IngressService { queue_connection.clone(), config.ingress_topic, ), + reputation_service: Arc::new(ReputationServiceImpl::new(mempool.get_mempool())), audit_channel, send_transaction_default_lifetime_seconds: config .send_transaction_default_lifetime_seconds, @@ -347,6 +354,13 @@ impl IngressApiServer for IngressService { EthApiError::InvalidParams(e.to_string()).into_rpc_err() })?; + let reputation = self.reputation_service.get_reputation(&request.user_operation.sender()); + if reputation == ReputationStatus::Banned { + return Err(EthApiError::InvalidParams("User operation sender is banned".into()).into_rpc_err()); + }else if reputation == ReputationStatus::Throttled { + return Err(EthApiError::InvalidParams("User operation sender is throttled".into()).into_rpc_err()); + } + let _ = self .account_abstraction_service .validate_user_operation(&request.user_operation, &entry_point) From e789cb40018a3d697e05c4c68cc980e997ee0af8 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 10:56:30 -0500 Subject: [PATCH 04/12] chore: init kafka mempool engine --- .../core/src/kafka_mempool_engine.rs | 16 +- .../account-abstraction-core/core/src/lib.rs | 2 +- .../core/src/reputation_service.rs | 6 +- crates/ingress-rpc/src/bin/main.rs | 21 +++ .../ingress-rpc/src/kafka_mempool_consumer.rs | 177 ++++++++++++++++++ crates/ingress-rpc/src/lib.rs | 16 ++ crates/ingress-rpc/src/service.rs | 66 ++++++- 7 files changed, 286 insertions(+), 18 deletions(-) create mode 100644 crates/ingress-rpc/src/kafka_mempool_consumer.rs diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 2279f25..5a153ae 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -1,3 +1,4 @@ +use crate::mempool::PoolConfig; use crate::mempool::{self, Mempool}; use crate::types::WrappedUserOperation; use async_trait::async_trait; @@ -6,7 +7,6 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; use tokio::sync::RwLock; -use crate::mempool::PoolConfig; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] @@ -51,14 +51,16 @@ impl KafkaMempoolEngine { } } - pub fn with_kafka_consumer(kafka_consumer: Arc, pool_config: Option) -> Self { + pub fn with_kafka_consumer( + kafka_consumer: Arc, + pool_config: Option, + ) -> Self { let pool_config = pool_config.unwrap_or(PoolConfig::default()); let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(pool_config))); Self { mempool, kafka_consumer, } - } pub fn get_mempool(&self) -> Arc> { @@ -133,7 +135,9 @@ mod tests { #[tokio::test] async fn handle_add_operation() { - let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::default()))); + let mempool = Arc::new(RwLock::new( + mempool::MempoolImpl::new(PoolConfig::default()), + )); let op_hash = [1u8; 32]; let wrapped = make_wrapped_op(1_000, op_hash); @@ -162,7 +166,9 @@ mod tests { #[tokio::test] async fn remove_opperation_should_remove_from_mempool() { - let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(PoolConfig::default()))); + let mempool = Arc::new(RwLock::new( + mempool::MempoolImpl::new(PoolConfig::default()), + )); let op_hash = [1u8; 32]; let wrapped = make_wrapped_op(1_000, op_hash); let add_mempool = KafkaEvent::UserOpAdded { diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index 6a609a9..4fda184 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -5,4 +5,4 @@ pub use account_abstraction_service::{AccountAbstractionService, AccountAbstract pub use types::{SendUserOperationResponse, VersionedUserOperation}; pub mod kafka_mempool_engine; pub mod mempool; -pub mod reputation_service; \ No newline at end of file +pub mod reputation_service; diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs index 40bc6de..49b4aec 100644 --- a/crates/account-abstraction-core/core/src/reputation_service.rs +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -1,7 +1,7 @@ +use crate::mempool; +use alloy_primitives::Address; use std::sync::Arc; use tokio::sync::RwLock; -use alloy_primitives::Address; -use crate::mempool; /// Reputation status for an entity #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -32,4 +32,4 @@ impl ReputationService for ReputationServiceImpl { fn get_reputation(&self, entity: &Address) -> ReputationStatus { ReputationStatus::Ok } -} \ No newline at end of file +} diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 67da612..9e6e138 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -4,6 +4,7 @@ use jsonrpsee::server::Server; use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use std::sync::Arc; use tips_audit::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher}; use tips_core::kafka::load_kafka_config_from_file; use tips_core::logger::init_logger_with_format; @@ -11,6 +12,7 @@ use tips_core::{Bundle, MeterBundleResponse}; use tips_ingress_rpc::Config; use tips_ingress_rpc::connect_ingress_to_builder; use tips_ingress_rpc::health::bind_health_server; +use tips_ingress_rpc::kafka_mempool_consumer::{create_mempool_engine, run_mempool_engine}; use tips_ingress_rpc::metrics::init_prometheus_exporter; use tips_ingress_rpc::queue::KafkaMessageQueue; use tips_ingress_rpc::service::{IngressApiServer, IngressService, Providers}; @@ -73,6 +75,23 @@ async fn main() -> anyhow::Result<()> { let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); + let user_op_properties_file = config + .user_operation_consumer_properties + .as_deref() + .unwrap_or(&config.ingress_kafka_properties); + + let mempool_engine = create_mempool_engine( + user_op_properties_file, + &config.user_operation_topic, + &config.user_operation_consumer_group_id, + None, + )?; + + let mempool_engine_handle = { + let engine = mempool_engine.clone(); + tokio::spawn(async move { run_mempool_engine(engine).await }) + }; + let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); let (builder_backrun_tx, _) = broadcast::channel::(config.max_buffered_backrun_bundles); @@ -95,6 +114,7 @@ async fn main() -> anyhow::Result<()> { audit_tx, builder_tx, builder_backrun_tx, + mempool_engine.clone(), cfg, ); let bind_addr = format!("{}:{}", config.address, config.port); @@ -110,6 +130,7 @@ async fn main() -> anyhow::Result<()> { handle.stopped().await; health_handle.abort(); + mempool_engine_handle.abort(); Ok(()) } diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs new file mode 100644 index 0000000..9148a3e --- /dev/null +++ b/crates/ingress-rpc/src/kafka_mempool_consumer.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; +use std::time::Duration; + +use account_abstraction_core::{kafka_mempool_engine::KafkaMempoolEngine, mempool::PoolConfig}; +use backon::{ExponentialBuilder, Retryable}; +use rdkafka::{ + ClientConfig, + consumer::{Consumer, StreamConsumer}, +}; +use tips_core::kafka::load_kafka_config_from_file; +use tracing::warn; + +/// Build a Kafka consumer for the user operation topic. +/// Ensures the consumer group id is set per deployment and subscribes to the topic. +fn create_user_operation_consumer( + properties_file: &str, + topic: &str, + consumer_group_id: &str, +) -> anyhow::Result { + let mut client_config = ClientConfig::from_iter(load_kafka_config_from_file(properties_file)?); + + // Allow deployments to control group id even if the properties file omits it. + client_config.set("group.id", consumer_group_id); + // Rely on Kafka for at-least-once; we keep auto commit enabled unless overridden in the file. + client_config.set("enable.auto.commit", "true"); + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic])?; + + Ok(consumer) +} + +/// Factory function that creates a fully configured KafkaMempoolEngine. +/// Handles consumer creation, engine instantiation, and Arc wrapping. +pub fn create_mempool_engine( + properties_file: &str, + topic: &str, + consumer_group_id: &str, + pool_config: Option, +) -> anyhow::Result> { + let consumer = create_user_operation_consumer(properties_file, topic, consumer_group_id)?; + Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(consumer), + pool_config, + ))) +} + +/// Process a single Kafka message with exponential backoff retries. +pub async fn process_next_with_backoff(engine: &KafkaMempoolEngine) -> anyhow::Result<()> { + let process = || async { engine.process_next().await }; + + process + .retry( + &ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(100)) + .with_max_delay(Duration::from_secs(5)) + .with_max_times(5), + ) + .notify(|err: &anyhow::Error, dur: Duration| { + warn!( + error = %err, + retry_in_ms = dur.as_millis(), + "Retrying Kafka mempool engine step" + ); + }) + .await +} + +/// Run the mempool engine forever, applying backoff on individual message failures. +pub async fn run_mempool_engine(engine: Arc) { + loop { + if let Err(err) = process_next_with_backoff(&engine).await { + // We log and continue to avoid stalling the consumer; repeated failures + // will still observe backoff inside `process_next_with_backoff`. + warn!(error = %err, "Kafka mempool engine exhausted retries, continuing"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use account_abstraction_core::{ + kafka_mempool_engine::{KafkaConsumer, KafkaEvent}, + mempool::PoolConfig, + types::{VersionedUserOperation, WrappedUserOperation}, + }; + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::erc4337; + use rdkafka::{Message, Timestamp, message::OwnedMessage}; + use tokio::sync::Mutex; + + fn make_wrapped_op(max_fee: u128, hash: [u8; 32]) -> WrappedUserOperation { + let op = VersionedUserOperation::UserOperation(erc4337::UserOperation { + sender: Address::ZERO, + nonce: Uint::from(0u64), + init_code: Default::default(), + call_data: Default::default(), + call_gas_limit: Uint::from(100_000u64), + verification_gas_limit: Uint::from(100_000u64), + pre_verification_gas: Uint::from(21_000u64), + max_fee_per_gas: Uint::from(max_fee), + max_priority_fee_per_gas: Uint::from(max_fee), + paymaster_and_data: Default::default(), + signature: Default::default(), + }); + + WrappedUserOperation { + operation: op, + hash: FixedBytes::from(hash), + } + } + + struct MockConsumer { + msgs: Mutex>>, + } + + impl MockConsumer { + fn new(msgs: Vec>) -> Self { + Self { + msgs: Mutex::new(msgs), + } + } + } + + #[async_trait::async_trait] + impl KafkaConsumer for MockConsumer { + async fn recv_msg(&self) -> anyhow::Result { + let mut guard = self.msgs.lock().await; + if guard.is_empty() { + Err(anyhow::anyhow!("no more messages")) + } else { + guard.remove(0) + } + } + } + + #[tokio::test] + async fn process_next_with_backoff_recovers_after_error() { + let add_event = KafkaEvent::UserOpAdded { + user_op: make_wrapped_op(1_000, [1u8; 32]), + }; + + let payload = serde_json::to_vec(&add_event).unwrap(); + let good_msg = OwnedMessage::new( + Some(payload), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ); + + // First call fails, second succeeds. + let consumer = Arc::new(MockConsumer::new(vec![ + Err(anyhow::anyhow!("transient error")), + Ok(good_msg), + ])); + + let engine = KafkaMempoolEngine::with_kafka_consumer(consumer, Some(PoolConfig::default())); + + // Should succeed after retrying the transient failure. + let result = process_next_with_backoff(&engine).await; + assert!(result.is_ok()); + + // The mempool should contain the added op. + let items: Vec<_> = engine + .get_mempool() + .read() + .await + .get_top_operations(10) + .collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from([1u8; 32])); + } +} diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 649e120..fa0a607 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -1,4 +1,5 @@ pub mod health; +pub mod kafka_mempool_consumer; pub mod metrics; pub mod queue; pub mod service; @@ -85,6 +86,21 @@ pub struct Config { )] pub audit_topic: String, + /// Kafka properties file for the user operation consumer (defaults to ingress properties if unset) + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE" + )] + pub user_operation_consumer_properties: Option, + + /// Consumer group id for user operation topic (set uniquely per deployment) + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_GROUP_ID", + default_value = "tips-user-operation" + )] + pub user_operation_consumer_group_id: String, + /// User operation topic for pushing valid user operations #[arg( long, diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index ce8a6c3..65ae703 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -27,7 +27,11 @@ use crate::validation::validate_bundle; use crate::{Config, TxSubmissionMethod}; use account_abstraction_core::entrypoints::version::EntryPointVersion; use account_abstraction_core::types::{UserOperationRequest, VersionedUserOperation}; -use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl, reputation_service::{ReputationService, ReputationServiceImpl}, mempool::{Mempool, PoolConfig}}; +use account_abstraction_core::{ + AccountAbstractionService, AccountAbstractionServiceImpl, + mempool::{Mempool, PoolConfig}, + reputation_service::{ReputationService, ReputationServiceImpl}, +}; use std::sync::Arc; use tokio::sync::RwLock; @@ -73,6 +77,7 @@ pub struct IngressService { bundle_queue_publisher: BundleQueuePublisher, user_op_queue_publisher: UserOpQueuePublisher, reputation_service: Arc, + mempool_engine: Arc, audit_channel: mpsc::UnboundedSender, send_transaction_default_lifetime_seconds: u64, metrics: Metrics, @@ -90,6 +95,7 @@ impl IngressService { audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, builder_backrun_tx: broadcast::Sender, + mempool_engine: Arc, config: Config, ) -> Self { let mempool_provider = Arc::new(providers.mempool); @@ -102,7 +108,6 @@ impl IngressService { ); let queue_connection = Arc::new(queue); - let mempool = KafkaMempoolEngine::with_kafka_consumer(kafka_consumer, None); Self { mempool_provider, simulation_provider, @@ -117,7 +122,8 @@ impl IngressService { queue_connection.clone(), config.ingress_topic, ), - reputation_service: Arc::new(ReputationServiceImpl::new(mempool.get_mempool())), + reputation_service: Arc::new(ReputationServiceImpl::new(mempool_engine.get_mempool())), + mempool_engine, audit_channel, send_transaction_default_lifetime_seconds: config .send_transaction_default_lifetime_seconds, @@ -354,11 +360,18 @@ impl IngressApiServer for IngressService { EthApiError::InvalidParams(e.to_string()).into_rpc_err() })?; - let reputation = self.reputation_service.get_reputation(&request.user_operation.sender()); + let reputation = self + .reputation_service + .get_reputation(&request.user_operation.sender()); if reputation == ReputationStatus::Banned { - return Err(EthApiError::InvalidParams("User operation sender is banned".into()).into_rpc_err()); - }else if reputation == ReputationStatus::Throttled { - return Err(EthApiError::InvalidParams("User operation sender is throttled".into()).into_rpc_err()); + return Err( + EthApiError::InvalidParams("User operation sender is banned".into()).into_rpc_err(), + ); + } else if reputation == ReputationStatus::Throttled { + return Err( + EthApiError::InvalidParams("User operation sender is throttled".into()) + .into_rpc_err(), + ); } let _ = self @@ -507,6 +520,7 @@ impl IngressService { mod tests { use super::*; use crate::{Config, TxSubmissionMethod, queue::MessageQueue}; + use account_abstraction_core::kafka_mempool_engine::KafkaConsumer; use alloy_provider::RootProvider; use anyhow::Result; use async_trait::async_trait; @@ -514,6 +528,7 @@ mod tests { use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; use jsonrpsee::server::{ServerBuilder, ServerHandle}; use mockall::mock; + use rdkafka::message::OwnedMessage; use serde_json::json; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; @@ -530,6 +545,15 @@ mod tests { } } + struct NoopConsumer; + + #[async_trait] + impl KafkaConsumer for NoopConsumer { + async fn recv_msg(&self) -> anyhow::Result { + Err(anyhow::anyhow!("no messages")) + } + } + fn create_test_config(mock_server: &MockServer) -> Config { Config { address: IpAddr::from([127, 0, 0, 1]), @@ -540,6 +564,8 @@ mod tests { ingress_topic: String::new(), audit_kafka_properties: String::new(), audit_topic: String::new(), + user_operation_consumer_properties: None, + user_operation_consumer_group_id: "tips-user-operation".to_string(), log_level: String::from("info"), log_format: tips_core::logger::LogFormat::Pretty, send_transaction_default_lifetime_seconds: 300, @@ -666,8 +692,19 @@ mod tests { let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let mempool_engine = Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(NoopConsumer), + None, + )); + let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + mempool_engine, + config, ); let bundle = Bundle::default(); @@ -725,8 +762,19 @@ mod tests { let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let mempool_engine = Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(NoopConsumer), + None, + )); + let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + mempool_engine, + config, ); // Valid signed transaction bytes From 9b4fbf0b6e96e5228a8c7dd3e488f48405a88d5c Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 10:57:45 -0500 Subject: [PATCH 05/12] chore: create kafka mempool engine --- .../account-abstraction-core/core/src/kafka_mempool_engine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 5a153ae..25304e4 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -24,7 +24,7 @@ pub enum KafkaEvent { } #[async_trait] -pub trait KafkaConsumer { +pub trait KafkaConsumer: Send + Sync { async fn recv_msg(&self) -> anyhow::Result; } From e111e7ae21acfba541cdecaf405175e60fd913a7 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 11:07:10 -0500 Subject: [PATCH 06/12] chore: kafka event factory creation --- .../core/src/reputation_service.rs | 2 +- crates/ingress-rpc/src/bin/main.rs | 1 - .../ingress-rpc/src/kafka_mempool_consumer.rs | 101 +----------------- crates/ingress-rpc/src/queue.rs | 16 ++- crates/ingress-rpc/src/service.rs | 2 - 5 files changed, 16 insertions(+), 106 deletions(-) diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs index 49b4aec..f0a5e85 100644 --- a/crates/account-abstraction-core/core/src/reputation_service.rs +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -29,7 +29,7 @@ impl ReputationServiceImpl { } impl ReputationService for ReputationServiceImpl { - fn get_reputation(&self, entity: &Address) -> ReputationStatus { + fn get_reputation(&self, _entity: &Address) -> ReputationStatus { ReputationStatus::Ok } } diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 9e6e138..ac9f14b 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -4,7 +4,6 @@ use jsonrpsee::server::Server; use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; -use std::sync::Arc; use tips_audit::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher}; use tips_core::kafka::load_kafka_config_from_file; use tips_core::logger::init_logger_with_format; diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs index 9148a3e..f62c673 100644 --- a/crates/ingress-rpc/src/kafka_mempool_consumer.rs +++ b/crates/ingress-rpc/src/kafka_mempool_consumer.rs @@ -75,103 +75,4 @@ pub async fn run_mempool_engine(engine: Arc) { warn!(error = %err, "Kafka mempool engine exhausted retries, continuing"); } } -} - -#[cfg(test)] -mod tests { - use super::*; - use account_abstraction_core::{ - kafka_mempool_engine::{KafkaConsumer, KafkaEvent}, - mempool::PoolConfig, - types::{VersionedUserOperation, WrappedUserOperation}, - }; - use alloy_primitives::{Address, FixedBytes, Uint}; - use alloy_rpc_types::erc4337; - use rdkafka::{Message, Timestamp, message::OwnedMessage}; - use tokio::sync::Mutex; - - fn make_wrapped_op(max_fee: u128, hash: [u8; 32]) -> WrappedUserOperation { - let op = VersionedUserOperation::UserOperation(erc4337::UserOperation { - sender: Address::ZERO, - nonce: Uint::from(0u64), - init_code: Default::default(), - call_data: Default::default(), - call_gas_limit: Uint::from(100_000u64), - verification_gas_limit: Uint::from(100_000u64), - pre_verification_gas: Uint::from(21_000u64), - max_fee_per_gas: Uint::from(max_fee), - max_priority_fee_per_gas: Uint::from(max_fee), - paymaster_and_data: Default::default(), - signature: Default::default(), - }); - - WrappedUserOperation { - operation: op, - hash: FixedBytes::from(hash), - } - } - - struct MockConsumer { - msgs: Mutex>>, - } - - impl MockConsumer { - fn new(msgs: Vec>) -> Self { - Self { - msgs: Mutex::new(msgs), - } - } - } - - #[async_trait::async_trait] - impl KafkaConsumer for MockConsumer { - async fn recv_msg(&self) -> anyhow::Result { - let mut guard = self.msgs.lock().await; - if guard.is_empty() { - Err(anyhow::anyhow!("no more messages")) - } else { - guard.remove(0) - } - } - } - - #[tokio::test] - async fn process_next_with_backoff_recovers_after_error() { - let add_event = KafkaEvent::UserOpAdded { - user_op: make_wrapped_op(1_000, [1u8; 32]), - }; - - let payload = serde_json::to_vec(&add_event).unwrap(); - let good_msg = OwnedMessage::new( - Some(payload), - None, - "topic".to_string(), - Timestamp::NotAvailable, - 0, - 0, - None, - ); - - // First call fails, second succeeds. - let consumer = Arc::new(MockConsumer::new(vec![ - Err(anyhow::anyhow!("transient error")), - Ok(good_msg), - ])); - - let engine = KafkaMempoolEngine::with_kafka_consumer(consumer, Some(PoolConfig::default())); - - // Should succeed after retrying the transient failure. - let result = process_next_with_backoff(&engine).await; - assert!(result.is_ok()); - - // The mempool should contain the added op. - let items: Vec<_> = engine - .get_mempool() - .read() - .await - .get_top_operations(10) - .collect(); - assert_eq!(items.len(), 1); - assert_eq!(items[0].hash, FixedBytes::from([1u8; 32])); - } -} +} \ No newline at end of file diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index ee6063c..4f2daad 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,4 +1,4 @@ -use account_abstraction_core::types::VersionedUserOperation; +use account_abstraction_core::{kafka_mempool_engine::KafkaEvent, types::{VersionedUserOperation, WrappedUserOperation}}; use alloy_primitives::B256; use anyhow::Result; use async_trait::async_trait; @@ -79,9 +79,21 @@ impl UserOpQueuePublisher { pub async fn publish(&self, user_op: &VersionedUserOperation, hash: &B256) -> Result<()> { let key = hash.to_string(); - let payload = serde_json::to_vec(&user_op)?; + let event = self.create_user_op_added_event(user_op, hash); + let payload = serde_json::to_vec(&event)?; self.queue.publish(&self.topic, &key, &payload).await } + + fn create_user_op_added_event(&self, user_op: &VersionedUserOperation, hash: &B256) -> KafkaEvent { + let wrapped_user_op = WrappedUserOperation { + operation: user_op.clone(), + hash: hash.clone(), + }; + let event = KafkaEvent::UserOpAdded { + user_op: wrapped_user_op, + }; + event + } } pub struct BundleQueuePublisher { diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 65ae703..c7dec6a 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -29,11 +29,9 @@ use account_abstraction_core::entrypoints::version::EntryPointVersion; use account_abstraction_core::types::{UserOperationRequest, VersionedUserOperation}; use account_abstraction_core::{ AccountAbstractionService, AccountAbstractionServiceImpl, - mempool::{Mempool, PoolConfig}, reputation_service::{ReputationService, ReputationServiceImpl}, }; use std::sync::Arc; -use tokio::sync::RwLock; /// RPC providers for different endpoints pub struct Providers { From e100f1ef0f7233b81c8664349f179dd823f0fdcf Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 11:52:27 -0500 Subject: [PATCH 07/12] chore: create consumer logic --- .env.example | 1 + crates/ingress-rpc/src/bin/main.rs | 5 +---- .../ingress-rpc/src/kafka_mempool_consumer.rs | 2 +- crates/ingress-rpc/src/lib.rs | 4 ++-- crates/ingress-rpc/src/queue.rs | 19 +++++++++++++------ crates/ingress-rpc/src/service.rs | 2 +- docker-compose.tips.yml | 7 +++++++ ...s-user-operation-consumer-kafka-properties | 9 +++++++++ 8 files changed, 35 insertions(+), 14 deletions(-) create mode 100644 docker/ingress-user-operation-consumer-kafka-properties diff --git a/.env.example b/.env.example index bb4a66a..4058bd9 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,7 @@ TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE=/app/docker/ingress-bundles-kafka-pro TIPS_INGRESS_KAFKA_INGRESS_TOPIC=tips-ingress TIPS_INGRESS_KAFKA_AUDIT_PROPERTIES_FILE=/app/docker/ingress-audit-kafka-properties TIPS_INGRESS_KAFKA_AUDIT_TOPIC=tips-audit +TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE=/app/docker/ingress-user-operation-consumer-kafka-properties TIPS_INGRESS_LOG_LEVEL=info TIPS_INGRESS_LOG_FORMAT=pretty TIPS_INGRESS_SEND_TRANSACTION_DEFAULT_LIFETIME_SECONDS=10800 diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index ac9f14b..a3f1ce8 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -74,10 +74,7 @@ async fn main() -> anyhow::Result<()> { let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); - let user_op_properties_file = config - .user_operation_consumer_properties - .as_deref() - .unwrap_or(&config.ingress_kafka_properties); + let user_op_properties_file = &config.user_operation_consumer_properties; let mempool_engine = create_mempool_engine( user_op_properties_file, diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs index f62c673..63efd80 100644 --- a/crates/ingress-rpc/src/kafka_mempool_consumer.rs +++ b/crates/ingress-rpc/src/kafka_mempool_consumer.rs @@ -75,4 +75,4 @@ pub async fn run_mempool_engine(engine: Arc) { warn!(error = %err, "Kafka mempool engine exhausted retries, continuing"); } } -} \ No newline at end of file +} diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index fa0a607..cc58640 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -86,12 +86,12 @@ pub struct Config { )] pub audit_topic: String, - /// Kafka properties file for the user operation consumer (defaults to ingress properties if unset) + /// Kafka properties file for the user operation consumer #[arg( long, env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE" )] - pub user_operation_consumer_properties: Option, + pub user_operation_consumer_properties: String, /// Consumer group id for user operation topic (set uniquely per deployment) #[arg( diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 4f2daad..d5b0a91 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,4 +1,7 @@ -use account_abstraction_core::{kafka_mempool_engine::KafkaEvent, types::{VersionedUserOperation, WrappedUserOperation}}; +use account_abstraction_core::{ + kafka_mempool_engine::KafkaEvent, + types::{VersionedUserOperation, WrappedUserOperation}, +}; use alloy_primitives::B256; use anyhow::Result; use async_trait::async_trait; @@ -84,15 +87,19 @@ impl UserOpQueuePublisher { self.queue.publish(&self.topic, &key, &payload).await } - fn create_user_op_added_event(&self, user_op: &VersionedUserOperation, hash: &B256) -> KafkaEvent { + fn create_user_op_added_event( + &self, + user_op: &VersionedUserOperation, + hash: &B256, + ) -> KafkaEvent { let wrapped_user_op = WrappedUserOperation { operation: user_op.clone(), - hash: hash.clone(), + hash: *hash, }; - let event = KafkaEvent::UserOpAdded { + + KafkaEvent::UserOpAdded { user_op: wrapped_user_op, - }; - event + } } } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index c7dec6a..7442562 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -562,7 +562,7 @@ mod tests { ingress_topic: String::new(), audit_kafka_properties: String::new(), audit_topic: String::new(), - user_operation_consumer_properties: None, + user_operation_consumer_properties: String::new(), user_operation_consumer_group_id: "tips-user-operation".to_string(), log_level: String::from("info"), log_format: tips_core::logger::LogFormat::Pretty, diff --git a/docker-compose.tips.yml b/docker-compose.tips.yml index 666802a..c73c226 100644 --- a/docker-compose.tips.yml +++ b/docker-compose.tips.yml @@ -15,6 +15,10 @@ services: volumes: - ./docker/ingress-bundles-kafka-properties:/app/docker/ingress-bundles-kafka-properties:ro - ./docker/ingress-audit-kafka-properties:/app/docker/ingress-audit-kafka-properties:ro + - ./docker/ingress-user-operation-consumer-kafka-properties:/app/docker/ingress-user-operation-consumer-kafka-properties:ro + depends_on: + kafka-setup: + condition: service_completed_successfully restart: unless-stopped audit: @@ -28,6 +32,9 @@ services: - .env.docker volumes: - ./docker/audit-kafka-properties:/app/docker/audit-kafka-properties:ro + depends_on: + kafka-setup: + condition: service_completed_successfully restart: unless-stopped ui: diff --git a/docker/ingress-user-operation-consumer-kafka-properties b/docker/ingress-user-operation-consumer-kafka-properties new file mode 100644 index 0000000..3bb02bf --- /dev/null +++ b/docker/ingress-user-operation-consumer-kafka-properties @@ -0,0 +1,9 @@ +# Kafka configuration properties for ingress user operation consumer +bootstrap.servers=host.docker.internal:9094 +message.timeout.ms=5000 +enable.partition.eof=false +session.timeout.ms=6000 +fetch.wait.max.ms=100 +fetch.min.bytes=1 +# Note: group.id and enable.auto.commit are set programmatically + From a21c15d5135585f7d3aea3d5632396e33ac3bcab Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 15:15:20 -0500 Subject: [PATCH 08/12] chore: update tracing --- Cargo.lock | 1 + crates/account-abstraction-core/Cargo.toml | 1 + .../core/src/kafka_mempool_engine.rs | 6 +++++- crates/ingress-rpc/src/kafka_mempool_consumer.rs | 3 ++- 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 829adc7..cca3b77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,6 +20,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tracing", "wiremock", ] diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index a63ebb9..e2a28f6 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -24,6 +24,7 @@ alloy-sol-types.workspace= true anyhow.workspace = true rdkafka.workspace = true serde_json.workspace = true +tracing.workspace=true [dev-dependencies] alloy-primitives.workspace = true diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 25304e4..3d567ca 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; use tokio::sync::RwLock; +use tracing::info; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] @@ -86,7 +87,10 @@ impl KafkaMempoolEngine { } async fn handle_event(&self, event: KafkaEvent) -> anyhow::Result<()> { - println!("Handling Kafka event: {:?}", event); + info!( + event = ?event, + "Kafka mempool engine handling event" + ); match event { KafkaEvent::UserOpAdded { user_op } => { self.mempool.write().await.add_operation(&user_op)?; diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs index 63efd80..2e510c9 100644 --- a/crates/ingress-rpc/src/kafka_mempool_consumer.rs +++ b/crates/ingress-rpc/src/kafka_mempool_consumer.rs @@ -38,7 +38,8 @@ pub fn create_mempool_engine( consumer_group_id: &str, pool_config: Option, ) -> anyhow::Result> { - let consumer = create_user_operation_consumer(properties_file, topic, consumer_group_id)?; + let consumer: StreamConsumer = + create_user_operation_consumer(properties_file, topic, consumer_group_id)?; Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( Arc::new(consumer), pool_config, From 9a4221ca0caa0b44d4aa4764c64851e352e33943 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 15:22:01 -0500 Subject: [PATCH 09/12] chore: update comments --- .../core/src/mempool.rs | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 7feaaef..7fd6dd5 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -64,7 +64,6 @@ impl Ord for ByMaxFeeAndSubmissionId { .max_priority_fee_per_gas() .cmp(&self.0.pool_operation.operation.max_priority_fee_per_gas()) .then_with(|| self.0.submission_id.cmp(&other.0.submission_id)) - .then_with(|| self.0.pool_operation.hash.cmp(&other.0.pool_operation.hash)) } } @@ -299,39 +298,6 @@ mod tests { ); } - // Tests adding an operation with the same hash but lower gas price - #[test] - fn test_add_operation_duplicate_hash_lower_gas() { - let mut mempool = create_test_mempool(1000); - let hash = FixedBytes::from([1u8; 32]); - - let operation1 = create_wrapped_operation(3000, hash); - let result1 = mempool.add_operation(&operation1); - assert!(result1.is_ok()); - assert!(result1.unwrap().is_some()); - - let operation2 = create_wrapped_operation(2000, hash); - let result2 = mempool.add_operation(&operation2); - assert!(result2.is_ok()); - assert!(result2.unwrap().is_none()); - } - - // Tests adding an operation with the same hash and equal gas price - #[test] - fn test_add_operation_duplicate_hash_equal_gas() { - let mut mempool = create_test_mempool(1000); - let hash = FixedBytes::from([1u8; 32]); - - let operation1 = create_wrapped_operation(2000, hash); - let result1 = mempool.add_operation(&operation1); - assert!(result1.is_ok()); - assert!(result1.unwrap().is_some()); - - let operation2 = create_wrapped_operation(2000, hash); - let result2 = mempool.add_operation(&operation2); - assert!(result2.is_ok()); - assert!(result2.unwrap().is_none()); - } // Tests adding multiple operations with different hashes #[test] From 08b105522f32d9d1fdbd20474b7ede7a0862d745 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 15:33:44 -0500 Subject: [PATCH 10/12] chore: format --- crates/account-abstraction-core/core/src/mempool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 7fd6dd5..30988e2 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -298,7 +298,6 @@ mod tests { ); } - // Tests adding multiple operations with different hashes #[test] fn test_add_multiple_operations_with_different_hashes() { From 8449cafb3810ca740f38086dcdd7ea633b162abc Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Wed, 17 Dec 2025 16:19:13 -0500 Subject: [PATCH 11/12] chore: setup lock and implement services --- Cargo.lock | 1 + crates/account-abstraction-core/Cargo.toml | 1 + .../core/src/kafka_mempool_engine.rs | 45 ++++++++++- crates/ingress-rpc/src/bin/main.rs | 4 +- .../ingress-rpc/src/kafka_mempool_consumer.rs | 79 ------------------- crates/ingress-rpc/src/lib.rs | 1 - 6 files changed, 45 insertions(+), 86 deletions(-) delete mode 100644 crates/ingress-rpc/src/kafka_mempool_consumer.rs diff --git a/Cargo.lock b/Cargo.lock index cca3b77..079de98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,6 +19,7 @@ dependencies = [ "reth-rpc-eth-types", "serde", "serde_json", + "tips-core", "tokio", "tracing", "wiremock", diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index e2a28f6..39b1591 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -24,6 +24,7 @@ alloy-sol-types.workspace= true anyhow.workspace = true rdkafka.workspace = true serde_json.workspace = true +tips-core.workspace = true tracing.workspace=true [dev-dependencies] diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs index 3d567ca..7165ef4 100644 --- a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -2,12 +2,17 @@ use crate::mempool::PoolConfig; use crate::mempool::{self, Mempool}; use crate::types::WrappedUserOperation; use async_trait::async_trait; -use rdkafka::{Message, consumer::StreamConsumer, message::OwnedMessage}; +use rdkafka::{ + ClientConfig, Message, + consumer::{Consumer, StreamConsumer}, + message::OwnedMessage, +}; use serde::{Deserialize, Serialize}; use serde_json; use std::sync::Arc; +use tips_core::kafka::load_kafka_config_from_file; use tokio::sync::RwLock; -use tracing::info; +use tracing::{info, warn}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] @@ -68,9 +73,11 @@ impl KafkaMempoolEngine { self.mempool.clone() } - pub async fn run(&self) -> anyhow::Result<()> { + pub async fn run(&self) { loop { - self.process_next().await?; + if let Err(err) = self.process_next().await { + warn!(error = %err, "Kafka mempool engine error, continuing"); + } } } @@ -106,6 +113,36 @@ impl KafkaMempoolEngine { } } +fn create_user_operation_consumer( + properties_file: &str, + topic: &str, + consumer_group_id: &str, +) -> anyhow::Result { + let mut client_config = ClientConfig::from_iter(load_kafka_config_from_file(properties_file)?); + + client_config.set("group.id", consumer_group_id); + client_config.set("enable.auto.commit", "true"); + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic])?; + + Ok(consumer) +} + +pub fn create_mempool_engine( + properties_file: &str, + topic: &str, + consumer_group_id: &str, + pool_config: Option, +) -> anyhow::Result> { + let consumer: StreamConsumer = + create_user_operation_consumer(properties_file, topic, consumer_group_id)?; + Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(consumer), + pool_config, + ))) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index a3f1ce8..ef49082 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -1,3 +1,4 @@ +use account_abstraction_core::kafka_mempool_engine::create_mempool_engine; use alloy_provider::ProviderBuilder; use clap::Parser; use jsonrpsee::server::Server; @@ -11,7 +12,6 @@ use tips_core::{Bundle, MeterBundleResponse}; use tips_ingress_rpc::Config; use tips_ingress_rpc::connect_ingress_to_builder; use tips_ingress_rpc::health::bind_health_server; -use tips_ingress_rpc::kafka_mempool_consumer::{create_mempool_engine, run_mempool_engine}; use tips_ingress_rpc::metrics::init_prometheus_exporter; use tips_ingress_rpc::queue::KafkaMessageQueue; use tips_ingress_rpc::service::{IngressApiServer, IngressService, Providers}; @@ -85,7 +85,7 @@ async fn main() -> anyhow::Result<()> { let mempool_engine_handle = { let engine = mempool_engine.clone(); - tokio::spawn(async move { run_mempool_engine(engine).await }) + tokio::spawn(async move { engine.run().await }) }; let (builder_tx, _) = diff --git a/crates/ingress-rpc/src/kafka_mempool_consumer.rs b/crates/ingress-rpc/src/kafka_mempool_consumer.rs deleted file mode 100644 index 2e510c9..0000000 --- a/crates/ingress-rpc/src/kafka_mempool_consumer.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use account_abstraction_core::{kafka_mempool_engine::KafkaMempoolEngine, mempool::PoolConfig}; -use backon::{ExponentialBuilder, Retryable}; -use rdkafka::{ - ClientConfig, - consumer::{Consumer, StreamConsumer}, -}; -use tips_core::kafka::load_kafka_config_from_file; -use tracing::warn; - -/// Build a Kafka consumer for the user operation topic. -/// Ensures the consumer group id is set per deployment and subscribes to the topic. -fn create_user_operation_consumer( - properties_file: &str, - topic: &str, - consumer_group_id: &str, -) -> anyhow::Result { - let mut client_config = ClientConfig::from_iter(load_kafka_config_from_file(properties_file)?); - - // Allow deployments to control group id even if the properties file omits it. - client_config.set("group.id", consumer_group_id); - // Rely on Kafka for at-least-once; we keep auto commit enabled unless overridden in the file. - client_config.set("enable.auto.commit", "true"); - - let consumer: StreamConsumer = client_config.create()?; - consumer.subscribe(&[topic])?; - - Ok(consumer) -} - -/// Factory function that creates a fully configured KafkaMempoolEngine. -/// Handles consumer creation, engine instantiation, and Arc wrapping. -pub fn create_mempool_engine( - properties_file: &str, - topic: &str, - consumer_group_id: &str, - pool_config: Option, -) -> anyhow::Result> { - let consumer: StreamConsumer = - create_user_operation_consumer(properties_file, topic, consumer_group_id)?; - Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( - Arc::new(consumer), - pool_config, - ))) -} - -/// Process a single Kafka message with exponential backoff retries. -pub async fn process_next_with_backoff(engine: &KafkaMempoolEngine) -> anyhow::Result<()> { - let process = || async { engine.process_next().await }; - - process - .retry( - &ExponentialBuilder::default() - .with_min_delay(Duration::from_millis(100)) - .with_max_delay(Duration::from_secs(5)) - .with_max_times(5), - ) - .notify(|err: &anyhow::Error, dur: Duration| { - warn!( - error = %err, - retry_in_ms = dur.as_millis(), - "Retrying Kafka mempool engine step" - ); - }) - .await -} - -/// Run the mempool engine forever, applying backoff on individual message failures. -pub async fn run_mempool_engine(engine: Arc) { - loop { - if let Err(err) = process_next_with_backoff(&engine).await { - // We log and continue to avoid stalling the consumer; repeated failures - // will still observe backoff inside `process_next_with_backoff`. - warn!(error = %err, "Kafka mempool engine exhausted retries, continuing"); - } - } -} diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index cc58640..b694d03 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -1,5 +1,4 @@ pub mod health; -pub mod kafka_mempool_consumer; pub mod metrics; pub mod queue; pub mod service; From 44d41b2f34b0d0607bfb5d65fb71eb462bd24d71 Mon Sep 17 00:00:00 2001 From: Rayyan Alam Date: Thu, 18 Dec 2025 14:42:35 -0500 Subject: [PATCH 12/12] chore: update reputation service --- crates/account-abstraction-core/core/src/mempool.rs | 6 +++--- .../account-abstraction-core/core/src/reputation_service.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index 30988e2..ecb46be 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -103,7 +103,7 @@ pub trait Mempool { &mut self, operation: &WrappedUserOperation, ) -> Result, anyhow::Error>; - fn get_top_operations(&self, n: usize) -> impl Iterator>; + fn get_top_operations(&self, n: usize) -> impl Iterator; fn remove_operation( &mut self, operation_hash: &UserOpHash, @@ -132,7 +132,7 @@ impl Mempool for MempoolImpl { Ok(ordered_operation_result) } - fn get_top_operations(&self, n: usize) -> impl Iterator> { + fn get_top_operations(&self, n: usize) -> impl Iterator { // TODO: There is a case where we skip operations that are not the lowest nonce for an account. // But we still have not given the N number of operations, meaning we don't return those operations. @@ -148,7 +148,7 @@ impl Mempool for MempoolImpl { Some(lowest) if lowest.0.pool_operation.hash == op_by_fee.0.pool_operation.hash => { - Some(Arc::new(op_by_fee.0.pool_operation.clone())) + Some(op_by_fee.0.pool_operation.clone()) } Some(_) => None, None => { diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs index f0a5e85..c946073 100644 --- a/crates/account-abstraction-core/core/src/reputation_service.rs +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -23,7 +23,7 @@ pub struct ReputationServiceImpl { } impl ReputationServiceImpl { - pub fn new(mempool: Arc>) -> Self { + pub async fn new(mempool: Arc>) -> Self { Self { mempool } } }