diff --git a/.env.example b/.env.example index bb4a66a..4058bd9 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,7 @@ TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE=/app/docker/ingress-bundles-kafka-pro TIPS_INGRESS_KAFKA_INGRESS_TOPIC=tips-ingress TIPS_INGRESS_KAFKA_AUDIT_PROPERTIES_FILE=/app/docker/ingress-audit-kafka-properties TIPS_INGRESS_KAFKA_AUDIT_TOPIC=tips-audit +TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE=/app/docker/ingress-user-operation-consumer-kafka-properties TIPS_INGRESS_LOG_LEVEL=info TIPS_INGRESS_LOG_FORMAT=pretty TIPS_INGRESS_SEND_TRANSACTION_DEFAULT_LIFETIME_SECONDS=10800 diff --git a/Cargo.lock b/Cargo.lock index 49c509f..079de98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,10 +15,13 @@ dependencies = [ "async-trait", "jsonrpsee", "op-alloy-network", + "rdkafka", "reth-rpc-eth-types", "serde", "serde_json", + "tips-core", "tokio", + "tracing", "wiremock", ] diff --git a/crates/account-abstraction-core/Cargo.toml b/crates/account-abstraction-core/Cargo.toml index 55277ca..39b1591 100644 --- a/crates/account-abstraction-core/Cargo.toml +++ b/crates/account-abstraction-core/Cargo.toml @@ -22,8 +22,11 @@ jsonrpsee.workspace = true async-trait = { workspace = true } alloy-sol-types.workspace= true anyhow.workspace = true +rdkafka.workspace = true +serde_json.workspace = true +tips-core.workspace = true +tracing.workspace=true [dev-dependencies] alloy-primitives.workspace = true -serde_json.workspace = true wiremock.workspace = true diff --git a/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs new file mode 100644 index 0000000..7165ef4 --- /dev/null +++ b/crates/account-abstraction-core/core/src/kafka_mempool_engine.rs @@ -0,0 +1,275 @@ +use crate::mempool::PoolConfig; +use crate::mempool::{self, Mempool}; +use crate::types::WrappedUserOperation; +use async_trait::async_trait; +use rdkafka::{ + ClientConfig, Message, + consumer::{Consumer, StreamConsumer}, + message::OwnedMessage, +}; +use serde::{Deserialize, Serialize}; +use serde_json; +use std::sync::Arc; +use tips_core::kafka::load_kafka_config_from_file; +use tokio::sync::RwLock; +use tracing::{info, warn}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "event", content = "data")] +pub enum KafkaEvent { + UserOpAdded { + user_op: WrappedUserOperation, + }, + UserOpIncluded { + user_op: WrappedUserOperation, + }, + UserOpDropped { + user_op: WrappedUserOperation, + reason: String, + }, +} + +#[async_trait] +pub trait KafkaConsumer: Send + Sync { + async fn recv_msg(&self) -> anyhow::Result; +} + +#[async_trait] +impl KafkaConsumer for StreamConsumer { + async fn recv_msg(&self) -> anyhow::Result { + Ok(self.recv().await?.detach()) + } +} + +pub struct KafkaMempoolEngine { + mempool: Arc>, + kafka_consumer: Arc, +} + +impl KafkaMempoolEngine { + pub fn new( + mempool: Arc>, + kafka_consumer: Arc, + ) -> Self { + Self { + mempool, + kafka_consumer, + } + } + + pub fn with_kafka_consumer( + kafka_consumer: Arc, + pool_config: Option, + ) -> Self { + let pool_config = pool_config.unwrap_or(PoolConfig::default()); + let mempool = Arc::new(RwLock::new(mempool::MempoolImpl::new(pool_config))); + Self { + mempool, + kafka_consumer, + } + } + + pub fn get_mempool(&self) -> Arc> { + self.mempool.clone() + } + + pub async fn run(&self) { + loop { + if let Err(err) = self.process_next().await { + warn!(error = %err, "Kafka mempool engine error, continuing"); + } + } + } + + /// Process a single Kafka message (useful for tests and controlled loops) + pub async fn process_next(&self) -> anyhow::Result<()> { + let msg = self.kafka_consumer.recv_msg().await?; + let payload = msg + .payload() + .ok_or_else(|| anyhow::anyhow!("Kafka message missing payload"))?; + let event: KafkaEvent = serde_json::from_slice(payload) + .map_err(|e| anyhow::anyhow!("Failed to parse Kafka event: {e}"))?; + + self.handle_event(event).await + } + + async fn handle_event(&self, event: KafkaEvent) -> anyhow::Result<()> { + info!( + event = ?event, + "Kafka mempool engine handling event" + ); + match event { + KafkaEvent::UserOpAdded { user_op } => { + self.mempool.write().await.add_operation(&user_op)?; + } + KafkaEvent::UserOpIncluded { user_op } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + KafkaEvent::UserOpDropped { user_op, reason: _ } => { + self.mempool.write().await.remove_operation(&user_op.hash)?; + } + } + Ok(()) + } +} + +fn create_user_operation_consumer( + properties_file: &str, + topic: &str, + consumer_group_id: &str, +) -> anyhow::Result { + let mut client_config = ClientConfig::from_iter(load_kafka_config_from_file(properties_file)?); + + client_config.set("group.id", consumer_group_id); + client_config.set("enable.auto.commit", "true"); + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic])?; + + Ok(consumer) +} + +pub fn create_mempool_engine( + properties_file: &str, + topic: &str, + consumer_group_id: &str, + pool_config: Option, +) -> anyhow::Result> { + let consumer: StreamConsumer = + create_user_operation_consumer(properties_file, topic, consumer_group_id)?; + Ok(Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(consumer), + pool_config, + ))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mempool::PoolConfig; + use crate::types::VersionedUserOperation; + use alloy_primitives::{Address, FixedBytes, Uint}; + use alloy_rpc_types::erc4337; + use rdkafka::Timestamp; + use tokio::sync::Mutex; + + fn make_wrapped_op(max_fee: u128, hash: [u8; 32]) -> WrappedUserOperation { + let op = VersionedUserOperation::UserOperation(erc4337::UserOperation { + sender: Address::ZERO, + nonce: Uint::from(0u64), + init_code: Default::default(), + call_data: Default::default(), + call_gas_limit: Uint::from(100_000u64), + verification_gas_limit: Uint::from(100_000u64), + pre_verification_gas: Uint::from(21_000u64), + max_fee_per_gas: Uint::from(max_fee), + max_priority_fee_per_gas: Uint::from(max_fee), + paymaster_and_data: Default::default(), + signature: Default::default(), + }); + + WrappedUserOperation { + operation: op, + hash: FixedBytes::from(hash), + } + } + + #[tokio::test] + async fn handle_add_operation() { + let mempool = Arc::new(RwLock::new( + mempool::MempoolImpl::new(PoolConfig::default()), + )); + + let op_hash = [1u8; 32]; + let wrapped = make_wrapped_op(1_000, op_hash); + + let add_event = KafkaEvent::UserOpAdded { + user_op: wrapped.clone(), + }; + let mock_consumer = Arc::new(MockConsumer::new(vec![OwnedMessage::new( + Some(serde_json::to_vec(&add_event).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + )])); + + let engine = KafkaMempoolEngine::new(mempool.clone(), mock_consumer); + + // Process add then remove deterministically + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from(op_hash)); + } + + #[tokio::test] + async fn remove_opperation_should_remove_from_mempool() { + let mempool = Arc::new(RwLock::new( + mempool::MempoolImpl::new(PoolConfig::default()), + )); + let op_hash = [1u8; 32]; + let wrapped = make_wrapped_op(1_000, op_hash); + let add_mempool = KafkaEvent::UserOpAdded { + user_op: wrapped.clone(), + }; + let remove_mempool = KafkaEvent::UserOpDropped { + user_op: wrapped.clone(), + reason: "test".to_string(), + }; + let mock_consumer = Arc::new(MockConsumer::new(vec![ + OwnedMessage::new( + Some(serde_json::to_vec(&add_mempool).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ), + OwnedMessage::new( + Some(serde_json::to_vec(&remove_mempool).unwrap()), + None, + "topic".to_string(), + Timestamp::NotAvailable, + 0, + 0, + None, + ), + ])); + + let engine = KafkaMempoolEngine::new(mempool.clone(), mock_consumer); + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].hash, FixedBytes::from(op_hash)); + engine.process_next().await.unwrap(); + let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + assert_eq!(items.len(), 0); + } + struct MockConsumer { + msgs: Mutex>, + } + + impl MockConsumer { + fn new(msgs: Vec) -> Self { + Self { + msgs: Mutex::new(msgs), + } + } + } + + #[async_trait] + impl KafkaConsumer for MockConsumer { + async fn recv_msg(&self) -> anyhow::Result { + let mut guard = self.msgs.lock().await; + if guard.is_empty() { + Err(anyhow::anyhow!("no more messages")) + } else { + Ok(guard.remove(0)) + } + } + } +} diff --git a/crates/account-abstraction-core/core/src/lib.rs b/crates/account-abstraction-core/core/src/lib.rs index b3aa2f7..4fda184 100644 --- a/crates/account-abstraction-core/core/src/lib.rs +++ b/crates/account-abstraction-core/core/src/lib.rs @@ -3,4 +3,6 @@ pub mod entrypoints; pub mod types; pub use account_abstraction_service::{AccountAbstractionService, AccountAbstractionServiceImpl}; pub use types::{SendUserOperationResponse, VersionedUserOperation}; +pub mod kafka_mempool_engine; pub mod mempool; +pub mod reputation_service; diff --git a/crates/account-abstraction-core/core/src/mempool.rs b/crates/account-abstraction-core/core/src/mempool.rs index d5f2fe0..ecb46be 100644 --- a/crates/account-abstraction-core/core/src/mempool.rs +++ b/crates/account-abstraction-core/core/src/mempool.rs @@ -9,6 +9,14 @@ pub struct PoolConfig { minimum_max_fee_per_gas: u128, } +impl PoolConfig { + pub fn default() -> Self { + Self { + minimum_max_fee_per_gas: 0, + } + } +} + #[derive(Eq, PartialEq, Clone, Debug)] pub struct OrderedPoolOperation { pub pool_operation: WrappedUserOperation, @@ -56,7 +64,6 @@ impl Ord for ByMaxFeeAndSubmissionId { .max_priority_fee_per_gas() .cmp(&self.0.pool_operation.operation.max_priority_fee_per_gas()) .then_with(|| self.0.submission_id.cmp(&other.0.submission_id)) - .then_with(|| self.0.pool_operation.hash.cmp(&other.0.pool_operation.hash)) } } @@ -96,7 +103,7 @@ pub trait Mempool { &mut self, operation: &WrappedUserOperation, ) -> Result, anyhow::Error>; - fn get_top_operations(&self, n: usize) -> impl Iterator>; + fn get_top_operations(&self, n: usize) -> impl Iterator; fn remove_operation( &mut self, operation_hash: &UserOpHash, @@ -125,7 +132,7 @@ impl Mempool for MempoolImpl { Ok(ordered_operation_result) } - fn get_top_operations(&self, n: usize) -> impl Iterator> { + fn get_top_operations(&self, n: usize) -> impl Iterator { // TODO: There is a case where we skip operations that are not the lowest nonce for an account. // But we still have not given the N number of operations, meaning we don't return those operations. @@ -141,7 +148,7 @@ impl Mempool for MempoolImpl { Some(lowest) if lowest.0.pool_operation.hash == op_by_fee.0.pool_operation.hash => { - Some(Arc::new(op_by_fee.0.pool_operation.clone())) + Some(op_by_fee.0.pool_operation.clone()) } Some(_) => None, None => { @@ -291,40 +298,6 @@ mod tests { ); } - // Tests adding an operation with the same hash but lower gas price - #[test] - fn test_add_operation_duplicate_hash_lower_gas() { - let mut mempool = create_test_mempool(1000); - let hash = FixedBytes::from([1u8; 32]); - - let operation1 = create_wrapped_operation(3000, hash); - let result1 = mempool.add_operation(&operation1); - assert!(result1.is_ok()); - assert!(result1.unwrap().is_some()); - - let operation2 = create_wrapped_operation(2000, hash); - let result2 = mempool.add_operation(&operation2); - assert!(result2.is_ok()); - assert!(result2.unwrap().is_none()); - } - - // Tests adding an operation with the same hash and equal gas price - #[test] - fn test_add_operation_duplicate_hash_equal_gas() { - let mut mempool = create_test_mempool(1000); - let hash = FixedBytes::from([1u8; 32]); - - let operation1 = create_wrapped_operation(2000, hash); - let result1 = mempool.add_operation(&operation1); - assert!(result1.is_ok()); - assert!(result1.unwrap().is_some()); - - let operation2 = create_wrapped_operation(2000, hash); - let result2 = mempool.add_operation(&operation2); - assert!(result2.is_ok()); - assert!(result2.unwrap().is_none()); - } - // Tests adding multiple operations with different hashes #[test] fn test_add_multiple_operations_with_different_hashes() { diff --git a/crates/account-abstraction-core/core/src/reputation_service.rs b/crates/account-abstraction-core/core/src/reputation_service.rs new file mode 100644 index 0000000..c946073 --- /dev/null +++ b/crates/account-abstraction-core/core/src/reputation_service.rs @@ -0,0 +1,35 @@ +use crate::mempool; +use alloy_primitives::Address; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Reputation status for an entity +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ReputationStatus { + /// Entity is not throttled or banned + Ok, + /// Entity is throttled + Throttled, + /// Entity is banned + Banned, +} + +pub trait ReputationService { + fn get_reputation(&self, entity: &Address) -> ReputationStatus; +} + +pub struct ReputationServiceImpl { + mempool: Arc>, +} + +impl ReputationServiceImpl { + pub async fn new(mempool: Arc>) -> Self { + Self { mempool } + } +} + +impl ReputationService for ReputationServiceImpl { + fn get_reputation(&self, _entity: &Address) -> ReputationStatus { + ReputationStatus::Ok + } +} diff --git a/crates/account-abstraction-core/core/src/types.rs b/crates/account-abstraction-core/core/src/types.rs index 4600839..3d79564 100644 --- a/crates/account-abstraction-core/core/src/types.rs +++ b/crates/account-abstraction-core/core/src/types.rs @@ -138,7 +138,7 @@ pub struct AggregatorInfo { pub type UserOpHash = FixedBytes<32>; -#[derive(Eq, PartialEq, Clone, Debug)] +#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct WrappedUserOperation { pub operation: VersionedUserOperation, pub hash: UserOpHash, diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 67da612..ef49082 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -1,3 +1,4 @@ +use account_abstraction_core::kafka_mempool_engine::create_mempool_engine; use alloy_provider::ProviderBuilder; use clap::Parser; use jsonrpsee::server::Server; @@ -73,6 +74,20 @@ async fn main() -> anyhow::Result<()> { let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); + let user_op_properties_file = &config.user_operation_consumer_properties; + + let mempool_engine = create_mempool_engine( + user_op_properties_file, + &config.user_operation_topic, + &config.user_operation_consumer_group_id, + None, + )?; + + let mempool_engine_handle = { + let engine = mempool_engine.clone(); + tokio::spawn(async move { engine.run().await }) + }; + let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); let (builder_backrun_tx, _) = broadcast::channel::(config.max_buffered_backrun_bundles); @@ -95,6 +110,7 @@ async fn main() -> anyhow::Result<()> { audit_tx, builder_tx, builder_backrun_tx, + mempool_engine.clone(), cfg, ); let bind_addr = format!("{}:{}", config.address, config.port); @@ -110,6 +126,7 @@ async fn main() -> anyhow::Result<()> { handle.stopped().await; health_handle.abort(); + mempool_engine_handle.abort(); Ok(()) } diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 649e120..b694d03 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -85,6 +85,21 @@ pub struct Config { )] pub audit_topic: String, + /// Kafka properties file for the user operation consumer + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_PROPERTIES_FILE" + )] + pub user_operation_consumer_properties: String, + + /// Consumer group id for user operation topic (set uniquely per deployment) + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPERATION_CONSUMER_GROUP_ID", + default_value = "tips-user-operation" + )] + pub user_operation_consumer_group_id: String, + /// User operation topic for pushing valid user operations #[arg( long, diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index ee6063c..d5b0a91 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,4 +1,7 @@ -use account_abstraction_core::types::VersionedUserOperation; +use account_abstraction_core::{ + kafka_mempool_engine::KafkaEvent, + types::{VersionedUserOperation, WrappedUserOperation}, +}; use alloy_primitives::B256; use anyhow::Result; use async_trait::async_trait; @@ -79,9 +82,25 @@ impl UserOpQueuePublisher { pub async fn publish(&self, user_op: &VersionedUserOperation, hash: &B256) -> Result<()> { let key = hash.to_string(); - let payload = serde_json::to_vec(&user_op)?; + let event = self.create_user_op_added_event(user_op, hash); + let payload = serde_json::to_vec(&event)?; self.queue.publish(&self.topic, &key, &payload).await } + + fn create_user_op_added_event( + &self, + user_op: &VersionedUserOperation, + hash: &B256, + ) -> KafkaEvent { + let wrapped_user_op = WrappedUserOperation { + operation: user_op.clone(), + hash: *hash, + }; + + KafkaEvent::UserOpAdded { + user_op: wrapped_user_op, + } + } } pub struct BundleQueuePublisher { diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index f0d3341..7442562 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,3 +1,5 @@ +use account_abstraction_core::kafka_mempool_engine::KafkaMempoolEngine; +use account_abstraction_core::reputation_service::ReputationStatus; use alloy_consensus::transaction::Recovered; use alloy_consensus::{Transaction, transaction::SignerRecoverable}; use alloy_primitives::{Address, B256, Bytes, FixedBytes}; @@ -25,7 +27,10 @@ use crate::validation::validate_bundle; use crate::{Config, TxSubmissionMethod}; use account_abstraction_core::entrypoints::version::EntryPointVersion; use account_abstraction_core::types::{UserOperationRequest, VersionedUserOperation}; -use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl}; +use account_abstraction_core::{ + AccountAbstractionService, AccountAbstractionServiceImpl, + reputation_service::{ReputationService, ReputationServiceImpl}, +}; use std::sync::Arc; /// RPC providers for different endpoints @@ -69,6 +74,8 @@ pub struct IngressService { tx_submission_method: TxSubmissionMethod, bundle_queue_publisher: BundleQueuePublisher, user_op_queue_publisher: UserOpQueuePublisher, + reputation_service: Arc, + mempool_engine: Arc, audit_channel: mpsc::UnboundedSender, send_transaction_default_lifetime_seconds: u64, metrics: Metrics, @@ -86,6 +93,7 @@ impl IngressService { audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, builder_backrun_tx: broadcast::Sender, + mempool_engine: Arc, config: Config, ) -> Self { let mempool_provider = Arc::new(providers.mempool); @@ -97,6 +105,7 @@ impl IngressService { config.validate_user_operation_timeout_ms, ); let queue_connection = Arc::new(queue); + Self { mempool_provider, simulation_provider, @@ -111,6 +120,8 @@ impl IngressService { queue_connection.clone(), config.ingress_topic, ), + reputation_service: Arc::new(ReputationServiceImpl::new(mempool_engine.get_mempool())), + mempool_engine, audit_channel, send_transaction_default_lifetime_seconds: config .send_transaction_default_lifetime_seconds, @@ -347,6 +358,20 @@ impl IngressApiServer for IngressService { EthApiError::InvalidParams(e.to_string()).into_rpc_err() })?; + let reputation = self + .reputation_service + .get_reputation(&request.user_operation.sender()); + if reputation == ReputationStatus::Banned { + return Err( + EthApiError::InvalidParams("User operation sender is banned".into()).into_rpc_err(), + ); + } else if reputation == ReputationStatus::Throttled { + return Err( + EthApiError::InvalidParams("User operation sender is throttled".into()) + .into_rpc_err(), + ); + } + let _ = self .account_abstraction_service .validate_user_operation(&request.user_operation, &entry_point) @@ -493,6 +518,7 @@ impl IngressService { mod tests { use super::*; use crate::{Config, TxSubmissionMethod, queue::MessageQueue}; + use account_abstraction_core::kafka_mempool_engine::KafkaConsumer; use alloy_provider::RootProvider; use anyhow::Result; use async_trait::async_trait; @@ -500,6 +526,7 @@ mod tests { use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; use jsonrpsee::server::{ServerBuilder, ServerHandle}; use mockall::mock; + use rdkafka::message::OwnedMessage; use serde_json::json; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; @@ -516,6 +543,15 @@ mod tests { } } + struct NoopConsumer; + + #[async_trait] + impl KafkaConsumer for NoopConsumer { + async fn recv_msg(&self) -> anyhow::Result { + Err(anyhow::anyhow!("no messages")) + } + } + fn create_test_config(mock_server: &MockServer) -> Config { Config { address: IpAddr::from([127, 0, 0, 1]), @@ -526,6 +562,8 @@ mod tests { ingress_topic: String::new(), audit_kafka_properties: String::new(), audit_topic: String::new(), + user_operation_consumer_properties: String::new(), + user_operation_consumer_group_id: "tips-user-operation".to_string(), log_level: String::from("info"), log_format: tips_core::logger::LogFormat::Pretty, send_transaction_default_lifetime_seconds: 300, @@ -652,8 +690,19 @@ mod tests { let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let mempool_engine = Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(NoopConsumer), + None, + )); + let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + mempool_engine, + config, ); let bundle = Bundle::default(); @@ -711,8 +760,19 @@ mod tests { let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); + let mempool_engine = Arc::new(KafkaMempoolEngine::with_kafka_consumer( + Arc::new(NoopConsumer), + None, + )); + let service = IngressService::new( - providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + providers, + MockQueue, + audit_tx, + builder_tx, + backrun_tx, + mempool_engine, + config, ); // Valid signed transaction bytes diff --git a/docker-compose.tips.yml b/docker-compose.tips.yml index 666802a..c73c226 100644 --- a/docker-compose.tips.yml +++ b/docker-compose.tips.yml @@ -15,6 +15,10 @@ services: volumes: - ./docker/ingress-bundles-kafka-properties:/app/docker/ingress-bundles-kafka-properties:ro - ./docker/ingress-audit-kafka-properties:/app/docker/ingress-audit-kafka-properties:ro + - ./docker/ingress-user-operation-consumer-kafka-properties:/app/docker/ingress-user-operation-consumer-kafka-properties:ro + depends_on: + kafka-setup: + condition: service_completed_successfully restart: unless-stopped audit: @@ -28,6 +32,9 @@ services: - .env.docker volumes: - ./docker/audit-kafka-properties:/app/docker/audit-kafka-properties:ro + depends_on: + kafka-setup: + condition: service_completed_successfully restart: unless-stopped ui: diff --git a/docker/ingress-user-operation-consumer-kafka-properties b/docker/ingress-user-operation-consumer-kafka-properties new file mode 100644 index 0000000..3bb02bf --- /dev/null +++ b/docker/ingress-user-operation-consumer-kafka-properties @@ -0,0 +1,9 @@ +# Kafka configuration properties for ingress user operation consumer +bootstrap.servers=host.docker.internal:9094 +message.timeout.ms=5000 +enable.partition.eof=false +session.timeout.ms=6000 +fetch.wait.max.ms=100 +fetch.min.bytes=1 +# Note: group.id and enable.auto.commit are set programmatically +