Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions crates/builder/base-builder-cli/src/flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ pub struct FlashblocksArgs {
env = "FLASHBLOCKS_COMPUTE_STATE_ROOT_ON_FINALIZE"
)]
pub flashblocks_compute_state_root_on_finalize: bool,

/// Whether to use streaming state root calculation.
/// When enabled, state updates are streamed to a background task during transaction
/// execution, reducing finalization latency by pre-fetching trie nodes and building
/// a sparse trie incrementally. This is most effective when combined with
/// --flashblocks.compute-state-root-on-finalize.
#[arg(
long = "flashblocks.streaming-state-root",
default_value = "false",
env = "FLASHBLOCKS_STREAMING_STATE_ROOT"
)]
pub flashblocks_streaming_state_root: bool,
}

impl Default for FlashblocksArgs {
Expand All @@ -65,6 +77,7 @@ impl Default for FlashblocksArgs {
flashblocks_leeway_time: 75,
flashblocks_disable_state_root: false,
flashblocks_compute_state_root_on_finalize: false,
flashblocks_streaming_state_root: false,
}
}
}
10 changes: 10 additions & 0 deletions crates/builder/core/src/flashblocks/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ pub struct FlashblocksConfig {
/// When enabled, flashblocks are built without state root, but the final payload
/// returned by `get_payload` will have the state root computed.
pub compute_state_root_on_finalize: bool,

/// Whether to use streaming state root calculation.
/// When enabled, state updates are streamed to a background task during transaction
/// execution, reducing finalization latency by pre-fetching trie nodes and building
/// a sparse trie incrementally.
pub streaming_state_root: bool,
}

impl Default for FlashblocksConfig {
Expand All @@ -53,6 +59,7 @@ impl Default for FlashblocksConfig {
fixed: false,
disable_state_root: false,
compute_state_root_on_finalize: false,
streaming_state_root: false,
}
}
}
Expand All @@ -77,13 +84,16 @@ impl TryFrom<OpRbuilderArgs> for FlashblocksConfig {
let compute_state_root_on_finalize =
args.flashblocks.flashblocks_compute_state_root_on_finalize;

let streaming_state_root = args.flashblocks.flashblocks_streaming_state_root;

Ok(Self {
ws_addr,
interval,
leeway_time,
fixed,
disable_state_root,
compute_state_root_on_finalize,
streaming_state_root,
})
}
}
Expand Down
6 changes: 6 additions & 0 deletions crates/builder/core/src/flashblocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub(crate) mod best_txs;
pub(crate) mod generator;
pub(crate) mod state_root_task;
pub(crate) mod wspub;

mod handler;
Expand All @@ -18,3 +19,8 @@ pub use payload::FlashblocksExecutionInfo;

mod service;
pub use service::FlashblocksServiceBuilder;

pub use state_root_task::{
StateRootError, StateRootMessage, StateRootResult, StateRootTask, StateRootTaskBuilder,
StateRootTaskConfig, StateRootTaskHandle,
};
248 changes: 247 additions & 1 deletion crates/builder/core/src/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, metadata::Level, span, warn};

use super::wspub::WebSocketPublisher;
use super::{state_root_task::StateRootTaskHandle, wspub::WebSocketPublisher};
use crate::{
BuilderConfig, ExecutionInfo,
flashblocks::{
Expand Down Expand Up @@ -713,6 +713,58 @@ where
Ok(())
}

/// Finalize the payload using streaming state root calculation.
///
/// This method uses a pre-started StateRootTaskHandle to compute the state root
/// with reduced latency by leveraging pre-fetched trie nodes.
#[allow(dead_code)]
async fn finalize_payload_with_streaming<DB, P>(
&self,
state: &mut State<DB>,
ctx: &OpPayloadBuilderCtx,
info: &mut ExecutionInfo,
finalized_cell: &BlockCell<OpBuiltPayload>,
state_root_handle: &StateRootTaskHandle,
) -> Result<(), PayloadBuilderError>
where
DB: Database<Error = ProviderError> + AsRef<P>,
P: StateRootProvider + HashedPostStateProvider + StorageRootProvider,
{
let start_time = Instant::now();

// Request the state root from the streaming task
let state_root_result = state_root_handle
.compute_root()
.await
.map_err(|e| PayloadBuilderError::Other(Box::new(e)))?;

// Build the final block with the pre-computed state root
let (final_payload, _) = build_block_with_state_root(
state,
ctx,
info,
state_root_result.state_root,
state_root_result.trie_updates,
state_root_result.hashed_state,
)?;

let elapsed = start_time.elapsed();
info!(
target: "payload_builder",
block_number = ctx.block_number(),
block_hash = ?final_payload.block().hash(),
elapsed_ms = elapsed.as_millis(),
"Finalized payload with streaming state root"
);

finalized_cell.set(final_payload);

// Shutdown the state root task
let _ = state_root_handle.shutdown().await;

Ok(())
}

/// Calculate number of flashblocks.
/// If dynamic is enabled this function will take time drift into the account.
pub(super) fn calculate_flashblocks(&self, timestamp: u64) -> (u64, Duration) {
Expand Down Expand Up @@ -1022,3 +1074,197 @@ where
fb_payload,
))
}

/// Build a block with a pre-computed state root from the streaming state root task.
///
/// This is similar to `build_block` but skips the state root calculation since it's
/// already been computed by the streaming task.
#[allow(dead_code)]
pub(super) fn build_block_with_state_root<DB, P>(
state: &mut State<DB>,
ctx: &OpPayloadBuilderCtx,
info: &mut ExecutionInfo,
state_root: B256,
trie_output: TrieUpdates,
hashed_state: HashedPostState,
) -> Result<(OpBuiltPayload, FlashblocksPayloadV1), PayloadBuilderError>
where
DB: Database<Error = ProviderError> + AsRef<P> + revm::Database,
P: StateRootProvider + HashedPostStateProvider + StorageRootProvider,
{
// We use it to preserve state, so we run merge_transitions on transition state at most once
let untouched_transition_state = state.transition_state.clone();
let state_merge_start_time = Instant::now();
state.merge_transitions(BundleRetention::Reverts);
let state_transition_merge_time = state_merge_start_time.elapsed();
ctx.metrics.state_transition_merge_duration.record(state_transition_merge_time);
ctx.metrics.state_transition_merge_gauge.set(state_transition_merge_time);

let block_number = ctx.block_number();
assert_eq!(block_number, ctx.parent().number + 1);

let execution_outcome = ExecutionOutcome::new(
state.bundle_state.clone(),
vec![info.receipts.clone()],
block_number,
vec![],
);

let receipts_root = execution_outcome
.generic_receipts_root_slow(block_number, |receipts| {
calculate_receipt_root_no_memo_optimism(
receipts,
&ctx.chain_spec,
ctx.attributes().timestamp(),
)
})
.expect("Number is in range");
let logs_bloom = execution_outcome.block_logs_bloom(block_number).expect("Number is in range");

// State root is already computed by the streaming task
debug!(
target: "payload_builder",
?state_root,
"Using pre-computed state root from streaming task"
);

let mut requests_hash = None;
let withdrawals_root =
if ctx.chain_spec.is_isthmus_active_at_timestamp(ctx.attributes().timestamp()) {
requests_hash = Some(EMPTY_REQUESTS_HASH);
Some(
isthmus::withdrawals_root(execution_outcome.state(), state.database.as_ref())
.map_err(PayloadBuilderError::other)?,
)
} else if ctx.chain_spec.is_canyon_active_at_timestamp(ctx.attributes().timestamp()) {
Some(EMPTY_WITHDRAWALS)
} else {
None
};

// create the block header
let transactions_root = proofs::calculate_transaction_root(&info.executed_transactions);

let (excess_blob_gas, blob_gas_used) = ctx.blob_fields(info);
let extra_data = ctx.extra_data()?;

let header = Header {
parent_hash: ctx.parent().hash(),
ommers_hash: EMPTY_OMMER_ROOT_HASH,
beneficiary: ctx.evm_env.block_env.beneficiary,
state_root,
transactions_root,
receipts_root,
withdrawals_root,
logs_bloom,
timestamp: ctx.attributes().payload_attributes.timestamp,
mix_hash: ctx.attributes().payload_attributes.prev_randao,
nonce: BEACON_NONCE.into(),
base_fee_per_gas: Some(ctx.base_fee()),
number: ctx.parent().number + 1,
gas_limit: ctx.block_gas_limit(),
difficulty: U256::ZERO,
gas_used: info.cumulative_gas_used,
extra_data,
parent_beacon_block_root: ctx.attributes().payload_attributes.parent_beacon_block_root,
blob_gas_used,
excess_blob_gas,
requests_hash,
};

// seal the block
let block = alloy_consensus::Block::<OpTransactionSigned>::new(
header,
BlockBody {
transactions: info.executed_transactions.clone(),
ommers: vec![],
withdrawals: ctx.withdrawals().cloned(),
},
);

let recovered_block =
RecoveredBlock::new_unhashed(block.clone(), info.executed_senders.clone());

let executed = BuiltPayloadExecutedBlock {
recovered_block: Arc::new(recovered_block),
execution_output: Arc::new(BlockExecutionOutput {
result: BlockExecutionResult {
receipts: info.receipts.clone(),
requests: vec![].into(),
gas_used: info.cumulative_gas_used,
blob_gas_used: 0,
},
state: state.take_bundle(),
}),
hashed_state: Either::Left(Arc::new(hashed_state)),
trie_updates: Either::Left(Arc::new(trie_output)),
};
debug!(target: "payload_builder", message = "Executed block created with streaming state root");

let sealed_block = Arc::new(block.seal_slow());
debug!(target: "payload_builder", ?sealed_block, "sealed built block");

let block_hash = sealed_block.hash();

// pick the new transactions from the info field and update the last flashblock index
let new_transactions = info.executed_transactions[info.extra.last_flashblock_index..].to_vec();

let new_transactions_encoded =
new_transactions.into_iter().map(|tx| tx.encoded_2718().into()).collect::<Vec<_>>();

let min_tx_index = info.extra.last_flashblock_index as u64;
let max_tx_index = min_tx_index + new_transactions_encoded.len() as u64;

info.extra.last_flashblock_index = info.executed_transactions.len();

// finalize and build the FAL
let fal_builder = std::mem::take(&mut info.extra.access_list_builder);
let access_list = fal_builder.build(min_tx_index, max_tx_index);

let metadata: FlashblocksMetadata =
FlashblocksMetadata { block_number: ctx.parent().number + 1, access_list };

let (_, blob_gas_used) = ctx.blob_fields(info);

// Prepare the flashblocks message
let fb_payload = FlashblocksPayloadV1 {
payload_id: ctx.payload_id(),
index: 0,
base: Some(ExecutionPayloadBaseV1 {
parent_beacon_block_root: ctx
.attributes()
.payload_attributes
.parent_beacon_block_root
.unwrap(),
parent_hash: ctx.parent().hash(),
fee_recipient: ctx.attributes().suggested_fee_recipient(),
prev_randao: ctx.attributes().payload_attributes.prev_randao,
block_number: ctx.parent().number + 1,
gas_limit: ctx.block_gas_limit(),
timestamp: ctx.attributes().payload_attributes.timestamp,
extra_data: ctx.extra_data()?,
base_fee_per_gas: ctx.base_fee().try_into().unwrap(),
}),
diff: ExecutionPayloadFlashblockDeltaV1 {
state_root,
receipts_root,
logs_bloom,
gas_used: info.cumulative_gas_used,
block_hash,
transactions: new_transactions_encoded,
withdrawals: ctx.withdrawals().cloned().unwrap_or_default().to_vec(),
withdrawals_root: withdrawals_root.unwrap_or_default(),
blob_gas_used,
},
metadata: serde_json::to_value(&metadata).unwrap_or_default(),
};

// We clean bundle and place initial state transaction back
state.take_bundle();
state.transition_state = untouched_transition_state;

Ok((
OpBuiltPayload::new(ctx.payload_id(), sealed_block, info.total_fees, Some(executed)),
fb_payload,
))
}
Loading