From b5333db84cac67b0b34436d504ab60382078aca3 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Wed, 3 Jun 2026 13:21:39 +0200 Subject: [PATCH 1/6] eth2api,ssz: add bitfield accessors, synthetic graffiti const, and is_synthetic - Add bit_at, set_bit_at, bit_indices, contains, and or combinators to BitList and BitVector, along with a BitfieldError type for length mismatches; export BitfieldError from the ssz crate - Replace the synthetic_graffiti() runtime function with a compile-time SYNTHETIC_GRAFFITI constant using a const block - Add SignedProposalBlock::graffiti() accessor and VersionedSignedProposal::is_synthetic() using the new constant --- crates/eth2api/src/versioned.rs | 53 ++++++++++ crates/ssz/src/lib.rs | 2 +- crates/ssz/src/types.rs | 167 ++++++++++++++++++++++++++++++++ 3 files changed, 221 insertions(+), 1 deletion(-) diff --git a/crates/eth2api/src/versioned.rs b/crates/eth2api/src/versioned.rs index 4d2f2547..8a4e110a 100644 --- a/crates/eth2api/src/versioned.rs +++ b/crates/eth2api/src/versioned.rs @@ -9,6 +9,21 @@ use crate::{ v1, }; +/// Graffiti string used to mark synthetic blocks that must never be submitted. +pub const SYNTHETIC_BLOCK_GRAFFITI: &str = "SYNTHETIC BLOCK: DO NOT SUBMIT"; + +/// 32-byte graffiti used to mark synthetic blocks, left-aligned with zero padding. +pub const SYNTHETIC_GRAFFITI: phase0::Root = { + let mut graffiti = [0u8; 32]; + let src = SYNTHETIC_BLOCK_GRAFFITI.as_bytes(); + let mut i = 0; + while i < src.len() { + graffiti[i] = src[i]; + i += 1; + } + graffiti +}; + /// Signed proposal wrapper across all supported forks. #[derive(Debug, Clone, PartialEq, Eq)] pub struct VersionedSignedProposal { @@ -87,6 +102,24 @@ impl SignedProposalBlock { } } + /// Returns the graffiti embedded in this proposal's block body. + pub fn graffiti(&self) -> phase0::Root { + match self { + Self::Phase0(block) => block.message.body.graffiti, + Self::Altair(block) => block.message.body.graffiti, + Self::Bellatrix(block) => block.message.body.graffiti, + Self::BellatrixBlinded(block) => block.message.body.graffiti, + Self::Capella(block) => block.message.body.graffiti, + Self::CapellaBlinded(block) => block.message.body.graffiti, + Self::Deneb(block) => block.signed_block.message.body.graffiti, + Self::DenebBlinded(block) => block.message.body.graffiti, + Self::Electra(block) => block.signed_block.message.body.graffiti, + Self::ElectraBlinded(block) => block.message.body.graffiti, + Self::Fulu(block) => block.signed_block.message.body.graffiti, + Self::FuluBlinded(block) => block.message.body.graffiti, + } + } + /// Converts blinded payload variants into blinded-wrapper payloads. pub fn into_blinded(self) -> Option { match self { @@ -106,6 +139,18 @@ impl SignedProposalBlock { } } +impl VersionedSignedProposal { + /// Returns `true` if this is a synthetic proposal, i.e. its block body + /// graffiti matches [`SYNTHETIC_GRAFFITI`]. + /// + /// Unifies Go's separate blinded/full checks: the payload enum already + /// carries both blinded and full variants, so a single graffiti comparison + /// covers every case. + pub fn is_synthetic(&self) -> bool { + self.block.graffiti() == SYNTHETIC_GRAFFITI + } +} + /// Signed blinded proposal wrapper across all supported forks. #[derive(Debug, Clone, PartialEq, Eq)] pub struct VersionedSignedBlindedProposal { @@ -401,6 +446,14 @@ mod tests { use super::*; use crate::test_fixtures; + #[test] + fn synthetic_graffiti_layout() { + let marker = SYNTHETIC_BLOCK_GRAFFITI.as_bytes(); + assert_eq!(&SYNTHETIC_GRAFFITI[..marker.len()], marker); + // Remaining bytes are zero-padded. + assert!(SYNTHETIC_GRAFFITI[marker.len()..].iter().all(|&b| b == 0)); + } + #[test] fn versioned_signed_aggregate_and_proof_message_root_delegates_to_payload() { let signed = electra::SignedAggregateAndProof { diff --git a/crates/ssz/src/lib.rs b/crates/ssz/src/lib.rs index 18294464..35a3b0fb 100644 --- a/crates/ssz/src/lib.rs +++ b/crates/ssz/src/lib.rs @@ -17,7 +17,7 @@ pub use helpers::{ from_0x_hex_str, left_pad, put_byte_list, put_bytes_n, put_hex_bytes_n, to_0x_hex, }; /// Generic SSZ list, vector, and bitfield wrappers. -pub use types::{BitList, BitVector, SszList, SszVector}; +pub use types::{BitList, BitVector, BitfieldError, SszList, SszVector}; /// Error type for SSZ binary encode/decode operations. #[derive(Debug, thiserror::Error)] diff --git a/crates/ssz/src/types.rs b/crates/ssz/src/types.rs index 217d0869..006d6dae 100644 --- a/crates/ssz/src/types.rs +++ b/crates/ssz/src/types.rs @@ -253,6 +253,15 @@ impl TreeHash for SszVector { const BIT_MASK: [u8; 8] = [0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80]; +/// Error returned by bitfield combinators that require operands of equal +/// length. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum BitfieldError { + /// The two bitlists have different bit lengths and cannot be combined. + #[error("bitlists are different lengths")] + DifferentLength, +} + /// SSZ variable-length bitfield with maximum capacity. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BitList { @@ -344,6 +353,66 @@ impl BitList { len: capacity, } } + + /// Returns the bit at index `i`, or `false` if `i` is out of range. + pub fn bit_at(&self, i: usize) -> bool { + if i >= self.len { + return false; + } + self.bytes[i / 8] & BIT_MASK[i % 8] != 0 + } + + /// Sets the bit at index `i` to `value`; out-of-range indices are ignored. + pub fn set_bit_at(&mut self, i: usize, value: bool) { + if i >= self.len { + return; + } + if value { + self.bytes[i / 8] |= BIT_MASK[i % 8]; + } else { + self.bytes[i / 8] &= !BIT_MASK[i % 8]; + } + } + + /// Returns the indices of all set bits in ascending order. + pub fn bit_indices(&self) -> Vec { + (0..self.len).filter(|&i| self.bit_at(i)).collect() + } + + /// Returns `true` if every bit set in `other` is also set in `self`. + /// + /// Errors with [`BitfieldError::DifferentLength`] if the two bitlists do + /// not have the same bit length. + pub fn contains(&self, other: &Self) -> Result { + if self.len != other.len { + return Err(BitfieldError::DifferentLength); + } + Ok(other + .bytes + .iter() + .zip(&self.bytes) + .all(|(o, s)| o & s == *o)) + } + + /// Returns the bitwise OR (union) of `self` and `other`. + /// + /// Errors with [`BitfieldError::DifferentLength`] if the two bitlists do + /// not have the same bit length. + pub fn or(&self, other: &Self) -> Result { + if self.len != other.len { + return Err(BitfieldError::DifferentLength); + } + let bytes = self + .bytes + .iter() + .zip(&other.bytes) + .map(|(a, b)| a | b) + .collect(); + Ok(Self { + bytes, + len: self.len, + }) + } } impl Serialize for BitList { @@ -433,6 +502,31 @@ impl BitVector { } v } + + /// Returns the bit at index `i`, or `false` if `i` is out of range. + pub fn bit_at(&self, i: usize) -> bool { + if i >= SIZE { + return false; + } + self.bytes[i / 8] & BIT_MASK[i % 8] != 0 + } + + /// Sets the bit at index `i` to `value`; out-of-range indices are ignored. + pub fn set_bit_at(&mut self, i: usize, value: bool) { + if i >= SIZE { + return; + } + if value { + self.bytes[i / 8] |= BIT_MASK[i % 8]; + } else { + self.bytes[i / 8] &= !BIT_MASK[i % 8]; + } + } + + /// Returns the indices of all set bits in ascending order. + pub fn bit_indices(&self) -> Vec { + (0..SIZE).filter(|&i| self.bit_at(i)).collect() + } } impl Serialize for BitVector { @@ -560,4 +654,77 @@ mod tests { let vec: SszVector = vec![1, 2, 3].into(); assert_eq!(vec.as_ref(), &[1, 2, 3]); } + + #[test] + fn bitlist_bit_at_and_indices() { + let bl = BitList::<2048>::with_bits(3, &[0, 2]); + assert!(bl.bit_at(0)); + assert!(!bl.bit_at(1)); + assert!(bl.bit_at(2)); + // Out-of-range index reads as unset. + assert!(!bl.bit_at(3)); + assert!(!bl.bit_at(9001)); + assert_eq!(bl.bit_indices(), vec![0, 2]); + } + + #[test] + fn bitlist_bit_at_matches_ssz_round_trip() { + // SSZ byte 0x0D = sentinel at bit 3 ⇒ 3 data bits with bits 0 and 2 set, + // matching the bytes returned by `aggregation_bits()`. + let bl = BitList::<2048>::from_ssz_bytes(vec![0x0D]); + assert_eq!(bl.len(), 3); + assert_eq!(bl.bit_indices(), vec![0, 2]); + } + + #[test] + fn bitlist_set_bit_at() { + let mut bl = BitList::<2048>::with_bits(8, &[0]); + bl.set_bit_at(3, true); + assert_eq!(bl.bit_indices(), vec![0, 3]); + bl.set_bit_at(0, false); + assert_eq!(bl.bit_indices(), vec![3]); + // Out-of-range set is a no-op. + bl.set_bit_at(8, true); + assert_eq!(bl.bit_indices(), vec![3]); + } + + #[test] + fn bitlist_contains() { + let superset = BitList::<2048>::with_bits(4, &[0, 1, 2]); + let subset = BitList::<2048>::with_bits(4, &[0, 2]); + assert_eq!(superset.contains(&subset), Ok(true)); + assert_eq!(subset.contains(&superset), Ok(false)); + + let other_len = BitList::<2048>::with_bits(8, &[0]); + assert_eq!( + superset.contains(&other_len), + Err(BitfieldError::DifferentLength) + ); + } + + #[test] + fn bitlist_or() { + let a = BitList::<2048>::with_bits(4, &[0]); + let b = BitList::<2048>::with_bits(4, &[1, 3]); + assert_eq!(a.or(&b).unwrap().bit_indices(), vec![0, 1, 3]); + + let other_len = BitList::<2048>::with_bits(8, &[0]); + assert_eq!(a.or(&other_len), Err(BitfieldError::DifferentLength)); + } + + #[test] + fn bitvector_bit_ops() { + let mut bv = BitVector::<64>::with_bits(&[0, 2]); + assert!(bv.bit_at(0)); + assert!(!bv.bit_at(1)); + assert_eq!(bv.bit_indices(), vec![0, 2]); + + bv.set_bit_at(1, true); + assert_eq!(bv.bit_indices(), vec![0, 1, 2]); + + // Out-of-range access is a no-op / reads as unset. + bv.set_bit_at(64, true); + assert!(!bv.bit_at(64)); + assert_eq!(bv.bit_indices(), vec![0, 1, 2]); + } } From 5de9cbbe42b00e9bcf23a19c2a5510026b4ecf71 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Wed, 3 Jun 2026 13:26:43 +0200 Subject: [PATCH 2/6] lint --- crates/eth2api/src/versioned.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/eth2api/src/versioned.rs b/crates/eth2api/src/versioned.rs index 8a4e110a..e3d0562e 100644 --- a/crates/eth2api/src/versioned.rs +++ b/crates/eth2api/src/versioned.rs @@ -12,7 +12,8 @@ use crate::{ /// Graffiti string used to mark synthetic blocks that must never be submitted. pub const SYNTHETIC_BLOCK_GRAFFITI: &str = "SYNTHETIC BLOCK: DO NOT SUBMIT"; -/// 32-byte graffiti used to mark synthetic blocks, left-aligned with zero padding. +/// 32-byte graffiti used to mark synthetic blocks, left-aligned with zero +/// padding. pub const SYNTHETIC_GRAFFITI: phase0::Root = { let mut graffiti = [0u8; 32]; let src = SYNTHETIC_BLOCK_GRAFFITI.as_bytes(); From 4e9a97a501c39944116c4c8c58eddf54499b84f6 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Fri, 5 Jun 2026 09:53:39 +0200 Subject: [PATCH 3/6] feat(tracker): port inclusion checker core from charon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the on-chain inclusion tracking infrastructure needed to port charon/core/tracker/inclusion.go: - pluto-core/tracker/inclusion: port inclusionCore — Submitted, Trim, CheckBlock, CheckBlockAndAtts, check_attestation_inclusion (phase0→deneb via bitlist contains; electra/fulu via committee-offset bit_at), check_aggregation_inclusion, report_missed, report_att_inclusion; error strings match Go exactly; synthetic-proposal short-circuit and defensive DeprecatedDutyBuilderProposer branch included --- crates/core/src/tracker/inclusion.rs | 826 +++++++++++++++++++++++++++ crates/core/src/tracker/mod.rs | 3 + 2 files changed, 829 insertions(+) create mode 100644 crates/core/src/tracker/inclusion.rs diff --git a/crates/core/src/tracker/inclusion.rs b/crates/core/src/tracker/inclusion.rs new file mode 100644 index 00000000..c6c2c540 --- /dev/null +++ b/crates/core/src/tracker/inclusion.rs @@ -0,0 +1,826 @@ +//! On-chain inclusion checking for broadcast duties. +//! +//! [`InclusionCore`] caches duties as they are submitted to the beacon node +//! and, when fed observed blocks and attestations, determines whether each duty +//! landed on-chain. For every resolved duty it invokes the tracker callback +//! with either success or a [`InclusionError::NotIncludedOnChain`] error, and +//! reports inclusion delay / missed-duty metrics. +//! +//! The core is deliberately free of any beacon-node I/O so it can be driven +//! directly from tests. The networked driver that polls the beacon node and +//! builds the [`Block`] inputs is layered on top separately. + +// The networked `InclusionChecker` that wires the default reporters and drives +// this core is added in a follow-up; until then some core items (default +// reporters, committee plumbing) have no in-crate caller. +#![allow(dead_code)] + +use std::{any::Any, collections::HashMap, sync::Arc, time::Duration}; + +use pluto_eth2api::versioned; +use pluto_ssz::BitList; +use tree_hash::TreeHash; + +use crate::{ + signeddata::{ + Attestation, SignedAggregateAndProof, SignedDataError, VersionedAttestation, + VersionedSignedAggregateAndProof, VersionedSignedProposal, + }, + tracker::{StepError, analysis::incl_supported, metrics::TRACKER_METRICS}, + types::{Duty, DutyType, PubKey, SignedData}, +}; + +/// Number of slots after which an unincluded duty is assumed missed and its +/// cached submission (and associated committee state) is dropped. +const INCL_MISSED_LAG: u64 = 32; + +/// SSZ capacity bound used only to decode aggregation bitlists for bit-level +/// comparisons. The bit operations (`contains`/`bit_at`) work on the decoded +/// length, so the concrete bound is irrelevant as long as it is an upper bound. +type AggBits = BitList<131_072>; + +/// Tracker callback invoked when a duty's inclusion is resolved. +type TrackerInclFn = Box) + Send>; +/// Callback invoked for duties broadcast but never included on-chain. +type MissedFn = Box; +/// Callback invoked for attestations/aggregates observed on-chain. +type AttIncludedFn = Box; + +/// Errors produced while recording or checking duty inclusion. +#[derive(Debug, thiserror::Error)] +pub enum InclusionError { + /// Submitted attester duty data was not an attestation. + #[error("invalid attestation")] + InvalidAttestation, + /// Submitted aggregator duty data was not an aggregate-and-proof. + #[error("invalid aggregate and proof")] + InvalidAggregateAndProof, + /// Submitted proposer duty data was not a versioned signed proposal. + #[error("invalid block")] + InvalidBlock, + /// `DutyBuilderProposer` is deprecated and not tracked for inclusion. + #[error("DutyBuilderProposer is deprecated and no longer supported")] + DeprecatedDutyBuilderProposer, + /// A broadcast duty was never observed on-chain. + #[error("duty not included on-chain")] + NotIncludedOnChain, + /// The cached submission was not a versioned attestation. + #[error("not an attestation block data")] + NotAnAttestation, + /// The cached submission was not a versioned aggregate-and-proof. + #[error("parse VersionedSignedAggregateAndProof")] + ParseVersionedAggregate, + /// An Electra attestation lacked a validator index. + #[error("no validator index in electra attestation")] + NoValidatorIndex, + /// No matching attester duty was found for an Electra attestation. + #[error("no attester duty data found in electra attestation")] + NoAttesterDuty, + /// An Electra attestation did not reference exactly one committee. + #[error("electra attestation must reference exactly one committee")] + InvalidCommitteeBits, + /// A versioned attestation carried no payload. + #[error("missing attestation payload")] + MissingAttestation, + /// A bitfield comparison failed. + #[error(transparent)] + Bitfield(#[from] pluto_ssz::BitfieldError), + /// A signed-data accessor failed. + #[error(transparent)] + SignedData(#[from] SignedDataError), +} + +/// Uniquely identifies a cached submission. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct SubKey { + duty: Duty, + pubkey: PubKey, +} + +/// A duty submitted to the beacon node, awaiting on-chain confirmation. +pub struct Submission { + /// The duty that produced this submission. + pub duty: Duty, + /// The validator the duty belongs to. + pub pubkey: PubKey, + /// The signed data broadcast to the beacon node. + pub data: Box, + /// Hash-tree-root of the attestation data (zero for proposals). + pub att_data_root: [u8; 32], + /// Delay between slot start and broadcast. + pub delay: Duration, +} + +/// A minimal attester duty, carrying only the fields used by inclusion checks. +#[derive(Clone)] +pub struct AttesterDuty { + /// Validator index the duty belongs to. + pub validator_index: u64, + /// Index of the validator within its committee's aggregation bits. + pub validator_committee_index: u64, +} + +/// A beacon committee for a slot, carrying only the fields used by inclusion. +#[derive(Clone)] +pub struct BeaconCommittee { + /// Committee index within the slot. + pub index: u64, + /// Validators assigned to this committee. + pub validators: Vec, +} + +/// A simplified observed block with its attestations and committee context. +pub struct Block { + /// Slot of the block. + pub slot: u64, + /// Attester duties relevant to this slot (used for Electra inclusion). + pub att_duties: Vec, + /// Block attestations keyed by their attestation-data root. + pub attestations_by_data_root: HashMap<[u8; 32], versioned::VersionedAttestation>, + /// Beacon committees for the slot, ordered by committee index. + pub beacon_committees: Vec, +} + +/// Tracks the on-chain inclusion of submitted duties. +/// +/// Holds a simplified, I/O-free API so it can be exercised directly in tests: +/// callers feed it blocks via [`InclusionCore::check_block`] / +/// [`InclusionCore::check_block_and_atts`] and trim stale state via +/// [`InclusionCore::trim`]. +pub struct InclusionCore { + submissions: HashMap, + beacon_committees: HashMap>, + tracker_incl_fn: TrackerInclFn, + missed_fn: MissedFn, + att_included_fn: AttIncludedFn, +} + +impl InclusionCore { + /// Creates a core with the production reporters ([`report_missed`] and + /// [`report_att_inclusion`]) and the given tracker callback. + pub fn new(tracker_incl_fn: TrackerInclFn) -> Self { + Self::with_handlers( + tracker_incl_fn, + Box::new(report_missed), + Box::new(report_att_inclusion), + ) + } + + /// Creates a core with explicit reporter callbacks (used by tests). + pub fn with_handlers( + tracker_incl_fn: TrackerInclFn, + missed_fn: MissedFn, + att_included_fn: AttIncludedFn, + ) -> Self { + Self { + submissions: HashMap::new(), + beacon_committees: HashMap::new(), + tracker_incl_fn, + missed_fn, + att_included_fn, + } + } + + /// Records a duty submitted to the beacon node. + /// + /// Unsupported duty types are ignored. Synthetic proposals are reported as + /// included immediately since they are already on-chain. + pub fn submitted( + &mut self, + duty: Duty, + pubkey: PubKey, + data: Box, + delay: Duration, + ) -> Result<(), InclusionError> { + if !incl_supported().contains(&duty.duty_type) { + return Ok(()); + } + + let mut att_data_root = [0u8; 32]; + + if duty.duty_type == DutyType::Attester { + let any = &*data as &dyn Any; + if let Some(att) = any.downcast_ref::() { + let payload = att + .0 + .attestation + .as_ref() + .ok_or(InclusionError::MissingAttestation)?; + att_data_root = payload.data().tree_hash_root().0; + } else if let Some(att) = any.downcast_ref::() { + att_data_root = att.0.data.tree_hash_root().0; + } else { + return Err(InclusionError::InvalidAttestation); + } + } + + if duty.duty_type == DutyType::Aggregator { + let any = &*data as &dyn Any; + if let Some(agg) = any.downcast_ref::() { + let data = agg.data().ok_or(InclusionError::InvalidAggregateAndProof)?; + att_data_root = data.tree_hash_root().0; + } else if let Some(agg) = any.downcast_ref::() { + att_data_root = agg.0.message.aggregate.data.tree_hash_root().0; + } else { + return Err(InclusionError::InvalidAggregateAndProof); + } + } + + if duty.duty_type == DutyType::Proposer { + let any = &*data as &dyn Any; + let proposal = any + .downcast_ref::() + .ok_or(InclusionError::InvalidBlock)?; + if proposal.0.is_synthetic() { + // Synthetic blocks are already on-chain; report inclusion now. + (self.tracker_incl_fn)(&duty, pubkey, None); + return Ok(()); + } + } + + // Defensive: builder proposals are deprecated and excluded by + // `incl_supported`, so this is unreachable in practice. + if duty.duty_type == DutyType::BuilderProposer { + return Err(InclusionError::DeprecatedDutyBuilderProposer); + } + + let key = SubKey { + duty: duty.clone(), + pubkey, + }; + self.submissions.insert( + key, + Submission { + duty, + pubkey, + data, + att_data_root, + delay, + }, + ); + + Ok(()) + } + + /// Removes submissions and committee state at or below `slot`, reporting + /// each removed submission as missed and never included on-chain. + pub fn trim(&mut self, slot: u64) { + let stale: Vec = self + .submissions + .iter() + .filter(|(_, sub)| sub.duty.slot.inner() <= slot) + .map(|(key, _)| key.clone()) + .collect(); + + for key in stale { + let sub = self + .submissions + .remove(&key) + .expect("key collected from submissions"); + (self.missed_fn)(&sub); + let err: StepError = Arc::new(InclusionError::NotIncludedOnChain); + (self.tracker_incl_fn)(&sub.duty, sub.pubkey, Some(err)); + } + + self.beacon_committees.retain(|&s, _| s > slot); + } + + /// Checks whether a proposer duty for `slot` was included, given whether a + /// block was found at that slot. Only proposer submissions are expected. + pub fn check_block(&mut self, slot: u64, found: bool) { + let matched: Vec = self + .submissions + .iter() + .filter_map(|(key, sub)| match sub.duty.duty_type { + DutyType::Proposer => (sub.duty.slot.inner() == slot).then(|| key.clone()), + _ => unreachable!("bug: unexpected type"), + }) + .collect(); + + for key in matched { + let blinded = match self.submissions.get(&key) { + Some(sub) => { + match (&*sub.data as &dyn Any).downcast_ref::() { + Some(proposal) => proposal.0.blinded, + None => { + tracing::error!( + duty = %self.submissions[&key].duty, + "Submission data has wrong type", + ); + continue; + } + } + } + None => continue, + }; + + let sub = self.submissions.remove(&key).expect("present"); + if found { + log_block_included(&sub, slot, blinded); + } else { + (self.missed_fn)(&sub); + } + (self.tracker_incl_fn)(&sub.duty, sub.pubkey, None); + } + } + + /// Checks whether any submitted attester/aggregator/proposer duties were + /// included in the given block (and its attestations). + pub fn check_block_and_atts(&mut self, block: &Block) { + enum Act { + Include, + ProposerInclude { blinded: bool }, + } + + let mut acts: Vec<(SubKey, Act)> = Vec::new(); + + for (key, sub) in &self.submissions { + match sub.duty.duty_type { + DutyType::Attester => match check_attestation_inclusion(sub, block) { + // Matching Go: on error, log and still report inclusion. + Err(e) => { + tracing::warn!(error = %e, "Failed to check attestation inclusion"); + acts.push((key.clone(), Act::Include)); + } + Ok(true) => acts.push((key.clone(), Act::Include)), + Ok(false) => {} + }, + DutyType::Aggregator => match check_aggregation_inclusion(sub, block) { + Err(e) => { + tracing::warn!(error = %e, "Failed to check aggregate inclusion"); + acts.push((key.clone(), Act::Include)); + } + Ok(true) => acts.push((key.clone(), Act::Include)), + Ok(false) => {} + }, + DutyType::Proposer => { + if sub.duty.slot.inner() != block.slot { + continue; + } + match (&*sub.data as &dyn Any).downcast_ref::() { + Some(proposal) => acts.push(( + key.clone(), + Act::ProposerInclude { + blinded: proposal.0.blinded, + }, + )), + None => { + tracing::error!(duty = %sub.duty, "Submission data has wrong type"); + } + } + } + _ => unreachable!("bug: unexpected type"), + } + } + + for (key, act) in acts { + let sub = self.submissions.remove(&key).expect("present"); + match act { + Act::Include => { + (self.att_included_fn)(&sub, block); + (self.tracker_incl_fn)(&sub.duty, sub.pubkey, None); + } + Act::ProposerInclude { blinded } => { + log_block_included(&sub, block.slot, blinded); + (self.tracker_incl_fn)(&sub.duty, sub.pubkey, None); + } + } + } + + if let Some(old) = block.slot.checked_sub(INCL_MISSED_LAG) { + self.beacon_committees.remove(&old); + } + } +} + +/// Returns the aggregation bits of a block attestation, or an error if the +/// payload is missing. +fn block_att_agg_bits(att: &versioned::VersionedAttestation) -> Result, InclusionError> { + att.attestation + .as_ref() + .map(versioned::AttestationPayload::aggregation_bits) + .ok_or(InclusionError::MissingAttestation) +} + +/// Returns the single committee index referenced by an Electra/Fulu attestation +/// payload, or an error if it does not reference exactly one committee. +fn electra_committee_index(payload: &versioned::AttestationPayload) -> Result { + match payload { + versioned::AttestationPayload::Electra(att) | versioned::AttestationPayload::Fulu(att) => { + match att.committee_bits.bit_indices()[..] { + [idx] => Ok(idx as u64), + _ => Err(InclusionError::InvalidCommitteeBits), + } + } + _ => Err(InclusionError::InvalidCommitteeBits), + } +} + +/// Checks whether the submitted attestation is included in the block. +fn check_attestation_inclusion(sub: &Submission, block: &Block) -> Result { + let any = &*sub.data as &dyn Any; + let sub_att = any + .downcast_ref::() + .ok_or(InclusionError::NotAnAttestation)?; + + let Some(att) = block.attestations_by_data_root.get(&sub.att_data_root) else { + return Ok(false); + }; + + let payload = sub_att + .0 + .attestation + .as_ref() + .ok_or(InclusionError::MissingAttestation)?; + + match sub_att.0.version { + versioned::DataVersion::Phase0 + | versioned::DataVersion::Altair + | versioned::DataVersion::Bellatrix + | versioned::DataVersion::Capella + | versioned::DataVersion::Deneb => { + let sub_bits = AggBits::from_ssz_bytes(payload.aggregation_bits()); + let att_bits = AggBits::from_ssz_bytes(block_att_agg_bits(att)?); + Ok(att_bits.contains(&sub_bits)?) + } + versioned::DataVersion::Electra | versioned::DataVersion::Fulu => { + let validator_index = sub_att + .0 + .validator_index + .ok_or(InclusionError::NoValidatorIndex)?; + let duty = block + .att_duties + .iter() + .find(|d| d.validator_index == validator_index) + .ok_or(InclusionError::NoAttesterDuty)?; + + let att_bits = AggBits::from_ssz_bytes(block_att_agg_bits(att)?); + let committee_index = electra_committee_index(payload)?; + + // Sum the validator counts of all committees preceding the + // attestation's committee to offset into the full aggregation bits. + let preceding = usize::try_from(committee_index).unwrap_or(usize::MAX); + let previous_validators: usize = block + .beacon_committees + .iter() + .take(preceding) + .map(|c| c.validators.len()) + .sum(); + + let offset = usize::try_from(duty.validator_committee_index).unwrap_or(usize::MAX); + Ok(att_bits.bit_at(previous_validators.saturating_add(offset))) + } + versioned::DataVersion::Unknown => Err(InclusionError::NotAnAttestation), + } +} + +/// Checks whether the submitted aggregate is included in the block. +fn check_aggregation_inclusion(sub: &Submission, block: &Block) -> Result { + let Some(att) = block.attestations_by_data_root.get(&sub.att_data_root) else { + return Ok(false); + }; + let att_bits = AggBits::from_ssz_bytes(block_att_agg_bits(att)?); + + let any = &*sub.data as &dyn Any; + let agg = any + .downcast_ref::() + .ok_or(InclusionError::ParseVersionedAggregate)?; + let sub_bits = AggBits::from_ssz_bytes( + agg.aggregation_bits() + .ok_or(InclusionError::ParseVersionedAggregate)?, + ); + + Ok(att_bits.contains(&sub_bits)?) +} + +/// Reports a duty that was broadcast but never included on-chain. +fn report_missed(sub: &Submission) { + TRACKER_METRICS.inclusion_missed_total[&sub.duty.duty_type.to_string()].inc(); + + match sub.duty.duty_type { + DutyType::Attester | DutyType::Aggregator => { + let msg = if sub.duty.duty_type == DutyType::Aggregator { + "Broadcasted attestation aggregate never included on-chain" + } else { + "Broadcasted attestation never included on-chain" + }; + tracing::warn!( + pubkey = %sub.pubkey, + attestation_slot = sub.duty.slot.inner(), + broadcast_delay = ?sub.delay, + "{msg}", + ); + } + DutyType::Proposer => { + match (&*sub.data as &dyn Any).downcast_ref::() { + Some(proposal) => { + let msg = if proposal.0.blinded { + "Broadcasted blinded block never included on-chain" + } else { + "Broadcasted block never included on-chain" + }; + tracing::warn!( + pubkey = %sub.pubkey, + block_slot = sub.duty.slot.inner(), + broadcast_delay = ?sub.delay, + "{msg}", + ); + } + None => tracing::error!(duty = %sub.duty, "Submission data has wrong type"), + } + } + _ => unreachable!("bug: unexpected type"), + } +} + +/// Reports an attestation/aggregate observed on-chain, recording inclusion +/// delay. +fn report_att_inclusion(sub: &Submission, block: &Block) { + let Some(att) = block.attestations_by_data_root.get(&sub.att_data_root) else { + return; + }; + let Some(payload) = att.attestation.as_ref() else { + return; + }; + + let att_slot = payload.data().slot; + let block_slot = block.slot; + let inclusion_delay = block_slot.saturating_sub(att_slot); + // Inclusion delay is a small slot count; widen losslessly via u32. + let inclusion_delay_f64 = f64::from(u32::try_from(inclusion_delay).unwrap_or(u32::MAX)); + + let msg = if sub.duty.duty_type == DutyType::Aggregator { + "Broadcasted attestation aggregate included on-chain" + } else { + "Broadcasted attestation included on-chain" + }; + tracing::info!( + block_slot, + attestation_slot = att_slot, + pubkey = %sub.pubkey, + inclusion_delay, + broadcast_delay = ?sub.delay, + "{msg}", + ); + + TRACKER_METRICS.inclusion_delay.set(inclusion_delay_f64); +} + +#[cfg(test)] +impl InclusionCore { + /// Inserts a submission directly, bypassing [`InclusionCore::submitted`]'s + /// `incl_supported` feature gate. Used by tests that exercise the inclusion + /// checks for attester/aggregator duties without toggling the (cached, + /// process-global) `AttestationInclusion` feature. + fn insert_submission(&mut self, sub: Submission) { + self.submissions.insert( + SubKey { + duty: sub.duty.clone(), + pubkey: sub.pubkey, + }, + sub, + ); + } +} + +/// Logs that a proposer's block was included on-chain. +fn log_block_included(sub: &Submission, block_slot: u64, blinded: bool) { + let msg = if blinded { + "Broadcasted blinded block included on-chain" + } else { + "Broadcasted block included on-chain" + }; + tracing::info!( + block_slot, + pubkey = %sub.pubkey, + broadcast_delay = ?sub.delay, + "{msg}", + ); +} + +#[cfg(test)] +mod tests { + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + time::Duration, + }; + + use pluto_eth2api::spec::phase0; + use pluto_ssz::BitList; + use pluto_testutil::random::random_deneb_versioned_attestation; + use tree_hash::TreeHash; + + use super::*; + use crate::types::SlotNumber; + + /// Shared recorder of duties passed to a callback. + type Rec = Arc>>; + + fn pubkey() -> PubKey { + PubKey::from([0u8; 48]) + } + + fn checkpoint() -> phase0::Checkpoint { + phase0::Checkpoint { + epoch: 0, + root: [0u8; 32], + } + } + + fn att_data(slot: u64) -> phase0::AttestationData { + phase0::AttestationData { + slot, + index: 0, + beacon_block_root: [1u8; 32], + source: checkpoint(), + target: checkpoint(), + } + } + + fn phase0_attestation(slot: u64) -> phase0::Attestation { + phase0::Attestation { + aggregation_bits: BitList::default(), + data: att_data(slot), + signature: [0u8; 96], + } + } + + fn phase0_aggregate(slot: u64) -> phase0::SignedAggregateAndProof { + phase0::SignedAggregateAndProof { + message: phase0::AggregateAndProof { + aggregator_index: 0, + aggregate: phase0_attestation(slot), + selection_proof: [0u8; 96], + }, + signature: [0u8; 96], + } + } + + /// Deserialises a real (Fulu, non-blinded) signed proposal from the shared + /// JSON golden. The fork version is irrelevant to the checks under test. + fn proposal() -> VersionedSignedProposal { + serde_json::from_str(include_str!( + "../../testdata/signeddata/TestJSONSerialisation_VersionedSignedProposal.json.golden" + )) + .expect("golden proposal deserialises") + } + + fn submission(duty: Duty, data: Box, att_data_root: [u8; 32]) -> Submission { + Submission { + duty, + pubkey: pubkey(), + data, + att_data_root, + delay: Duration::ZERO, + } + } + + fn sorted_slots(rec: &Rec) -> Vec { + let mut slots: Vec = rec.lock().unwrap().iter().map(|d| d.slot.inner()).collect(); + slots.sort_unstable(); + slots + } + + /// Mirrors Go's `TestInclusion`: non-versioned attester/aggregator data + /// fails the versioned downcast inside the inclusion checks, which (per the + /// Go semantics) logs and still reports the duty as included. Proposer + /// duties are reported via the tracker callback but not the att-included + /// callback. + #[test] + fn inclusion() { + let included: Rec = Rec::default(); + let missed: Rec = Rec::default(); + let resolved: Rec = Rec::default(); + + let (inc, mis, res) = (included.clone(), missed.clone(), resolved.clone()); + let mut core = InclusionCore::with_handlers( + Box::new(move |duty: &Duty, _pk, _err| res.lock().unwrap().push(duty.clone())), + Box::new(move |sub: &Submission| mis.lock().unwrap().push(sub.duty.clone())), + Box::new(move |sub: &Submission, _b: &Block| { + inc.lock().unwrap().push(sub.duty.clone()) + }), + ); + + let att1 = Attestation::new(phase0_attestation(1)); + let agg2 = SignedAggregateAndProof::new(phase0_aggregate(2)); + let att3 = Attestation::new(phase0_attestation(3)); + let block4 = proposal(); + + let att1_root = att1.0.data.tree_hash_root().0; + let agg2_root = agg2.0.message.aggregate.data.tree_hash_root().0; + let att3_root = att3.0.data.tree_hash_root().0; + + core.insert_submission(submission( + Duty::new_attester_duty(SlotNumber::new(1)), + Box::new(att1), + att1_root, + )); + core.insert_submission(submission( + Duty::new_aggregator_duty(SlotNumber::new(2)), + Box::new(agg2), + agg2_root, + )); + core.insert_submission(submission( + Duty::new_attester_duty(SlotNumber::new(3)), + Box::new(att3), + att3_root, + )); + core.insert_submission(submission( + Duty::new_proposer_duty(SlotNumber::new(100)), + Box::new(block4), + [0u8; 32], + )); + + // The aggregator lookup must find a block attestation at its data root + // before failing the versioned downcast, so seed one. + let block = Block { + slot: 100, + att_duties: vec![], + attestations_by_data_root: HashMap::from([( + agg2_root, + random_deneb_versioned_attestation(), + )]), + beacon_committees: vec![], + }; + + core.check_block_and_atts(&block); + + // Attester (1, 3) and aggregator (2) report via att-included. + assert_eq!(sorted_slots(&included), vec![1, 2, 3]); + assert!(missed.lock().unwrap().is_empty()); + // All four duties resolve via the tracker callback (incl. proposer 100). + assert_eq!(sorted_slots(&resolved), vec![1, 2, 3, 100]); + } + + /// Mirrors Go's `TestBlockInclusion`: a proposer duty is reported missed + /// only when its slot's block is checked and not found. + #[test] + fn block_inclusion() { + let scenario = |check_offset: u64, found: bool| -> usize { + let missed: Rec = Rec::default(); + let mis = missed.clone(); + let mut core = InclusionCore::with_handlers( + Box::new(|_d: &Duty, _pk, _err| {}), + Box::new(move |sub: &Submission| mis.lock().unwrap().push(sub.duty.clone())), + Box::new(|_s: &Submission, _b: &Block| {}), + ); + + // The duty slot is independent of the proposal's internal slot; + // `check_block` matches against the duty slot. + let slot = 42u64; + core.submitted( + Duty::new_proposer_duty(SlotNumber::new(slot)), + pubkey(), + Box::new(proposal()), + Duration::ZERO, + ) + .expect("submit proposal"); + + core.check_block(slot.wrapping_add(check_offset), found); + missed.lock().unwrap().len() + }; + + assert_eq!(scenario(0, true), 0, "block found at slot -> not missed"); + assert_eq!(scenario(0, false), 1, "block not found at slot -> missed"); + assert_eq!(scenario(1, true), 0, "slot mismatch -> skipped"); + assert_eq!(scenario(1, false), 0, "slot mismatch, not found -> skipped"); + } + + /// `trim` reports each removed submission as missed and never included, + /// only for slots at or below the trim slot. + #[test] + fn trim() { + let missed: Rec = Rec::default(); + let resolved: Arc>>> = Arc::default(); + + let (mis, res) = (missed.clone(), resolved.clone()); + let mut core = InclusionCore::with_handlers( + Box::new(move |_d: &Duty, _pk, err: Option| res.lock().unwrap().push(err)), + Box::new(move |sub: &Submission| mis.lock().unwrap().push(sub.duty.clone())), + Box::new(|_s: &Submission, _b: &Block| {}), + ); + + core.submitted( + Duty::new_proposer_duty(SlotNumber::new(5)), + pubkey(), + Box::new(proposal()), + Duration::ZERO, + ) + .expect("submit proposal"); + + // Below the duty slot: nothing trimmed. + core.trim(4); + assert!(missed.lock().unwrap().is_empty()); + + // At the duty slot: trimmed, reported missed and not-included. + core.trim(5); + assert_eq!(sorted_slots(&missed), vec![5]); + let res = resolved.lock().unwrap(); + assert_eq!(res.len(), 1); + assert_eq!( + res[0].as_ref().map(|e| e.to_string()), + Some("duty not included on-chain".to_string()), + ); + } +} diff --git a/crates/core/src/tracker/mod.rs b/crates/core/src/tracker/mod.rs index 755385e0..dbdfc9e2 100644 --- a/crates/core/src/tracker/mod.rs +++ b/crates/core/src/tracker/mod.rs @@ -27,6 +27,9 @@ pub mod metrics; /// Reporters that consume analysis results and emit metrics/logs. pub mod reporters; +/// On-chain inclusion checking for broadcast duties. +pub mod inclusion; + use std::{collections::HashMap, future::Future, sync::Arc}; use tokio::sync::mpsc; From 55df124a04570303ac42a4d2fd627c6dde88569f Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Fri, 5 Jun 2026 13:32:32 +0200 Subject: [PATCH 4/6] Minor issue, uneeded map access --- crates/core/src/tracker/inclusion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/tracker/inclusion.rs b/crates/core/src/tracker/inclusion.rs index c6c2c540..f72f1f5e 100644 --- a/crates/core/src/tracker/inclusion.rs +++ b/crates/core/src/tracker/inclusion.rs @@ -304,7 +304,7 @@ impl InclusionCore { Some(proposal) => proposal.0.blinded, None => { tracing::error!( - duty = %self.submissions[&key].duty, + duty = %sub.duty, "Submission data has wrong type", ); continue; From b4436da6cb60e12ca160ad19e74f26ba11b77bbb Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Fri, 5 Jun 2026 15:49:45 +0200 Subject: [PATCH 5/6] additional tests requested by claude code --- crates/core/src/tracker/inclusion.rs | 137 +++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/crates/core/src/tracker/inclusion.rs b/crates/core/src/tracker/inclusion.rs index f72f1f5e..7af252d8 100644 --- a/crates/core/src/tracker/inclusion.rs +++ b/crates/core/src/tracker/inclusion.rs @@ -823,4 +823,141 @@ mod tests { Some("duty not included on-chain".to_string()), ); } + + fn versioned_att_phase0( + slot: u64, + agg_bits: pluto_ssz::BitList<2048>, + ) -> versioned::VersionedAttestation { + versioned::VersionedAttestation { + version: versioned::DataVersion::Deneb, + validator_index: None, + attestation: Some(versioned::AttestationPayload::Deneb(phase0::Attestation { + aggregation_bits: agg_bits, + data: att_data(slot), + signature: [0u8; 96], + })), + } + } + + fn run_phase0_inclusion_check( + block_set_bits: &[usize], + sub_set_bits: &[usize], + ) -> Result { + let slot = 5u64; + let data_root = att_data(slot).tree_hash_root().0; + let block_att = versioned_att_phase0( + slot, + pluto_ssz::BitList::<2048>::with_bits(4, block_set_bits), + ); + let sub_att = versioned::VersionedAttestation { + version: versioned::DataVersion::Deneb, + validator_index: None, + attestation: Some(versioned::AttestationPayload::Deneb(phase0::Attestation { + aggregation_bits: pluto_ssz::BitList::<2048>::with_bits(4, sub_set_bits), + data: att_data(slot), + signature: [0u8; 96], + })), + }; + let sub = submission( + Duty::new_attester_duty(SlotNumber::new(slot)), + Box::new(VersionedAttestation::new(sub_att).unwrap()), + data_root, + ); + let block = Block { + slot, + att_duties: vec![], + attestations_by_data_root: HashMap::from([(data_root, block_att)]), + beacon_committees: vec![], + }; + check_attestation_inclusion(&sub, &block) + } + + /// Block has bits 0 and 1 set; submission has only bit 0 → included. + #[test] + fn check_attestation_inclusion_phase0_contains() { + assert_eq!(run_phase0_inclusion_check(&[0, 1], &[0]).unwrap(), true); + } + + /// Block has bit 1; submission has bit 0 → not included. + #[test] + fn check_attestation_inclusion_phase0_not_contained() { + assert_eq!(run_phase0_inclusion_check(&[1], &[0]).unwrap(), false); + } + + // committee 0 has 3 validators, committee 1 has 4; validator sits at + // committee_index=1, validator_committee_index=2 → global bit 5. + fn run_electra_inclusion_check(block_set_bits: &[usize]) -> Result { + use pluto_eth2api::spec::electra; + use pluto_ssz::BitVector; + + let slot = 10u64; + let validator_index: u64 = 99; + let committee_index: usize = 1; + let committee0_size: usize = 3; + let validator_committee_index: u64 = 2; + let total_validators = committee0_size + 4; + + let data_root = att_data(slot).tree_hash_root().0; + let block_att = versioned::VersionedAttestation { + version: versioned::DataVersion::Electra, + validator_index: None, + attestation: Some(versioned::AttestationPayload::Electra( + electra::Attestation { + aggregation_bits: BitList::with_bits(total_validators, block_set_bits), + data: att_data(slot), + signature: [0u8; 96], + committee_bits: BitVector::with_bits(&[committee_index]), + }, + )), + }; + let sub_att = versioned::VersionedAttestation { + version: versioned::DataVersion::Electra, + validator_index: Some(validator_index), + attestation: Some(versioned::AttestationPayload::Electra( + electra::Attestation { + aggregation_bits: BitList::default(), + data: att_data(slot), + signature: [0u8; 96], + committee_bits: BitVector::with_bits(&[committee_index]), + }, + )), + }; + let sub = submission( + Duty::new_attester_duty(SlotNumber::new(slot)), + Box::new(VersionedAttestation::new(sub_att).unwrap()), + data_root, + ); + let block = Block { + slot, + att_duties: vec![AttesterDuty { + validator_index, + validator_committee_index, + }], + attestations_by_data_root: HashMap::from([(data_root, block_att)]), + beacon_committees: vec![ + BeaconCommittee { + index: 0, + validators: vec![0u64; committee0_size], + }, + BeaconCommittee { + index: 1, + validators: vec![0u64; 4], + }, + ], + }; + check_attestation_inclusion(&sub, &block) + } + + /// Validator in committee 1 (preceded by 3 validators in committee 0), + /// validator_committee_index 2 → global bit 5 is set → included. + #[test] + fn check_attestation_inclusion_electra_offset() { + assert_eq!(run_electra_inclusion_check(&[5]).unwrap(), true); + } + + /// Global bit 5 is not set in the block (bit 4 is) → not included. + #[test] + fn check_attestation_inclusion_electra_offset_not_included() { + assert_eq!(run_electra_inclusion_check(&[4]).unwrap(), false); + } } From 5aa2bfa86771dfa9f5e857884f2bd05ed8140217 Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Fri, 5 Jun 2026 15:51:56 +0200 Subject: [PATCH 6/6] clippy --- crates/core/src/tracker/inclusion.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/core/src/tracker/inclusion.rs b/crates/core/src/tracker/inclusion.rs index 7af252d8..b75b5a22 100644 --- a/crates/core/src/tracker/inclusion.rs +++ b/crates/core/src/tracker/inclusion.rs @@ -875,13 +875,13 @@ mod tests { /// Block has bits 0 and 1 set; submission has only bit 0 → included. #[test] fn check_attestation_inclusion_phase0_contains() { - assert_eq!(run_phase0_inclusion_check(&[0, 1], &[0]).unwrap(), true); + assert!(run_phase0_inclusion_check(&[0, 1], &[0]).unwrap()); } /// Block has bit 1; submission has bit 0 → not included. #[test] fn check_attestation_inclusion_phase0_not_contained() { - assert_eq!(run_phase0_inclusion_check(&[1], &[0]).unwrap(), false); + assert!(!run_phase0_inclusion_check(&[1], &[0]).unwrap()); } // committee 0 has 3 validators, committee 1 has 4; validator sits at @@ -952,12 +952,12 @@ mod tests { /// validator_committee_index 2 → global bit 5 is set → included. #[test] fn check_attestation_inclusion_electra_offset() { - assert_eq!(run_electra_inclusion_check(&[5]).unwrap(), true); + assert!(run_electra_inclusion_check(&[5]).unwrap()); } /// Global bit 5 is not set in the block (bit 4 is) → not included. #[test] fn check_attestation_inclusion_electra_offset_not_included() { - assert_eq!(run_electra_inclusion_check(&[4]).unwrap(), false); + assert!(!run_electra_inclusion_check(&[4]).unwrap()); } }