Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,20 @@ impl BlockChainServer {
.flatten();

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
store::on_tick(
let new_aggregates = store::on_tick(
&mut self.store,
timestamp_ms,
proposer_validator_id.is_some(),
self.is_aggregator,
);

for aggregate in new_aggregates {
let _ = self
.p2p_tx
.send(P2PMessage::PublishAggregatedAttestation(aggregate))
.inspect_err(|err| error!(%err, "Failed to publish aggregated attestation"));
}

// Now build and publish the block (after attestations have been accepted)
if let Some(validator_id) = proposer_validator_id {
self.propose_block(slot, validator_id);
Expand Down
66 changes: 58 additions & 8 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,16 @@ fn update_safe_target(store: &mut Store) {
let min_target_score = (num_validators * 2).div_ceil(3);

let blocks = store.get_live_chain();
// Merge both attestation pools. At interval 3 the migration (interval 4) hasn't
// run yet, so attestations that entered "known" directly (proposer's own attestation
// in block body, node's self-attestation) would be invisible without this merge.
let mut all_payloads: HashMap<SignatureKey, Vec<StoredAggregatedPayload>> =
store.iter_known_aggregated_payloads().collect();
for (key, new_proofs) in store.iter_new_aggregated_payloads() {
all_payloads.entry(key).or_default().extend(new_proofs);
}
let attestations =
extract_attestations_from_aggregated_payloads(store, store.iter_new_aggregated_payloads());
extract_attestations_from_aggregated_payloads(store, all_payloads.into_iter());
let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
&blocks,
Expand Down Expand Up @@ -125,12 +133,14 @@ fn extract_attestations_from_aggregated_payloads(
///
/// Collects individual gossip signatures, aggregates them by attestation data,
/// and stores the resulting proofs in `LatestNewAggregatedPayloads`.
fn aggregate_committee_signatures(store: &mut Store) {
fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAttestation> {
let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect();
if gossip_sigs.is_empty() {
return;
return Vec::new();
}

let mut new_aggregates: Vec<SignedAggregatedAttestation> = Vec::new();

let head_state = store.head_state();
let validators = &head_state.validators;

Expand Down Expand Up @@ -183,6 +193,12 @@ fn aggregate_committee_signatures(store: &mut Store) {

let participants = aggregation_bits_from_validator_indices(&ids);
let proof = AggregatedSignatureProof::new(participants, proof_data);

new_aggregates.push(SignedAggregatedAttestation {
data: data.clone(),
proof: proof.clone(),
});

let payload = StoredAggregatedPayload { slot, proof };

// Store in new aggregated payloads for each covered validator
Expand All @@ -199,14 +215,18 @@ fn aggregate_committee_signatures(store: &mut Store) {

// Delete aggregated entries from gossip_signatures
store.delete_gossip_signatures(&keys_to_delete);

new_aggregates
}

/// Validate incoming attestation before processing.
///
/// Ensures the vote respects the basic laws of time and topology:
/// 1. The blocks voted for must exist in our store.
/// 2. A vote cannot span backwards in time (source > target).
/// 3. A vote cannot be for a future slot.
/// 3. The head must be at least as recent as source and target.
/// 4. Checkpoint slots must match the actual block slots.
/// 5. A vote cannot be for a future slot.
fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<(), StoreError> {
let _timing = metrics::time_attestation_validation();

Expand All @@ -218,14 +238,20 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()
.get_block_header(&data.target.root)
.ok_or(StoreError::UnknownTargetBlock(data.target.root))?;

let _ = store
let head_header = store
.get_block_header(&data.head.root)
.ok_or(StoreError::UnknownHeadBlock(data.head.root))?;

// Topology Check - Source must be older than Target.
// Topology Check - Source must be older than Target, and Head must be at least as recent.
if data.source.slot > data.target.slot {
return Err(StoreError::SourceExceedsTarget);
}
if data.head.slot < data.target.slot {
return Err(StoreError::HeadOlderThanTarget {
head_slot: data.head.slot,
target_slot: data.target.slot,
});
}

// Consistency Check - Validate checkpoint slots match block slots.
if source_header.slot != data.source.slot {
Expand All @@ -240,6 +266,12 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()
block_slot: target_header.slot,
});
}
if head_header.slot != data.head.slot {
return Err(StoreError::HeadSlotMismatch {
checkpoint_slot: data.head.slot,
block_slot: head_header.slot,
});
}

// Time Check - Validate attestation is not too far in the future.
// We allow a small margin for clock disparity (1 slot), but no further.
Expand All @@ -260,7 +292,14 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()
/// 800ms interval. Slot and interval-within-slot are derived as:
/// slot = store.time() / INTERVALS_PER_SLOT
/// interval = store.time() % INTERVALS_PER_SLOT
pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool, is_aggregator: bool) {
pub fn on_tick(
store: &mut Store,
timestamp_ms: u64,
has_proposal: bool,
is_aggregator: bool,
) -> Vec<SignedAggregatedAttestation> {
let mut new_aggregates: Vec<SignedAggregatedAttestation> = Vec::new();

// Convert UNIX timestamp (ms) to interval count since genesis
let genesis_time_ms = store.config().genesis_time * 1000;
let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms);
Expand Down Expand Up @@ -298,7 +337,7 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool, is_aggr
2 => {
// Aggregation interval
if is_aggregator {
aggregate_committee_signatures(store);
new_aggregates.extend(aggregate_committee_signatures(store));
}
}
3 => {
Expand All @@ -312,6 +351,8 @@ pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool, is_aggr
_ => unreachable!("slots only have 5 intervals"),
}
}

new_aggregates
}

/// Process a gossiped attestation.
Expand Down Expand Up @@ -798,6 +839,9 @@ pub enum StoreError {
#[error("Source checkpoint slot exceeds target")]
SourceExceedsTarget,

#[error("Head checkpoint slot {head_slot} is older than target slot {target_slot}")]
HeadOlderThanTarget { head_slot: u64, target_slot: u64 },

#[error("Source checkpoint slot {checkpoint_slot} does not match block slot {block_slot}")]
SourceSlotMismatch {
checkpoint_slot: u64,
Expand All @@ -810,6 +854,12 @@ pub enum StoreError {
block_slot: u64,
},

#[error("Head checkpoint slot {checkpoint_slot} does not match block slot {block_slot}")]
HeadSlotMismatch {
checkpoint_slot: u64,
block_slot: u64,
},

#[error(
"Attestation slot {attestation_slot} is too far in future (current slot: {current_slot})"
)]
Expand Down
Loading