From 059995dad1dff53f82cde8d0e029b5bd4b9961a9 Mon Sep 17 00:00:00 2001 From: kaleofduty <59616916+kaleofduty@users.noreply.github.com> Date: Fri, 15 Aug 2025 12:50:12 +0200 Subject: [PATCH] Skip noisy warnings "Failed to process announcement from reconcile" / "peer is not an oracle in any of our jobs" OCR3.1 polishing --- networking/ragedisco/discovery_protocol.go | 3 +- .../internal/ocr3_1/protocol/oracle.go | 3 + .../ocr3_1/protocol/outcome_generation.go | 3 + .../protocol/outcome_generation_follower.go | 78 +++++++++++-------- .../ocr3_1/protocol/state_persistence.go | 9 ++- .../ocr3_1/serialization/serialization.go | 7 +- 6 files changed, 67 insertions(+), 36 deletions(-) diff --git a/networking/ragedisco/discovery_protocol.go b/networking/ragedisco/discovery_protocol.go index 0218834b..c56ec215 100644 --- a/networking/ragedisco/discovery_protocol.go +++ b/networking/ragedisco/discovery_protocol.go @@ -542,7 +542,8 @@ func (p *discoveryProtocol) lockedProcessAnnouncement(ann Announcement) error { } if p.locked.numGroupsByOracle[pid] == 0 { - return fmt.Errorf("peer %s is not an oracle in any of our jobs; perhaps whoever sent this is running a job that includes us and this peer, but we are not running that job", pid) + return nil + } err = ann.verify() diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go b/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go index 9d22b1e0..03e3d625 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go @@ -167,6 +167,7 @@ func (o *oracleState[RI]) run() { chNetToStatePersistence := make(chan MessageToStatePersistenceWithSender[RI]) o.chNetToStatePersistence = chNetToStatePersistence + chOutcomeGenerationToStatePersistence := make(chan EventToStatePersistence[RI]) chReportAttestationToStatePersistence := make(chan EventToStatePersistence[RI]) chNetToBlobExchange := make(chan MessageToBlobExchangeWithSender[RI]) @@ -243,6 +244,7 @@ func (o *oracleState[RI]) run() { chPacemakerToOutcomeGeneration, chOutcomeGenerationToPacemaker, chOutcomeGenerationToReportAttestation, + chOutcomeGenerationToStatePersistence, &blobEndpoint, o.config, o.database, @@ -282,6 +284,7 @@ func (o *oracleState[RI]) run() { o.childCtx, chNetToStatePersistence, + chOutcomeGenerationToStatePersistence, chReportAttestationToStatePersistence, o.config, o.database, diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go b/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go index 640ff543..dcdc9278 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation.go @@ -33,6 +33,7 @@ func RunOutcomeGeneration[RI any]( chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI], chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI], chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI], + chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI], blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher, config ocr3config.SharedConfig, database Database, @@ -58,6 +59,7 @@ func RunOutcomeGeneration[RI any]( chPacemakerToOutcomeGeneration: chPacemakerToOutcomeGeneration, chOutcomeGenerationToPacemaker: chOutcomeGenerationToPacemaker, chOutcomeGenerationToReportAttestation: chOutcomeGenerationToReportAttestation, + chOutcomeGenerationToStatePersistence: chOutcomeGenerationToStatePersistence, blobBroadcastFetcher: blobBroadcastFetcher, config: config, database: database, @@ -83,6 +85,7 @@ type outcomeGenerationState[RI any] struct { chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI] chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI] chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI] + chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI] blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher config ocr3config.SharedConfig database Database diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go b/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go index 64b27d7b..d87036b5 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/outcome_generation_follower.go @@ -83,6 +83,12 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar outgen.refreshCommittedSeqNrAndCert() if !outgen.ensureHighestCertifiedIsCompatible(msg.EpochStartProof.HighestCertified, "MessageEpochStart") { + select { + case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{ + msg.EpochStartProof.HighestCertified.SeqNr(), + }: + case <-outgen.ctx.Done(): + } return } @@ -154,6 +160,7 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar prepareQc.ReportsPlusPrecursor, reportPlusPrecursorDigest, } + outgen.logger.Debug("broadcasting MessagePrepare (reproposal where prepareQcSeqNr == committedSeqNr)", commontypes.LogFields{ "seqNr": outgen.sharedState.seqNr, }) @@ -914,38 +921,33 @@ func (outgen *outcomeGenerationState[RI]) tryProcessCommitPool() { return } - if outgen.followerState.openKVTxn == nil { - outgen.logger.Critical("assumption violation, open kv transaction must exist in this phase", commontypes.LogFields{ - "seqNr": outgen.sharedState.seqNr, - "phase": outgen.followerState.phase, - }) - panic("") - } - err := outgen.followerState.openKVTxn.Commit() - outgen.followerState.openKVTxn.Discard() - outgen.followerState.openKVTxn = nil - if err != nil { - outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{ - "seqNr": outgen.sharedState.seqNr, - "error": err, - }) - - { - kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr() - if err != nil { - outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{ - "seqNr": outgen.sharedState.seqNr, - "error": err, - }) - return - } + if outgen.followerState.openKVTxn != nil { + err := outgen.followerState.openKVTxn.Commit() + outgen.followerState.openKVTxn.Discard() + outgen.followerState.openKVTxn = nil + if err != nil { + outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{ + "seqNr": outgen.sharedState.seqNr, + "error": err, + }) - if kvSeqNr < outgen.sharedState.seqNr { - outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{ - "seqNr": outgen.sharedState.seqNr, - "kvSeqNr": kvSeqNr, - }) - return + { + kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr() + if err != nil { + outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{ + "seqNr": outgen.sharedState.seqNr, + "error": err, + }) + return + } + + if kvSeqNr < outgen.sharedState.seqNr { + outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{ + "seqNr": outgen.sharedState.seqNr, + "kvSeqNr": kvSeqNr, + }) + return + } } } } @@ -1252,6 +1254,7 @@ func (outgen *outcomeGenerationState[RI]) persistCommitAsBlock(commit *Certified } func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() { + preRefreshCommittedSeqNr := outgen.sharedState.committedSeqNr postRefreshCommittedSeqNr, err := outgen.kvStore.HighestCommittedSeqNr() @@ -1269,9 +1272,20 @@ func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() { }) if postRefreshCommittedSeqNr == preRefreshCommittedSeqNr { + return + } else if postRefreshCommittedSeqNr+1 == preRefreshCommittedSeqNr { + + logger.Warn("last kv transaction commit failed, requesting state sync", nil) + select { + case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{ + preRefreshCommittedSeqNr, + }: + case <-outgen.ctx.Done(): + } + return } else if postRefreshCommittedSeqNr < preRefreshCommittedSeqNr { - logger.Critical("assumption violation, kv is behind what outgen knows as committed", nil) + logger.Critical("assumption violation, kv is way behind what outgen knows as committed", nil) panic("") } diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go b/offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go index a8710e53..72f23228 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/state_persistence.go @@ -17,6 +17,7 @@ func RunStatePersistence[RI any]( ctx context.Context, chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI], + chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI], chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI], config ocr3config.SharedConfig, database Database, @@ -32,6 +33,7 @@ func RunStatePersistence[RI any]( defer sched.Close() newStatePersistenceState(ctx, chNetToStatePersistence, + chOutcomeGenerationToStatePersistence, chReportAttestationToStatePersistence, config, database, id, kvStore, logger, netSender, reportingPlugin, sched).run(restoredState) } @@ -42,6 +44,7 @@ type statePersistenceState[RI any] struct { ctx context.Context chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI] + chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI] chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI] tTryReplay <-chan time.Time config ocr3config.SharedConfig @@ -70,6 +73,8 @@ func (state *statePersistenceState[RI]) run(restoredState StatePersistenceState) select { case msg := <-state.chNetToStatePersistence: msg.msg.processStatePersistence(state, msg.sender) + case ev := <-state.chOutcomeGenerationToStatePersistence: + ev.processStatePersistence(state) case ev := <-state.chReportAttestationToStatePersistence: ev.processStatePersistence(state) case ev := <-state.blockSyncState.scheduler.Scheduled(): @@ -180,6 +185,7 @@ func (state *statePersistenceState[RI]) eventStateSyncRequest(ev EventStateSyncR state.logger.Debug("received EventStateSyncRequest", commontypes.LogFields{ "heardSeqNr": ev.SeqNr, }) + state.tTryReplay = time.After(0) state.heardSeqNr(ev.SeqNr) } @@ -319,7 +325,7 @@ func (state *statePersistenceState[RI]) eventReadyToSendNextBlockSyncRequest(ev func newStatePersistenceState[RI any]( ctx context.Context, chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI], - + chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI], chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI], config ocr3config.SharedConfig, database Database, @@ -348,6 +354,7 @@ func newStatePersistenceState[RI any]( ctx, chNetToStatePersistence, + chOutcomeGenerationToStatePersistence, chReportAttestationToStatePersistence, tTryReplay, config, diff --git a/offchainreporting2plus/internal/ocr3_1/serialization/serialization.go b/offchainreporting2plus/internal/ocr3_1/serialization/serialization.go index d778e102..43d964ad 100644 --- a/offchainreporting2plus/internal/ocr3_1/serialization/serialization.go +++ b/offchainreporting2plus/internal/ocr3_1/serialization/serialization.go @@ -1188,7 +1188,10 @@ func (fpm *fromProtoMessage[RI]) chunkDigests(pbcds [][]byte) ([]protocol.BlobCh } cds := make([]protocol.BlobChunkDigest, 0, len(pbcds)) for _, pbcd := range pbcds { - cds = append(cds, protocol.BlobChunkDigest(pbcd)) + var blockChunkDigest protocol.BlobChunkDigest + copy(blockChunkDigest[:], pbcd) + + cds = append(cds, blockChunkDigest) } return cds, nil } @@ -1217,7 +1220,7 @@ func (fpm *fromProtoMessage[RI]) messageBlobChunkResponse(m *MessageBlobChunkRes copy(blobDigest[:], m.BlobDigest) return protocol.MessageBlobChunkResponse[RI]{ - fpm.requestHandle, + nil, // TODO: consider using a sentinel value here, e.g. "EmptyRequestHandleForInboundResponse" blobDigest, m.ChunkIndex, m.Chunk,