Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions networking/peer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,25 @@ import (
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

func peerGroupStreamNamePrefix(configDigestPrefix ocr2types.ConfigDigestPrefix) (streamNamePrefix string, ok bool) {
func peerGroupStreamNamePrefix(configDigest ocr2types.ConfigDigest) (streamNamePrefix string, err error) {
configDigestPrefix := ocr2types.ConfigDigestPrefixFromConfigDigest(configDigest)

switch configDigestPrefix { // nolint:exhaustive
case ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo:
return "ccip-rmn/", true
return "ccip-rmn/", nil
case ocr2types.ConfigDigestPrefixDONToDONMessagingGroup:
// We include the full config digest in the stream name prefix to have clean
// namespacing between different PeerGroups.
return fmt.Sprintf("don-to-don/%s/", configDigest), nil
case ocr2types.ConfigDigestPrefixDONToDONDiscoveryGroup:
// We include the full config digest in the stream name prefix to have clean
// namespacing between different PeerGroups.
// Based on the current don-to-don design, we do not expect to have any streams
// with this config digest prefix, but we nevertheless need to allowlist it
// to enable creation of corresponding PeerGroups.
return fmt.Sprintf("don-to-don-discovery/%s/", configDigest), nil
default:
return "", false
return "", fmt.Errorf("config digest prefix %s is not allowed", configDigestPrefix)
}
}

Expand Down Expand Up @@ -84,10 +97,9 @@ func (f *peerGroupFactory) NewPeerGroup(
peerIDs []string,
bootstrappers []commontypes.BootstrapperLocator,
) (PeerGroup, error) {
configDigestPrefix := ocr2types.ConfigDigestPrefixFromConfigDigest(configDigest)
streamNamePrefix, ok := peerGroupStreamNamePrefix(configDigestPrefix)
if !ok {
return nil, fmt.Errorf("config digest prefix %s is not allowed", configDigestPrefix)
streamNamePrefix, err := peerGroupStreamNamePrefix(configDigest)
if err != nil {
return nil, fmt.Errorf("could not get stream name prefix: %w", err)
}

decodedv2PeerIDs, err := decodev2PeerIDs(peerIDs)
Expand Down
2 changes: 1 addition & 1 deletion offchainreporting/internal/managed/track_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (state *trackConfigState) checkLatestConfigDetails() (
defer configCancel()
contractConfig, err := state.configTracker.ConfigFromLogs(configCtx, changedInBlock)
if err != nil {
state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfigDetails()", configCtx, commontypes.LogFields{
state.logger.ErrorIfNotCanceled("TrackConfig: error during ConfigFromLogs()", configCtx, commontypes.LogFields{
"error": err,
})
return nil, true
Expand Down
2 changes: 1 addition & 1 deletion offchainreporting2plus/internal/managed/track_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (state *trackConfigState) checkLatestConfigDetails() (

contractConfig, err := state.configTracker.LatestConfig(ctx, changedInBlock)
if err != nil {
state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfigDetails()", ctx, commontypes.LogFields{
state.logger.ErrorIfNotCanceled("TrackConfig: error during LatestConfig()", ctx, commontypes.LogFields{
"error": err,
})
return nil, true
Expand Down
3 changes: 3 additions & 0 deletions offchainreporting2plus/internal/ocr3_1/protocol/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (o *oracleState[RI]) run() {
chNetToStatePersistence := make(chan MessageToStatePersistenceWithSender[RI])
o.chNetToStatePersistence = chNetToStatePersistence

chOutcomeGenerationToStatePersistence := make(chan EventToStatePersistence[RI])
chReportAttestationToStatePersistence := make(chan EventToStatePersistence[RI])

chNetToBlobExchange := make(chan MessageToBlobExchangeWithSender[RI])
Expand Down Expand Up @@ -243,6 +244,7 @@ func (o *oracleState[RI]) run() {
chPacemakerToOutcomeGeneration,
chOutcomeGenerationToPacemaker,
chOutcomeGenerationToReportAttestation,
chOutcomeGenerationToStatePersistence,
&blobEndpoint,
o.config,
o.database,
Expand Down Expand Up @@ -282,6 +284,7 @@ func (o *oracleState[RI]) run() {
o.childCtx,

chNetToStatePersistence,
chOutcomeGenerationToStatePersistence,
chReportAttestationToStatePersistence,
o.config,
o.database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func RunOutcomeGeneration[RI any](
chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI],
chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI],
chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI],
chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI],
blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher,
config ocr3config.SharedConfig,
database Database,
Expand All @@ -58,6 +59,7 @@ func RunOutcomeGeneration[RI any](
chPacemakerToOutcomeGeneration: chPacemakerToOutcomeGeneration,
chOutcomeGenerationToPacemaker: chOutcomeGenerationToPacemaker,
chOutcomeGenerationToReportAttestation: chOutcomeGenerationToReportAttestation,
chOutcomeGenerationToStatePersistence: chOutcomeGenerationToStatePersistence,
blobBroadcastFetcher: blobBroadcastFetcher,
config: config,
database: database,
Expand All @@ -83,6 +85,7 @@ type outcomeGenerationState[RI any] struct {
chPacemakerToOutcomeGeneration <-chan EventToOutcomeGeneration[RI]
chOutcomeGenerationToPacemaker chan<- EventToPacemaker[RI]
chOutcomeGenerationToReportAttestation chan<- EventToReportAttestation[RI]
chOutcomeGenerationToStatePersistence chan<- EventToStatePersistence[RI]
blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher
config ocr3config.SharedConfig
database Database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar

outgen.refreshCommittedSeqNrAndCert()
if !outgen.ensureHighestCertifiedIsCompatible(msg.EpochStartProof.HighestCertified, "MessageEpochStart") {
select {
case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{
msg.EpochStartProof.HighestCertified.SeqNr(),
}:
case <-outgen.ctx.Done():
}
return
}

Expand Down Expand Up @@ -154,6 +160,7 @@ func (outgen *outcomeGenerationState[RI]) messageEpochStart(msg MessageEpochStar
prepareQc.ReportsPlusPrecursor,
reportPlusPrecursorDigest,
}

outgen.logger.Debug("broadcasting MessagePrepare (reproposal where prepareQcSeqNr == committedSeqNr)", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
})
Expand Down Expand Up @@ -914,38 +921,33 @@ func (outgen *outcomeGenerationState[RI]) tryProcessCommitPool() {
return
}

if outgen.followerState.openKVTxn == nil {
outgen.logger.Critical("assumption violation, open kv transaction must exist in this phase", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"phase": outgen.followerState.phase,
})
panic("")
}
err := outgen.followerState.openKVTxn.Commit()
outgen.followerState.openKVTxn.Discard()
outgen.followerState.openKVTxn = nil
if err != nil {
outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"error": err,
})

{
kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
if err != nil {
outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"error": err,
})
return
}
if outgen.followerState.openKVTxn != nil {
err := outgen.followerState.openKVTxn.Commit()
outgen.followerState.openKVTxn.Discard()
outgen.followerState.openKVTxn = nil
if err != nil {
outgen.logger.Warn("failed to commit kv transaction", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"error": err,
})

if kvSeqNr < outgen.sharedState.seqNr {
outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"kvSeqNr": kvSeqNr,
})
return
{
kvSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
if err != nil {
outgen.logger.Error("failed to validate kv commit post-condition, upon kv commit failure", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"error": err,
})
return
}

if kvSeqNr < outgen.sharedState.seqNr {
outgen.logger.Error("kv commit failed and post-condition (seqNr <= kvSeqNr) is not satisfied", commontypes.LogFields{
"seqNr": outgen.sharedState.seqNr,
"kvSeqNr": kvSeqNr,
})
return
}
}
}
}
Expand Down Expand Up @@ -1252,6 +1254,7 @@ func (outgen *outcomeGenerationState[RI]) persistCommitAsBlock(commit *Certified
}

func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {

preRefreshCommittedSeqNr := outgen.sharedState.committedSeqNr

postRefreshCommittedSeqNr, err := outgen.kvStore.HighestCommittedSeqNr()
Expand All @@ -1269,9 +1272,20 @@ func (outgen *outcomeGenerationState[RI]) refreshCommittedSeqNrAndCert() {
})

if postRefreshCommittedSeqNr == preRefreshCommittedSeqNr {
return
} else if postRefreshCommittedSeqNr+1 == preRefreshCommittedSeqNr {

logger.Warn("last kv transaction commit failed, requesting state sync", nil)
select {
case outgen.chOutcomeGenerationToStatePersistence <- EventStateSyncRequest[RI]{
preRefreshCommittedSeqNr,
}:
case <-outgen.ctx.Done():
}

return
} else if postRefreshCommittedSeqNr < preRefreshCommittedSeqNr {
logger.Critical("assumption violation, kv is behind what outgen knows as committed", nil)
logger.Critical("assumption violation, kv is way behind what outgen knows as committed", nil)
panic("")
}

Expand Down
Loading
Loading