Skip to content

Commit dda3266

Browse files
committed
store the fvm events in blockstore and fetch from blockstore if not available in cache [skip ci]
1 parent 5c043d9 commit dda3266

File tree

5 files changed

+73
-11
lines changed

5 files changed

+73
-11
lines changed

src/chain/store/chain_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ where
203203
Ok(())
204204
}
205205

206-
pub fn get_tipset_key(&self, key: &Cid) -> Result<Option<TipsetKey>, Error> {
206+
pub fn get_tipset_key_by_events_root(&self, key: &Cid) -> Result<Option<TipsetKey>, Error> {
207207
Ok(self.indices.read_obj(key)?)
208208
}
209209

src/rpc/methods/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ impl RpcMethod<1> for ChainGetEvents {
239239
let tsk = ctx
240240
.state_manager
241241
.chain_store()
242-
.get_tipset_key(&root_cid)?
243-
.with_context(|| format!("can't find events with cid {root_cid}"))?;
242+
.get_tipset_key_by_events_root(&root_cid)?
243+
.with_context(|| format!("can't find tipset for events root {root_cid}"))?;
244244

245245
let ts = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?;
246246

src/rpc/methods/eth/filter/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use anyhow::{Context, Error, anyhow, bail, ensure};
4545
use cid::Cid;
4646
use fvm_ipld_blockstore::Blockstore;
4747
use fvm_ipld_encoding::IPLD_RAW;
48-
use itertools::Itertools;
4948
use serde::*;
5049
use std::ops::RangeInclusive;
5150
use std::sync::Arc;
@@ -384,11 +383,13 @@ impl EthEventHandler {
384383
.filter(|(cid, _)| cid.as_ref() == Some(events_root))
385384
.map(|(_, v)| v);
386385

387-
let chain_events = filtered_events
386+
// Do NOT deduplicate events - the AMT can legitimately contain duplicate events
387+
// if a contract emits the same event multiple times. We must preserve the exact
388+
// order and count of events as stored in the AMT.
389+
let chain_events: Vec<Event> = filtered_events
388390
.into_iter()
389391
.flat_map(|events| events.into_iter())
390392
.map(Into::into)
391-
.unique()
392393
.collect();
393394

394395
Ok(chain_events)

src/shim/executor.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::shim::{
77
};
88
use crate::utils::get_size::{GetSize, vec_heap_size_with_fn_helper};
99
use cid::Cid;
10-
use fil_actors_shared::fvm_ipld_amt::Amtv0;
10+
use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
1111
use fvm_ipld_blockstore::Blockstore;
1212
use fvm_ipld_encoding::RawBytes;
1313
use fvm_shared2::receipt::Receipt as Receipt_v2;
@@ -339,6 +339,7 @@ impl ActorEvent {
339339

340340
/// Event with extra information stamped by the FVM.
341341
#[derive(Clone, Debug, Serialize)]
342+
#[serde(untagged)]
342343
pub enum StampedEvent {
343344
V3(StampedEvent_v3),
344345
V4(StampedEvent_v4),
@@ -385,6 +386,32 @@ impl StampedEvent {
385386
Self::V4(v4) => v4.event.clone().into(),
386387
}
387388
}
389+
390+
/// Loads events directly from the events AMT root CID.
391+
/// Returns events in the exact order they are stored in the AMT.
392+
pub fn get_events<DB: Blockstore>(
393+
db: &DB,
394+
events_root: &Cid,
395+
) -> anyhow::Result<Vec<StampedEvent>> {
396+
let mut events = Vec::new();
397+
398+
// Try StampedEvent_v4 first (StampedEvent_v4 and StampedEvent_v3 are identical, use v4 here)
399+
if let Ok(amt) = Amt::<StampedEvent_v4, _>::load(events_root, db) {
400+
amt.for_each(|_, event| {
401+
events.push(StampedEvent::V4(event.clone()));
402+
Ok(())
403+
})?;
404+
} else {
405+
// Fallback to StampedEvent_v3
406+
let amt = Amt::<StampedEvent_v3, _>::load(events_root, db)?;
407+
amt.for_each(|_, event| {
408+
events.push(StampedEvent::V3(event.clone()));
409+
Ok(())
410+
})?;
411+
}
412+
413+
Ok(events)
414+
}
388415
}
389416

390417
#[cfg(test)]

src/state_manager/mod.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ use crate::utils::get_size::{
6161
GetSize, vec_heap_size_helper, vec_with_stack_only_item_heap_size_helper,
6262
};
6363
use ahash::{HashMap, HashMapExt};
64-
use anyhow::{Context as _, bail};
64+
use anyhow::{Context as _, bail, ensure};
6565
use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
6666
use chain_rand::ChainRand;
6767
use cid::Cid;
6868
pub use circulating_supply::GenesisInfo;
6969
use fil_actor_verifreg_state::v12::DataCap;
7070
use fil_actor_verifreg_state::v13::ClaimID;
71-
use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
71+
use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0};
7272
use fil_actors_shared::fvm_ipld_bitfield::BitField;
7373
use fil_actors_shared::v12::runtime::DomainSeparationTag;
7474
use fil_actors_shared::v13::runtime::Policy;
@@ -91,6 +91,7 @@ use tracing::{error, info, instrument, trace, warn};
9191
pub use utils::is_valid_for_sending;
9292

9393
const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
94+
const EVENTS_AMT_BITWIDTH: u32 = 5;
9495

9596
/// Intermediary for retrieving state objects and updating actor states.
9697
type CidPair = (Cid, Cid);
@@ -560,11 +561,24 @@ where
560561
let ts = tipset.clone();
561562
let this = Arc::clone(self);
562563
let cids = tipset.cids();
564+
let events_root = events_root.cloned();
563565
self.receipt_event_cache_handler
564566
.get_events_or_else(
565567
key,
566568
Box::new(move || {
567569
Box::pin(async move {
570+
// If the events are not in the cache, try to load them from the blockstore
571+
if let Some(events_root) = events_root
572+
&& let Ok(stamped_events) =
573+
StampedEvent::get_events(this.blockstore(), &events_root)
574+
{
575+
return Ok(StateEvents {
576+
events: vec![stamped_events],
577+
roots: vec![Some(events_root)],
578+
});
579+
}
580+
581+
// If the events are neither in the cache nor in the blockstore, compute them.
568582
let state_out = this
569583
.compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced)
570584
.await?;
@@ -1986,8 +2000,28 @@ where
19862000
let (receipts, events, events_roots) =
19872001
vm.apply_block_messages(&block_messages, epoch, callback)?;
19882002

1989-
// step 5: construct receipt root from receipts and flush the state-tree
1990-
let receipt_root = Amt::new_from_iter(chain_index.db(), receipts)?;
2003+
// step 5: construct receipt root from receipts
2004+
let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?;
2005+
2006+
// step 6: store events AMTs in the blockstore
2007+
for (msg_events, events_root) in events.iter().zip(events_roots.iter()) {
2008+
if let Some(event_root) = events_root {
2009+
// Store the events AMT - the root CID should match the one computed by FVM
2010+
let derived_event_root = Amt::new_from_iter_with_bit_width(
2011+
chain_index.db(),
2012+
EVENTS_AMT_BITWIDTH,
2013+
msg_events.iter(),
2014+
)
2015+
.map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;
2016+
2017+
// Verify the stored root matches the FVM-computed root
2018+
ensure!(
2019+
derived_event_root.eq(event_root),
2020+
"Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
2021+
);
2022+
}
2023+
}
2024+
19912025
let state_root = vm.flush()?;
19922026

19932027
Ok(StateOutput {

0 commit comments

Comments
 (0)