From d9516bcf13a482db13e13300bdb10c8472403e1c Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 25 Feb 2026 19:46:20 +0100 Subject: [PATCH] perf: reduce allocations on hot paths - Cache Prometheus WithLabelValues at init (metrics.go): eliminates per-height map lookups and slice allocs for sync state, backfill stages, and store operations. - Move transient error needles to package-level var (celestia_node.go): avoids per-retry []string heap allocation. - Cache bearer auth metadata map at construction (celestia_app.go): avoids map+string alloc per gRPC call. - Replace map[string]any with struct in MarshalBlob (service.go): ~200 bytes less per blob marshal. - Preallocate blob slices in GetBlobs, BlobGetAll, gRPC GetAll: reduces append doublings on typical result sets. - Optimize notifier filterEvent: skip allocation when all/no blobs match; use exact-capacity slice for partial matches. - Skip namespace map allocation for header-only subscriptions. - Use slice indexing in httpToWS instead of TrimPrefix. Co-Authored-By: Claude Opus 4.6 --- pkg/api/grpc/blob_service.go | 2 +- pkg/api/notifier.go | 28 ++++-- pkg/api/service.go | 28 ++++-- pkg/fetch/celestia_app.go | 10 ++- pkg/fetch/celestia_node.go | 31 ++++--- pkg/metrics/metrics.go | 170 ++++++++++++++++++++++------------- pkg/store/sqlite.go | 2 +- 7 files changed, 177 insertions(+), 94 deletions(-) diff --git a/pkg/api/grpc/blob_service.go b/pkg/api/grpc/blob_service.go index 0988c17..0783b3d 100644 --- a/pkg/api/grpc/blob_service.go +++ b/pkg/api/grpc/blob_service.go @@ -73,7 +73,7 @@ func (s *BlobServiceServer) GetAll(ctx context.Context, req *pb.GetAllRequest) ( nsList[i] = ns } - var allBlobs []types.Blob + allBlobs := make([]types.Blob, 0, len(nsList)*8) for _, ns := range nsList { blobs, err := s.svc.Store().GetBlobs(ctx, ns, req.Height, req.Height, 0, 0) if err != nil { diff --git a/pkg/api/notifier.go b/pkg/api/notifier.go index e9f4f91..69d0485 100644 --- a/pkg/api/notifier.go +++ b/pkg/api/notifier.go @@ -77,9 +77,13 @@ func (n *Notifier) Subscribe(namespaces []types.Namespace) (*Subscription, error } id := n.nextID.Add(1) - nsSet := make(map[types.Namespace]struct{}, len(namespaces)) - for _, ns := range namespaces { - nsSet[ns] = struct{}{} + + var nsSet map[types.Namespace]struct{} + if len(namespaces) > 0 { + nsSet = make(map[types.Namespace]struct{}, len(namespaces)) + for _, ns := range namespaces { + nsSet[ns] = struct{}{} + } } sub := &Subscription{ @@ -161,13 +165,27 @@ func (n *Notifier) Publish(event HeightEvent) { // filterEvent returns an event with blobs filtered to the subscriber's // namespace set. If the subscriber watches all namespaces, the event is -// returned as-is. +// returned as-is. Avoids allocation when no blobs match. func (n *Notifier) filterEvent(event HeightEvent, sub *Subscription) HeightEvent { if len(sub.namespaces) == 0 { return event } - filtered := make([]types.Blob, 0, len(event.Blobs)) + // Count matches first to avoid allocating when nothing matches. + count := 0 + for i := range event.Blobs { + if _, ok := sub.namespaces[event.Blobs[i].Namespace]; ok { + count++ + } + } + if count == len(event.Blobs) { + return event // all match — no copy needed + } + if count == 0 { + return HeightEvent{Height: event.Height, Header: event.Header} + } + + filtered := make([]types.Blob, 0, count) for i := range event.Blobs { if _, ok := sub.namespaces[event.Blobs[i].Namespace]; ok { filtered = append(filtered, event.Blobs[i]) diff --git a/pkg/api/service.go b/pkg/api/service.go index 5cc4324..758b568 100644 --- a/pkg/api/service.go +++ b/pkg/api/service.go @@ -70,7 +70,7 @@ func (s *Service) BlobGetByCommitment(ctx context.Context, commitment []byte) (j // limit=0 means no limit; offset=0 means no offset. // Pagination is applied to the aggregate result across all namespaces. func (s *Service) BlobGetAll(ctx context.Context, height uint64, namespaces []types.Namespace, limit, offset int) (json.RawMessage, error) { - var allBlobs []types.Blob + allBlobs := make([]types.Blob, 0, len(namespaces)*8) // preallocate for typical workload for _, ns := range namespaces { blobs, err := s.store.GetBlobs(ctx, ns, height, height, 0, 0) if err != nil { @@ -179,15 +179,25 @@ func (s *Service) Fetcher() fetch.DataFetcher { return s.fetcher } +// blobJSON is a struct-based representation for celestia-node compatible JSON. +// Using a struct avoids the per-call map[string]any allocation that json.Marshal +// requires for maps. +type blobJSON struct { + Namespace []byte `json:"namespace"` + Data []byte `json:"data"` + ShareVersion uint32 `json:"share_version"` + Commitment []byte `json:"commitment"` + Index int `json:"index"` +} + // MarshalBlob converts a stored blob into celestia-node compatible JSON. func MarshalBlob(b *types.Blob) json.RawMessage { - m := map[string]any{ - "namespace": b.Namespace[:], - "data": b.Data, - "share_version": b.ShareVersion, - "commitment": b.Commitment, - "index": b.Index, - } - raw, _ := json.Marshal(m) //nolint:errcheck + raw, _ := json.Marshal(blobJSON{ //nolint:errcheck + Namespace: b.Namespace[:], + Data: b.Data, + ShareVersion: b.ShareVersion, + Commitment: b.Commitment, + Index: b.Index, + }) return raw } diff --git a/pkg/fetch/celestia_app.go b/pkg/fetch/celestia_app.go index b1b7de5..3fc6517 100644 --- a/pkg/fetch/celestia_app.go +++ b/pkg/fetch/celestia_app.go @@ -29,11 +29,15 @@ type CelestiaAppFetcher struct { // bearerCreds implements grpc.PerRPCCredentials for bearer token auth. type bearerCreds struct { - token string + metadata map[string]string // cached; avoids allocation per RPC +} + +func newBearerCreds(token string) bearerCreds { + return bearerCreds{metadata: map[string]string{"authorization": "Bearer " + token}} } func (b bearerCreds) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) { - return map[string]string{"authorization": "Bearer " + b.token}, nil + return b.metadata, nil } func (b bearerCreds) RequireTransportSecurity() bool { return false } @@ -45,7 +49,7 @@ func NewCelestiaAppFetcher(grpcAddr, authToken string, log zerolog.Logger) (*Cel grpc.WithTransportCredentials(insecure.NewCredentials()), } if authToken != "" { - opts = append(opts, grpc.WithPerRPCCredentials(bearerCreds{token: authToken})) + opts = append(opts, grpc.WithPerRPCCredentials(newBearerCreds(authToken))) } conn, err := grpc.NewClient(grpcAddr, opts...) diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index f7035bd..a66b8cc 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -91,9 +91,9 @@ func NewCelestiaNodeFetcher(ctx context.Context, addr, token string, log zerolog func httpToWS(addr string) string { switch { case strings.HasPrefix(addr, "http://"): - return "ws://" + strings.TrimPrefix(addr, "http://") + return "ws://" + addr[len("http://"):] case strings.HasPrefix(addr, "https://"): - return "wss://" + strings.TrimPrefix(addr, "https://") + return "wss://" + addr[len("https://"):] default: return addr } @@ -474,18 +474,7 @@ func isTransientRPCError(err error) bool { return true } msg := strings.ToLower(err.Error()) - for _, needle := range []string{ - "eof", - "connection reset by peer", - "broken pipe", - "i/o timeout", - "timeout", - "temporarily unavailable", - "connection refused", - "503", - "504", - "502", - } { + for _, needle := range transientNeedles { if strings.Contains(msg, needle) { return true } @@ -493,6 +482,20 @@ func isTransientRPCError(err error) bool { return false } +// transientNeedles is allocated once at package init to avoid per-call slice allocation. +var transientNeedles = []string{ + "eof", + "connection reset by peer", + "broken pipe", + "i/o timeout", + "timeout", + "temporarily unavailable", + "connection refused", + "503", + "504", + "502", +} + // jsonInt64 handles CometBFT's int64 fields encoded as JSON strings. type jsonInt64 int64 diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 23ba57b..bb6ca03 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -55,23 +55,39 @@ func (nopRecorder) ObserveStoreQueryDuration(string, time.Duration) {} func (nopRecorder) SetActiveSubscriptions(int) {} func (nopRecorder) IncEventsDropped() {} +// syncStateLabels lists the known sync states. Index must match the +// syncStateGauges array in PromRecorder. +var syncStateLabels = [3]string{"initializing", "backfilling", "streaming"} + // PromRecorder implements Recorder using Prometheus metrics. +// +// All *Vec metrics are pre-resolved at construction time via WithLabelValues +// for known label sets. This eliminates per-call map lookups and slice +// allocations on hot paths. type PromRecorder struct { - syncState *prometheus.GaugeVec latestHeight prometheus.Gauge networkHeight prometheus.Gauge syncLag prometheus.Gauge blobsProcessed prometheus.Counter headersProcessed prometheus.Counter batchDuration prometheus.Histogram - backfillStageDur *prometheus.HistogramVec - backfillStageErr *prometheus.CounterVec - apiRequests *prometheus.CounterVec - apiDuration *prometheus.HistogramVec - storeQueryDur *prometheus.HistogramVec activeSubs prometheus.Gauge eventsDropped prometheus.Counter - info *prometheus.GaugeVec + + // Pre-resolved label combinations. + syncStateGauges [3]prometheus.Gauge // indexed by syncStateLabels + + // Vec metrics kept for dynamic labels not known at init. + backfillStageDurVec *prometheus.HistogramVec + backfillStageErrVec *prometheus.CounterVec + apiRequestsVec *prometheus.CounterVec + apiDurationVec *prometheus.HistogramVec + storeQueryDurVec *prometheus.HistogramVec + + // Pre-resolved observers/counters for known label values. + backfillStageDurOb map[string]prometheus.Observer + backfillStageErrCt map[string]prometheus.Counter + storeQueryDurOb map[string]prometheus.Observer // cached for lag calculation lastLatest atomic.Uint64 @@ -86,98 +102,118 @@ func NewPromRecorder(reg prometheus.Registerer, version string) *PromRecorder { } factory := promauto.With(reg) - r := &PromRecorder{ - syncState: factory.NewGaugeVec(prometheus.GaugeOpts{ - Name: "apex_sync_state", - Help: "Current sync state (1 = active for the labeled state).", - }, []string{"state"}), + syncStateVec := factory.NewGaugeVec(prometheus.GaugeOpts{ + Name: "apex_sync_state", + Help: "Current sync state (1 = active for the labeled state).", + }, []string{"state"}) + + backfillStageDurVec := factory.NewHistogramVec(prometheus.HistogramOpts{ + Name: "apex_backfill_stage_duration_seconds", + Help: "Per-stage duration during backfill height processing.", + Buckets: prometheus.DefBuckets, + }, []string{"stage"}) + + backfillStageErrVec := factory.NewCounterVec(prometheus.CounterOpts{ + Name: "apex_backfill_stage_errors_total", + Help: "Total backfill stage errors by stage.", + }, []string{"stage"}) + + storeQueryDurVec := factory.NewHistogramVec(prometheus.HistogramOpts{ + Name: "apex_store_query_duration_seconds", + Help: "Store query duration by operation.", + Buckets: prometheus.DefBuckets, + }, []string{"operation"}) + + apiRequestsVec := factory.NewCounterVec(prometheus.CounterOpts{ + Name: "apex_api_requests_total", + Help: "Total API requests by method and status.", + }, []string{"method", "status"}) + + apiDurationVec := factory.NewHistogramVec(prometheus.HistogramOpts{ + Name: "apex_api_request_duration_seconds", + Help: "API request duration by method.", + Buckets: prometheus.DefBuckets, + }, []string{"method"}) + r := &PromRecorder{ latestHeight: factory.NewGauge(prometheus.GaugeOpts{ Name: "apex_sync_latest_height", Help: "Latest locally synced block height.", }), - networkHeight: factory.NewGauge(prometheus.GaugeOpts{ Name: "apex_sync_network_height", Help: "Latest known network block height.", }), - syncLag: factory.NewGauge(prometheus.GaugeOpts{ Name: "apex_sync_lag", Help: "Difference between network height and latest synced height.", }), - blobsProcessed: factory.NewCounter(prometheus.CounterOpts{ Name: "apex_sync_blobs_processed_total", Help: "Total number of blobs processed.", }), - headersProcessed: factory.NewCounter(prometheus.CounterOpts{ Name: "apex_sync_headers_processed_total", Help: "Total number of headers processed.", }), - batchDuration: factory.NewHistogram(prometheus.HistogramOpts{ Name: "apex_sync_batch_duration_seconds", Help: "Duration of backfill batch processing.", Buckets: prometheus.DefBuckets, }), - - backfillStageDur: factory.NewHistogramVec(prometheus.HistogramOpts{ - Name: "apex_backfill_stage_duration_seconds", - Help: "Per-stage duration during backfill height processing.", - Buckets: prometheus.DefBuckets, - }, []string{"stage"}), - - backfillStageErr: factory.NewCounterVec(prometheus.CounterOpts{ - Name: "apex_backfill_stage_errors_total", - Help: "Total backfill stage errors by stage.", - }, []string{"stage"}), - - apiRequests: factory.NewCounterVec(prometheus.CounterOpts{ - Name: "apex_api_requests_total", - Help: "Total API requests by method and status.", - }, []string{"method", "status"}), - - apiDuration: factory.NewHistogramVec(prometheus.HistogramOpts{ - Name: "apex_api_request_duration_seconds", - Help: "API request duration by method.", - Buckets: prometheus.DefBuckets, - }, []string{"method"}), - - storeQueryDur: factory.NewHistogramVec(prometheus.HistogramOpts{ - Name: "apex_store_query_duration_seconds", - Help: "Store query duration by operation.", - Buckets: prometheus.DefBuckets, - }, []string{"operation"}), - activeSubs: factory.NewGauge(prometheus.GaugeOpts{ Name: "apex_subscriptions_active", Help: "Number of active event subscriptions.", }), - eventsDropped: factory.NewCounter(prometheus.CounterOpts{ Name: "apex_subscriptions_events_dropped_total", Help: "Total number of subscription events dropped.", }), - info: factory.NewGaugeVec(prometheus.GaugeOpts{ - Name: "apex_info", - Help: "Build information.", - }, []string{"version", "go_version"}), + backfillStageDurVec: backfillStageDurVec, + backfillStageErrVec: backfillStageErrVec, + apiRequestsVec: apiRequestsVec, + apiDurationVec: apiDurationVec, + storeQueryDurVec: storeQueryDurVec, } - r.info.WithLabelValues(version, runtime.Version()).Set(1) + // Pre-resolve sync state gauges. + for i, label := range syncStateLabels { + r.syncStateGauges[i] = syncStateVec.WithLabelValues(label) + } + + // Pre-resolve known backfill stage labels. + knownStages := []string{"fetch_height", "store_header", "store_blobs", "observer"} + r.backfillStageDurOb = make(map[string]prometheus.Observer, len(knownStages)) + r.backfillStageErrCt = make(map[string]prometheus.Counter, len(knownStages)) + for _, s := range knownStages { + r.backfillStageDurOb[s] = backfillStageDurVec.WithLabelValues(s) + r.backfillStageErrCt[s] = backfillStageErrVec.WithLabelValues(s) + } + + // Pre-resolve known store operations. + knownOps := []string{"PutBlobs", "GetBlobs", "GetBlobByCommitment", "PutHeader", "GetHeader", "GetSyncState"} + r.storeQueryDurOb = make(map[string]prometheus.Observer, len(knownOps)) + for _, op := range knownOps { + r.storeQueryDurOb[op] = storeQueryDurVec.WithLabelValues(op) + } + + // Info metric (set once). + infoVec := factory.NewGaugeVec(prometheus.GaugeOpts{ + Name: "apex_info", + Help: "Build information.", + }, []string{"version", "go_version"}) + infoVec.WithLabelValues(version, runtime.Version()).Set(1) return r } func (r *PromRecorder) SetSyncState(state string) { - for _, s := range []string{"initializing", "backfilling", "streaming"} { - if s == state { - r.syncState.WithLabelValues(s).Set(1) + for i, label := range syncStateLabels { + if label == state { + r.syncStateGauges[i].Set(1) } else { - r.syncState.WithLabelValues(s).Set(0) + r.syncStateGauges[i].Set(0) } } } @@ -219,23 +255,35 @@ func (r *PromRecorder) ObserveBatchDuration(d time.Duration) { } func (r *PromRecorder) ObserveBackfillStageDuration(stage string, d time.Duration) { - r.backfillStageDur.WithLabelValues(stage).Observe(d.Seconds()) + if ob, ok := r.backfillStageDurOb[stage]; ok { + ob.Observe(d.Seconds()) + return + } + r.backfillStageDurVec.WithLabelValues(stage).Observe(d.Seconds()) } func (r *PromRecorder) IncBackfillStageErrors(stage string) { - r.backfillStageErr.WithLabelValues(stage).Inc() + if ct, ok := r.backfillStageErrCt[stage]; ok { + ct.Inc() + return + } + r.backfillStageErrVec.WithLabelValues(stage).Inc() } func (r *PromRecorder) IncAPIRequest(method, status string) { - r.apiRequests.WithLabelValues(method, status).Inc() + r.apiRequestsVec.WithLabelValues(method, status).Inc() } func (r *PromRecorder) ObserveAPIRequestDuration(method string, d time.Duration) { - r.apiDuration.WithLabelValues(method).Observe(d.Seconds()) + r.apiDurationVec.WithLabelValues(method).Observe(d.Seconds()) } func (r *PromRecorder) ObserveStoreQueryDuration(op string, d time.Duration) { - r.storeQueryDur.WithLabelValues(op).Observe(d.Seconds()) + if ob, ok := r.storeQueryDurOb[op]; ok { + ob.Observe(d.Seconds()) + return + } + r.storeQueryDurVec.WithLabelValues(op).Observe(d.Seconds()) } func (r *PromRecorder) SetActiveSubscriptions(n int) { diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go index 9579353..4acba92 100644 --- a/pkg/store/sqlite.go +++ b/pkg/store/sqlite.go @@ -204,7 +204,7 @@ func (s *SQLiteStore) GetBlobs(ctx context.Context, ns types.Namespace, startHei } defer rows.Close() //nolint:errcheck - var blobs []types.Blob + blobs := make([]types.Blob, 0, 64) // preallocate to reduce append reallocations for rows.Next() { b, err := scanBlobRow(rows) if err != nil {