diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index dabc3422f7..36dcd59678 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -70,6 +70,9 @@ const ( // ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation. // In case they differ, Charon does not sign the attestation. ChainSplitHalt = "chain_split_halt" + + // PrepareProposer enables scheduling and processing of prepare proposer duties. + PrepareProposer = "prepare_proposer" ) var ( @@ -88,6 +91,7 @@ var ( QUIC: statusAlpha, FetchOnlyCommIdx0: statusAlpha, ChainSplitHalt: statusAlpha, + PrepareProposer: statusAlpha, // Add all features and their status here. } diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index a2e74c9ddd..5f5444817e 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -12,6 +12,7 @@ import ( k1 "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "go.opentelemetry.io/otel/attribute" @@ -40,15 +41,15 @@ var supportedCompareDuties = []core.DutyType{core.DutyAttester} // newDefinition returns a qbft definition (this is constant across all consensus instances). func newDefinition(nodes int, subs func() []subscriber, roundTimer timer.RoundTimer, - decideCallback func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]), compareAttestations bool, + decideCallback func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message], value proto.Message), + isLeader func(duty core.Duty, round, process int64) bool, + compareAttestations bool, ) qbft.Definition[core.Duty, [32]byte, proto.Message] { quorum := qbft.Definition[core.Duty, [32]byte, proto.Message]{Nodes: nodes}.Quorum() return qbft.Definition[core.Duty, [32]byte, proto.Message]{ // IsLeader is a deterministic leader election function. - IsLeader: func(duty core.Duty, round, process int64) bool { - return leader(duty, round, nodes) == process - }, + IsLeader: isLeader, // Decide sends consensus output to subscribers. Decide: func(ctx context.Context, duty core.Duty, _ [32]byte, qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]) { @@ -70,7 +71,7 @@ func newDefinition(nodes int, subs func() []subscriber, roundTimer timer.RoundTi return } - decideCallback(qcommit) + decideCallback(qcommit, value) for _, sub := range subs() { if err := sub(ctx, duty, value); err != nil { @@ -279,6 +280,7 @@ func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKe compareAttestations: compareAttestations, } c.mutable.instances = make(map[core.Duty]*instance.IO[Msg]) + c.prepareParticipation.data = make(map[uint64][]int64) return c, nil } @@ -307,6 +309,15 @@ type Consensus struct { instances map[core.Duty]*instance.IO[Msg] } + + // prepareParticipation stores peer indices that participated in DutyPrepareProposer consensus. + // This is used to adjust leader election for DutyProposer to skip offline/malicious nodes. + // Key is the slot of DutyPrepareProposer (which is slot-1 of the corresponding DutyProposer). + prepareParticipation struct { + sync.RWMutex + + data map[uint64][]int64 + } } // ProtocolID returns the protocol ID. @@ -374,6 +385,26 @@ func (c *Consensus) Start(ctx context.Context) { // waits until it completes, in both cases it returns the resulting error. // Note this errors if called multiple times for the same duty. func (c *Consensus) Propose(ctx context.Context, duty core.Duty, data core.UnsignedDataSet) error { + // Inject visible peers for PrepareProposer duty. + if duty.Type == core.DutyPrepareProposer { + var visible []uint64 + + for i, p := range c.peers { + // Include self and connected peers. + if p.ID == c.p2pNode.ID() || c.p2pNode.Network().Connectedness(p.ID) == network.Connected { + visible = append(visible, uint64(i)) + } + } + + // Update the data set. + for k, v := range data { + if p, ok := v.(core.PrepareProposerData); ok { + p.VisiblePeers = visible + data[k] = p + } + } + } + // Hash the proposed data, since qbft only supports simple comparable values. value, err := core.UnsignedDataSetToProto(data) if err != nil { @@ -545,7 +576,7 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err span.End() }() - decideCallback := func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]) { + decideCallback := func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message], value proto.Message) { round := qcommit[0].Round() decided = true @@ -568,12 +599,23 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err span.SetAttributes(attribute.String("leader_name", leaderName)) span.AddEvent("qbft.Decided") + // Store participation for DutyPrepareProposer to be used in DutyProposer leader election. + if duty.Type == core.DutyPrepareProposer { + c.storeParticipation(duty.Slot, value) + } + // qbft.Run() is stopped by cancelling the context, or if an error occurred. cancel() } + // isLeader returns true if the given process is the leader for the given duty and round. + // For DutyProposer, it uses participation data from DutyPrepareProposer (slot-1) if available. + isLeader := func(d core.Duty, round, process int64) bool { + return c.leaderWithParticipation(d, round, nodes) == process + } + // Create a new qbft definition for this instance. - def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, c.compareAttestations) + def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, isLeader, c.compareAttestations) origLogRoundChange := def.LogRoundChange def.LogRoundChange = func(ctx context.Context, instance core.Duty, process, round, newRound int64, uponRule qbft.UponRule, msgs []qbft.Msg[core.Duty, [32]byte, proto.Message]) { if origLogRoundChange != nil { @@ -881,6 +923,85 @@ func fmtStepPeers(step roundStep) string { return strings.Join(resp, "") } +// storeParticipation stores peer indices that participated in DutyPrepareProposer consensus. +// It also cleans up entries older than 2 slots to prevent memory growth. +func (c *Consensus) storeParticipation(slot uint64, value proto.Message) { + // Convert proto message back to UnsignedDataSet + unsignedPB, ok := value.(*pbv1.UnsignedDataSet) + if !ok { + return + } + + unsignedSet, err := core.UnsignedDataSetFromProto(core.DutyPrepareProposer, unsignedPB) + if err != nil { + return + } + + var participants []int64 + + // Extract visible peers from the dataset + for _, v := range unsignedSet { + if p, ok := v.(core.PrepareProposerData); ok { + for _, peerIdx := range p.VisiblePeers { + participants = append(participants, int64(peerIdx)) + } + // All entries in the set should have the same VisiblePeers since they come from the same leader proposal + break + } + } + + // Sort for deterministic leader election across all nodes. + slices.Sort(participants) + + c.prepareParticipation.Lock() + defer c.prepareParticipation.Unlock() + + // Store participation for this slot. + c.prepareParticipation.data[slot] = participants + + // Clean up entries older than 2 slots. + for s := range c.prepareParticipation.data { + if slot > 1 && s < slot-1 { + delete(c.prepareParticipation.data, s) + } + } +} + +// getParticipants returns the list of peer indices that participated in DutyPrepareProposer +// for the given slot. Returns nil if no participation data is available. +func (c *Consensus) getParticipants(prepareSlot uint64) []int64 { + c.prepareParticipation.RLock() + defer c.prepareParticipation.RUnlock() + + return c.prepareParticipation.data[prepareSlot] +} + +// leaderWithParticipation returns the leader index for the given duty and round. +// For DutyProposer, it uses participation data from DutyPrepareProposer (slot-1) if available, +// which allows skipping offline/malicious nodes in leader election. +// For all other duties, it falls back to the standard leader election. +func (c *Consensus) leaderWithParticipation(duty core.Duty, round int64, nodes int) int64 { + if duty.Type != core.DutyProposer { + return leader(duty, round, nodes) + } + + if duty.Slot == 0 { + return leader(duty, round, nodes) + } + + participants := c.getParticipants(duty.Slot - 1) + + // If no participation data (e.g., DutyPrepareProposer didn't reach consensus), + // fall back to standard leader election. + if len(participants) == 0 { + return leader(duty, round, nodes) + } + + idx := (int64(duty.Slot) + int64(duty.Type) + round) % int64(len(participants)) + + return participants[idx] +} + // leader return the deterministic leader index. func leader(duty core.Duty, round int64, nodes int) int64 { return (int64(duty.Slot) + int64(duty.Type) + round) % int64(nodes) diff --git a/core/consensus/qbft/qbft_internal_test.go b/core/consensus/qbft/qbft_internal_test.go index 7ae342ec7d..6e7fd520fa 100644 --- a/core/consensus/qbft/qbft_internal_test.go +++ b/core/consensus/qbft/qbft_internal_test.go @@ -591,3 +591,230 @@ func signConsensusMsg(t *testing.T, msg *pbv1.QBFTConsensusMsg, privKey *k1.Priv return msg } + +func createPrepareProposerValue(t *testing.T, visiblePeers []uint64) proto.Message { + t.Helper() + + data := core.PrepareProposerData{ + TargetSlot: 100, // Dummy slot + VisiblePeers: visiblePeers, + } + + // Wrap in UnsignedDataSet + set := core.UnsignedDataSet{ + "0x123": data, // Dummy pubkey + } + + pb, err := core.UnsignedDataSetToProto(set) + require.NoError(t, err) + + return pb +} + +func TestStoreAndGetParticipation(t *testing.T) { + c := &Consensus{} + c.prepareParticipation.data = make(map[uint64][]int64) + + // Store participation for slot 10. + c.storeParticipation(10, createPrepareProposerValue(t, []uint64{0, 1, 2})) + + // Get participants and verify they are sorted. + participants := c.getParticipants(10) + require.Equal(t, []int64{0, 1, 2}, participants) + + // Verify non-existent slot returns nil. + require.Nil(t, c.getParticipants(5)) + + // Store for slot 12, should clean up slot 10 (12 - 10 > 1). + c.storeParticipation(12, createPrepareProposerValue(t, []uint64{3})) + + // Slot 10 should be cleaned up. + require.Nil(t, c.getParticipants(10)) + + // Slot 12 should exist. + require.Equal(t, []int64{3}, c.getParticipants(12)) +} + +func TestLeaderWithParticipation(t *testing.T) { + c := &Consensus{} + c.prepareParticipation.data = make(map[uint64][]int64) + + const nodes = 4 + + tests := []struct { + name string + duty core.Duty + round int64 + participants []int64 // Stored at slot-1 for DutyProposer. + expected int64 + }{ + { + name: "non-proposer duty uses normal leader", + duty: core.Duty{Slot: 10, Type: core.DutyAttester}, + round: 0, + expected: leader(core.Duty{Slot: 10, Type: core.DutyAttester}, 0, nodes), + }, + { + name: "proposer without participation uses normal leader", + duty: core.Duty{Slot: 10, Type: core.DutyProposer}, + round: 0, + expected: leader(core.Duty{Slot: 10, Type: core.DutyProposer}, 0, nodes), + }, + { + name: "proposer with participation elects from participants only", + duty: core.Duty{Slot: 10, Type: core.DutyProposer}, + round: 0, + participants: []int64{1, 3}, // Stored at slot 9. + expected: 3, // (10 + 1 + 0) % 2 = 1, so participants[1] = 3. + }, + { + name: "proposer with all participants same as normal", + duty: core.Duty{Slot: 10, Type: core.DutyProposer}, + round: 0, + participants: []int64{0, 1, 2, 3}, + expected: leader(core.Duty{Slot: 10, Type: core.DutyProposer}, 0, nodes), + }, + { + name: "proposer at slot 0 uses normal leader", + duty: core.Duty{Slot: 0, Type: core.DutyProposer}, + round: 0, + expected: leader(core.Duty{Slot: 0, Type: core.DutyProposer}, 0, nodes), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Clear and set up participation data. + c.prepareParticipation.data = make(map[uint64][]int64) + if tt.participants != nil && tt.duty.Slot > 0 { + c.prepareParticipation.data[tt.duty.Slot-1] = tt.participants + } + + result := c.leaderWithParticipation(tt.duty, tt.round, nodes) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestLeaderWithParticipationConsistency(t *testing.T) { + // This test verifies that all nodes with the same participation data + // will elect the same leader - the core consistency guarantee. + c1 := &Consensus{} + c1.prepareParticipation.data = make(map[uint64][]int64) + + c2 := &Consensus{} + c2.prepareParticipation.data = make(map[uint64][]int64) + + // Simulate same participation data on both nodes. + participants := []int64{0, 2, 3} // Node 1 is offline. + c1.prepareParticipation.data[9] = participants + c2.prepareParticipation.data[9] = participants + + duty := core.Duty{Slot: 10, Type: core.DutyProposer} + + // Both nodes should elect the same leader for all rounds. + for round := int64(0); round < 10; round++ { + leader1 := c1.leaderWithParticipation(duty, round, 4) + leader2 := c2.leaderWithParticipation(duty, round, 4) + require.Equal(t, leader1, leader2, "round %d: leaders should match", round) + + // Leader should be one of the participants. + require.Contains(t, participants, leader1, "leader should be a participant") + } +} + +func TestPrepareProposerToProposerFlow(t *testing.T) { + // This test simulates the full flow: + // 1. DutyPrepareProposer at slot N-1 decides with a subset of peers participating + // 2. DutyProposer at slot N uses participation data to exclude non-participating peers + const ( + nodes = 4 + prepareSlot = 9 + proposerSlot = 10 + offlinePeer = int64(1) // Peer 1 is offline/malicious + ) + + c := &Consensus{} + c.prepareParticipation.data = make(map[uint64][]int64) + + // Store participation from DutyPrepareProposer. + // Only peers 0, 2, 3 participated (peer 1 was offline). + c.storeParticipation(prepareSlot, createPrepareProposerValue(t, []uint64{0, 2, 3})) + + // Verify participation was stored correctly. + participants := c.getParticipants(prepareSlot) + require.Equal(t, []int64{0, 2, 3}, participants) + require.NotContains(t, participants, offlinePeer) + + // Now simulate DutyProposer at slot 10. + proposerDuty := core.Duty{Slot: proposerSlot, Type: core.DutyProposer} + + // Verify that for multiple rounds, the leader is never the offline peer. + for r := range 20 { + round := int64(r) + leaderIdx := c.leaderWithParticipation(proposerDuty, round, nodes) + + // Leader must be one of the participating peers. + require.Contains(t, participants, leaderIdx, + "round %d: leader %d should be a participant", round, leaderIdx) + + // Leader must NOT be the offline peer. + require.NotEqual(t, offlinePeer, leaderIdx, + "round %d: leader should not be the offline peer", round) + } + + // Compare with normal leader election (without participation data). + // At least some rounds should have different leaders. + var differenceCount int + + for r := range 20 { + round := int64(r) + normalLeader := leader(proposerDuty, round, nodes) + participationLeader := c.leaderWithParticipation(proposerDuty, round, nodes) + + if normalLeader != participationLeader { + differenceCount++ + } + } + + // We expect differences because peer 1 (offline) would be leader in some rounds + // with normal election but not with participation-based election. + require.Positive(t, differenceCount, "participation-based election should differ from normal election when a peer is offline") +} + +func TestPrepareProposerExpiresAfterTwoSlots(t *testing.T) { + // This test verifies that participation data expires correctly. + const nodes = 4 + + c := &Consensus{} + c.prepareParticipation.data = make(map[uint64][]int64) + + // Store participation at slot 9. + c.storeParticipation(9, createPrepareProposerValue(t, []uint64{0, 2})) + + // At slot 10 (next slot), participation should still be available. + proposerDuty10 := core.Duty{Slot: 10, Type: core.DutyProposer} + leaderSlot10 := c.leaderWithParticipation(proposerDuty10, 0, nodes) + require.Contains(t, []int64{0, 2}, leaderSlot10, "slot 10 should use participation from slot 9") + + // Store new participation at slot 11 (this cleans up slot 9). + c.storeParticipation(11, createPrepareProposerValue(t, []uint64{1, 3})) + + // Slot 9 data should be cleaned up. + require.Nil(t, c.getParticipants(9), "slot 9 data should be expired") + + // Slot 11 data should exist. + require.Equal(t, []int64{1, 3}, c.getParticipants(11)) + + // DutyProposer at slot 12 should use participation from slot 11. + proposerDuty12 := core.Duty{Slot: 12, Type: core.DutyProposer} + leaderSlot12 := c.leaderWithParticipation(proposerDuty12, 0, nodes) + require.Contains(t, []int64{1, 3}, leaderSlot12, "slot 12 should use participation from slot 11") + + // DutyProposer at slot 10 should now fall back to normal election + // because slot 9 data was cleaned up. + leaderSlot10After := c.leaderWithParticipation(proposerDuty10, 0, nodes) + normalLeader := leader(proposerDuty10, 0, nodes) + require.Equal(t, normalLeader, leaderSlot10After, + "slot 10 should fall back to normal election after slot 9 data expired") +} diff --git a/core/consensus/qbft/sniffed_internal_test.go b/core/consensus/qbft/sniffed_internal_test.go index 685316b7b2..d624bd7420 100644 --- a/core/consensus/qbft/sniffed_internal_test.go +++ b/core/consensus/qbft/sniffed_internal_test.go @@ -79,7 +79,10 @@ func testSniffedInstance(ctx context.Context, t *testing.T, instance *pbv1.Sniff return nil }} - }, timer.NewIncreasingRoundTimer(), func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]) {}, false) + }, timer.NewIncreasingRoundTimer(), func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message], value proto.Message) {}, + func(duty core.Duty, round, process int64) bool { + return leader(duty, round, int(instance.GetNodes())) == process + }, false) recvBuffer := make(chan qbft.Msg[core.Duty, [32]byte, proto.Message], len(instance.GetMsgs())) diff --git a/core/deadline.go b/core/deadline.go index 7d98ab9dae..536d02ce8b 100644 --- a/core/deadline.go +++ b/core/deadline.go @@ -100,7 +100,7 @@ func NewDutyDeadlineFunc(ctx context.Context, eth2Cl eth2wrap.Client) (DeadlineF duration = slotDuration / 3 case DutySyncMessage: duration = 2 * slotDuration / 3 - case DutyAttester, DutyAggregator, DutyPrepareAggregator: + case DutyAttester, DutyAggregator, DutyPrepareAggregator, DutyPrepareProposer: // Even though attestations and aggregations are acceptable even after 2 slots, the rewards are heavily diminished. duration = 2 * slotDuration default: diff --git a/core/dutydefinition.go b/core/dutydefinition.go index 4399d2ea71..c53e35ff88 100644 --- a/core/dutydefinition.go +++ b/core/dutydefinition.go @@ -3,6 +3,8 @@ package core import ( + "encoding/json" + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/obolnetwork/charon/app/errors" @@ -66,6 +68,35 @@ func (d ProposerDefinition) MarshalJSON() ([]byte, error) { return d.ProposerDuty.MarshalJSON() } +// NewPrepareProposerDefinition is a convenience function that returns a new prepare proposer definition. +func NewPrepareProposerDefinition(targetSlot uint64) PrepareProposerDefinition { + return PrepareProposerDefinition{ + TargetSlot: targetSlot, + } +} + +// PrepareProposerDefinition defines a prepare proposer duty. +type PrepareProposerDefinition struct { + TargetSlot uint64 // Definition cannot be empty, so we store the target slot. +} + +func (d PrepareProposerDefinition) Clone() (DutyDefinition, error) { + return NewPrepareProposerDefinition(d.TargetSlot), nil +} + +func (d PrepareProposerDefinition) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(struct { + TargetSlot uint64 `json:"target_slot,string"` + }{ + TargetSlot: d.TargetSlot, + }) + if err != nil { + return nil, errors.Wrap(err, "marshal prepare proposer definition") + } + + return b, nil +} + // NewSyncCommitteeDefinition is a convenience function that returns a new SyncCommitteeDefinition. func NewSyncCommitteeDefinition(duty *eth2v1.SyncCommitteeDuty) DutyDefinition { return SyncCommitteeDefinition{SyncCommitteeDuty: *duty} diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index c326e5043b..f5a90635eb 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -90,6 +90,18 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef } else if len(unsignedSet) == 0 { // No sync committee contributors found in this slot return nil } + case core.DutyPrepareProposer: + unsignedSet = make(core.UnsignedDataSet) + for pubkey, def := range defSet { + prepDef, ok := def.(core.PrepareProposerDefinition) + if !ok { + return errors.New("invalid prepare proposer definition") + } + + unsignedSet[pubkey] = core.PrepareProposerData{ + TargetSlot: prepDef.TargetSlot, + } + } default: return errors.New("unsupported duty type", z.Str("type", duty.Type.String())) } diff --git a/core/scheduler/offset.go b/core/scheduler/offset.go index 7db1fb9803..3ca62c6fe0 100644 --- a/core/scheduler/offset.go +++ b/core/scheduler/offset.go @@ -13,6 +13,7 @@ var slotOffsets = map[core.DutyType]func(time.Duration) time.Duration{ core.DutyAttester: fraction(1, 3), // 1/3 slot duration core.DutyAggregator: fraction(2, 3), // 2/3 slot duration core.DutySyncContribution: fraction(2, 3), + core.DutyPrepareProposer: fraction(1, 2), // 1/2 slot duration } // fraction returns a function that calculates slot offset based on the fraction x/y of total slot duration. diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index d44d4f2adc..85e7f6550f 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -116,6 +116,7 @@ type Scheduler struct { builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution + stopOnce sync.Once } // SubscribeDuties subscribes a callback function for triggered duties. @@ -132,7 +133,9 @@ func (s *Scheduler) SubscribeSlots(fn func(context.Context, core.Slot) error) { } func (s *Scheduler) Stop() { - close(s.quit) + s.stopOnce.Do(func() { + close(s.quit) + }) } // Run blocks and runs the scheduler until Stop is called. @@ -498,6 +501,13 @@ func (s *Scheduler) resolveProDuties(ctx context.Context, slot core.Slot, vals v continue } + if featureset.Enabled(featureset.PrepareProposer) && proDuty.Slot > eth2p0.Slot(slot.Slot) { + // Schedule prepare proposer duty for the slot before the actual proposer duty. + prepareDuty := core.NewPrepareProposerDuty(uint64(proDuty.Slot - 1)) + + s.setDutyDefinition(prepareDuty, slot.Epoch(), pubkey, core.NewPrepareProposerDefinition(uint64(proDuty.Slot))) + } + log.Info(ctx, "Resolved proposer duty", z.U64("slot", uint64(proDuty.Slot)), z.U64("vidx", uint64(proDuty.ValidatorIndex)), diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 230e1b53bb..03e50ec6d6 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -198,21 +198,28 @@ func TestSchedulerDuties(t *testing.T) { }, { // All duties spread in first N slots of epoch (N is number of validators) + // 3 slots × (proposer + attester + aggregator) + 2 prepare_proposer = 11 Name: "spread", Factor: 1, - Results: 9, + Results: 11, }, { // All duties spread in first N slots of epoch (except first proposer errors) + // Slot 0: attester + aggregator = 2 (no proposer due to error) + // Slot 1: proposer + attester + aggregator + prepare_proposer = 4 + // Slot 2: proposer + attester + aggregator = 3 + // Total = 9 Name: "spread_errors", Factor: 1, PropErrs: 1, - Results: 8, + Results: 9, }, } for _, test := range tests { t.Run(test.Name, func(t *testing.T) { + featureset.EnableForT(t, featureset.PrepareProposer) + // Configure beacon mock var t0 time.Time @@ -251,9 +258,6 @@ func TestSchedulerDuties(t *testing.T) { slotDuration, ok := eth2Resp.Data["SECONDS_PER_SLOT"].(time.Duration) require.True(t, ok) - clock.CallbackAfter(t0.Add(time.Duration(stopAfter)*slotDuration), func() { - time.Sleep(time.Hour) // Do not let the slot ticker tick anymore. - }) // Collect results type result struct { @@ -268,6 +272,25 @@ func TestSchedulerDuties(t *testing.T) { mu sync.Mutex ) + clock.CallbackAfter(t0.Add(time.Duration(stopAfter)*slotDuration), func() { + // Wait for any lagging duties to be processed. + for range 100 { + mu.Lock() + + count := len(results) + + mu.Unlock() + + if count >= test.Results { + return + } + + time.Sleep(10 * time.Millisecond) + } + + sched.Stop() + }) + sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, set core.DutyDefinitionSet) error { // Make result human-readable resultSet := make(map[core.PubKey]string) diff --git a/core/scheduler/testdata/TestSchedulerDuties_spread.golden b/core/scheduler/testdata/TestSchedulerDuties_spread.golden index 64f30cf386..bd72a35dab 100644 --- a/core/scheduler/testdata/TestSchedulerDuties_spread.golden +++ b/core/scheduler/testdata/TestSchedulerDuties_spread.golden @@ -20,6 +20,13 @@ "0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\",\"committee_index\":\"1\",\"committee_length\":\"1\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"0\"}" } }, + { + "Time": "00:06.000", + "duty": "0/prepare_proposer", + "DutyDefSet": { + "0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"target_slot\":\"1\"}" + } + }, { "Time": "00:00.000", "duty": "1/proposer", @@ -41,6 +48,13 @@ "0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"1\",\"validator_index\":\"2\",\"committee_index\":\"2\",\"committee_length\":\"1\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"0\"}" } }, + { + "Time": "00:18.000", + "duty": "1/prepare_proposer", + "DutyDefSet": { + "0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"target_slot\":\"2\"}" + } + }, { "Time": "00:00.000", "duty": "2/proposer", diff --git a/core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden b/core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden index 4815ac8169..ea78141994 100644 --- a/core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden +++ b/core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden @@ -34,6 +34,13 @@ "0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"1\",\"validator_index\":\"2\",\"committee_index\":\"2\",\"committee_length\":\"1\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"0\"}" } }, + { + "Time": "00:18.000", + "duty": "1/prepare_proposer", + "DutyDefSet": { + "0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"target_slot\":\"2\"}" + } + }, { "Time": "00:00.000", "duty": "2/proposer", diff --git a/core/types.go b/core/types.go index 08641b95db..8756a61a56 100644 --- a/core/types.go +++ b/core/types.go @@ -44,9 +44,10 @@ const ( DutyPrepareSyncContribution DutyType = 11 DutySyncContribution DutyType = 12 DutyInfoSync DutyType = 13 + DutyPrepareProposer DutyType = 14 // Only ever append new types here... - dutySentinel DutyType = 14 // Must always be last + dutySentinel DutyType = 15 // Must always be last ) func (d DutyType) Valid() bool { @@ -69,6 +70,7 @@ func (d DutyType) String() string { DutyPrepareSyncContribution: "prepare_sync_contribution", DutySyncContribution: "sync_contribution", DutyInfoSync: "info_sync", + DutyPrepareProposer: "prepare_proposer", }[d] } @@ -260,6 +262,15 @@ func NewInfoSyncDuty(slot uint64) Duty { } } +// NewPrepareProposerDuty returns a new prepare proposer duty. It is a convenience function that is +// slightly more readable and concise than the struct literal equivalent. +func NewPrepareProposerDuty(slot uint64) Duty { + return Duty{ + Slot: slot, + Type: DutyPrepareProposer, + } +} + const ( pkLen = 98 // "0x" + hex.Encode([48]byte) = 2+2*48 sigLen = 96 diff --git a/core/types_test.go b/core/types_test.go index 7e2dff22d7..6401eb8979 100644 --- a/core/types_test.go +++ b/core/types_test.go @@ -32,9 +32,10 @@ func TestBackwardsCompatibility(t *testing.T) { require.EqualValues(t, 11, core.DutyPrepareSyncContribution) require.EqualValues(t, 12, core.DutySyncContribution) require.EqualValues(t, 13, core.DutyInfoSync) + require.EqualValues(t, 14, core.DutyPrepareProposer) // Add more types here. - const sentinel = core.DutyType(14) + const sentinel = core.DutyType(15) for i := core.DutyUnknown; i <= sentinel; i++ { switch i { case core.DutyUnknown: @@ -78,7 +79,7 @@ func TestWithDutySpanCtx(t *testing.T) { func TestAllDutyTypes(t *testing.T) { adt := core.AllDutyTypes() - require.Len(t, adt, 13) + require.Len(t, adt, 14) for i, dt := range adt { require.Equal(t, i, slices.Index(adt, dt)) diff --git a/core/unsigneddata.go b/core/unsigneddata.go index 18ee0b9956..63ed2b2d7e 100644 --- a/core/unsigneddata.go +++ b/core/unsigneddata.go @@ -31,6 +31,7 @@ var ( _ UnsignedData = VersionedAggregatedAttestation{} _ UnsignedData = VersionedProposal{} _ UnsignedData = SyncContribution{} + _ UnsignedData = PrepareProposerData{} // Some types also support SSZ marshalling and unmarshalling. _ ssz.Marshaler = AttestationData{} @@ -45,6 +46,49 @@ var ( _ ssz.Unmarshaler = new(SyncContribution) ) +// PrepareProposerData wraps the prepare proposer data. +type PrepareProposerData struct { + TargetSlot uint64 + VisiblePeers []uint64 // Indices of peers visible to this node. +} + +func (p PrepareProposerData) Clone() (UnsignedData, error) { + peers := make([]uint64, len(p.VisiblePeers)) + copy(peers, p.VisiblePeers) + + return PrepareProposerData{TargetSlot: p.TargetSlot, VisiblePeers: peers}, nil +} + +func (p PrepareProposerData) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(struct { + TargetSlot uint64 `json:"target_slot,string"` + VisiblePeers []uint64 `json:"visible_peers,omitempty"` + }{ + TargetSlot: p.TargetSlot, + VisiblePeers: p.VisiblePeers, + }) + if err != nil { + return nil, errors.Wrap(err, "marshal prepare proposer data") + } + + return b, nil +} + +func (p *PrepareProposerData) UnmarshalJSON(data []byte) error { + var aux struct { + TargetSlot uint64 `json:"target_slot,string"` + VisiblePeers []uint64 `json:"visible_peers,omitempty"` + } + if err := json.Unmarshal(data, &aux); err != nil { + return errors.Wrap(err, "unmarshal prepare proposer data") + } + + p.TargetSlot = aux.TargetSlot + p.VisiblePeers = aux.VisiblePeers + + return nil +} + // AttestationData wraps the eth2 attestation data and adds the original duty. // The original duty allows mapping the partial signed response from the VC // back to the validator pubkey via the aggregation bits field. @@ -689,6 +733,13 @@ func unmarshalUnsignedData(typ DutyType, data []byte) (UnsignedData, error) { return nil, errors.Wrap(err, "unmarshal sync contribution") } + return resp, nil + case DutyPrepareProposer: + var resp PrepareProposerData + if err := unmarshal(data, &resp); err != nil { + return nil, errors.Wrap(err, "unmarshal prepare proposer data") + } + return resp, nil default: return nil, errors.New("unsupported unsigned data duty type")