Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/eth2wrap/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (m multi) Headers() map[string]string {
if len(m.clients) == 0 {
return nil
}

return m.clients[0].Headers()
}

Expand Down
20 changes: 15 additions & 5 deletions core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/consensus/instance"
Expand Down Expand Up @@ -514,8 +513,14 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err
inst.ErrCh <- err // Send resulting error to errCh.
}()

var span trace.Span

ctx, span = core.StartDutyTrace(ctx, duty, "core/qbft.runInstance")

if !c.deadliner.Add(duty) {
span.AddEvent("Expired Duty Skipped")
log.Warn(ctx, "Skipping consensus for expired duty", nil)

return nil
}

Expand All @@ -527,12 +532,8 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err
var (
decided bool
nodes = len(c.peers)
span trace.Span
)

_, span = tracer.Start(ctx, "qbft.runInstance")
span.SetAttributes(attribute.String("duty", duty.Type.String()))

defer func() {
if err != nil && !isContextErr(err) {
span.RecordError(err)
Expand Down Expand Up @@ -573,6 +574,15 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err

// Create a new qbft definition for this instance.
def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, c.compareAttestations)
origLogRoundChange := def.LogRoundChange
def.LogRoundChange = func(ctx context.Context, instance core.Duty, process, round, newRound int64, uponRule qbft.UponRule, msgs []qbft.Msg[core.Duty, [32]byte, proto.Message]) {
if origLogRoundChange != nil {
origLogRoundChange(ctx, instance, process, round, newRound, uponRule, msgs)
}

span.AddEvent("Round Changed")
span.SetAttributes(attribute.Int64("new_round", newRound))
}

// Create a new transport that handles sending and receiving for this instance.
t := newTransport(c, c.privkey, inst.ValueCh, make(chan qbft.Msg[core.Duty, [32]byte, proto.Message]), newSniffer(int64(def.Nodes), peerIdx))
Expand Down
5 changes: 5 additions & 0 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
eth2api "github.com/attestantio/go-eth2-client/api"
eth2spec "github.com/attestantio/go-eth2-client/spec"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
Expand Down Expand Up @@ -54,10 +55,14 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat
// Fetch triggers fetching of a proposed duty data set.
func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error {
var (
span trace.Span
unsignedSet core.UnsignedDataSet
err error
)

ctx, span = core.StartDutyTrace(ctx, duty, "core/fetcher.Fetch")
defer span.End()

switch duty.Type {
case core.DutyProposer:
unsignedSet, err = f.fetchProposerData(ctx, duty.Slot, defSet)
Expand Down
17 changes: 9 additions & 8 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,28 +269,29 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) {
Type: dutyType,
}

var span trace.Span

dutyCtx := log.WithCtx(ctx, z.Any("duty", duty))

dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot")

defSet, ok := s.getDutyDefinitionSet(duty)
if !ok {
span.End()
// Nothing for this duty.
continue
}

// Trigger duty async
go func() {
defer span.End()

if !delaySlotOffset(ctx, slot, duty, s.delayFunc) {
return // context cancelled
}

instrumentDuty(duty, defSet)

dutyCtx := log.WithCtx(ctx, z.Any("duty", duty))
if duty.Type == core.DutyProposer {
var span trace.Span

dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot")
defer span.End()
}

for _, sub := range s.dutySubs {
clone, err := defSet.Clone() // Clone for each subscriber.
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions core/validatorapi/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,7 @@ func eventsHandler(h Handler) http.HandlerFunc {
if err != nil {
log.Error(ctx, "Failed to parse beacon node address for proxying", err, z.Str("address", beaconNodeAddr))
writeError(ctx, w, "events", err)

return
}

Expand Down
4 changes: 4 additions & 0 deletions core/validatorapi/router_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
func TestProxyShutdown(t *testing.T) {
// Start a server that will block until the request is cancelled.
serving := make(chan struct{})

target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(serving)
<-r.Context().Done()
Expand All @@ -67,16 +68,19 @@ func TestProxyShutdown(t *testing.T) {
if err != nil {
return nil, errors.Wrap(err, "create proxy request")
}

proxyReq.Header = req.Header

client := &http.Client{}

return client.Do(proxyReq)
},
}

// Start a proxy server that will proxy to the target server.
ctx, cancel := context.WithCancel(t.Context())
proxyHTTP := proxy(handler)

proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
proxyHTTP.ServeHTTP(w, r.WithContext(ctx))
}))
Expand Down
62 changes: 60 additions & 2 deletions core/validatorapi/validatorapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
"github.com/attestantio/go-eth2-client/spec/altair"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
ssz "github.com/ferranbt/fastssz"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
Expand All @@ -34,7 +36,6 @@

const (
defaultGasLimit = 30000000
zeroAddress = "0x0000000000000000000000000000000000000000"
)

// SlotFromTimestamp returns the Ethereum slot associated to a timestamp, given the genesis configuration fetched
Expand Down Expand Up @@ -261,6 +262,14 @@

// AttestationData implements the eth2client.AttesterDutiesProvider for the router.
func (c Component) AttestationData(ctx context.Context, opts *eth2api.AttestationDataOpts) (*eth2api.Response[*eth2p0.AttestationData], error) {
var span trace.Span

duty := core.NewAttesterDuty(uint64(opts.Slot))
ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.AttestationData")

span.SetAttributes(attribute.Int64("committee_index", int64(opts.CommitteeIndex)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here we expect it to be always 0 if fetch_only_commidx_0 is turned on which is the majority now. It will also be made default in the upcoming months. Not sure if we need that attribute.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shall not hurt, especially if it is not zero, suddenly :) this way we can always confirm it is zero

defer span.End()

att, err := c.awaitAttFunc(ctx, uint64(opts.Slot), uint64(opts.CommitteeIndex))
if err != nil {
return nil, err
Expand All @@ -271,6 +280,13 @@

// SubmitAttestations implements the eth2client.AttestationsSubmitter for the router.
func (c Component) SubmitAttestations(ctx context.Context, attestationOpts *eth2api.SubmitAttestationsOpts) error {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.SubmitAttestations")

span.SetAttributes(attribute.Int("num_attestations", len(attestationOpts.Attestations)))
defer span.End()

attestations := attestationOpts.Attestations
setsBySlot := make(map[uint64]core.ParSignedDataSet)

Expand Down Expand Up @@ -387,6 +403,13 @@
}

func (c Component) Proposal(ctx context.Context, opts *eth2api.ProposalOpts) (*eth2api.Response[*eth2api.VersionedProposal], error) {
var span trace.Span

duty := core.NewRandaoDuty(uint64(opts.Slot))

ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.Proposal")
defer span.End()

// Get proposer pubkey (this is a blocking query).
pubkey, err := c.getProposerPubkey(ctx, core.NewProposerDuty(uint64(opts.Slot)))
if err != nil {
Expand All @@ -403,7 +426,6 @@
Signature: opts.RandaoReveal,
}

duty := core.NewRandaoDuty(uint64(opts.Slot))
parSig := core.NewPartialSignedRandao(sigEpoch.Epoch, sigEpoch.Signature, c.shareIdx)

// Verify randao signature
Expand Down Expand Up @@ -795,6 +817,14 @@
// AggregateAttestation returns the aggregate attestation for the given attestation root.
// It does a blocking query to DutyAggregator unsigned data from dutyDB.
func (c Component) AggregateAttestation(ctx context.Context, opts *eth2api.AggregateAttestationOpts) (*eth2api.Response[*eth2spec.VersionedAttestation], error) {
var span trace.Span

duty := core.NewAggregatorDuty(uint64(opts.Slot))
ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.AggregateAttestation")

span.SetAttributes(attribute.Int64("committee_index", int64(opts.CommitteeIndex)))
defer span.End()

aggAtt, err := c.awaitAggAttFunc(ctx, uint64(opts.Slot), opts.AttestationDataRoot)
if err != nil {
return nil, err
Expand All @@ -807,6 +837,13 @@
// - It verifies partial signature on AggregateAndProof.
// - It then calls all the subscribers for further steps on partially signed aggregate and proof.
func (c Component) SubmitAggregateAttestations(ctx context.Context, opts *eth2api.SubmitAggregateAttestationsOpts) error {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.SubmitAggregateAttestations")

span.SetAttributes(attribute.Int("num_aggregates", len(opts.SignedAggregateAndProofs)))
defer span.End()

aggsAndProofs := opts.SignedAggregateAndProofs

vals, err := c.eth2Cl.ActiveValidators(ctx)
Expand Down Expand Up @@ -1071,6 +1108,13 @@

// ProposerDuties obtains proposer duties for the given options.
func (c Component) ProposerDuties(ctx context.Context, opts *eth2api.ProposerDutiesOpts) (*eth2api.Response[[]*eth2v1.ProposerDuty], error) {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.ProposerDuties")

span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch)))
defer span.End()

eth2Resp, err := c.eth2Cl.ProposerDuties(ctx, opts)
if err != nil {
return nil, err
Expand All @@ -1097,6 +1141,13 @@
}

func (c Component) AttesterDuties(ctx context.Context, opts *eth2api.AttesterDutiesOpts) (*eth2api.Response[[]*eth2v1.AttesterDuty], error) {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.AttesterDuties")

span.SetAttributes(attribute.Int64("epoch", int64(opts.Epoch)))
defer span.End()

eth2Resp, err := c.eth2Cl.AttesterDuties(ctx, opts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1148,6 +1199,13 @@
}

func (c Component) Validators(ctx context.Context, opts *eth2api.ValidatorsOpts) (*eth2api.Response[map[eth2p0.ValidatorIndex]*eth2v1.Validator], error) {
var span trace.Span

ctx, span = tracer.Start(ctx, "core/validatorapi.Validators")

span.SetAttributes(attribute.String("state", opts.State))
defer span.End()

if len(opts.PubKeys) == 0 && len(opts.Indices) == 0 {
// fetch all validators
eth2Resp, err := c.eth2Cl.Validators(ctx, opts)
Expand Down
Loading