From 5935254458aeaa7517bf996c05a11280ecc3e053 Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 10 Nov 2025 12:33:55 +0000 Subject: [PATCH 1/2] chore: add periodic refresh of last observed value --- pkg/exporters/verifier/verifier.go | 9 ++++++++- pkg/metrics/metrics.go | 29 +++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/pkg/exporters/verifier/verifier.go b/pkg/exporters/verifier/verifier.go index 29a471f..da7f98e 100644 --- a/pkg/exporters/verifier/verifier.go +++ b/pkg/exporters/verifier/verifier.go @@ -2,11 +2,11 @@ package verifier import ( "context" + "github.com/ethereum/go-ethereum/core/types" "github.com/evstack/ev-metrics/internal/clients/celestia" "github.com/evstack/ev-metrics/internal/clients/evm" "github.com/evstack/ev-metrics/internal/clients/evnode" "github.com/evstack/ev-metrics/pkg/metrics" - "github.com/ethereum/go-ethereum/core/types" "github.com/rs/zerolog" "time" ) @@ -53,11 +53,18 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error } defer sub.Unsubscribe() + // ticker to refresh submission duration metric every 10 seconds + refreshTicker := time.NewTicker(10 * time.Second) + defer refreshTicker.Stop() + for { select { case <-ctx.Done(): e.logger.Info().Msg("stopping block verification") return nil + case <-refreshTicker.C: + // ensure that submission duration is always included in the 60 second window. + m.RefreshSubmissionDuration() case header := <-headers: // record block arrival time for millisecond precision arrivalTime := time.Now() diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 0698f73..9bc431c 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -3,6 +3,7 @@ package metrics import ( "fmt" "sort" + "strings" "sync" "time" @@ -54,6 +55,9 @@ type Metrics struct { // internal tracking for block time calculation (uses arrival time for ms precision) lastBlockArrivalTime map[string]time.Time // key: chainID + // lastSubmissionDurations tracks the most recent submission durations. + lastSubmissionDurations map[string]time.Duration // key: chainID:namespace + mu sync.Mutex ranges map[string][]*blockRange // key: blobType -> sorted slice of ranges } @@ -227,8 +231,9 @@ func NewWithRegistry(namespace string, registerer prometheus.Registerer) *Metric }, []string{"chain_id", "endpoint", "error_type"}, ), - ranges: make(map[string][]*blockRange), - lastBlockArrivalTime: make(map[string]time.Time), + ranges: make(map[string][]*blockRange), + lastBlockArrivalTime: make(map[string]time.Time), + lastSubmissionDurations: make(map[string]time.Duration), } return m @@ -460,7 +465,27 @@ func (m *Metrics) RecordBlockHeightDrift(chainID, targetEndpoint string, referen // RecordSubmissionDuration records the da submission duration for a given submission type func (m *Metrics) RecordSubmissionDuration(chainID, submissionType string, duration time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.SubmissionDuration.WithLabelValues(chainID, submissionType).Observe(duration.Seconds()) + + key := fmt.Sprintf("%s:%s", chainID, submissionType) + m.lastSubmissionDurations[key] = duration +} + +// RefreshSubmissionDuration re-observes the last known submission duration to keep the metric alive. +func (m *Metrics) RefreshSubmissionDuration() { + m.mu.Lock() + defer m.mu.Unlock() + + for key, duration := range m.lastSubmissionDurations { + // assuming format "chainID:namespace" + parts := strings.Split(key, ":") + if len(parts) == 2 { + m.SubmissionDuration.WithLabelValues(parts[0], parts[1]).Observe(duration.Seconds()) + } + } } // RecordBlockTime records the time between consecutive blocks using arrival time From 9535af4192f811bb113b05eabd75fb192538b53a Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 10 Nov 2025 12:39:20 +0000 Subject: [PATCH 2/2] chore: adding some unit tests --- pkg/metrics/metrics_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 016a0ba..2b2e061 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -2,6 +2,7 @@ package metrics import ( "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -535,6 +536,33 @@ func TestMetrics_ComplexScenario(t *testing.T) { } } +func TestMetrics_RecordSubmissionDuration(t *testing.T) { + reg := prometheus.NewRegistry() + m := NewWithRegistry("test", reg) + + // record submission durations + m.RecordSubmissionDuration("chain1", "header", 5*time.Second) + // overwrite submission duration + m.RecordSubmissionDuration("chain1", "header", 6*time.Second) + m.RecordSubmissionDuration("chain1", "data", 10*time.Second) + m.RecordSubmissionDuration("chain2", "header", 3*time.Second) + + // verify stored in memory + require.Equal(t, 6*time.Second, m.lastSubmissionDurations["chain1:header"], "last submission duration should be present") + require.Equal(t, 10*time.Second, m.lastSubmissionDurations["chain1:data"]) + require.Equal(t, 3*time.Second, m.lastSubmissionDurations["chain2:header"]) +} + +func TestMetrics_RefreshSubmissionDuration_Empty(t *testing.T) { + reg := prometheus.NewRegistry() + m := NewWithRegistry("test", reg) + + // call refresh without any recorded values - should not panic + require.NotPanics(t, func() { + m.RefreshSubmissionDuration() + }) +} + // helper types for table tests type blockToRecord struct { chain string