Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion networking/ragedisco/discovery_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,8 @@ func (p *discoveryProtocol) lockedProcessAnnouncement(ann Announcement) error {
}

if p.locked.numGroupsByOracle[pid] == 0 {
return fmt.Errorf("peer %s is not an oracle in any of our jobs; perhaps whoever sent this is running a job that includes us and this peer, but we are not running that job", pid)
return nil

}

err = ann.verify()
Expand Down
3 changes: 3 additions & 0 deletions offchainreporting2plus/internal/ocr3_1/protocol/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (o *oracleState[RI]) run() {
chNetToStatePersistence := make(chan MessageToStatePersistenceWithSender[RI])
o.chNetToStatePersistence = chNetToStatePersistence

chOutcomeGenerationToStatePersistence := make(chan EventToStatePersistence[RI])
chReportAttestationToStatePersistence := make(chan EventToStatePersistence[RI])

chNetToBlobExchange := make(chan MessageToBlobExchangeWithSender[RI])
Expand Down Expand Up @@ -243,6 +244,7 @@ func (o *oracleState[RI]) run() {
chPacemakerToOutcomeGeneration,
chOutcomeGenerationToPacemaker,
chOutcomeGenerationToReportAttestation,
chOutcomeGenerationToStatePersistence,
&blobEndpoint,
o.config,
o.database,
Expand Down Expand Up @@ -282,6 +284,7 @@ func (o *oracleState[RI]) run() {
o.childCtx,

chNetToStatePersistence,
chOutcomeGenerationToStatePersistence,
chReportAttestationToStatePersistence,
o.config,
o.database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func RunOutcomeGeneration[RI any](
chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI],
chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI],
chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI],
chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI],
blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher,
config ocr3config.SharedConfig,
database Database,
Expand All @@ -58,6 +59,7 @@ func RunOutcomeGeneration[RI any](
chPacemakerToOutcomeGeneration: chPacemakerToOutcomeGeneration,
chOutcomeGenerationToPacemaker: chOutcomeGenerationToPacemaker,
chOutcomeGenerationToReportAttestation: chOutcomeGenerationToReportAttestation,
chOutcomeGenerationToStatePersistence: chOutcomeGenerationToStatePersistence,
blobBroadcastFetcher: blobBroadcastFetcher,
config: config,
database: database,
Expand All @@ -83,6 +85,7 @@ type outcomeGenerationState[RI any] struct {
chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI]
chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI]
chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI]
chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI]
blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher
config ocr3config.SharedConfig
database Database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar

outgen.refreshCommittedSeqNrAndCert()
if !outgen.ensureHighestCertifiedIsCompatible(msg.EpochStartProof.HighestCertified, "MessageEpochStart") {
select {
case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{
msg.EpochStartProof.HighestCertified.SeqNr(),
}:
case <-outgen.ctx.Done():
}
return
}

Expand Down Expand Up @@ -154,6 +160,7 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar
prepareQc.ReportsPlusPrecursor,
reportPlusPrecursorDigest,
}

outgen.logger.Debug("broadcasting MessagePrepare (reproposal where prepareQcSeqNr == committedSeqNr)", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
})
Expand Down Expand Up @@ -914,38 +921,33 @@ func (outgen *outcomeGenerationState[RI]) tryProcessCommitPool() {
return
}

if outgen.followerState.openKVTxn == nil {
outgen.logger.Critical("assumption violation, open kv transaction must exist in this phase", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"phase": outgen.followerState.phase,
})
panic("")
}
err := outgen.followerState.openKVTxn.Commit()
outgen.followerState.openKVTxn.Discard()
outgen.followerState.openKVTxn = nil
if err != nil {
outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"error": err,
})

{
kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
if err != nil {
outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"error": err,
})
return
}
if outgen.followerState.openKVTxn != nil {
err := outgen.followerState.openKVTxn.Commit()
outgen.followerState.openKVTxn.Discard()
outgen.followerState.openKVTxn = nil
if err != nil {
outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"error": err,
})

if kvSeqNr < outgen.sharedState.seqNr {
outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"kvSeqNr": kvSeqNr,
})
return
{
kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
if err != nil {
outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"error": err,
})
return
}

if kvSeqNr < outgen.sharedState.seqNr {
outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"kvSeqNr": kvSeqNr,
})
return
}
}
}
}
Expand Down Expand Up @@ -1252,6 +1254,7 @@ func (outgen *outcomeGenerationState[RI]) persistCommitAsBlock(commit *Certified
}

func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {

preRefreshCommittedSeqNr := outgen.sharedState.committedSeqNr

postRefreshCommittedSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
Expand All @@ -1269,9 +1272,20 @@ func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {
})

if postRefreshCommittedSeqNr == preRefreshCommittedSeqNr {
return
} else if postRefreshCommittedSeqNr+1 == preRefreshCommittedSeqNr {

logger.Warn("last kv transaction commit failed, requesting state sync", nil)
select {
case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{
preRefreshCommittedSeqNr,
}:
case <-outgen.ctx.Done():
}

return
} else if postRefreshCommittedSeqNr < preRefreshCommittedSeqNr {
logger.Critical("assumption violation, kv is behind what outgen knows as committed", nil)
logger.Critical("assumption violation, kv is way behind what outgen knows as committed", nil)
panic("")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func RunStatePersistence[RI any](
ctx context.Context,

chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI],
chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI],
chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI],
config ocr3config.SharedConfig,
database Database,
Expand All @@ -32,6 +33,7 @@ func RunStatePersistence[RI any](
defer sched.Close()

newStatePersistenceState(ctx, chNetToStatePersistence,
chOutcomeGenerationToStatePersistence,
chReportAttestationToStatePersistence,
config, database, id, kvStore, logger, netSender, reportingPlugin, sched).run(restoredState)
}
Expand All @@ -42,6 +44,7 @@ type statePersistenceState[RI any] struct {
ctx context.Context

chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI]
chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI]
chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI]
tTryReplay <-chan time.Time
config ocr3config.SharedConfig
Expand Down Expand Up @@ -70,6 +73,8 @@ func (state *statePersistenceState[RI]) run(restoredState StatePersistenceState)
select {
case msg := <-state.chNetToStatePersistence:
msg.msg.processStatePersistence(state, msg.sender)
case ev := <-state.chOutcomeGenerationToStatePersistence:
ev.processStatePersistence(state)
case ev := <-state.chReportAttestationToStatePersistence:
ev.processStatePersistence(state)
case ev := <-state.blockSyncState.scheduler.Scheduled():
Expand Down Expand Up @@ -180,6 +185,7 @@ func (state *statePersistenceState[RI]) eventStateSyncRequest(ev EventStateSyncR
state.logger.Debug("received EventStateSyncRequest", commontypes.LogFields{
"heardSeqNr": ev.SeqNr,
})
state.tTryReplay = time.After(0)
state.heardSeqNr(ev.SeqNr)
}

Expand Down Expand Up @@ -319,7 +325,7 @@ func (state *statePersistenceState[RI]) eventReadyToSendNextBlockSyncRequest(ev
func newStatePersistenceState[RI any](
ctx context.Context,
chNetToStatePersistence <-chan MessageToStatePersistenceWithSender[RI],

chOutcomeGenerationToStatePersistence <-chan EventToStatePersistence[RI],
chReportAttestationToStatePersistence <-chan EventToStatePersistence[RI],
config ocr3config.SharedConfig,
database Database,
Expand Down Expand Up @@ -348,6 +354,7 @@ func newStatePersistenceState[RI any](
ctx,

chNetToStatePersistence,
chOutcomeGenerationToStatePersistence,
chReportAttestationToStatePersistence,
tTryReplay,
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,10 @@ func (fpm *fromProtoMessage[RI]) chunkDigests(pbcds [][]byte) ([]protocol.BlobCh
}
cds := make([]protocol.BlobChunkDigest, 0, len(pbcds))
for _, pbcd := range pbcds {
cds = append(cds, protocol.BlobChunkDigest(pbcd))
var blockChunkDigest protocol.BlobChunkDigest
copy(blockChunkDigest[:], pbcd)

cds = append(cds, blockChunkDigest)
}
return cds, nil
}
Expand Down Expand Up @@ -1217,7 +1220,7 @@ func (fpm *fromProtoMessage[RI]) messageBlobChunkResponse(m *MessageBlobChunkRes
copy(blobDigest[:], m.BlobDigest)

return protocol.MessageBlobChunkResponse[RI]{
fpm.requestHandle,
nil, // TODO: consider using a sentinel value here, e.g. "EmptyRequestHandleForInboundResponse"
blobDigest,
m.ChunkIndex,
m.Chunk,
Expand Down