diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 2c7540b..bcecb57 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -155,13 +155,20 @@ impl BlockChainServer { .flatten(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - store::on_tick( + let new_aggregates = store::on_tick( &mut self.store, timestamp_ms, proposer_validator_id.is_some(), self.is_aggregator, ); + for aggregate in new_aggregates { + let _ = self + .p2p_tx + .send(P2PMessage::PublishAggregatedAttestation(aggregate)) + .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); + } + // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { self.propose_block(slot, validator_id); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 13ad8c4..abe2b08 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -82,8 +82,16 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); let blocks = store.get_live_chain(); + // Merge both attestation pools. At interval 3 the migration (interval 4) hasn't + // run yet, so attestations that entered "known" directly (proposer's own attestation + // in block body, node's self-attestation) would be invisible without this merge. + let mut all_payloads: HashMap> = + store.iter_known_aggregated_payloads().collect(); + for (key, new_proofs) in store.iter_new_aggregated_payloads() { + all_payloads.entry(key).or_default().extend(new_proofs); + } let attestations = - extract_attestations_from_aggregated_payloads(store, store.iter_new_aggregated_payloads()); + extract_attestations_from_aggregated_payloads(store, all_payloads.into_iter()); let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -125,12 +133,14 @@ fn extract_attestations_from_aggregated_payloads( /// /// Collects individual gossip signatures, aggregates them by attestation data, /// and stores the resulting proofs in `LatestNewAggregatedPayloads`. -fn aggregate_committee_signatures(store: &mut Store) { +fn aggregate_committee_signatures(store: &mut Store) -> Vec { let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); if gossip_sigs.is_empty() { - return; + return Vec::new(); } + let mut new_aggregates: Vec = Vec::new(); + let head_state = store.head_state(); let validators = &head_state.validators; @@ -183,6 +193,12 @@ fn aggregate_committee_signatures(store: &mut Store) { let participants = aggregation_bits_from_validator_indices(&ids); let proof = AggregatedSignatureProof::new(participants, proof_data); + + new_aggregates.push(SignedAggregatedAttestation { + data: data.clone(), + proof: proof.clone(), + }); + let payload = StoredAggregatedPayload { slot, proof }; // Store in new aggregated payloads for each covered validator @@ -199,6 +215,8 @@ fn aggregate_committee_signatures(store: &mut Store) { // Delete aggregated entries from gossip_signatures store.delete_gossip_signatures(&keys_to_delete); + + new_aggregates } /// Validate incoming attestation before processing. @@ -206,7 +224,9 @@ fn aggregate_committee_signatures(store: &mut Store) { /// Ensures the vote respects the basic laws of time and topology: /// 1. The blocks voted for must exist in our store. /// 2. A vote cannot span backwards in time (source > target). -/// 3. A vote cannot be for a future slot. +/// 3. The head must be at least as recent as source and target. +/// 4. Checkpoint slots must match the actual block slots. +/// 5. A vote cannot be for a future slot. fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<(), StoreError> { let _timing = metrics::time_attestation_validation(); @@ -218,14 +238,20 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() .get_block_header(&data.target.root) .ok_or(StoreError::UnknownTargetBlock(data.target.root))?; - let _ = store + let head_header = store .get_block_header(&data.head.root) .ok_or(StoreError::UnknownHeadBlock(data.head.root))?; - // Topology Check - Source must be older than Target. + // Topology Check - Source must be older than Target, and Head must be at least as recent. if data.source.slot > data.target.slot { return Err(StoreError::SourceExceedsTarget); } + if data.head.slot < data.target.slot { + return Err(StoreError::HeadOlderThanTarget { + head_slot: data.head.slot, + target_slot: data.target.slot, + }); + } // Consistency Check - Validate checkpoint slots match block slots. if source_header.slot != data.source.slot { @@ -240,6 +266,12 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() block_slot: target_header.slot, }); } + if head_header.slot != data.head.slot { + return Err(StoreError::HeadSlotMismatch { + checkpoint_slot: data.head.slot, + block_slot: head_header.slot, + }); + } // Time Check - Validate attestation is not too far in the future. // We allow a small margin for clock disparity (1 slot), but no further. @@ -260,7 +292,14 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// 800ms interval. Slot and interval-within-slot are derived as: /// slot = store.time() / INTERVALS_PER_SLOT /// interval = store.time() % INTERVALS_PER_SLOT -pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool, is_aggregator: bool) { +pub fn on_tick( + store: &mut Store, + timestamp_ms: u64, + has_proposal: bool, + is_aggregator: bool, +) -> Vec { + let mut new_aggregates: Vec = Vec::new(); + // Convert UNIX timestamp (ms) to interval count since genesis let genesis_time_ms = store.config().genesis_time * 1000; let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); @@ -298,7 +337,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool, is_aggr 2 => { // Aggregation interval if is_aggregator { - aggregate_committee_signatures(store); + new_aggregates.extend(aggregate_committee_signatures(store)); } } 3 => { @@ -312,6 +351,8 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool, is_aggr _ => unreachable!("slots only have 5 intervals"), } } + + new_aggregates } /// Process a gossiped attestation. @@ -798,6 +839,9 @@ pub enum StoreError { #[error("Source checkpoint slot exceeds target")] SourceExceedsTarget, + #[error("Head checkpoint slot {head_slot} is older than target slot {target_slot}")] + HeadOlderThanTarget { head_slot: u64, target_slot: u64 }, + #[error("Source checkpoint slot {checkpoint_slot} does not match block slot {block_slot}")] SourceSlotMismatch { checkpoint_slot: u64, @@ -810,6 +854,12 @@ pub enum StoreError { block_slot: u64, }, + #[error("Head checkpoint slot {checkpoint_slot} does not match block slot {block_slot}")] + HeadSlotMismatch { + checkpoint_slot: u64, + block_slot: u64, + }, + #[error( "Attestation slot {attestation_slot} is too far in future (current slot: {current_slot})" )]