diff --git a/app/eth2wrap/multi.go b/app/eth2wrap/multi.go index 7590ff30b..f7081cd85 100644 --- a/app/eth2wrap/multi.go +++ b/app/eth2wrap/multi.go @@ -63,6 +63,7 @@ func (m multi) Headers() map[string]string { if len(m.clients) == 0 { return nil } + return m.clients[0].Headers() } diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index bd9b78be4..a2e74c9dd 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -23,7 +23,6 @@ import ( "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/featureset" "github.com/obolnetwork/charon/app/log" - "github.com/obolnetwork/charon/app/tracer" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/core/consensus/instance" @@ -514,8 +513,14 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err inst.ErrCh <- err // Send resulting error to errCh. }() + var span trace.Span + + ctx, span = core.StartDutyTrace(ctx, duty, "core/qbft.runInstance") + if !c.deadliner.Add(duty) { + span.AddEvent("Expired Duty Skipped") log.Warn(ctx, "Skipping consensus for expired duty", nil) + return nil } @@ -527,12 +532,8 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err var ( decided bool nodes = len(c.peers) - span trace.Span ) - _, span = tracer.Start(ctx, "qbft.runInstance") - span.SetAttributes(attribute.String("duty", duty.Type.String())) - defer func() { if err != nil && !isContextErr(err) { span.RecordError(err) @@ -573,6 +574,15 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err // Create a new qbft definition for this instance. def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, c.compareAttestations) + origLogRoundChange := def.LogRoundChange + def.LogRoundChange = func(ctx context.Context, instance core.Duty, process, round, newRound int64, uponRule qbft.UponRule, msgs []qbft.Msg[core.Duty, [32]byte, proto.Message]) { + if origLogRoundChange != nil { + origLogRoundChange(ctx, instance, process, round, newRound, uponRule, msgs) + } + + span.AddEvent("Round Changed") + span.SetAttributes(attribute.Int64("new_round", newRound)) + } // Create a new transport that handles sending and receiving for this instance. t := newTransport(c, c.privkey, inst.ValueCh, make(chan qbft.Msg[core.Duty, [32]byte, proto.Message]), newSniffer(int64(def.Nodes), peerIdx)) diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index a0f5562df..c326e5043 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -11,6 +11,7 @@ import ( eth2api "github.com/attestantio/go-eth2-client/api" eth2spec "github.com/attestantio/go-eth2-client/spec" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + "go.opentelemetry.io/otel/trace" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/eth2wrap" @@ -54,10 +55,14 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat // Fetch triggers fetching of a proposed duty data set. func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error { var ( + span trace.Span unsignedSet core.UnsignedDataSet err error ) + ctx, span = core.StartDutyTrace(ctx, duty, "core/fetcher.Fetch") + defer span.End() + switch duty.Type { case core.DutyProposer: unsignedSet, err = f.fetchProposerData(ctx, duty.Slot, defSet) @@ -305,10 +310,10 @@ func (f *Fetcher) fetchProposerData(ctx context.Context, slot uint64, defSet cor return nil, errors.Wrap(err, "new proposal") } - // Track whether the fetched proposal is blinded (built by MEV builder, 1) or local (built by beacon node, 0) - var blinded float64 + // Track whether the fetched proposal is blinded (built by MEV builder, 1) or local (built by beacon node, 2) + blinded := 2.0 if proposal.Blinded { - blinded = 1 + blinded = 1.0 } proposalBlindedGauge.Set(blinded) diff --git a/core/fetcher/metrics.go b/core/fetcher/metrics.go index 6f0f3235d..4c0552c3e 100644 --- a/core/fetcher/metrics.go +++ b/core/fetcher/metrics.go @@ -12,5 +12,5 @@ var proposalBlindedGauge = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "core", Subsystem: "fetcher", Name: "proposal_blinded", - Help: "Whether the fetched proposal was blinded (1) or local (0)", + Help: "Whether the fetched proposal was blinded (1) or local (2)", }) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 91cb96bc7..d44d4f2ad 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -269,28 +269,29 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { Type: dutyType, } + var span trace.Span + + dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) + + dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") + defSet, ok := s.getDutyDefinitionSet(duty) if !ok { + span.End() // Nothing for this duty. continue } // Trigger duty async go func() { + defer span.End() + if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { return // context cancelled } instrumentDuty(duty, defSet) - dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) - if duty.Type == core.DutyProposer { - var span trace.Span - - dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") - defer span.End() - } - for _, sub := range s.dutySubs { clone, err := defSet.Clone() // Clone for each subscriber. if err != nil { diff --git a/core/validatorapi/router.go b/core/validatorapi/router.go index fab7e4626..f16506717 100644 --- a/core/validatorapi/router.go +++ b/core/validatorapi/router.go @@ -1648,6 +1648,7 @@ func eventsHandler(h Handler) http.HandlerFunc { if err != nil { log.Error(ctx, "Failed to parse beacon node address for proxying", err, z.Str("address", beaconNodeAddr)) writeError(ctx, w, "events", err) + return } diff --git a/core/validatorapi/router_internal_test.go b/core/validatorapi/router_internal_test.go index 716d42142..c8f76e226 100644 --- a/core/validatorapi/router_internal_test.go +++ b/core/validatorapi/router_internal_test.go @@ -52,6 +52,7 @@ const ( func TestProxyShutdown(t *testing.T) { // Start a server that will block until the request is cancelled. serving := make(chan struct{}) + target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { close(serving) <-r.Context().Done() @@ -67,9 +68,11 @@ func TestProxyShutdown(t *testing.T) { if err != nil { return nil, errors.Wrap(err, "create proxy request") } + proxyReq.Header = req.Header client := &http.Client{} + return client.Do(proxyReq) }, } @@ -77,6 +80,7 @@ func TestProxyShutdown(t *testing.T) { // Start a proxy server that will proxy to the target server. ctx, cancel := context.WithCancel(t.Context()) proxyHTTP := proxy(handler) + proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { proxyHTTP.ServeHTTP(w, r.WithContext(ctx)) })) diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index 89d228749..fc28381aa 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -18,11 +18,13 @@ import ( "github.com/attestantio/go-eth2-client/spec/altair" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" ssz "github.com/ferranbt/fastssz" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/eth2wrap" "github.com/obolnetwork/charon/app/log" + "github.com/obolnetwork/charon/app/tracer" "github.com/obolnetwork/charon/app/version" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" @@ -34,7 +36,6 @@ import ( const ( defaultGasLimit = 30000000 - zeroAddress = "0x0000000000000000000000000000000000000000" ) // SlotFromTimestamp returns the Ethereum slot associated to a timestamp, given the genesis configuration fetched @@ -261,6 +262,14 @@ func (c *Component) Subscribe(fn func(context.Context, core.Duty, core.ParSigned // AttestationData implements the eth2client.AttesterDutiesProvider for the router. func (c Component) AttestationData(ctx context.Context, opts *eth2api.AttestationDataOpts) (*eth2api.Response[*eth2p0.AttestationData], error) { + var span trace.Span + + duty := core.NewAttesterDuty(uint64(opts.Slot)) + ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.AttestationData") + + span.SetAttributes(attribute.Int64("committee_index", int64(opts.CommitteeIndex))) + defer span.End() + att, err := c.awaitAttFunc(ctx, uint64(opts.Slot), uint64(opts.CommitteeIndex)) if err != nil { return nil, err @@ -271,6 +280,13 @@ func (c Component) AttestationData(ctx context.Context, opts *eth2api.Attestatio // SubmitAttestations implements the eth2client.AttestationsSubmitter for the router. func (c Component) SubmitAttestations(ctx context.Context, attestationOpts *eth2api.SubmitAttestationsOpts) error { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.SubmitAttestations") + + span.SetAttributes(attribute.Int("num_attestations", len(attestationOpts.Attestations))) + defer span.End() + attestations := attestationOpts.Attestations setsBySlot := make(map[uint64]core.ParSignedDataSet) @@ -387,6 +403,13 @@ func (c Component) SubmitAttestations(ctx context.Context, attestationOpts *eth2 } func (c Component) Proposal(ctx context.Context, opts *eth2api.ProposalOpts) (*eth2api.Response[*eth2api.VersionedProposal], error) { + var span trace.Span + + duty := core.NewRandaoDuty(uint64(opts.Slot)) + + ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.Proposal") + defer span.End() + // Get proposer pubkey (this is a blocking query). pubkey, err := c.getProposerPubkey(ctx, core.NewProposerDuty(uint64(opts.Slot))) if err != nil { @@ -403,7 +426,6 @@ func (c Component) Proposal(ctx context.Context, opts *eth2api.ProposalOpts) (*e Signature: opts.RandaoReveal, } - duty := core.NewRandaoDuty(uint64(opts.Slot)) parSig := core.NewPartialSignedRandao(sigEpoch.Epoch, sigEpoch.Signature, c.shareIdx) // Verify randao signature @@ -795,6 +817,14 @@ func (c Component) BeaconCommitteeSelections(ctx context.Context, opts *eth2api. // AggregateAttestation returns the aggregate attestation for the given attestation root. // It does a blocking query to DutyAggregator unsigned data from dutyDB. func (c Component) AggregateAttestation(ctx context.Context, opts *eth2api.AggregateAttestationOpts) (*eth2api.Response[*eth2spec.VersionedAttestation], error) { + var span trace.Span + + duty := core.NewAggregatorDuty(uint64(opts.Slot)) + ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.AggregateAttestation") + + span.SetAttributes(attribute.Int64("committee_index", int64(opts.CommitteeIndex))) + defer span.End() + aggAtt, err := c.awaitAggAttFunc(ctx, uint64(opts.Slot), opts.AttestationDataRoot) if err != nil { return nil, err @@ -807,6 +837,13 @@ func (c Component) AggregateAttestation(ctx context.Context, opts *eth2api.Aggre // - It verifies partial signature on AggregateAndProof. // - It then calls all the subscribers for further steps on partially signed aggregate and proof. func (c Component) SubmitAggregateAttestations(ctx context.Context, opts *eth2api.SubmitAggregateAttestationsOpts) error { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.SubmitAggregateAttestations") + + span.SetAttributes(attribute.Int("num_aggregates", len(opts.SignedAggregateAndProofs))) + defer span.End() + aggsAndProofs := opts.SignedAggregateAndProofs vals, err := c.eth2Cl.ActiveValidators(ctx) @@ -1071,6 +1108,13 @@ func (c Component) SyncCommitteeSelections(ctx context.Context, opts *eth2api.Sy // ProposerDuties obtains proposer duties for the given options. func (c Component) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDutiesOpts) (*eth2api.Response[[]*eth2v1.ProposerDuty], error) { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.ProposerDuties") + + span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch))) + defer span.End() + eth2Resp, err := c.eth2Cl.ProposerDuties(ctx, opts) if err != nil { return nil, err @@ -1097,6 +1141,13 @@ func (c Component) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDut } func (c Component) AttesterDuties(ctx context.Context, opts *eth2api.AttesterDutiesOpts) (*eth2api.Response[[]*eth2v1.AttesterDuty], error) { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.AttesterDuties") + + span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch))) + defer span.End() + eth2Resp, err := c.eth2Cl.AttesterDuties(ctx, opts) if err != nil { return nil, err @@ -1148,6 +1199,13 @@ func (c Component) SyncCommitteeDuties(ctx context.Context, opts *eth2api.SyncCo } func (c Component) Validators(ctx context.Context, opts *eth2api.ValidatorsOpts) (*eth2api.Response[map[eth2p0.ValidatorIndex]*eth2v1.Validator], error) { + var span trace.Span + + ctx, span = tracer.Start(ctx, "core/validatorapi.Validators") + + span.SetAttributes(attribute.String("state", opts.State)) + defer span.End() + if len(opts.PubKeys) == 0 && len(opts.Indices) == 0 { // fetch all validators eth2Resp, err := c.eth2Cl.Validators(ctx, opts) diff --git a/docs/metrics.md b/docs/metrics.md index fa33695a3..4dcd87fda 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -54,7 +54,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance. | `core_consensus_duration_seconds` | Histogram | Duration of the consensus process by protocol, duty, and timer | `protocol, duty, timer` | | `core_consensus_error_total` | Counter | Total count of consensus errors by protocol | `protocol` | | `core_consensus_timeout_total` | Counter | Total count of consensus timeouts by protocol, duty, and timer | `protocol, duty, timer` | -| `core_fetcher_proposal_blinded` | Gauge | Whether the fetched proposal was blinded (1) or local (0) | | +| `core_fetcher_proposal_blinded` | Gauge | Whether the fetched proposal was blinded (1) or local (2) | | | `core_parsigdb_exit_total` | Counter | Total number of partially signed voluntary exits per public key | `pubkey` | | `core_scheduler_current_epoch` | Gauge | The current epoch | | | `core_scheduler_current_slot` | Gauge | The current slot | |