Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions networking/ocr_endpoint_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/libocr/subprocesses"

"github.com/smartcontractkit/libocr/internal/loghelper"
"github.com/smartcontractkit/libocr/internal/util"
)

var (
Expand All @@ -22,8 +23,8 @@ var (
// ocrEndpointV3 represents a member of a particular feed oracle group
type ocrEndpointV3 struct {
// configuration and settings
defaultPriorityConfig ocr2types.BinaryNetworkEndpoint2Config
lowPriorityConfig ocr2types.BinaryNetworkEndpoint2Config
defaultPriorityConfig binaryNetworkEndpoint2ConfigNoNils
lowPriorityConfig binaryNetworkEndpoint2ConfigNoNils
peerMapping map[commontypes.OracleID]ragetypes.PeerID
host *ragep2pnew.Host
configDigest ocr2types.ConfigDigest
Expand All @@ -50,6 +51,21 @@ type priorityStreamGroup struct {
Default ragep2pnew.Stream2
}

// copy of ocr2types.BinaryNetworkEndpoint2Config without nils
type binaryNetworkEndpoint2ConfigNoNils struct {
ocr2types.BinaryNetworkEndpointLimits
IncomingMessageBufferSize int
OutgoingMessageBufferSize int
}

func binaryNetworkEndpoint2ConfigNilCoalesceWithPeerConfig(config ocr2types.BinaryNetworkEndpoint2Config, peer *concretePeerV2) binaryNetworkEndpoint2ConfigNoNils {
return binaryNetworkEndpoint2ConfigNoNils{
config.BinaryNetworkEndpointLimits,
util.NilCoalesce(config.OverrideIncomingMessageBufferSize, peer.endpointConfig.IncomingMessageBufferSize),
util.NilCoalesce(config.OverrideOutgoingMessageBufferSize, peer.endpointConfig.OutgoingMessageBufferSize),
}
}

//nolint:unused
func newOCREndpointV3(
logger loghelper.LoggerWithContext,
Expand Down Expand Up @@ -94,8 +110,8 @@ func newOCREndpointV3(
}

o := &ocrEndpointV3{
defaultPriorityConfig,
lowPriorityConfig,
binaryNetworkEndpoint2ConfigNilCoalesceWithPeerConfig(defaultPriorityConfig, peer),
binaryNetworkEndpoint2ConfigNilCoalesceWithPeerConfig(lowPriorityConfig, peer),
peerMapping,
host,
configDigest,
Expand Down Expand Up @@ -141,8 +157,8 @@ func (o *ocrEndpointV3) start() error {
pid,
streamNameFromConfigDigestAndPriority(o.configDigest, ragep2pnew.StreamPriorityLow),
ragep2pnew.StreamPriorityLow,
ragep2pnew.Stream2Limits{o.lowPriorityConfig.OverrideOutgoingMessageBufferSize,
o.lowPriorityConfig.OverrideIncomingMessageBufferSize,
ragep2pnew.Stream2Limits{o.lowPriorityConfig.OutgoingMessageBufferSize,
o.lowPriorityConfig.IncomingMessageBufferSize,
o.lowPriorityConfig.MaxMessageLength,
ragetypes.TokenBucketParams{
o.lowPriorityConfig.MessagesRatePerOracle,
Expand All @@ -163,8 +179,8 @@ func (o *ocrEndpointV3) start() error {
streamNameFromConfigDigestAndPriority(o.configDigest, ragep2pnew.StreamPriorityDefault),
ragep2pnew.StreamPriorityDefault,
ragep2pnew.Stream2Limits{
o.defaultPriorityConfig.OverrideOutgoingMessageBufferSize,
o.defaultPriorityConfig.OverrideIncomingMessageBufferSize,
o.defaultPriorityConfig.OutgoingMessageBufferSize,
o.defaultPriorityConfig.IncomingMessageBufferSize,
o.defaultPriorityConfig.MaxMessageLength,
ragetypes.TokenBucketParams{
o.defaultPriorityConfig.MessagesRatePerOracle,
Expand Down
17 changes: 5 additions & 12 deletions offchainreporting2plus/internal/managed/limits/ocr3_1_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func OCR3_1Limits(cfg ocr3_1config.PublicConfig, pluginLimits ocr3_1types.Report
const sigOverhead = 10
const overhead = 256

maxLenStateTransitionOutputs := add(
maxLenStateWriteSet := add(
pluginLimits.MaxKeyValueModifiedKeysPlusValuesBytes,
mul(
pluginLimits.MaxKeyValueModifiedKeys,
repeatedOverhead,
),
)
maxLenCertifiedPrepareOrCommit := add(mul(ed25519.SignatureSize+sigOverhead, cfg.ByzQuorumSize()),
sha256.Size*3,
sha256.Size*5,
overhead)
maxLenMsgNewEpoch := overhead
maxLenMsgEpochStartRequest := add(maxLenCertifiedPrepareOrCommit, overhead)
Expand Down Expand Up @@ -132,7 +132,7 @@ func OCR3_1Limits(cfg ocr3_1config.PublicConfig, pluginLimits ocr3_1types.Report

// block sync messages
maxLenMsgBlockSyncRequest := overhead
maxLenAttestedStateTransitionBlock := add(maxLenCertifiedPrepareOrCommit, maxLenStateTransitionOutputs)
maxLenAttestedStateTransitionBlock := add(maxLenCertifiedPrepareOrCommit, maxLenStateWriteSet)
maxLenMsgBlockSyncResponse := add(mul(cfg.GetMaxBlocksPerBlockSyncResponse(), maxLenAttestedStateTransitionBlock), overhead)

// blob exchange messages
Expand All @@ -151,13 +151,11 @@ func OCR3_1Limits(cfg ocr3_1config.PublicConfig, pluginLimits ocr3_1types.Report
maxLenMsgEpochStartRequest,
maxLenMsgEpochStart,
maxLenMsgRoundStart,
maxLenMsgObservation,
maxLenMsgProposal,
maxLenMsgPrepare,
maxLenMsgCommit,
maxLenMsgReportSignatures,
maxLenMsgReportsPlusPrecursorRequest,
maxLenMsgReportsPlusPrecursor,
maxLenMsgBlobOffer,
maxLenMsgBlobChunkRequest,
)
Expand All @@ -174,15 +172,15 @@ func OCR3_1Limits(cfg ocr3_1config.PublicConfig, pluginLimits ocr3_1types.Report

defaultPriorityMessagesRate := (1.0*float64(time.Second)/float64(cfg.GetDeltaResend()) +
3.0*float64(time.Second)/minEpochInterval +
8.0*float64(time.Second)/float64(minRoundInterval) +
6.0*float64(time.Second)/float64(minRoundInterval) +
2.0*float64(time.Second)/float64(cfg.GetDeltaBlobOfferMinRequestToSameOracleInterval()) +
1.0*float64(time.Second)/float64(cfg.GetDeltaBlobChunkMinRequestToSameOracleInterval())) * 1.2

lowPriorityMessagesRate := (1.0*float64(time.Second)/float64(cfg.GetDeltaBlockSyncMinRequestToSameOracleInterval()) +
1.0*float64(time.Second)/float64(cfg.GetDeltaTreeSyncMinRequestToSameOracleInterval()) +
1.0*float64(time.Second)/float64(cfg.GetDeltaStateSyncSummaryInterval())) * 1.2

defaultPriorityMessagesCapacity := mul(15, 3)
defaultPriorityMessagesCapacity := mul(13, 3)
lowPriorityMessagesCapacity := mul(3, 3)

// we don't multiply bytesRate by a safetyMargin since we already have a generous overhead on each message
Expand All @@ -196,9 +194,7 @@ func OCR3_1Limits(cfg ocr3_1config.PublicConfig, pluginLimits ocr3_1types.Report
float64(time.Second)/float64(minRoundInterval)*float64(maxLenMsgRoundStart) +
float64(time.Second)/float64(minRoundInterval)*float64(maxLenMsgProposal) +
float64(time.Second)/float64(minEpochInterval)*float64(maxLenMsgEpochStartRequest) +
float64(time.Second)/float64(minRoundInterval)*float64(maxLenMsgObservation) +
float64(time.Second)/float64(minRoundInterval)*float64(maxLenMsgReportsPlusPrecursorRequest) +
float64(time.Second)/float64(minRoundInterval)*float64(maxLenMsgReportsPlusPrecursor) +
float64(time.Second)/float64(cfg.GetDeltaBlobOfferMinRequestToSameOracleInterval())*float64(maxLenMsgBlobOffer) + // blob-related messages
float64(time.Second)/float64(cfg.GetDeltaBlobChunkMinRequestToSameOracleInterval())*float64(maxLenMsgBlobChunkRequest)

Expand All @@ -212,16 +208,13 @@ func OCR3_1Limits(cfg ocr3_1config.PublicConfig, pluginLimits ocr3_1types.Report
maxLenMsgEpochStartRequest,
maxLenMsgEpochStart,
maxLenMsgRoundStart,
maxLenMsgObservation,
maxLenMsgProposal,
maxLenMsgPrepare,
maxLenMsgCommit,
maxLenMsgReportSignatures,
maxLenMsgReportsPlusPrecursorRequest,
maxLenMsgReportsPlusPrecursor,
maxLenMsgBlobOffer,
maxLenMsgBlobChunkRequest,
maxLenMsgBlobOfferResponse,
), 3)

lowPriorityBytesCapacity := mul(add(
Expand Down
32 changes: 20 additions & 12 deletions offchainreporting2plus/internal/managed/managed_ocr3_1_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ import (
"github.com/smartcontractkit/libocr/subprocesses"
)

const (
defaultIncomingMessageBufferSize = 10
defaultOutgoingMessageBufferSize = 10
lowPriorityIncomingMessageBufferSize = 10
lowPriorityOutgoingMessageBufferSize = 10
)

// RunManagedOCR3_1Oracle runs a "managed" version of protocol.RunOracle. It handles
// setting up telemetry, garbage collection, configuration updates, translating
// from types.BinaryNetworkEndpoint2 to protocol.NetworkEndpoint, and
Expand Down Expand Up @@ -175,15 +168,16 @@ func RunManagedOCR3_1Oracle[RI any](
})
return fmt.Errorf("ManagedOCR3_1Oracle: error during limits"), false
}

defaultPriorityConfig := types.BinaryNetworkEndpoint2Config{
defaultLims,
defaultIncomingMessageBufferSize,
defaultOutgoingMessageBufferSize,
nil,
nil,
}
lowPriorityConfig := types.BinaryNetworkEndpoint2Config{
lowPriorityLimits,
lowPriorityIncomingMessageBufferSize,
lowPriorityOutgoingMessageBufferSize,
nil,
nil,
}

binNetEndpoint, err := messageNetEndpointFactory.NewEndpoint(
Expand Down Expand Up @@ -237,7 +231,21 @@ func RunManagedOCR3_1Oracle[RI any](
logger,
"ManagedOCR3_1Oracle: error during keyValueDatabase.Close()",
)
semanticOCR3_1KeyValueDatabase := shim.NewSemanticOCR3_1KeyValueDatabase(keyValueDatabase, reportingPluginInfo.Limits, sharedConfig.PublicConfig, logger, metricsRegisterer)
keyValueDatabaseWithMetrics := shim.NewKeyValueDatabaseWithMetrics(keyValueDatabase, metricsRegisterer, logger)
defer loghelper.CloseLogError(
keyValueDatabaseWithMetrics,
logger,
"ManagedOCR3_1Oracle: error during keyValueDatabaseWithMetrics.Close()",
)
semanticOCR3_1KeyValueDatabase, err := shim.NewSemanticOCR3_1KeyValueDatabase(keyValueDatabaseWithMetrics, reportingPluginInfo.Limits, sharedConfig.PublicConfig, logger, metricsRegisterer)
if err != nil {
return fmt.Errorf("ManagedOCR3_1Oracle: error during NewSemanticOCR3_1KeyValueDatabase: %w", err), false
}
defer loghelper.CloseLogError(
semanticOCR3_1KeyValueDatabase,
logger,
"ManagedOCR3_1Oracle: error during semanticOCR3_1KeyValueDatabase.Close()",
)

protocol.RunOracle[RI](
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const (
MaxMaxEpochStartBytes = 4825
MaxMaxReportsPlusPrecursorRequestBytes = 18
MaxMaxReportsPlusPrecursorBytes = 6815772
MaxMaxBlockSyncRequestBytes = 32
MaxMaxBlockSyncRequestBytes = 40
MaxMaxBlockSyncResponseBytes = 27371305
MaxMaxTreeSyncChunkRequestBytes = 106
MaxMaxTreeSyncChunkResponseBytes = 65049225
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1815,9 +1815,9 @@ func (bex *blobExchangeState[RI]) verifyCert(cert *LightCertifiedBlob) error {
}

func staleBlob(expirySeqNr uint64, blobDigest BlobDigest) StaleBlob {
return StaleBlob{expirySeqNr + 1, blobDigest}
return StaleBlob{expirySeqNr, blobDigest}
}

func hasBlobExpired(expirySeqNr uint64, committedSeqNr uint64) bool {
return expirySeqNr < committedSeqNr
return expirySeqNr <= committedSeqNr
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
blobReapInterval = 10 * time.Second
blobReapInterval = 3 * time.Second
maxBlobsToReapInSingleTransaction = 100
)

Expand Down
1 change: 1 addition & 0 deletions offchainreporting2plus/internal/ocr3_1/protocol/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (ev EventComputedObservationQuorumSuccess[RI]) processOutcomeGeneration(out
type EventComputedObservation[RI any] struct {
Epoch uint64
SeqNr uint64
RequestHandle types.RequestHandle
AttributedQuery types.AttributedQuery
Observation types.Observation
}
Expand Down
1 change: 1 addition & 0 deletions offchainreporting2plus/internal/ocr3_1/protocol/kvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type KeyValueDatabaseSemanticRead interface {
toSeqNr uint64,
startIndex jmt.Digest,
requestEndInclIndex jmt.Digest,
maxCumulativeKeysPlusValuesBytes int,
) (
endInclIndex jmt.Digest,
boundingLeaves []jmt.BoundingLeaf,
Expand Down
29 changes: 18 additions & 11 deletions offchainreporting2plus/internal/ocr3_1/protocol/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ func (msg MessageEpochStart[RI]) epoch() uint64 {
}

type MessageRoundStart[RI any] struct {
Epoch uint64
SeqNr uint64
Query types.Query
RequestHandle types.RequestHandle // actual handle for inbound message, sentinel for outbound
Epoch uint64
SeqNr uint64
Query types.Query
}

var _ MessageToOutcomeGeneration[struct{}] = (*MessageRoundStart[struct{}])(nil)
Expand All @@ -206,6 +207,7 @@ func (msg MessageRoundStart[RI]) epoch() uint64 {
}

type MessageObservation[RI any] struct {
RequestHandle types.RequestHandle // actual handle for outbound message, sentinel for inbound
Epoch uint64
SeqNr uint64
SignedObservation SignedObservation
Expand Down Expand Up @@ -354,7 +356,8 @@ func (msg MessageReportSignatures[RI]) processReportAttestation(repatt *reportAt
}

type MessageReportsPlusPrecursorRequest[RI any] struct {
SeqNr uint64
RequestHandle types.RequestHandle // actual handle for inbound message, sentinel for outbound
SeqNr uint64
}

var _ MessageToReportAttestation[struct{}] = MessageReportsPlusPrecursorRequest[struct{}]{}
Expand All @@ -372,6 +375,7 @@ func (msg MessageReportsPlusPrecursorRequest[RI]) processReportAttestation(repat
}

type MessageReportsPlusPrecursor[RI any] struct {
RequestHandle types.RequestHandle // actual handle for outbound message, sentinel for inbound
SeqNr uint64
ReportsPlusPrecursor ocr3_1types.ReportsPlusPrecursor
}
Expand All @@ -394,10 +398,11 @@ func (msg MessageReportsPlusPrecursor[RI]) processReportAttestation(repatt *repo
}

type MessageBlockSyncRequest[RI any] struct {
RequestHandle types.RequestHandle // actual handle for outbound message, sentinel for inbound
RequestInfo *types.RequestInfo
StartSeqNr uint64 // a successful response must contain at least the block with this sequence number
EndExclSeqNr uint64 // the response may only contain sequence numbers less than this
RequestHandle types.RequestHandle // actual handle for inbound message, sentinel for outbound
RequestInfo *types.RequestInfo
StartSeqNr uint64 // a successful response must contain at least the block with this sequence number
EndExclSeqNr uint64 // the response may only contain sequence numbers less than this
MaxCumulativeWriteSetBytes int
}

var _ MessageToStateSync[struct{}] = MessageBlockSyncRequest[struct{}]{}
Expand Down Expand Up @@ -463,11 +468,13 @@ func (msg MessageBlockSyncResponse[RI]) processStateSync(stasy *stateSyncState[R
}

type MessageTreeSyncChunkRequest[RI any] struct {
RequestHandle types.RequestHandle // actual handle for outbound message, sentinel for inbound
RequestHandle types.RequestHandle // actual handle for inbound message, sentinel for outbound
RequestInfo *types.RequestInfo
ToSeqNr uint64
StartIndex jmt.Digest
EndInclIndex jmt.Digest

MaxCumulativeKeysPlusValuesBytes int
}

var _ MessageToStateSync[struct{}] = MessageTreeSyncChunkRequest[struct{}]{}
Expand Down Expand Up @@ -534,7 +541,7 @@ func (msg MessageTreeSyncChunkResponse[RI]) processStateSync(stasy *stateSyncSta
}

type MessageBlobOffer[RI any] struct {
RequestHandle types.RequestHandle // actual handle for outbound message, sentinel for inbound
RequestHandle types.RequestHandle // actual handle for inbound message, sentinel for outbound
RequestInfo *types.RequestInfo
ChunkDigestsRoot mt.Digest
PayloadLength uint64
Expand Down Expand Up @@ -581,7 +588,7 @@ func (msg MessageBlobOfferResponse[RI]) processBlobExchange(bex *blobExchangeSta
}

type MessageBlobChunkRequest[RI any] struct {
RequestHandle types.RequestHandle // actual handle for outbound message, sentinel for inbound
RequestHandle types.RequestHandle // actual handle for inbound message, sentinel for outbound
RequestInfo *types.RequestInfo
BlobDigest BlobDigest
ChunkIndex uint64
Expand Down
Loading