From 88bb5d99bf3f9f215a8926e8af972de0cdd3802e Mon Sep 17 00:00:00 2001 From: kaleofduty <59616916+kaleofduty@users.noreply.github.com> Date: Fri, 5 Sep 2025 13:39:58 +0200 Subject: [PATCH] Improvements: - Support DON-to-DON messaging using PeerGroup - OCR3.1 progress - Fix log messages - Add config digests for DON-to-DON messaging. - Add peeridhelper for safe reuse of PeerID for arbitrary message signing Based on 01cc9044e71de1fb6d75056d532641421a34c779 Co-authored-by: Kostis Karantias Co-authored-by: stchrysa --- networking/peer_group.go | 26 ++- .../internal/managed/track_config.go | 2 +- .../internal/managed/track_config.go | 2 +- .../internal/ocr3_1/protocol/oracle.go | 3 + .../ocr3_1/protocol/outcome_generation.go | 3 + .../protocol/outcome_generation_follower.go | 78 ++++---- .../ocr3_1/protocol/report_attestation.go | 179 +++++++++++------- .../ocr3_1/protocol/state_persistence.go | 9 +- .../ocr3_1/serialization/serialization.go | 7 +- offchainreporting2plus/types/config_digest.go | 8 +- ragep2p/peeridhelper/domain_separation.go | 115 +++++++++++ 11 files changed, 320 insertions(+), 112 deletions(-) create mode 100644 ragep2p/peeridhelper/domain_separation.go diff --git a/networking/peer_group.go b/networking/peer_group.go index db045364..a811be41 100644 --- a/networking/peer_group.go +++ b/networking/peer_group.go @@ -13,12 +13,25 @@ import ( ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" ) -func peerGroupStreamNamePrefix(configDigestPrefix ocr2types.ConfigDigestPrefix) (streamNamePrefix string, ok bool) { +func peerGroupStreamNamePrefix(configDigest ocr2types.ConfigDigest) (streamNamePrefix string, err error) { + configDigestPrefix := ocr2types.ConfigDigestPrefixFromConfigDigest(configDigest) + switch configDigestPrefix { // nolint:exhaustive case ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo: - return "ccip-rmn/", true + return "ccip-rmn/", nil + case ocr2types.ConfigDigestPrefixDONToDONMessagingGroup: + // We include the full config digest in the stream name prefix to have clean + // namespacing between different PeerGroups. + return fmt.Sprintf("don-to-don/%s/", configDigest), nil + case ocr2types.ConfigDigestPrefixDONToDONDiscoveryGroup: + // We include the full config digest in the stream name prefix to have clean + // namespacing between different PeerGroups. + // Based on the current don-to-don design, we do not expect to have any streams + // with this config digest prefix, but we nevertheless need to allowlist it + // to enable creation of corresponding PeerGroups. + return fmt.Sprintf("don-to-don-discovery/%s/", configDigest), nil default: - return "", false + return "", fmt.Errorf("config digest prefix %s is not allowed", configDigestPrefix) } } @@ -84,10 +97,9 @@ func (f *peerGroupFactory) NewPeerGroup( peerIDs []string, bootstrappers []commontypes.BootstrapperLocator, ) (PeerGroup, error) { - configDigestPrefix := ocr2types.ConfigDigestPrefixFromConfigDigest(configDigest) - streamNamePrefix, ok := peerGroupStreamNamePrefix(configDigestPrefix) - if !ok { - return nil, fmt.Errorf("config digest prefix %s is not allowed", configDigestPrefix) + streamNamePrefix, err := peerGroupStreamNamePrefix(configDigest) + if err != nil { + return nil, fmt.Errorf("could not get stream name prefix: %w", err) } decodedv2PeerIDs, err := decodev2PeerIDs(peerIDs) diff --git a/offchainreporting/internal/managed/track_config.go b/offchainreporting/internal/managed/track_config.go index eae8948d..02854990 100644 --- a/offchainreporting/internal/managed/track_config.go +++ b/offchainreporting/internal/managed/track_config.go @@ -147,7 +147,7 @@ func (state *trackConfigState) checkLatestConfigDetails() ( defer configCancel() contractConfig, err := state.configTracker.ConfigFromLogs(configCtx, changedInBlock) if err != nil { - state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfigDetails()", configCtx, commontypes.LogFields{ + state.logger.ErrorIfNotCanceled("TrackConfig: error during ConfigFromLogs()", configCtx, commontypes.LogFields{ "error": err, }) return nil, true diff --git a/offchainreporting2plus/internal/managed/track_config.go b/offchainreporting2plus/internal/managed/track_config.go index 6e667d98..d6e26869 100644 --- a/offchainreporting2plus/internal/managed/track_config.go +++ b/offchainreporting2plus/internal/managed/track_config.go @@ -121,7 +121,7 @@ func (state *trackConfigState) checkLatestConfigDetails() ( contractConfig, err := state.configTracker.LatestConfig(ctx, changedInBlock) if err != nil { - state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfigDetails()", ctx, commontypes.LogFields{ + state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfig()", ctx, commontypes.LogFields{ "error": err, }) return nil, true diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go b/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go index 9d22b1e0..03e3d625 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go @@ -167,6 +167,7 @@ func (o *oracleState[RI]) run() { chNetToStatePersistence := make(chan MessageToStatePersistenceWithSender[RI]) o.chNetToStatePersistence = chNetToStatePersistence + chOutcomeGenerationToStatePersistence := make(chan EventToStatePersistence[RI]) chReportAttestationToStatePersistence := make(chan EventToStatePersistence[RI]) chNetToBlobExchange := make(chan MessageToBlobExchangeWithSender[RI]) @@ -243,6 +244,7 @@ func (o *oracleState[RI]) run() { chPacemakerToOutcomeGeneration, chOutcomeGenerationToPacemaker, chOutcomeGenerationToReportAttestation, + chOutcomeGenerationToStatePersistence, &blobEndpoint, o.config, o.database, @@ -282,6 +284,7 @@ func (o *oracleState[RI]) run() { o.childCtx, chNetToStatePersistence, + chOutcomeGenerationToStatePersistence, chReportAttestationToStatePersistence, o.config, o.database, diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go b/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go index 640ff543..dcdc9278 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go @@ -33,6 +33,7 @@ func RunOutcomeGeneration[RI any]( chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI], chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI], chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI], + chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI], blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher, config ocr3config.SharedConfig, database Database, @@ -58,6 +59,7 @@ func RunOutcomeGeneration[RI any]( chPacemakerToOutcomeGeneration: chPacemakerToOutcomeGeneration, chOutcomeGenerationToPacemaker: chOutcomeGenerationToPacemaker, chOutcomeGenerationToReportAttestation: chOutcomeGenerationToReportAttestation, + chOutcomeGenerationToStatePersistence: chOutcomeGenerationToStatePersistence, blobBroadcastFetcher: blobBroadcastFetcher, config: config, database: database, @@ -83,6 +85,7 @@ type outcomeGenerationState[RI any] struct { chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI] chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI] chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI] + chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI] blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher config ocr3config.SharedConfig database Database diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go b/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go index 64b27d7b..d87036b5 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go @@ -83,6 +83,12 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar outgen.refreshCommittedSeqNrAndCert() if !outgen.ensureHighestCertifiedIsCompatible(msg.EpochStartProof.HighestCertified, "MessageEpochStart") { + select { + case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{ + msg.EpochStartProof.HighestCertified.SeqNr(), + }: + case <-outgen.ctx.Done(): + } return } @@ -154,6 +160,7 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar prepareQc.ReportsPlusPrecursor, reportPlusPrecursorDigest, } + outgen.logger.Debug("broadcasting MessagePrepare (reproposal where prepareQcSeqNr == committedSeqNr)", commontypes.LogFields{ "seqNr": outgen.sharedState.seqNr, }) @@ -914,38 +921,33 @@ func (outgen *outcomeGenerationState[RI]) tryProcessCommitPool() { return } - if outgen.followerState.openKVTxn == nil { - outgen.logger.Critical("assumption violation, open kv transaction must exist in this phase", commontypes.LogFields{ - "seqNr": outgen.sharedState.seqNr, - "phase": outgen.followerState.phase, - }) - panic("") - } - err := outgen.followerState.openKVTxn.Commit() - outgen.followerState.openKVTxn.Discard() - outgen.followerState.openKVTxn = nil - if err != nil { - outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{ - "seqNr": outgen.sharedState.seqNr, - "error": err, - }) - - { - kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr() - if err != nil { - outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{ - "seqNr": outgen.sharedState.seqNr, - "error": err, - }) - return - } + if outgen.followerState.openKVTxn != nil { + err := outgen.followerState.openKVTxn.Commit() + outgen.followerState.openKVTxn.Discard() + outgen.followerState.openKVTxn = nil + if err != nil { + outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{ + "seqNr": outgen.sharedState.seqNr, + "error": err, + }) - if kvSeqNr < outgen.sharedState.seqNr { - outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{ - "seqNr": outgen.sharedState.seqNr, - "kvSeqNr": kvSeqNr, - }) - return + { + kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr() + if err != nil { + outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{ + "seqNr": outgen.sharedState.seqNr, + "error": err, + }) + return + } + + if kvSeqNr < outgen.sharedState.seqNr { + outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{ + "seqNr": outgen.sharedState.seqNr, + "kvSeqNr": kvSeqNr, + }) + return + } } } } @@ -1252,6 +1254,7 @@ func (outgen *outcomeGenerationState[RI]) persistCommitAsBlock(commit *Certified } func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() { + preRefreshCommittedSeqNr := outgen.sharedState.committedSeqNr postRefreshCommittedSeqNr, err := outgen.kvStore.HighestCommittedSeqNr() @@ -1269,9 +1272,20 @@ func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() { }) if postRefreshCommittedSeqNr == preRefreshCommittedSeqNr { + return + } else if postRefreshCommittedSeqNr+1 == preRefreshCommittedSeqNr { + + logger.Warn("last kv transaction commit failed, requesting state sync", nil) + select { + case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{ + preRefreshCommittedSeqNr, + }: + case <-outgen.ctx.Done(): + } + return } else if postRefreshCommittedSeqNr < preRefreshCommittedSeqNr { - logger.Critical("assumption violation, kv is behind what outgen knows as committed", nil) + logger.Critical("assumption violation, kv is way behind what outgen knows as committed", nil) panic("") } diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/report_attestation.go b/offchainreporting2plus/internal/ocr3_1/protocol/report_attestation.go index 879088a0..6145077d 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/report_attestation.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/report_attestation.go @@ -3,9 +3,6 @@ package protocol import ( "context" "crypto/rand" - "github.com/smartcontractkit/libocr/offchainreporting2plus/internal/common" - "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" - "github.com/smartcontractkit/libocr/subprocesses" "math" "math/big" "runtime" @@ -15,10 +12,13 @@ import ( "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/internal/loghelper" + "github.com/smartcontractkit/libocr/offchainreporting2plus/internal/common" "github.com/smartcontractkit/libocr/offchainreporting2plus/internal/common/scheduler" "github.com/smartcontractkit/libocr/offchainreporting2plus/internal/config/ocr3config" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/libocr/subprocesses" ) func RunReportAttestation[RI any]( @@ -70,12 +70,14 @@ type reportAttestationState[RI any] struct { scheduler *scheduler.Scheduler[EventMissingOutcome[RI]] chLocalEvent chan EventComputedReports[RI] - // reap() is used to prevent unbounded state growth of rounds + // reap() is used to prevent unbounded state growth of rounds. + rounds map[uint64]*round[RI] - // highest sequence number for which we have attested reports - highestAttestedSeqNr uint64 - // highest sequence number for which we have received report signatures - // from each oracle + + // Highest sequence number for which we know a certified commit exists. + // This is used for determining the window of rounds we keep in memory. + // Computed as select_largest(f+1, highestReportSignaturesSeqNr). + highWaterMark uint64 highestReportSignaturesSeqNr []uint64 } @@ -136,22 +138,21 @@ func (repatt *reportAttestationState[RI]) messageReportSignatures( "sender": sender, "msgSeqNr": msg.SeqNr, }) + + repatt.tryReap(msg.SeqNr, sender) + if repatt.isBeyondExpiry(msg.SeqNr) { - repatt.logger.Debug("ignoring MessageReportSignatures for expired seqNr", commontypes.LogFields{ - "seqNr": msg.SeqNr, - "sender": sender, + repatt.logger.Debug("dropping MessageReportSignatures for expired seqNr", commontypes.LogFields{ + "msgSeqNr": msg.SeqNr, + "sender": sender, }) return } - if repatt.highestReportSignaturesSeqNr[sender] < msg.SeqNr { - repatt.highestReportSignaturesSeqNr[sender] = msg.SeqNr - } - if repatt.isBeyondLookahead(msg.SeqNr) { - repatt.logger.Debug("ignoring MessageReportSignatures for seqNr beyond lookahead", commontypes.LogFields{ - "seqNr": msg.SeqNr, - "sender": sender, + repatt.logger.Debug("dropping MessageReportSignatures for seqNr beyond lookahead", commontypes.LogFields{ + "msgSeqNr": msg.SeqNr, + "sender": sender, }) return } @@ -170,9 +171,9 @@ func (repatt *reportAttestationState[RI]) messageReportSignatures( } if repatt.rounds[msg.SeqNr].oracles[sender].sentSignatures { - repatt.logger.Debug("ignoring MessageReportSignatures with duplicate signature", commontypes.LogFields{ - "seqNr": msg.SeqNr, - "sender": sender, + repatt.logger.Debug("dropping MessageReportSignatures with duplicate signature", commontypes.LogFields{ + "msgSeqNr": msg.SeqNr, + "sender": sender, }) return } @@ -187,9 +188,19 @@ func (repatt *reportAttestationState[RI]) eventMissingOutcome(ev EventMissingOut repatt.logger.Debug("received EventMissingOutcome", commontypes.LogFields{ "msgSeqNr": ev.SeqNr, }) + + if repatt.rounds[ev.SeqNr] == nil { + repatt.logger.Debug("dropping EventMissingOutcome for unknown seqNr", commontypes.LogFields{ + "evSeqNr": ev.SeqNr, + "highWaterMark": repatt.highWaterMark, + "expiryRounds": repatt.expiryRounds(), + }) + return + } + if repatt.rounds[ev.SeqNr].verifiedCertifiedCommit != nil { repatt.logger.Debug("dropping EventMissingOutcome, already have Outcome", commontypes.LogFields{ - "seqNr": ev.SeqNr, + "evSeqNr": ev.SeqNr, }) return } @@ -202,18 +213,29 @@ func (repatt *reportAttestationState[RI]) messageCertifiedCommitRequest(msg Mess "sender": sender, "msgSeqNr": msg.SeqNr, }) - if repatt.rounds[msg.SeqNr] == nil || repatt.rounds[msg.SeqNr].verifiedCertifiedCommit == nil { + + if repatt.rounds[msg.SeqNr] == nil { + repatt.logger.Debug("dropping MessageCertifiedCommitRequest for unknown seqNr", commontypes.LogFields{ + "msgSeqNr": msg.SeqNr, + "sender": sender, + "highWaterMark": repatt.highWaterMark, + "expiryRounds": repatt.expiryRounds(), + }) + return + } + + if repatt.rounds[msg.SeqNr].verifiedCertifiedCommit == nil { repatt.logger.Debug("dropping MessageCertifiedCommitRequest for outcome with unknown certified commit", commontypes.LogFields{ - "seqNr": msg.SeqNr, - "sender": sender, + "msgSeqNr": msg.SeqNr, + "sender": sender, }) return } if repatt.rounds[msg.SeqNr].oracles[sender].weServiced { repatt.logger.Warn("dropping duplicate MessageCertifiedCommitRequest", commontypes.LogFields{ - "seqNr": msg.SeqNr, - "sender": sender, + "msgSeqNr": msg.SeqNr, + "sender": sender, }) return } @@ -221,8 +243,8 @@ func (repatt *reportAttestationState[RI]) messageCertifiedCommitRequest(msg Mess repatt.rounds[msg.SeqNr].oracles[sender].weServiced = true repatt.logger.Debug("sending MessageCertifiedCommit", commontypes.LogFields{ - "seqNr": msg.SeqNr, - "to": sender, + "msgSeqNr": msg.SeqNr, + "to": sender, }) repatt.netSender.SendTo(MessageCertifiedCommit[RI]{*repatt.rounds[msg.SeqNr].verifiedCertifiedCommit}, sender) } @@ -234,8 +256,10 @@ func (repatt *reportAttestationState[RI]) messageCertifiedCommit(msg MessageCert }) if repatt.rounds[msg.CertifiedCommittedReports.SeqNr] == nil { repatt.logger.Warn("dropping MessageCertifiedCommit for unknown seqNr", commontypes.LogFields{ - "seqNr": msg.CertifiedCommittedReports.SeqNr, - "sender": sender, + "msgSeqNr": msg.CertifiedCommittedReports.SeqNr, + "sender": sender, + "highWaterMark": repatt.highWaterMark, + "expiryRounds": repatt.expiryRounds(), }) return } @@ -243,7 +267,7 @@ func (repatt *reportAttestationState[RI]) messageCertifiedCommit(msg MessageCert oracle := &repatt.rounds[msg.CertifiedCommittedReports.SeqNr].oracles[sender] if !(oracle.weRequested && !oracle.theyServiced) { repatt.logger.Warn("dropping unexpected MessageCertifiedCommit", commontypes.LogFields{ - "seqNr": msg.CertifiedCommittedReports.SeqNr, + "msgSeqNr": msg.CertifiedCommittedReports.SeqNr, "sender": sender, "weRequested": oracle.weRequested, "theyServiced": oracle.theyServiced, @@ -255,24 +279,23 @@ func (repatt *reportAttestationState[RI]) messageCertifiedCommit(msg MessageCert if repatt.rounds[msg.CertifiedCommittedReports.SeqNr].verifiedCertifiedCommit != nil { repatt.logger.Debug("dropping redundant MessageCertifiedCommit", commontypes.LogFields{ - "seqNr": msg.CertifiedCommittedReports.SeqNr, - "sender": sender, + "msgSeqNr": msg.CertifiedCommittedReports.SeqNr, + "sender": sender, }) return } if err := msg.CertifiedCommittedReports.Verify(repatt.config.ConfigDigest, repatt.config.OracleIdentities, repatt.config.ByzQuorumSize()); err != nil { repatt.logger.Warn("dropping MessageCertifiedCommit with invalid certified commit", commontypes.LogFields{ - "seqNr": msg.CertifiedCommittedReports.SeqNr, - "sender": sender, - "err": err, + "msgSeqNr": msg.CertifiedCommittedReports.SeqNr, + "sender": sender, }) return } repatt.logger.Debug("received valid MessageCertifiedCommit", commontypes.LogFields{ - "seqNr": msg.CertifiedCommittedReports.SeqNr, - "sender": sender, + "msgSeqNr": msg.CertifiedCommittedReports.SeqNr, + "sender": sender, }) repatt.receivedVerifiedCertifiedCommit(msg.CertifiedCommittedReports) @@ -338,9 +361,9 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) { if oraclesThatSentSignatures <= repatt.config.F { repatt.logger.Debug("cannot complete, missing CertifiedCommit and signatures", commontypes.LogFields{ - "oraclesThatSentNonemptySignatures": oraclesThatSentSignatures, - "seqNr": seqNr, - "threshold": repatt.config.F + 1, + "oraclesThatSentSignatures": oraclesThatSentSignatures, + "seqNr": seqNr, + "threshold": repatt.config.F + 1, }) } else if !repatt.rounds[seqNr].startedFetch { repatt.logger.Debug("we have received f+1 MessageReportSignatures messages but we are still missing CertifiedCommit", commontypes.LogFields{ @@ -355,6 +378,7 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) { } return } + if repatt.rounds[seqNr].reportsPlus == nil { repatt.logger.Debug("cannot complete, reportsPlus not computed yet", commontypes.LogFields{ "seqNr": seqNr, @@ -413,10 +437,6 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) { return } - if repatt.highestAttestedSeqNr < seqNr { - repatt.highestAttestedSeqNr = seqNr - } - repatt.rounds[seqNr].complete = true repatt.logger.Debug("sending attested reports to transmission protocol", commontypes.LogFields{ @@ -438,8 +458,6 @@ func (repatt *reportAttestationState[RI]) tryComplete(seqNr uint64) { case <-repatt.ctx.Done(): } } - - repatt.reap() } func (repatt *reportAttestationState[RI]) verifySignatures(publicKey types.OnchainPublicKey, seqNr uint64, reportsPlus []ocr3types.ReportPlus[RI], signatures [][]byte) bool { @@ -549,9 +567,10 @@ func (repatt *reportAttestationState[RI]) backgroundComputeReports(ctx context.C func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedReports[RI]) { if repatt.rounds[ev.SeqNr] == nil { - repatt.logger.Debug("discarding EventComputedReports from old round", commontypes.LogFields{ - "evSeqNr": ev.SeqNr, - "highestAttestedSeqNr": repatt.highestAttestedSeqNr, + repatt.logger.Debug("dropping EventComputedReports for unknown seqNr", commontypes.LogFields{ + "evSeqNr": ev.SeqNr, + "highWaterMark": repatt.highWaterMark, + "expiryRounds": repatt.expiryRounds(), }) return } @@ -569,9 +588,9 @@ func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedR sig, err := repatt.onchainKeyring.Sign(repatt.config.ConfigDigest, ev.SeqNr, reportPlus.ReportWithInfo) if err != nil { repatt.logger.Error("error while signing report", commontypes.LogFields{ - "seqNr": ev.SeqNr, - "index": i, - "error": err, + "evSeqNr": ev.SeqNr, + "index": i, + "error": err, }) return } @@ -579,7 +598,7 @@ func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedR } repatt.logger.Debug("broadcasting MessageReportSignatures", commontypes.LogFields{ - "seqNr": ev.SeqNr, + "evSeqNr": ev.SeqNr, }) repatt.netSender.Broadcast(MessageReportSignatures[RI]{ @@ -590,35 +609,57 @@ func (repatt *reportAttestationState[RI]) eventComputedReports(ev EventComputedR // no need to call tryComplete since receipt of our own MessageReportSignatures will do so } +// reap expired rounds if there is a new high water mark +func (repatt *reportAttestationState[RI]) tryReap(seqNr uint64, sender commontypes.OracleID) { + if repatt.highestReportSignaturesSeqNr[sender] >= seqNr { + return + } + + repatt.highestReportSignaturesSeqNr[sender] = seqNr + + var newHighWaterMark uint64 + { + highestReportSignaturesSeqNr := append([]uint64{}, repatt.highestReportSignaturesSeqNr...) + sort.Slice(highestReportSignaturesSeqNr, func(i, j int) bool { + return highestReportSignaturesSeqNr[i] > highestReportSignaturesSeqNr[j] + }) + newHighWaterMark = highestReportSignaturesSeqNr[repatt.config.F] // (f+1)th largest seqNr + } + + if repatt.highWaterMark >= newHighWaterMark { + return + } + + repatt.highWaterMark = newHighWaterMark // (f+1)th largest seqNr + repatt.reap() +} + func (repatt *reportAttestationState[RI]) isBeyondExpiry(seqNr uint64) bool { - highest := repatt.highestAttestedSeqNr expiry := uint64(repatt.expiryRounds()) - if highest <= expiry { + if repatt.highWaterMark <= expiry { return false } - return seqNr < highest-expiry + return seqNr < repatt.highWaterMark-expiry } func (repatt *reportAttestationState[RI]) isBeyondLookahead(seqNr uint64) bool { - highestReportSignaturesSeqNr := append([]uint64{}, repatt.highestReportSignaturesSeqNr...) - sort.Slice(highestReportSignaturesSeqNr, func(i, j int) bool { - return highestReportSignaturesSeqNr[i] > highestReportSignaturesSeqNr[j] - }) - highest := highestReportSignaturesSeqNr[repatt.config.F] // (f+1)th largest seqNr lookahead := uint64(repatt.lookaheadRounds()) if seqNr <= lookahead { return false } - return highest < seqNr-lookahead + return repatt.highWaterMark < seqNr-lookahead } -// reap expired entries from repatt.finalized to prevent unbounded state growth +// reap expired entries from repatt.rounds to prevent unbounded state growth func (repatt *reportAttestationState[RI]) reap() { maxActiveRoundCount := repatt.expiryRounds() + repatt.lookaheadRounds() - // only reap if more than ~ a third of the rounds can be discarded + // only reap if more than ~ a third of the rounds can potentially be discarded if 3*len(repatt.rounds) <= 4*maxActiveRoundCount { return } + + beforeRounds := len(repatt.rounds) + // A long time ago in a galaxy far, far away, Go used to leak memory when // repeatedly adding and deleting from the same map without ever exceeding // some maximum length. Fortunately, this is no longer the case @@ -629,6 +670,12 @@ func (repatt *reportAttestationState[RI]) reap() { delete(repatt.rounds, seqNr) } } + + repatt.logger.Debug("reaped expired rounds", commontypes.LogFields{ + "before": beforeRounds, + "after": len(repatt.rounds), + "highWaterMark": repatt.highWaterMark, + }) } // The age (denoted in rounds) after which a report is considered expired and diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go b/offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go index a8710e53..72f23228 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go @@ -17,6 +17,7 @@ func RunStatePersistence[RI any]( ctx context.Context, chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI], + chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI], chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI], config ocr3config.SharedConfig, database Database, @@ -32,6 +33,7 @@ func RunStatePersistence[RI any]( defer sched.Close() newStatePersistenceState(ctx, chNetToStatePersistence, + chOutcomeGenerationToStatePersistence, chReportAttestationToStatePersistence, config, database, id, kvStore, logger, netSender, reportingPlugin, sched).run(restoredState) } @@ -42,6 +44,7 @@ type statePersistenceState[RI any] struct { ctx context.Context chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI] + chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI] chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI] tTryReplay <-chan time.Time config ocr3config.SharedConfig @@ -70,6 +73,8 @@ func (state *statePersistenceState[RI]) run(restoredState StatePersistenceState) select { case msg := <-state.chNetToStatePersistence: msg.msg.processStatePersistence(state, msg.sender) + case ev := <-state.chOutcomeGenerationToStatePersistence: + ev.processStatePersistence(state) case ev := <-state.chReportAttestationToStatePersistence: ev.processStatePersistence(state) case ev := <-state.blockSyncState.scheduler.Scheduled(): @@ -180,6 +185,7 @@ func (state *statePersistenceState[RI]) eventStateSyncRequest(ev EventStateSyncR state.logger.Debug("received EventStateSyncRequest", commontypes.LogFields{ "heardSeqNr": ev.SeqNr, }) + state.tTryReplay = time.After(0) state.heardSeqNr(ev.SeqNr) } @@ -319,7 +325,7 @@ func (state *statePersistenceState[RI]) eventReadyToSendNextBlockSyncRequest(ev func newStatePersistenceState[RI any]( ctx context.Context, chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI], - + chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI], chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI], config ocr3config.SharedConfig, database Database, @@ -348,6 +354,7 @@ func newStatePersistenceState[RI any]( ctx, chNetToStatePersistence, + chOutcomeGenerationToStatePersistence, chReportAttestationToStatePersistence, tTryReplay, config, diff --git a/offchainreporting2plus/internal/ocr3_1/serialization/serialization.go b/offchainreporting2plus/internal/ocr3_1/serialization/serialization.go index d778e102..43d964ad 100644 --- a/offchainreporting2plus/internal/ocr3_1/serialization/serialization.go +++ b/offchainreporting2plus/internal/ocr3_1/serialization/serialization.go @@ -1188,7 +1188,10 @@ func (fpm *fromProtoMessage[RI]) chunkDigests(pbcds [][]byte) ([]protocol.BlobCh } cds := make([]protocol.BlobChunkDigest, 0, len(pbcds)) for _, pbcd := range pbcds { - cds = append(cds, protocol.BlobChunkDigest(pbcd)) + var blockChunkDigest protocol.BlobChunkDigest + copy(blockChunkDigest[:], pbcd) + + cds = append(cds, blockChunkDigest) } return cds, nil } @@ -1217,7 +1220,7 @@ func (fpm *fromProtoMessage[RI]) messageBlobChunkResponse(m *MessageBlobChunkRes copy(blobDigest[:], m.BlobDigest) return protocol.MessageBlobChunkResponse[RI]{ - fpm.requestHandle, + nil, // TODO: consider using a sentinel value here, e.g. "EmptyRequestHandleForInboundResponse" blobDigest, m.ChunkIndex, m.Chunk, diff --git a/offchainreporting2plus/types/config_digest.go b/offchainreporting2plus/types/config_digest.go index af0f6c41..d9225d39 100644 --- a/offchainreporting2plus/types/config_digest.go +++ b/offchainreporting2plus/types/config_digest.go @@ -26,7 +26,7 @@ const ( ConfigDigestPrefixTerra ConfigDigestPrefix = 0x0002 ConfigDigestPrefixSolana ConfigDigestPrefix = 0x0003 ConfigDigestPrefixStarknet ConfigDigestPrefix = 0x0004 - _ = 0x0005 // reserved, not sure for what + _ ConfigDigestPrefix = 0x0005 // reserved, not sure for what ConfigDigestPrefixMercuryV02 ConfigDigestPrefix = 0x0006 // Mercury v0.2 and v0.3 ConfigDigestPrefixEVMThresholdDecryption ConfigDigestPrefix = 0x0007 // Run Threshold/S4 plugins as part of another product under one contract. ConfigDigestPrefixEVMS4 ConfigDigestPrefix = 0x0008 // Run Threshold/S4 plugins as part of another product under one contract. @@ -34,8 +34,12 @@ const ( ConfigDigestPrefixCCIPMultiRole ConfigDigestPrefix = 0x000a // CCIP multi role ConfigDigestPrefixCCIPMultiRoleRMN ConfigDigestPrefix = 0x000b // CCIP multi role RMN ConfigDigestPrefixCCIPMultiRoleRMNCombo ConfigDigestPrefix = 0x000c // CCIP multi role & RMN combined - _ = 0x000d // reserved + _ ConfigDigestPrefix = 0x000d // reserved ConfigDigestPrefixKeystoneOCR3Capability ConfigDigestPrefix = 0x000e + ConfigDigestPrefixDONToDONDiscoveryGroup ConfigDigestPrefix = 0x000f // DON-to-DON Discovery Group + ConfigDigestPrefixDONToDONMessagingGroup ConfigDigestPrefix = 0x0010 // DON-to-DON Messaging Group + + _ ConfigDigestPrefix = 0x0013 // reserved ConfigDigestPrefixOCR1 ConfigDigestPrefix = 0xEEEE // we translate ocr1 config digest to ocr2 config digests in the networking layer _ ConfigDigestPrefix = 0xFFFF // reserved for future use diff --git a/ragep2p/peeridhelper/domain_separation.go b/ragep2p/peeridhelper/domain_separation.go new file mode 100644 index 00000000..2ba0a0b3 --- /dev/null +++ b/ragep2p/peeridhelper/domain_separation.go @@ -0,0 +1,115 @@ +package peeridhelper + +import ( + "crypto/ed25519" + "crypto/sha256" + "encoding/binary" + + "github.com/smartcontractkit/libocr/ragep2p/types" +) + +// context string acting as domain separator according to RFC8446 +const context = "ragep2p arbitrary message signing" + +// We hash messages longer than shortMessageMax to avoid copying large messages (and allocating lots of memory in the process) +const shortMessageMax = 1024 + +// MakePeerIDSignatureDomainSeparatedPayload provides a safe way to generate signature payloads for +// signing by the keypair associated with a PeerID. The payloads are guaranteed to not be confused +// with messages signed within ragep2p. +// +// Be sure to use domain separators that are unique to your application and do not +// collide with other applications! +// +// The output of this function must be directly used as the payload for the underlying ed25519.Sign function. +func MakePeerIDSignatureDomainSeparatedPayload(domainSeparator string, message []byte) []byte { + // We use the same domain separation scheme as TLS 1.3, but with a different context string. Details below. + // RFC8446 specifies the following domain separation scheme for CertificateVerify messages: + // The digital signature is then computed over the concatenation of: + // - A string that consists of octet 32 (0x20) repeated 64 times + // - The context string + // - A single 0 byte which serves as the separator + // - The content to be signed + // We use the same domain separation scheme, but use a different context string + // to ensure domain separation with TLS. + + capacity := 64 + // 64 times 0x20 from RFC8446 + len(context) + // context from RFC8446 + 1 + // 0x0 byte from RFC8446 + 8 + // length of domainSeparator + len(domainSeparator) // domainSeparator + if len(message) <= shortMessageMax { + capacity += 1 + // 0x0 to indicate message is "short" + 8 + // length of message + len(message) // message + } else { + capacity += 1 + // 0x1 to indicate message is "long" + sha256.Size // sha256 hash of message + } + + // Follow RFC8446 ... + payload := make([]byte, 0, capacity) + for range 64 { + payload = append(payload, 0x20) + } + payload = append(payload, []byte(context)...) + payload = append(payload, 0x0) + + // ... and now that we have domain separation from all things TLS, + // we perform further domain separation between different users of + // MakePeerIDSignaturePayload ... + payload = binary.BigEndian.AppendUint64(payload, uint64(len(domainSeparator))) + payload = append(payload, []byte(domainSeparator)...) + // ... and finally append the message (or a hash of it, if the message is too long) + if len(message) <= shortMessageMax { + payload = append(payload, 0x0) + + payload = binary.BigEndian.AppendUint64(payload, uint64(len(message))) + payload = append(payload, message...) + } else { + payload = append(payload, 0x1) + + h := sha256.New() + h.Write(message) + payload = h.Sum(payload) + } + + return payload +} + +// DomainSeparatedPeerKeyring is a wrapper around a PeerKeyring that ensures +// messages signed with it are domain separated from messages signed within ragep2p. +// +// Be sure to use domain separators that are unique to your application and do not +// collide with other applications! +// +// We intentionally do not implement the PeerKeyring interface, because we don't +// want to encourage nesting of DomainSeparatedPeerKeyring. +type DomainSeparatedPeerKeyring struct { + domainSeparator string + peerKeyring types.PeerKeyring +} + +func NewDomainSeparatedPeerKeyring(domainSeparator string, peerKeyring types.PeerKeyring) *DomainSeparatedPeerKeyring { + return &DomainSeparatedPeerKeyring{ + domainSeparator, + peerKeyring, + } +} + +func (dk *DomainSeparatedPeerKeyring) Sign(message []byte) (signature []byte, err error) { + return dk.peerKeyring.Sign(MakePeerIDSignatureDomainSeparatedPayload(dk.domainSeparator, message)) +} + +func (dk *DomainSeparatedPeerKeyring) Verify(publicKey types.PeerPublicKey, message []byte, signature []byte) bool { + // not needed strictly speaking, added for defense in depth + if len(publicKey) != ed25519.PublicKeySize { + return false + } + + return ed25519.Verify(ed25519.PublicKey(publicKey[:]), MakePeerIDSignatureDomainSeparatedPayload(dk.domainSeparator, message), signature) +} + +func (dk *DomainSeparatedPeerKeyring) DomainSeparatorAndPublicKey() (string, types.PeerPublicKey) { + return dk.domainSeparator, dk.peerKeyring.PublicKey() +}