Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 30 additions & 38 deletions crates/flashblocks-rpc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use alloy_consensus::transaction::{Recovered, SignerRecoverable, TransactionMeta
use alloy_consensus::{Header, TxReceipt};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::map::foldhash::HashMap;
use alloy_primitives::map::B256HashMap;
use alloy_primitives::{Address, BlockNumber, Bytes, Sealable, B256, U256};
use alloy_rpc_types::{TransactionTrait, Withdrawal};
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use alloy_rpc_types_eth::state::{AccountOverride, StateOverride, StateOverridesBuilder};
use alloy_rpc_types_eth::state::StateOverride;
use alloy_rpc_types_eth::{Filter, Log};
use arc_swap::{ArcSwapOption, Guard};
use eyre::eyre;
Expand Down Expand Up @@ -237,13 +236,13 @@ where
block_number = flashblock.metadata.block_number,
flashblock_index = flashblock.index
);
match self.process_flashblock(prev_pending_blocks, &flashblock) {
match self.process_flashblock(prev_pending_blocks, flashblock) {
Ok(new_pending_blocks) => {
if new_pending_blocks.is_some() {
_ = self.sender.send(new_pending_blocks.clone().unwrap())
}

self.pending_blocks.swap(new_pending_blocks.clone());
self.pending_blocks.swap(new_pending_blocks);
self.metrics
.block_processing_duration
.record(start_time.elapsed());
Expand Down Expand Up @@ -327,15 +326,15 @@ where
fn process_flashblock(
&self,
prev_pending_blocks: Option<Arc<PendingBlocks>>,
flashblock: &Flashblock,
flashblock: Flashblock,
) -> eyre::Result<Option<Arc<PendingBlocks>>> {
match &prev_pending_blocks {
Some(pending_blocks) => {
if self.is_next_flashblock(pending_blocks, flashblock) {
if self.is_next_flashblock(pending_blocks, &flashblock) {
// We have received the next flashblock for the current block
// or the first flashblock for the next block
let mut flashblocks = pending_blocks.get_flashblocks();
flashblocks.push(flashblock.clone());
flashblocks.push(flashblock);
self.build_pending_state(prev_pending_blocks, &flashblocks)
} else if pending_blocks.latest_block_number() != flashblock.metadata.block_number {
// We have received a non-zero flashblock for a new block
Expand All @@ -354,7 +353,7 @@ where
curr_block = %pending_blocks.latest_block_number(),
flashblock_index = %flashblock.index,
);
Ok(prev_pending_blocks.clone())
Ok(prev_pending_blocks)
} else {
// We have received a non-sequential flashblock for the current block
self.metrics.unexpected_block_order.increment(1);
Expand All @@ -370,7 +369,7 @@ where
}
None => {
if flashblock.index == 0 {
self.build_pending_state(None, &vec![flashblock.clone()])
self.build_pending_state(None, &vec![flashblock])
} else {
info!(message = "waiting for first Flashblock");
Ok(None)
Expand All @@ -385,12 +384,12 @@ where
flashblocks: &Vec<Flashblock>,
) -> eyre::Result<Option<Arc<PendingBlocks>>> {
// BTreeMap guarantees ascending order of keys while iterating
let mut flashblocks_per_block = BTreeMap::<BlockNumber, Vec<Flashblock>>::new();
let mut flashblocks_per_block = BTreeMap::<BlockNumber, Vec<&Flashblock>>::new();
for flashblock in flashblocks {
flashblocks_per_block
.entry(flashblock.metadata.block_number)
.or_default()
.push(flashblock.clone());
.push(flashblock);
}

let earliest_block_number = flashblocks_per_block.keys().min().unwrap();
Expand Down Expand Up @@ -463,7 +462,12 @@ where
acc
});

pending_blocks_builder.with_flashblocks(flashblocks.clone());
pending_blocks_builder.with_flashblocks(
flashblocks
.iter()
.map(|&x| x.clone())
.collect::<Vec<Flashblock>>(),
);

let execution_payload: ExecutionPayloadV3 = ExecutionPayloadV3 {
blob_gas_used: 0,
Expand Down Expand Up @@ -618,33 +622,21 @@ where
match evm.transact(recovered_transaction) {
Ok(ResultAndState { state, .. }) => {
for (addr, acc) in &state {
let mut state_cache_builder =
StateOverridesBuilder::new(state_overrides.clone());
let curr_state_diff =
B256HashMap::<B256>::from_iter(acc.storage.iter().map(
|(&key, slot)| (key.into(), slot.present_value.into()),
));

let mut state_diff = state_overrides
.get(addr)
.cloned()
.unwrap_or_default()
let existing_override =
state_overrides.entry(*addr).or_insert(Default::default());
existing_override.balance = Some(acc.info.balance);
existing_override.nonce = Some(acc.info.nonce);
existing_override.code =
acc.info.code.clone().map(|code| code.bytes());

let existing = existing_override
.state_diff
.unwrap_or_default();

state_diff.extend(curr_state_diff);

let acc_override = AccountOverride {
balance: Some(acc.info.balance),
nonce: Some(acc.info.nonce),
code: acc.info.code.clone().map(|code| code.bytes()),
state: None,
state_diff: Some(state_diff),
move_precompile_to: None,
};
state_cache_builder =
state_cache_builder.append(*addr, acc_override);
state_overrides = state_cache_builder.build();
.get_or_insert(Default::default());
let changed_slots = acc.storage.iter().map(|(&key, slot)| {
(B256::from(key), B256::from(slot.present_value))
});

existing.extend(changed_slots);
}
pending_blocks_builder
.with_transaction_state(transaction.tx_hash(), state.clone());
Expand Down