From 0fd44c66418b8e171c65839c32b72a3b0dd48b9b Mon Sep 17 00:00:00 2001 From: Diogo Santos <59585571+DiogoSantoss@users.noreply.github.com> Date: Fri, 12 Dec 2025 12:46:50 +0000 Subject: [PATCH 1/2] core/fetcher: update blinded gauge metric values (#4170) Previously `core_fetcher_proposal_blinded` had values 0 (local) or 1 (blinded). This is problematic because the default value for the gauge is also zero so when a node starts/restarts it reports a 0 value before fetching any block which makes monitoring more complicated. This updates the values to be 2 (local) and 1 (blinded) category: bug ticket: none --- core/fetcher/fetcher.go | 6 +++--- core/fetcher/metrics.go | 14 ++++++-------- docs/metrics.md | 2 +- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index 2a82bd247..327935701 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -302,10 +302,10 @@ func (f *Fetcher) fetchProposerData(ctx context.Context, slot uint64, defSet cor return nil, errors.Wrap(err, "new proposal") } - // Track whether the fetched proposal is blinded (built by MEV builder, 1) or local (built by beacon node, 0) - var blinded float64 + // Track whether the fetched proposal is blinded (built by MEV builder, 1) or local (built by beacon node, 2) + blinded := 2.0 if proposal.Blinded { - blinded = 1 + blinded = 1.0 } proposalBlindedGauge.Set(blinded) diff --git a/core/fetcher/metrics.go b/core/fetcher/metrics.go index 9f2d9d7d6..4c0552c3e 100644 --- a/core/fetcher/metrics.go +++ b/core/fetcher/metrics.go @@ -8,11 +8,9 @@ import ( "github.com/obolnetwork/charon/app/promauto" ) -var ( - proposalBlindedGauge = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "core", - Subsystem: "fetcher", - Name: "proposal_blinded", - Help: "Whether the fetched proposal was blinded (1) or local (0)", - }) -) +var proposalBlindedGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "core", + Subsystem: "fetcher", + Name: "proposal_blinded", + Help: "Whether the fetched proposal was blinded (1) or local (2)", +}) diff --git a/docs/metrics.md b/docs/metrics.md index 8f824d28d..1ef7ab271 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -57,7 +57,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance. | `core_consensus_duration_seconds` | Histogram | Duration of the consensus process by protocol, duty, and timer | `protocol, duty, timer` | | `core_consensus_error_total` | Counter | Total count of consensus errors by protocol | `protocol` | | `core_consensus_timeout_total` | Counter | Total count of consensus timeouts by protocol, duty, and timer | `protocol, duty, timer` | -| `core_fetcher_proposal_blinded` | Gauge | Whether the fetched proposal was blinded (1) or local (0) | | +| `core_fetcher_proposal_blinded` | Gauge | Whether the fetched proposal was blinded (1) or local (2) | | | `core_parsigdb_exit_total` | Counter | Total number of partially signed voluntary exits per public key | `pubkey` | | `core_scheduler_current_epoch` | Gauge | The current epoch | | | `core_scheduler_current_slot` | Gauge | The current slot | | From ca8470cb970aa43f529cf77bf7a2a42499453d37 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Fri, 12 Dec 2025 15:51:43 +0300 Subject: [PATCH 2/2] *: improved tracing (#4169) Fixed and improving of tracing data production. category: refactor ticket: none --- core/consensus/qbft/qbft.go | 20 ++++++-- core/fetcher/fetcher.go | 5 ++ core/scheduler/scheduler.go | 17 ++++--- core/validatorapi/router_internal_test.go | 1 + core/validatorapi/validatorapi.go | 62 ++++++++++++++++++++++- 5 files changed, 90 insertions(+), 15 deletions(-) diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index c4b2f9782..186b2f794 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -23,7 +23,6 @@ import ( "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/featureset" "github.com/obolnetwork/charon/app/log" - "github.com/obolnetwork/charon/app/tracer" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/core/consensus/instance" @@ -514,8 +513,14 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err inst.ErrCh <- err // Send resulting error to errCh. }() + var span trace.Span + + ctx, span = core.StartDutyTrace(ctx, duty, "core/qbft.runInstance") + if !c.deadliner.Add(duty) { + span.AddEvent("Expired Duty Skipped") log.Warn(ctx, "Skipping consensus for expired duty", nil) + return nil } @@ -527,12 +532,8 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err var ( decided bool nodes = len(c.peers) - span trace.Span ) - _, span = tracer.Start(ctx, "qbft.runInstance") - span.SetAttributes(attribute.String("duty", duty.Type.String())) - defer func() { if err != nil && !isContextErr(err) { span.RecordError(err) @@ -573,6 +574,15 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err // Create a new qbft definition for this instance. def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, c.compareAttestations) + origLogRoundChange := def.LogRoundChange + def.LogRoundChange = func(ctx context.Context, instance core.Duty, process, round, newRound int64, uponRule qbft.UponRule, msgs []qbft.Msg[core.Duty, [32]byte, proto.Message]) { + if origLogRoundChange != nil { + origLogRoundChange(ctx, instance, process, round, newRound, uponRule, msgs) + } + + span.AddEvent("Round Changed") + span.SetAttributes(attribute.Int64("new_round", newRound)) + } // Create a new transport that handles sending and receiving for this instance. t := newTransport(c, c.privkey, inst.ValueCh, make(chan qbft.Msg[core.Duty, [32]byte, proto.Message]), newSniffer(int64(def.Nodes), peerIdx)) diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index 327935701..0ca189a2b 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -11,6 +11,7 @@ import ( eth2api "github.com/attestantio/go-eth2-client/api" eth2spec "github.com/attestantio/go-eth2-client/spec" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + "go.opentelemetry.io/otel/trace" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/eth2wrap" @@ -54,10 +55,14 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat // Fetch triggers fetching of a proposed duty data set. func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error { var ( + span trace.Span unsignedSet core.UnsignedDataSet err error ) + ctx, span = core.StartDutyTrace(ctx, duty, "core/fetcher.Fetch") + defer span.End() + switch duty.Type { case core.DutyProposer: unsignedSet, err = f.fetchProposerData(ctx, duty.Slot, defSet) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 6cdbf639e..5fe85774e 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -230,28 +230,29 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { Type: dutyType, } + var span trace.Span + + dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) + + dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") + defSet, ok := s.getDutyDefinitionSet(duty) if !ok { + span.End() // Nothing for this duty. continue } // Trigger duty async go func() { + defer span.End() + if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { return // context cancelled } instrumentDuty(duty, defSet) - dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) - if duty.Type == core.DutyProposer { - var span trace.Span - - dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") - defer span.End() - } - for _, sub := range s.dutySubs { clone, err := defSet.Clone() // Clone for each subscriber. if err != nil { diff --git a/core/validatorapi/router_internal_test.go b/core/validatorapi/router_internal_test.go index 27fb9fff2..0e68f120b 100644 --- a/core/validatorapi/router_internal_test.go +++ b/core/validatorapi/router_internal_test.go @@ -58,6 +58,7 @@ func (a addr) Address() string { func TestProxyShutdown(t *testing.T) { // Start a server that will block until the request is cancelled. serving := make(chan struct{}) + target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { close(serving) <-r.Context().Done() diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index cca4b37a7..6b150ab83 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -17,11 +17,13 @@ import ( "github.com/attestantio/go-eth2-client/spec/altair" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" ssz "github.com/ferranbt/fastssz" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/eth2wrap" "github.com/obolnetwork/charon/app/log" + "github.com/obolnetwork/charon/app/tracer" "github.com/obolnetwork/charon/app/version" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" @@ -33,7 +35,6 @@ import ( const ( defaultGasLimit = 30000000 - zeroAddress = "0x0000000000000000000000000000000000000000" ) // SlotFromTimestamp returns the Ethereum slot associated to a timestamp, given the genesis configuration fetched @@ -260,6 +261,14 @@ func (c *Component) Subscribe(fn func(context.Context, core.Duty, core.ParSigned // AttestationData implements the eth2client.AttesterDutiesProvider for the router. func (c Component) AttestationData(ctx context.Context, opts *eth2api.AttestationDataOpts) (*eth2api.Response[*eth2p0.AttestationData], error) { + var span trace.Span + + duty := core.NewAttesterDuty(uint64(opts.Slot)) + ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.AttestationData") + + span.SetAttributes(attribute.Int64("committee_index", int64(opts.CommitteeIndex))) + defer span.End() + att, err := c.awaitAttFunc(ctx, uint64(opts.Slot), uint64(opts.CommitteeIndex)) if err != nil { return nil, err @@ -270,6 +279,13 @@ func (c Component) AttestationData(ctx context.Context, opts *eth2api.Attestatio // SubmitAttestations implements the eth2client.AttestationsSubmitter for the router. func (c Component) SubmitAttestations(ctx context.Context, attestationOpts *eth2api.SubmitAttestationsOpts) error { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.SubmitAttestations") + + span.SetAttributes(attribute.Int("num_attestations", len(attestationOpts.Attestations))) + defer span.End() + attestations := attestationOpts.Attestations setsBySlot := make(map[uint64]core.ParSignedDataSet) @@ -386,6 +402,13 @@ func (c Component) SubmitAttestations(ctx context.Context, attestationOpts *eth2 } func (c Component) Proposal(ctx context.Context, opts *eth2api.ProposalOpts) (*eth2api.Response[*eth2api.VersionedProposal], error) { + var span trace.Span + + duty := core.NewRandaoDuty(uint64(opts.Slot)) + + ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.Proposal") + defer span.End() + // Get proposer pubkey (this is a blocking query). pubkey, err := c.getProposerPubkey(ctx, core.NewProposerDuty(uint64(opts.Slot))) if err != nil { @@ -402,7 +425,6 @@ func (c Component) Proposal(ctx context.Context, opts *eth2api.ProposalOpts) (*e Signature: opts.RandaoReveal, } - duty := core.NewRandaoDuty(uint64(opts.Slot)) parSig := core.NewPartialSignedRandao(sigEpoch.Epoch, sigEpoch.Signature, c.shareIdx) // Verify randao signature @@ -866,6 +888,14 @@ func (c Component) BeaconCommitteeSelections(ctx context.Context, opts *eth2api. // AggregateAttestation returns the aggregate attestation for the given attestation root. // It does a blocking query to DutyAggregator unsigned data from dutyDB. func (c Component) AggregateAttestation(ctx context.Context, opts *eth2api.AggregateAttestationOpts) (*eth2api.Response[*eth2spec.VersionedAttestation], error) { + var span trace.Span + + duty := core.NewAggregatorDuty(uint64(opts.Slot)) + ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.AggregateAttestation") + + span.SetAttributes(attribute.Int64("committee_index", int64(opts.CommitteeIndex))) + defer span.End() + aggAtt, err := c.awaitAggAttFunc(ctx, uint64(opts.Slot), opts.AttestationDataRoot) if err != nil { return nil, err @@ -878,6 +908,13 @@ func (c Component) AggregateAttestation(ctx context.Context, opts *eth2api.Aggre // - It verifies partial signature on AggregateAndProof. // - It then calls all the subscribers for further steps on partially signed aggregate and proof. func (c Component) SubmitAggregateAttestations(ctx context.Context, opts *eth2api.SubmitAggregateAttestationsOpts) error { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.SubmitAggregateAttestations") + + span.SetAttributes(attribute.Int("num_aggregates", len(opts.SignedAggregateAndProofs))) + defer span.End() + aggsAndProofs := opts.SignedAggregateAndProofs vals, err := c.eth2Cl.ActiveValidators(ctx) @@ -1139,6 +1176,13 @@ func (c Component) SyncCommitteeSelections(ctx context.Context, opts *eth2api.Sy // ProposerDuties obtains proposer duties for the given options. func (c Component) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDutiesOpts) (*eth2api.Response[[]*eth2v1.ProposerDuty], error) { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.ProposerDuties") + + span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch))) + defer span.End() + eth2Resp, err := c.eth2Cl.ProposerDuties(ctx, opts) if err != nil { return nil, err @@ -1165,6 +1209,13 @@ func (c Component) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDut } func (c Component) AttesterDuties(ctx context.Context, opts *eth2api.AttesterDutiesOpts) (*eth2api.Response[[]*eth2v1.AttesterDuty], error) { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.AttesterDuties") + + span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch))) + defer span.End() + eth2Resp, err := c.eth2Cl.AttesterDuties(ctx, opts) if err != nil { return nil, err @@ -1216,6 +1267,13 @@ func (c Component) SyncCommitteeDuties(ctx context.Context, opts *eth2api.SyncCo } func (c Component) Validators(ctx context.Context, opts *eth2api.ValidatorsOpts) (*eth2api.Response[map[eth2p0.ValidatorIndex]*eth2v1.Validator], error) { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.Validators") + + span.SetAttributes(attribute.String("state", opts.State)) + defer span.End() + if len(opts.PubKeys) == 0 && len(opts.Indices) == 0 { // fetch all validators eth2Resp, err := c.eth2Cl.Validators(ctx, opts)