Skip to content

Commit 212ad6c

Browse files
feat(rpc): add newFlashblockTransactions subscription type (#287)
Adds `newFlashblockTransactions` to eth_subscribe for streaming transactions from flashblocks as they're sequenced. These have higher inclusion confidence than standard newPendingTransactions since they've already been included by the sequencer. Accepts optional boolean param: - true: full transaction objects - false (default): hashes only Changes: - NewFlashblockTransactions variant in BaseSubscriptionKind - get_pending_transactions/get_pending_transaction_hashes on PendingBlocks - Stream functions and subscription handler in pubsub - Tests for both modes Closes #280
1 parent 7b97937 commit 212ad6c

File tree

4 files changed

+197
-0
lines changed

4 files changed

+197
-0
lines changed

crates/flashblocks/src/pending_blocks.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,16 @@ impl PendingBlocks {
280280

281281
logs
282282
}
283+
284+
/// Returns all pending transactions from flashblocks.
285+
pub fn get_pending_transactions(&self) -> Vec<Transaction> {
286+
self.transactions.clone()
287+
}
288+
289+
/// Returns the hashes of all pending transactions from flashblocks.
290+
pub fn get_pending_transaction_hashes(&self) -> Vec<B256> {
291+
self.transactions.iter().map(|tx| tx.tx_hash()).collect()
292+
}
283293
}
284294

285295
impl PendingBlocksAPI for Guard<Option<Arc<PendingBlocks>>> {

crates/rpc/src/base/pubsub.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
77
use std::sync::Arc;
88

9+
use alloy_primitives::B256;
910
use alloy_rpc_types_eth::{Filter, Log, pubsub::Params};
1011
use base_reth_flashblocks::FlashblocksAPI;
1112
use jsonrpsee::{
@@ -15,6 +16,7 @@ use jsonrpsee::{
1516
server::SubscriptionMessage,
1617
};
1718
use op_alloy_network::Optimism;
19+
use op_alloy_rpc_types::Transaction;
1820
use reth_rpc::eth::EthPubSub as RethEthPubSub;
1921
use reth_rpc_eth_api::{
2022
EthApiTypes, RpcBlock, RpcNodeCore, RpcTransaction,
@@ -112,6 +114,52 @@ impl<Eth, FB> EthPubSub<Eth, FB> {
112114
},
113115
)
114116
}
117+
118+
/// Returns a stream that yields full transactions from pending flashblocks
119+
fn new_flashblock_transactions_full_stream(
120+
flashblocks_state: Arc<FB>,
121+
) -> impl Stream<Item = Vec<Transaction>>
122+
where
123+
FB: FlashblocksAPI + Send + Sync + 'static,
124+
{
125+
BroadcastStream::new(flashblocks_state.subscribe_to_flashblocks()).filter_map(|result| {
126+
let pending_blocks = match result {
127+
Ok(blocks) => blocks,
128+
Err(err) => {
129+
error!(
130+
message = "Error in flashblocks stream for transactions",
131+
error = %err
132+
);
133+
return None;
134+
}
135+
};
136+
let txs = pending_blocks.get_pending_transactions();
137+
if txs.is_empty() { None } else { Some(txs) }
138+
})
139+
}
140+
141+
/// Returns a stream that yields transaction hashes from pending flashblocks
142+
fn new_flashblock_transactions_hash_stream(
143+
flashblocks_state: Arc<FB>,
144+
) -> impl Stream<Item = Vec<B256>>
145+
where
146+
FB: FlashblocksAPI + Send + Sync + 'static,
147+
{
148+
BroadcastStream::new(flashblocks_state.subscribe_to_flashblocks()).filter_map(|result| {
149+
let pending_blocks = match result {
150+
Ok(blocks) => blocks,
151+
Err(err) => {
152+
error!(
153+
message = "Error in flashblocks stream for transaction hashes",
154+
error = %err
155+
);
156+
return None;
157+
}
158+
};
159+
let hashes = pending_blocks.get_pending_transaction_hashes();
160+
if hashes.is_empty() { None } else { Some(hashes) }
161+
})
162+
}
115163
}
116164

117165
#[async_trait]
@@ -165,6 +213,29 @@ where
165213
pipe_from_stream(sink, stream).await;
166214
});
167215
}
216+
BaseSubscriptionKind::NewFlashblockTransactions => {
217+
// Extract full_transactions param, default to false (hash only)
218+
let full = match params {
219+
Some(Params::Bool(full)) => full,
220+
_ => false,
221+
};
222+
223+
if full {
224+
let stream = Self::new_flashblock_transactions_full_stream(Arc::clone(
225+
&self.flashblocks_state,
226+
));
227+
tokio::spawn(async move {
228+
pipe_from_stream(sink, stream).await;
229+
});
230+
} else {
231+
let stream = Self::new_flashblock_transactions_hash_stream(Arc::clone(
232+
&self.flashblocks_state,
233+
));
234+
tokio::spawn(async move {
235+
pipe_from_stream(sink, stream).await;
236+
});
237+
}
238+
}
168239
}
169240

170241
Ok(())

crates/rpc/src/base/types.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ pub enum BaseSubscriptionKind {
5757
/// Unlike standard `logs` subscription which only includes logs from confirmed blocks,
5858
/// this includes logs from the current pending flashblock state.
5959
PendingLogs,
60+
/// New flashblock transactions subscription.
61+
///
62+
/// Returns transactions from flashblocks as they are sequenced, providing higher inclusion
63+
/// confidence than standard `newPendingTransactions` which returns mempool transactions.
64+
/// Flashblock transactions have been included by the sequencer and are effectively preconfirmed.
65+
///
66+
/// Accepts an optional boolean parameter:
67+
/// - `true`: Returns full transaction objects
68+
/// - `false` (default): Returns only transaction hashes
69+
NewFlashblockTransactions,
6070
}
6171

6272
impl ExtendedSubscriptionKind {

crates/rpc/tests/flashblocks_rpc.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,3 +985,109 @@ async fn test_eth_subscribe_new_heads() -> eyre::Result<()> {
985985

986986
Ok(())
987987
}
988+
989+
#[tokio::test]
990+
async fn test_eth_subscribe_new_flashblock_transactions_hashes() -> eyre::Result<()> {
991+
let setup = TestSetup::new().await?;
992+
let _provider = setup.harness.provider();
993+
let ws_url = setup.harness.ws_url();
994+
let (mut ws_stream, _) = connect_async(&ws_url).await?;
995+
996+
// Subscribe to newFlashblockTransactions with default (hash only) mode
997+
ws_stream
998+
.send(Message::Text(
999+
json!({
1000+
"jsonrpc": "2.0",
1001+
"id": 1,
1002+
"method": "eth_subscribe",
1003+
"params": ["newFlashblockTransactions"]
1004+
})
1005+
.to_string()
1006+
.into(),
1007+
))
1008+
.await?;
1009+
1010+
let response = ws_stream.next().await.unwrap()?;
1011+
let sub: serde_json::Value = serde_json::from_str(response.to_text()?)?;
1012+
assert_eq!(sub["jsonrpc"], "2.0");
1013+
assert_eq!(sub["id"], 1);
1014+
let subscription_id = sub["result"].as_str().expect("subscription id expected");
1015+
1016+
// Send first flashblock with L1 deposit tx
1017+
setup.send_flashblock(setup.create_first_payload()).await?;
1018+
1019+
let notification = ws_stream.next().await.unwrap()?;
1020+
let notif: serde_json::Value = serde_json::from_str(notification.to_text()?)?;
1021+
assert_eq!(notif["method"], "eth_subscription");
1022+
assert_eq!(notif["params"]["subscription"], subscription_id);
1023+
1024+
// Result should be an array of transaction hashes (strings)
1025+
let txs = notif["params"]["result"].as_array().expect("expected array of tx hashes");
1026+
assert_eq!(txs.len(), 1);
1027+
assert!(txs[0].is_string(), "Expected hash string, got: {:?}", txs[0]);
1028+
1029+
// Send second flashblock with more transactions
1030+
setup.send_flashblock(setup.create_second_payload()).await?;
1031+
1032+
let notification2 = ws_stream.next().await.unwrap()?;
1033+
let notif2: serde_json::Value = serde_json::from_str(notification2.to_text()?)?;
1034+
let txs2 = notif2["params"]["result"].as_array().expect("expected array of tx hashes");
1035+
assert_eq!(txs2.len(), 6);
1036+
assert!(txs2.iter().all(|tx| tx.is_string()));
1037+
1038+
Ok(())
1039+
}
1040+
1041+
#[tokio::test]
1042+
async fn test_eth_subscribe_new_flashblock_transactions_full() -> eyre::Result<()> {
1043+
let setup = TestSetup::new().await?;
1044+
let _provider = setup.harness.provider();
1045+
let ws_url = setup.harness.ws_url();
1046+
let (mut ws_stream, _) = connect_async(&ws_url).await?;
1047+
1048+
// Subscribe to newFlashblockTransactions with full transaction objects (true)
1049+
ws_stream
1050+
.send(Message::Text(
1051+
json!({
1052+
"jsonrpc": "2.0",
1053+
"id": 1,
1054+
"method": "eth_subscribe",
1055+
"params": ["newFlashblockTransactions", true]
1056+
})
1057+
.to_string()
1058+
.into(),
1059+
))
1060+
.await?;
1061+
1062+
let response = ws_stream.next().await.unwrap()?;
1063+
let sub: serde_json::Value = serde_json::from_str(response.to_text()?)?;
1064+
assert_eq!(sub["jsonrpc"], "2.0");
1065+
assert_eq!(sub["id"], 1);
1066+
let subscription_id = sub["result"].as_str().expect("subscription id expected");
1067+
1068+
// Send flashblocks
1069+
setup.send_flashblock(setup.create_first_payload()).await?;
1070+
1071+
let notification = ws_stream.next().await.unwrap()?;
1072+
let notif: serde_json::Value = serde_json::from_str(notification.to_text()?)?;
1073+
assert_eq!(notif["method"], "eth_subscription");
1074+
assert_eq!(notif["params"]["subscription"], subscription_id);
1075+
1076+
// Result should be an array of full transaction objects
1077+
let txs = notif["params"]["result"].as_array().expect("expected array of transactions");
1078+
assert_eq!(txs.len(), 1);
1079+
// Full transaction objects have fields like "hash", "from", "to", etc.
1080+
assert!(txs[0]["hash"].is_string(), "Expected full tx with hash field");
1081+
assert!(txs[0]["blockNumber"].is_string(), "Expected full tx with blockNumber field");
1082+
1083+
// Send second flashblock with more transactions
1084+
setup.send_flashblock(setup.create_second_payload()).await?;
1085+
1086+
let notification2 = ws_stream.next().await.unwrap()?;
1087+
let notif2: serde_json::Value = serde_json::from_str(notification2.to_text()?)?;
1088+
let txs2 = notif2["params"]["result"].as_array().expect("expected array of transactions");
1089+
assert_eq!(txs2.len(), 6);
1090+
assert!(txs2.iter().all(|tx| tx["hash"].is_string() && tx["blockNumber"].is_string()));
1091+
1092+
Ok(())
1093+
}

0 commit comments

Comments
 (0)