Skip to content

Commit d9638cc

Browse files
committed
Add projectionHint to SG Parquet mode
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 17dd8b1 commit d9638cc

File tree

11 files changed

+561
-14
lines changed

11 files changed

+561
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
1414
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
1515
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
16+
* [FEATURE] StoreGateway: Add a flag `-blocks-storage.bucket-store.honor-projection-hints`. If enabled, Store Gateway in Parquet mode will honor projection hints and only materialize requested labels. #7206
1617
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
1718
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
1819
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186

docs/blocks-storage/querier.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,6 +1813,12 @@ blocks_storage:
18131813
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
18141814
[parquet_shard_cache_ttl: <duration> | default = 24h]
18151815

1816+
# [Experimental] If enabled, Store Gateway will honor projection hints and
1817+
# only materialize requested labels. It is only effect when
1818+
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
1819+
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
1820+
[honor_projection_hints: <boolean> | default = false]
1821+
18161822
tsdb:
18171823
# Local directory to store TSDBs in the ingesters.
18181824
# CLI flag: -blocks-storage.tsdb.dir

docs/blocks-storage/store-gateway.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1879,6 +1879,12 @@ blocks_storage:
18791879
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
18801880
[parquet_shard_cache_ttl: <duration> | default = 24h]
18811881

1882+
# [Experimental] If enabled, Store Gateway will honor projection hints and
1883+
# only materialize requested labels. It is only effect when
1884+
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
1885+
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
1886+
[honor_projection_hints: <boolean> | default = false]
1887+
18821888
tsdb:
18831889
# Local directory to store TSDBs in the ingesters.
18841890
# CLI flag: -blocks-storage.tsdb.dir

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2491,6 +2491,12 @@ bucket_store:
24912491
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
24922492
[parquet_shard_cache_ttl: <duration> | default = 24h]
24932493

2494+
# [Experimental] If enabled, Store Gateway will honor projection hints and
2495+
# only materialize requested labels. It is only effect when
2496+
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
2497+
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
2498+
[honor_projection_hints: <boolean> | default = false]
2499+
24942500
tsdb:
24952501
# Local directory to store TSDBs in the ingesters.
24962502
# CLI flag: -blocks-storage.tsdb.dir

docs/configuration/v1-guarantees.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ Currently experimental features are:
110110
- Store Gateway Zone Stable Shuffle Sharding
111111
- `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag
112112
- `zone_stable_shuffle_sharding` (boolean) field in config file
113+
- Store Gateway HonorProjectionHints in Parquet Mode
114+
- `-blocks-storage.bucket-store.honor-projection-hints` CLI flag
113115
- Basic Lifecycler (Storegateway, Alertmanager, Ruler) Final Sleep on shutdown, which tells the pod wait before shutdown, allowing a delay to propagate ring changes.
114116
- `-ruler.ring.final-sleep` (duration) CLI flag
115117
- `store-gateway.sharding-ring.final-sleep` (duration) CLI flag
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
//go:build integration_querier
2+
3+
package integration
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"math/rand"
9+
"path/filepath"
10+
"slices"
11+
"testing"
12+
"time"
13+
14+
"github.com/prometheus/common/model"
15+
"github.com/prometheus/prometheus/model/labels"
16+
"github.com/stretchr/testify/require"
17+
"github.com/thanos-io/thanos/pkg/block"
18+
"github.com/thanos-io/thanos/pkg/block/metadata"
19+
20+
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
21+
22+
"github.com/cortexproject/cortex/integration/e2e"
23+
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
24+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
25+
"github.com/cortexproject/cortex/integration/e2ecortex"
26+
"github.com/cortexproject/cortex/pkg/storage/bucket"
27+
"github.com/cortexproject/cortex/pkg/util/log"
28+
)
29+
30+
func TestParquetBucketStore_ProjectionHint(t *testing.T) {
31+
s, err := e2e.NewScenario(networkName)
32+
require.NoError(t, err)
33+
defer s.Close()
34+
35+
consul := e2edb.NewConsulWithName("consul")
36+
minio := e2edb.NewMinio(9000, bucketName)
37+
memcached := e2ecache.NewMemcached()
38+
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))
39+
40+
// Define configuration flags.
41+
flags := BlocksStorageFlags()
42+
flags = mergeFlags(flags, map[string]string{
43+
// Enable Thanos engine and projection optimization.
44+
"-querier.thanos-engine": "true",
45+
"-querier.optimizers": "projection",
46+
47+
// enable honor-projection-hints querier and store gateway
48+
"-querier.honor-projection-hints": "true",
49+
"-blocks-storage.bucket-store.honor-projection-hints": "true",
50+
// enable Store Gateway Parquet mode
51+
"-blocks-storage.bucket-store.bucket-store-type": "parquet",
52+
53+
// Set query-ingesters-within to 1h so queries older than 1h don't hit ingesters
54+
"-querier.query-ingesters-within": "1h",
55+
56+
// Configure Parquet Converter
57+
"-parquet-converter.enabled": "true",
58+
"-parquet-converter.conversion-interval": "1s",
59+
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
60+
"-compactor.block-ranges": "1ms,12h",
61+
// Enable cache
62+
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
63+
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
64+
})
65+
66+
// Store Gateway
67+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
68+
require.NoError(t, s.StartAndWaitReady(storeGateway))
69+
70+
// Parquet Converter
71+
parquetConverter := e2ecortex.NewParquetConverter("parquet-converter", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
72+
require.NoError(t, s.StartAndWaitReady(parquetConverter))
73+
74+
// Querier
75+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
76+
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
77+
}), "")
78+
require.NoError(t, s.StartAndWaitReady(querier))
79+
80+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
81+
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
82+
83+
// Create block
84+
now := time.Now()
85+
// Time range: [Now - 24h] to [Now - 20h]
86+
start := now.Add(-24 * time.Hour)
87+
end := now.Add(-20 * time.Hour)
88+
89+
ctx := context.Background()
90+
91+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
92+
dir := filepath.Join(s.SharedDir(), "data")
93+
scrapeInterval := time.Minute
94+
statusCodes := []string{"200", "400", "404", "500", "502"}
95+
methods := []string{"GET", "POST", "PUT", "DELETE"}
96+
97+
numSeries := 10
98+
numSamples := 100
99+
100+
lbls := make([]labels.Labels, 0, numSeries)
101+
for i := 0; i < numSeries; i++ {
102+
lbls = append(lbls, labels.FromStrings(
103+
labels.MetricName, "http_requests_total",
104+
"job", "api-server",
105+
"instance", fmt.Sprintf("instance-%d", i),
106+
"status_code", statusCodes[i%len(statusCodes)],
107+
"method", methods[i%len(methods)],
108+
"path", fmt.Sprintf("/api/v1/endpoint%d", i%3),
109+
"cluster", "test-cluster",
110+
))
111+
}
112+
113+
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
114+
require.NoError(t, err)
115+
116+
storage, err := e2ecortex.NewS3ClientForMinio(minio, bucketName)
117+
require.NoError(t, err)
118+
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
119+
120+
// Upload TSDB Block
121+
require.NoError(t, block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc))
122+
123+
// Wait until parquet converter convert block
124+
require.NoError(t, parquetConverter.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_parquet_converter_blocks_converted_total"}, e2e.WaitMissingMetrics))
125+
126+
// Create client
127+
c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
128+
require.NoError(t, err)
129+
130+
cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} {
131+
labelSets, err := c.Series([]string{`{job="api-server"}`}, start, end)
132+
if err != nil {
133+
t.Logf("Series query failed: %v", err)
134+
return false
135+
}
136+
return len(labelSets) > 0
137+
})
138+
139+
testCases := []struct {
140+
name string
141+
query string
142+
expectedLabels []string // query result should contain these labels
143+
}{
144+
{
145+
name: "vector selector query",
146+
query: `http_requests_total`,
147+
expectedLabels: []string{
148+
"__name__", "job", "instance", "status_code", "method", "path", "cluster",
149+
},
150+
},
151+
{
152+
name: "simple_sum_by_job",
153+
query: `sum by (job) (http_requests_total)`,
154+
expectedLabels: []string{"job"},
155+
},
156+
{
157+
name: "rate_with_aggregation",
158+
query: `sum by (method) (rate(http_requests_total[5m]))`,
159+
expectedLabels: []string{"method"},
160+
},
161+
{
162+
name: "multiple_grouping_labels",
163+
query: `sum by (job, status_code) (http_requests_total)`,
164+
expectedLabels: []string{"job", "status_code"},
165+
},
166+
{
167+
name: "aggregation without query",
168+
query: `sum without (instance, method) (http_requests_total)`,
169+
expectedLabels: []string{"job", "status_code", "path", "cluster"},
170+
},
171+
}
172+
for _, tc := range testCases {
173+
t.Run(tc.name, func(t *testing.T) {
174+
t.Logf("Testing: %s", tc.query)
175+
176+
// Execute instant query
177+
result, err := c.Query(tc.query, end)
178+
require.NoError(t, err)
179+
require.NotNil(t, result)
180+
181+
// Verify we got results
182+
vector, ok := result.(model.Vector)
183+
require.True(t, ok, "result should be a vector")
184+
require.NotEmpty(t, vector, "query should return results")
185+
186+
for _, sample := range vector {
187+
actualLabels := make(map[string]struct{})
188+
for label := range sample.Metric {
189+
actualLabels[string(label)] = struct{}{}
190+
}
191+
192+
// Check that all expected labels are present
193+
for _, expectedLabel := range tc.expectedLabels {
194+
_, ok := actualLabels[expectedLabel]
195+
require.True(t, ok,
196+
"series should have %s label", expectedLabel)
197+
}
198+
199+
// Check that no unexpected labels are present
200+
for lbl := range actualLabels {
201+
if !slices.Contains(tc.expectedLabels, lbl) {
202+
require.Fail(t, "series should not have unexpected label: %s", lbl)
203+
}
204+
}
205+
}
206+
})
207+
}
208+
}

pkg/storage/tsdb/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,8 @@ type BucketStoreConfig struct {
334334
// Token bucket configs
335335
TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"`
336336
// Parquet shard cache config
337-
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`
337+
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`
338+
HonorProjectionHints bool `yaml:"honor_projection_hints"`
338339
}
339340

340341
type TokenBucketBytesLimiterConfig struct {
@@ -397,6 +398,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
397398
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token")
398399
f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
399400
cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f)
401+
f.BoolVar(&cfg.HonorProjectionHints, "blocks-storage.bucket-store.honor-projection-hints", false, "[Experimental] If enabled, Store Gateway will honor projection hints and only materialize requested labels. It is only effect when `-blocks-storage.bucket-store.bucket-store-type` is parquet.")
400402
}
401403

402404
// Validate the config.

pkg/storegateway/parquet_bucket_store.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package storegateway
33
import (
44
"context"
55
"fmt"
6+
"slices"
67
"strings"
78

89
"github.com/go-kit/log"
@@ -38,8 +39,9 @@ type parquetBucketStore struct {
3839

3940
chunksDecoder *schema.PrometheusParquetChunksDecoder
4041

41-
matcherCache storecache.MatchersCache
42-
parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard]
42+
matcherCache storecache.MatchersCache
43+
parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard]
44+
honorProjectionHints bool
4345
}
4446

4547
func (p *parquetBucketStore) Close() error {
@@ -107,6 +109,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep
107109
return fmt.Errorf("failed to find parquet shards: %w", err)
108110
}
109111

112+
storageHints := p.buildSelectHints(req.QueryHints, shards, req.MinTime, req.MaxTime)
110113
seriesSet := make([]prom_storage.ChunkSeriesSet, len(shards))
111114
errGroup, ctx := errgroup.WithContext(srv.Context())
112115
errGroup.SetLimit(p.concurrency)
@@ -116,7 +119,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep
116119
Id: shard.name,
117120
})
118121
errGroup.Go(func() error {
119-
ss, err := shard.Query(ctx, req.MinTime, req.MaxTime, req.SkipChunks, matchers)
122+
ss, err := shard.Query(ctx, storageHints, req.SkipChunks, matchers)
120123
seriesSet[i] = ss
121124
return err
122125
})
@@ -279,3 +282,42 @@ func (p *parquetBucketStore) LabelValues(ctx context.Context, req *storepb.Label
279282
Hints: anyHints,
280283
}, nil
281284
}
285+
286+
func (p *parquetBucketStore) buildSelectHints(queryHints *storepb.QueryHints, shards []*parquetBlock, minT, maxT int64) *prom_storage.SelectHints {
287+
storageHints := &prom_storage.SelectHints{
288+
Start: minT,
289+
End: maxT,
290+
}
291+
292+
if p.honorProjectionHints && queryHints != nil {
293+
storageHints.ProjectionInclude = queryHints.ProjectionInclude
294+
storageHints.ProjectionLabels = queryHints.ProjectionLabels
295+
296+
if storageHints.ProjectionInclude {
297+
// Reset projection hints if not all parquet shard have the hash column.
298+
if !allParquetBlocksHaveHashColumn(shards) {
299+
storageHints.ProjectionInclude = false
300+
storageHints.ProjectionLabels = nil
301+
}
302+
} else {
303+
// Reset hints for non-include projections to force a full scan, matching querier behavior.
304+
storageHints.ProjectionLabels = nil
305+
}
306+
307+
if storageHints.ProjectionInclude && !slices.Contains(storageHints.ProjectionLabels, schema.SeriesHashColumn) {
308+
storageHints.ProjectionLabels = append(storageHints.ProjectionLabels, schema.SeriesHashColumn)
309+
}
310+
}
311+
312+
return storageHints
313+
}
314+
315+
func allParquetBlocksHaveHashColumn(blocks []*parquetBlock) bool {
316+
// TODO(Sungjin1212): Change it to read marker version
317+
for _, b := range blocks {
318+
if !b.hasHashColumn() {
319+
return false
320+
}
321+
}
322+
return true
323+
}

0 commit comments

Comments
 (0)