Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/exporters/verifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package verifier

import (
"context"
"github.com/ethereum/go-ethereum/core/types"
"github.com/evstack/ev-metrics/internal/clients/celestia"
"github.com/evstack/ev-metrics/internal/clients/evm"
"github.com/evstack/ev-metrics/internal/clients/evnode"
"github.com/evstack/ev-metrics/pkg/metrics"
"github.com/ethereum/go-ethereum/core/types"
"github.com/rs/zerolog"
"time"
)
Expand Down Expand Up @@ -53,11 +53,18 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
}
defer sub.Unsubscribe()

// ticker to refresh submission duration metric every 10 seconds
refreshTicker := time.NewTicker(10 * time.Second)
defer refreshTicker.Stop()

for {
select {
case <-ctx.Done():
e.logger.Info().Msg("stopping block verification")
return nil
case <-refreshTicker.C:
// ensure that submission duration is always included in the 60 second window.
m.RefreshSubmissionDuration()
case header := <-headers:
// record block arrival time for millisecond precision
arrivalTime := time.Now()
Expand Down
29 changes: 27 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metrics
import (
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -60,6 +61,9 @@ type Metrics struct {
// internal tracking for block time calculation (uses arrival time for ms precision)
lastBlockArrivalTime map[string]time.Time // key: chainID

// lastSubmissionDurations tracks the most recent submission durations.
lastSubmissionDurations map[string]time.Duration // key: chainID:namespace

mu sync.Mutex
ranges map[string][]*blockRange // key: blobType -> sorted slice of ranges
}
Expand Down Expand Up @@ -257,8 +261,9 @@ func NewWithRegistry(namespace string, registerer prometheus.Registerer) *Metric
},
[]string{"chain_id", "endpoint", "error_type"},
),
ranges: make(map[string][]*blockRange),
lastBlockArrivalTime: make(map[string]time.Time),
ranges: make(map[string][]*blockRange),
lastBlockArrivalTime: make(map[string]time.Time),
lastSubmissionDurations: make(map[string]time.Duration),
}

return m
Expand Down Expand Up @@ -490,7 +495,27 @@ func (m *Metrics) RecordBlockHeightDrift(chainID, targetEndpoint string, referen

// RecordSubmissionDuration records the da submission duration for a given submission type
func (m *Metrics) RecordSubmissionDuration(chainID, submissionType string, duration time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()

m.SubmissionDuration.WithLabelValues(chainID, submissionType).Observe(duration.Seconds())

key := fmt.Sprintf("%s:%s", chainID, submissionType)
m.lastSubmissionDurations[key] = duration
}

// RefreshSubmissionDuration re-observes the last known submission duration to keep the metric alive.
func (m *Metrics) RefreshSubmissionDuration() {
m.mu.Lock()
defer m.mu.Unlock()

for key, duration := range m.lastSubmissionDurations {
// assuming format "chainID:namespace"
parts := strings.Split(key, ":")
if len(parts) == 2 {
m.SubmissionDuration.WithLabelValues(parts[0], parts[1]).Observe(duration.Seconds())
}
}
}
Comment on lines +508 to 519
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current method of parsing the metric key using strings.Split(key, ":") is not robust. If a chainID contains a colon, the key will be split into more than two parts, causing the len(parts) == 2 check to fail. This would result in the metric for that series not being refreshed, without any error or warning. A more robust approach is to split the key at the last colon, which correctly separates the chainID from the submissionType, assuming the submissionType itself does not contain a colon.

Suggested change
func (m *Metrics) RefreshSubmissionDuration() {
m.mu.Lock()
defer m.mu.Unlock()
for key, duration := range m.lastSubmissionDurations {
// assuming format "chainID:namespace"
parts := strings.Split(key, ":")
if len(parts) == 2 {
m.SubmissionDuration.WithLabelValues(parts[0], parts[1]).Observe(duration.Seconds())
}
}
}
func (m *Metrics) RefreshSubmissionDuration() {
m.mu.Lock()
defer m.mu.Unlock()
for key, duration := range m.lastSubmissionDurations {
lastColonIdx := strings.LastIndex(key, ":")
// Ensure the key is in the expected "chainID:submissionType" format and both parts are non-empty.
if lastColonIdx > 0 && lastColonIdx < len(key)-1 {
chainID := key[:lastColonIdx]
submissionType := key[lastColonIdx+1:]
m.SubmissionDuration.WithLabelValues(chainID, submissionType).Observe(duration.Seconds())
}
}
}


// RecordBlockTime records the time between consecutive blocks using arrival time
Expand Down
28 changes: 28 additions & 0 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -535,6 +536,33 @@ func TestMetrics_ComplexScenario(t *testing.T) {
}
}

func TestMetrics_RecordSubmissionDuration(t *testing.T) {
reg := prometheus.NewRegistry()
m := NewWithRegistry("test", reg)

// record submission durations
m.RecordSubmissionDuration("chain1", "header", 5*time.Second)
// overwrite submission duration
m.RecordSubmissionDuration("chain1", "header", 6*time.Second)
m.RecordSubmissionDuration("chain1", "data", 10*time.Second)
m.RecordSubmissionDuration("chain2", "header", 3*time.Second)

// verify stored in memory
require.Equal(t, 6*time.Second, m.lastSubmissionDurations["chain1:header"], "last submission duration should be present")
require.Equal(t, 10*time.Second, m.lastSubmissionDurations["chain1:data"])
require.Equal(t, 3*time.Second, m.lastSubmissionDurations["chain2:header"])
}

func TestMetrics_RefreshSubmissionDuration_Empty(t *testing.T) {
reg := prometheus.NewRegistry()
m := NewWithRegistry("test", reg)

// call refresh without any recorded values - should not panic
require.NotPanics(t, func() {
m.RefreshSubmissionDuration()
})
}
Comment on lines +556 to +564
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test correctly ensures that RefreshSubmissionDuration doesn't panic when there are no durations to refresh. However, there is no test case for the "happy path" where it successfully refreshes a metric. It would be beneficial to add a test that:

  1. Records a submission duration.
  2. Calls RefreshSubmissionDuration.
  3. Verifies that the Prometheus summary metric has been updated accordingly.

This might require a helper function to inspect summary metrics, as the existing getMetricValue is designed for gauges. You could, for example, gather metrics and check the _count or _sum for the summary.


// helper types for table tests
type blockToRecord struct {
chain string
Expand Down