From fd14cf0cfa899553221f4efd1ee16840ab475bf9 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 14 May 2026 14:18:58 -0700 Subject: [PATCH 1/2] reuse grpc buffer in querier's store-gate way stream Signed-off-by: Essam Eldaly --- integration/grpc_store_gateway_bench_test.go | 144 +++++++++++++ pkg/querier/blocks_store_queryable.go | 26 +++ .../series_response_free_tracking_test.go | 193 ++++++++++++++++++ pkg/querier/series_response_release_test.go | 131 ++++++++++++ .../thanos/pkg/store/storepb/buf_ref.go | 32 +++ .../thanos/pkg/store/storepb/rpc.pb.go | 50 ++++- .../thanos/pkg/store/storepb/rpc.proto | 4 + 7 files changed, 579 insertions(+), 1 deletion(-) create mode 100644 integration/grpc_store_gateway_bench_test.go create mode 100644 pkg/querier/series_response_free_tracking_test.go create mode 100644 pkg/querier/series_response_release_test.go create mode 100644 vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go diff --git a/integration/grpc_store_gateway_bench_test.go b/integration/grpc_store_gateway_bench_test.go new file mode 100644 index 00000000000..af28f2afed5 --- /dev/null +++ b/integration/grpc_store_gateway_bench_test.go @@ -0,0 +1,144 @@ +//go:build requires_docker + +package integration + +import ( + "context" + "fmt" + "io" + "net" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + // Import cortexpb to register the cortexCodec (buffer pooling). + _ "github.com/cortexproject/cortex/pkg/cortexpb" +) + +// mockStoreGatewayServer implements storepb.StoreServer and streams +// pre-built SeriesResponse messages for benchmarking. +type mockStoreGatewayServer struct { + storepb.UnimplementedStoreServer + responses []*storepb.SeriesResponse +} + +func (m *mockStoreGatewayServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + for _, resp := range m.responses { + if err := srv.Send(resp); err != nil { + return err + } + } + return nil +} + +// BenchmarkGrpcStoreGatewayCalls benchmarks the full gRPC path for store gateway +// Series streaming with the cortexCodec and compression enabled. +// This is the store-gateway equivalent of BenchmarkGrpcCalls (which tests the ingester path). +// +// With SeriesResponse implementing ReleasableMessage, calling Free() after each Recv() +// returns the unmarshal buffer to the pool, reducing per-message allocations by ~32KB. +func BenchmarkGrpcStoreGatewayCalls(b *testing.B) { + // Build realistic SeriesResponse messages (large enough to trigger buffer pooling). + responses := make([]*storepb.SeriesResponse, 10) + for i := range responses { + responses[i] = createStoreGatewayBenchResponse(i) + } + + mock := &mockStoreGatewayServer{responses: responses} + + // Start gRPC server. + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(b, err) + + gRPCServer := grpc.NewServer() + storepb.RegisterStoreServer(gRPCServer, mock) + + go func() { + if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped { + b.Error(err) + } + }() + defer gRPCServer.Stop() + + // Connect client with compression (zstd via cortexCodec default call options). + conn, err := grpc.NewClient( + listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(b, err) + defer conn.Close() + + client := storepb.NewStoreClient(conn) + + // freeable checks if the response supports Free() (i.e., has MessageWithBufRef embedded). + // This allows the benchmark to compile and run on both old builds (without Free) + // and new builds (with Free), so you can compare results via benchstat. + type freeable interface { + Free() + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + stream, err := client.Series(context.Background(), &storepb.SeriesRequest{}) + require.NoError(b, err) + + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) + if f, ok := interface{}(resp).(freeable); ok { + f.Free() + } + } + } +} + +// createStoreGatewayBenchResponse creates a realistic SeriesResponse with chunk data +// large enough to exceed the buffer pooling threshold (~1KB). +func createStoreGatewayBenchResponse(n int) *storepb.SeriesResponse { + lbls := labels.FromStrings( + "__name__", fmt.Sprintf("http_requests_total_%d", n), + "cluster", "us-east-1", + "namespace", "production", + "pod", fmt.Sprintf("web-server-deployment-7f8b9c6d4f-abc%02d", n), + "container", "nginx", + "instance", fmt.Sprintf("10.0.%d.%d:8080", n, n+1), + "job", "kubernetes-pods", + ) + + // Create chunk data (~4KB per chunk, simulating real store gateway responses). + chunkData := make([]byte, 4096) + for i := range chunkData { + chunkData[i] = byte((i + n) % 256) + } + + numChunks := 5 + n + chunks := make([]storepb.AggrChunk, numChunks) + for i := 0; i < numChunks; i++ { + chunks[i] = storepb.AggrChunk{ + MinTime: int64(i * 7200000), + MaxTime: int64((i + 1) * 7200000), + Raw: &storepb.Chunk{ + Type: storepb.Chunk_XOR, + Data: chunkData, + }, + } + } + + return &storepb.SeriesResponse{ + Result: &storepb.SeriesResponse_Series{ + Series: &storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(lbls), + Chunks: chunks, + }, + }, + } +} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index f9deca3b9bf..a89cd7e63cc 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -29,6 +29,7 @@ import ( "github.com/thanos-io/thanos/pkg/pool" thanosquery "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/store/hintspb" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "go.uber.org/atomic" @@ -67,6 +68,10 @@ var ( errNoStoreGatewayAddress = errors.New("no store-gateway address configured") errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)" defaultAggrs = []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} + + // Compile-time check: SeriesResponse must satisfy cortexpb.ReleasableMessage + // so that cortexCodec registers unmarshal buffers for explicit lifecycle management. + _ cortexpb.ReleasableMessage = &storepb.SeriesResponse{} ) // BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. @@ -675,6 +680,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( myQueriedBlocks := []ulid.ULID(nil) processSeries := func(s *storepb.Series) error { + // Detach series data from the gRPC unmarshal buffer so that + // resp.Free() can safely return the buffer to the pool. + detachSeriesFromBuffer(s) mySeries = append(mySeries, s) // Add series fingerprint to query limiter; will return error if we are over the limit @@ -746,6 +754,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( // Response may either contain series, batch, warning or hints. if s := resp.GetSeries(); s != nil { if err := processSeries(s); err != nil { + resp.Free() return err } } @@ -753,6 +762,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if b := resp.GetBatch(); b != nil { for _, s := range b.Series { if err := processSeries(s); err != nil { + resp.Free() return err } } @@ -765,11 +775,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if h := resp.GetHints(); h != nil { hints := hintspb.SeriesResponseHints{} if err := types.UnmarshalAny(h, &hints); err != nil { + resp.Free() return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress()) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) if err != nil { + resp.Free() return errors.Wrapf(err, "failed to parse queried block IDs from received hints") } @@ -778,6 +790,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( seriesQueryStats.Merge(hints.QueryStats) } } + + resp.Free() } numSeries := len(mySeries) @@ -1189,6 +1203,18 @@ func convertBlockHintsToULIDs(hints []hintspb.Block) ([]ulid.ULID, error) { return res, nil } +// detachSeriesFromBuffer re-allocates label strings and chunk data byte slices +// so that the series no longer references the gRPC unmarshal buffer. This allows +// resp.Free() to safely return the buffer to the pool without causing use-after-free. +func detachSeriesFromBuffer(s *storepb.Series) { + labelpb.ReAllocZLabelsStrings(&s.Labels, true) + for i := range s.Chunks { + if s.Chunks[i].Raw != nil && len(s.Chunks[i].Raw.Data) > 0 { + s.Chunks[i].Raw.Data = append([]byte(nil), s.Chunks[i].Raw.Data...) + } + } +} + // countChunkBytes returns the size of the chunks making up the provided series in bytes func countChunkBytes(series ...*storepb.Series) (count int) { for _, s := range series { diff --git a/pkg/querier/series_response_free_tracking_test.go b/pkg/querier/series_response_free_tracking_test.go new file mode 100644 index 00000000000..c5ce4e4866a --- /dev/null +++ b/pkg/querier/series_response_free_tracking_test.go @@ -0,0 +1,193 @@ +package querier + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/go-kit/log" + "github.com/oklog/ulid/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/user" + "google.golang.org/grpc/mem" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/util/limiter" +) + +type freeTracker struct { + freed atomic.Bool +} + +type singleFreePool struct { + tracker *freeTracker +} + +func (p *singleFreePool) Get(length int) *[]byte { + b := make([]byte, length) + return &b +} + +func (p *singleFreePool) Put(_ *[]byte) { + p.tracker.freed.Store(true) +} + +func registerTrackingBuffer(resp *storepb.SeriesResponse, tracker *freeTracker) { + data := make([]byte, 2048) + resp.RegisterBuffer(mem.NewBuffer(&data, &singleFreePool{tracker: tracker})) +} + +func TestFreeCalledOnAllResponses(t *testing.T) { + t.Parallel() + + const ( + metricName = "test_metric" + minT = int64(10) + maxT = int64(20) + ) + + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + + t.Run("success path", func(t *testing.T) { + t.Parallel() + + resp1 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "1"), + []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil, + ) + resp2 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "2"), + []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil, + ) + resp3 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "3"), + []cortexpb.Sample{{Value: 3, TimestampMs: minT}}, nil, nil, + ) + hintsResp := mockHintsResponse(block1, block2) + + tracker1 := &freeTracker{} + tracker2 := &freeTracker{} + tracker3 := &freeTracker{} + trackerHints := &freeTracker{} + registerTrackingBuffer(resp1, tracker1) + registerTrackingBuffer(resp2, tracker2) + registerTrackingBuffer(resp3, tracker3) + registerTrackingBuffer(hintsResp, trackerHints) + + ctx := user.InjectOrgID(context.Background(), "user-1") + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, 0)) + + stores := &blocksStoreSetMock{mockedResponses: []any{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesResponses: []*storepb.SeriesResponse{resp1, resp2, resp3, hintsResp}, + }: {block1, block2}, + }, + }} + + finder := &blocksFinderMock{} + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything). + Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1}, + &bucketindex.Block{ID: block2}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + + set := q.Select(ctx, true, &storage.SelectHints{Start: minT, End: maxT}, matchers...) + require.NoError(t, set.Err()) + for set.Next() { + _ = set.At() + } + + assert.True(t, tracker1.freed.Load(), "resp1 should be freed") + assert.True(t, tracker2.freed.Load(), "resp2 should be freed") + assert.True(t, tracker3.freed.Load(), "resp3 should be freed") + assert.True(t, trackerHints.freed.Load(), "hints resp should be freed") + }) + + t.Run("error path - series limit exceeded", func(t *testing.T) { + t.Parallel() + + resp1 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "1"), + []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil, + ) + resp2 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "2"), + []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil, + ) + + tracker1 := &freeTracker{} + tracker2 := &freeTracker{} + registerTrackingBuffer(resp1, tracker1) + registerTrackingBuffer(resp2, tracker2) + + // Limit to 1 series so the second triggers an error. + ctx := user.InjectOrgID(context.Background(), "user-1") + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(1, 0, 0, 0)) + + stores := &blocksStoreSetMock{mockedResponses: []any{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesResponses: []*storepb.SeriesResponse{resp1, resp2}, + }: {block1, block2}, + }, + }} + + finder := &blocksFinderMock{} + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything). + Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1}, + &bucketindex.Block{ID: block2}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + + set := q.Select(ctx, true, &storage.SelectHints{Start: minT, End: maxT}, matchers...) + require.Error(t, set.Err()) + + assert.True(t, tracker1.freed.Load(), "resp1 should be freed") + assert.True(t, tracker2.freed.Load(), "resp2 should be freed (caused the error)") + }) +} diff --git a/pkg/querier/series_response_release_test.go b/pkg/querier/series_response_release_test.go new file mode 100644 index 00000000000..afb484a1a31 --- /dev/null +++ b/pkg/querier/series_response_release_test.go @@ -0,0 +1,131 @@ +package querier + +import ( + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc/mem" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +func TestSeriesResponseImplementsReleasableMessage(t *testing.T) { + var resp storepb.SeriesResponse + var _ cortexpb.ReleasableMessage = &resp + + assert.NotPanics(t, func() { resp.Free() }) +} + +func TestSeriesResponseFreeIdempotence(t *testing.T) { + t.Run("zero value", func(t *testing.T) { + var resp storepb.SeriesResponse + assert.NotPanics(t, func() { resp.Free() }) + assert.NotPanics(t, func() { resp.Free() }) + }) + + t.Run("with registered buffer", func(t *testing.T) { + var resp storepb.SeriesResponse + buf := mem.SliceBuffer(make([]byte, 64)) + resp.RegisterBuffer(buf) + + assert.NotPanics(t, func() { resp.Free() }) + assert.NotPanics(t, func() { resp.Free() }) + }) + + t.Run("with pooled buffer", func(t *testing.T) { + var resp storepb.SeriesResponse + b := make([]byte, 128) + resp.RegisterBuffer(mem.NewBuffer(&b, mem.NopBufferPool{})) + + assert.NotPanics(t, func() { resp.Free() }) + assert.NotPanics(t, func() { resp.Free() }) + }) +} + +func TestDetachSeriesFromBuffer_NoUseAfterFree(t *testing.T) { + t.Run("labels survive buffer overwrite", func(t *testing.T) { + // ZLabel strings are unsafe casts into the unmarshal buffer. + bufData := []byte("__name__\x00http_requests_total\x00cluster\x00us-east-1\x00") + series := &storepb.Series{ + Labels: []storepb.Label{ + {Name: unsafeString(bufData[0:8]), Value: unsafeString(bufData[9:28])}, + {Name: unsafeString(bufData[29:36]), Value: unsafeString(bufData[37:46])}, + }, + } + + detachSeriesFromBuffer(series) + + // Overwrite original buffer (simulates pool reuse). + for i := range bufData { + bufData[i] = 0xFF + } + + assert.Equal(t, "__name__", series.Labels[0].Name) + assert.Equal(t, "http_requests_total", series.Labels[0].Value) + assert.Equal(t, "cluster", series.Labels[1].Name) + assert.Equal(t, "us-east-1", series.Labels[1].Value) + }) + + t.Run("chunk data survives buffer overwrite", func(t *testing.T) { + chunkBuf := make([]byte, 4096) + for i := range chunkBuf { + chunkBuf[i] = byte(i % 256) + } + expected := make([]byte, len(chunkBuf)) + copy(expected, chunkBuf) + + series := &storepb.Series{ + Chunks: []storepb.AggrChunk{ + {MinTime: 1000, MaxTime: 2000, Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chunkBuf}}, + }, + } + + detachSeriesFromBuffer(series) + + for i := range chunkBuf { + chunkBuf[i] = 0xFF + } + + assert.Equal(t, expected, series.Chunks[0].Raw.Data) + }) + + t.Run("end-to-end with Free and buffer overwrite", func(t *testing.T) { + chunkData := []byte{0x01, 0x02, 0x03, 0x04, 0x05} + series := &storepb.Series{ + Labels: []storepb.Label{{Name: "job", Value: "prometheus"}}, + Chunks: []storepb.AggrChunk{ + {Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chunkData}}, + }, + } + resp := &storepb.SeriesResponse{ + Result: &storepb.SeriesResponse_Series{Series: series}, + } + + poolBuf := make([]byte, 32768) + resp.RegisterBuffer(mem.NewBuffer(&poolBuf, mem.NopBufferPool{})) + + s := resp.GetSeries() + detachSeriesFromBuffer(s) + resp.Free() + + // Overwrite both the pool buffer and original chunk slice. + for i := range poolBuf { + poolBuf[i] = 0xDE + } + for i := range chunkData { + chunkData[i] = 0xAB + } + + assert.Equal(t, "job", s.Labels[0].Name) + assert.Equal(t, "prometheus", s.Labels[0].Value) + assert.Equal(t, []byte{0x01, 0x02, 0x03, 0x04, 0x05}, s.Chunks[0].Raw.Data) + }) +} + +// unsafeString creates a string sharing memory with the byte slice, +// simulating protobuf's zero-copy string unmarshal. +func unsafeString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go new file mode 100644 index 00000000000..926099ae326 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go @@ -0,0 +1,32 @@ +package storepb + +import ( + "google.golang.org/grpc/mem" +) + +// MessageWithBufRef holds a reference to gRPC unmarshal buffers for explicit lifecycle management. +// It satisfies cortexpb.ReleasableMessage via structural typing. +type MessageWithBufRef struct { + bs mem.BufferSlice +} + +func (m *MessageWithBufRef) RegisterBuffer(buffer mem.Buffer) { + m.bs = append(m.bs, buffer) +} + +// Free releases all registered buffers. Idempotent and safe on zero-value. +func (m *MessageWithBufRef) Free() { + m.bs.Free() + m.bs = m.bs[:0] +} + +// Proto serialization no-ops (MessageWithBufRef has no wire representation). + +func (m *MessageWithBufRef) Size() int { return 0 } +func (m *MessageWithBufRef) Marshal() ([]byte, error) { return nil, nil } +func (m *MessageWithBufRef) MarshalTo(dAtA []byte) (int, error) { return 0, nil } +func (m *MessageWithBufRef) MarshalToSizedBuffer(dAtA []byte) (int, error) { + return 0, nil +} +func (m *MessageWithBufRef) Unmarshal(dAtA []byte) error { return nil } +func (m MessageWithBufRef) Equal(that MessageWithBufRef) bool { return true } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go index 0406eb990c4..d871f238617 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go @@ -440,7 +440,8 @@ type SeriesResponse struct { // *SeriesResponse_Warning // *SeriesResponse_Hints // *SeriesResponse_Batch - Result isSeriesResponse_Result `protobuf_oneof:"result"` + Result isSeriesResponse_Result `protobuf_oneof:"result"` + MessageWithBufRef `protobuf:"bytes,1001,opt,name=Ref,proto3,embedded=Ref" json:"Ref"` } func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } @@ -1602,6 +1603,18 @@ func (m *SeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + { + size := m.MessageWithBufRef.Size() + i -= size + if _, err := m.MessageWithBufRef.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x3e + i-- + dAtA[i] = 0xca if m.Result != nil { { size := m.Result.Size() @@ -2201,6 +2214,8 @@ func (m *SeriesResponse) Size() (n int) { if m.Result != nil { n += m.Result.Size() } + l = m.MessageWithBufRef.Size() + n += 2 + l + sovRpc(uint64(l)) return n } @@ -3848,6 +3863,39 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { } m.Result = &SeriesResponse_Batch{v} iNdEx = postIndex + case 1001: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MessageWithBufRef", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.MessageWithBufRef.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto index afc9a6ec382..20624e258b5 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto @@ -8,6 +8,7 @@ import "store/storepb/types.proto"; import "gogoproto/gogo.proto"; import "store/storepb/prompb/types.proto"; import "google/protobuf/any.proto"; +import "github.com/cortexproject/cortex/pkg/cortexpb/cortex.proto"; option go_package = "storepb"; @@ -208,6 +209,9 @@ message SeriesResponse { /// batch is an array of series so more than 1 series can be included in the response SeriesBatch batch = 4; } + + // Buffer reference for explicit memory management via cortexCodec. + cortexpb.MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/cortexpb.MessageWithBufRef", (gogoproto.nullable) = false]; } message LabelNamesRequest { From 532b96f82719ebc77d06eb2d98023a6740ee4c45 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 14 May 2026 14:32:44 -0700 Subject: [PATCH 2/2] lint Signed-off-by: Essam Eldaly --- pkg/querier/series_response_free_tracking_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/series_response_free_tracking_test.go b/pkg/querier/series_response_free_tracking_test.go index c5ce4e4866a..ea35f6a9fdc 100644 --- a/pkg/querier/series_response_free_tracking_test.go +++ b/pkg/querier/series_response_free_tracking_test.go @@ -2,7 +2,6 @@ package querier import ( "context" - "sync/atomic" "testing" "github.com/go-kit/log" @@ -15,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/user" + "go.uber.org/atomic" "google.golang.org/grpc/mem" "github.com/cortexproject/cortex/pkg/cortexpb"