diff --git a/CHANGELOG.md b/CHANGELOG.md index e749c3372c8..aea65ad6849 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 +* [FEATURE] Tenant Federation: Add experimental support for partial responses using the `-tenant-federation.allow-partial-data` flag. When enabled, failures from individual tenants during a federated query are treated as warnings, allowing results from successful tenants to be returned. #7232 * [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203 * [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191 * [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 41de9d6e294..78c52471362 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -310,6 +310,11 @@ tenant_federation: # CLI flag: -tenant-federation.user-sync-interval [user_sync_interval: | default = 5m] + # [Experimental] If enabled, query errors from individual tenants are treated + # as warnings, allowing partial results to be returned. + # CLI flag: -tenant-federation.allow-partial-data + [allow_partial_data: | default = false] + # The ruler_config configures the Cortex ruler. [ruler: ] diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 2a35b853688..bfd74e6c359 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -68,6 +68,7 @@ Currently experimental features are: - `-tenant-federation.enabled` - `-tenant-federation.regex-matcher-enabled` - `-tenant-federation.user-sync-interval` + - `-tenant-federation.allow-partial-data` - The thanosconvert tool for converting Thanos block metadata to Cortex - HA Tracker: cleanup of old replicas from KV Store. - Instance limits in ingester and distributor diff --git a/integration/querier_tenant_federation_test.go b/integration/querier_tenant_federation_test.go index 9e5aff81081..4bd5d4265f8 100644 --- a/integration/querier_tenant_federation_test.go +++ b/integration/querier_tenant_federation_test.go @@ -396,6 +396,80 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon // TODO: check fairness in queryfrontend } +func TestQuerierTenantFederation_PartialData(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + memcached := e2ecache.NewMemcached() + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-querier.cache-results": "true", + "-querier.split-queries-by-interval": "24h", + "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-tenant-federation.enabled": "true", + // Allow query federation partial data + "-tenant-federation.allow-partial-data": "true", + "-querier.max-fetched-series-per-query": "5", // to trigger failure + }) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint() + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor)) + require.NoError(t, s.WaitReady(queryFrontend)) + + // Wait until distributor and queriers have updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + now := time.Now() + + userPassID := "user-pass" + cPass, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userPassID) + require.NoError(t, err) + + seriesPass, expectedVectorPass := generateSeries("series_good", now) + res, err := cPass.Push(seriesPass) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + userFailID := "user-fail" + cFail, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userFailID) + require.NoError(t, err) + + var seriesFail []prompb.TimeSeries + seriesNum := 10 // to trigger fail + for i := range seriesNum { + s, _ := generateSeries(fmt.Sprintf("series_bad_%d", i), now) + seriesFail = append(seriesFail, s...) + } + res, err = cFail.Push(seriesFail) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + multiTenantID := userPassID + "|" + userFailID + cFederated, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", multiTenantID) + require.NoError(t, err) + + result, err := cFederated.Query("series_good", now) + require.NoError(t, err) + expectedResult := mergeResults([]string{userPassID}, []model.Vector{expectedVectorPass}) + + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedResult, result.(model.Vector)) +} + func mergeResults(tenantIDs []string, resultsPerTenant []model.Vector) model.Vector { var v model.Vector for pos, tenantID := range tenantIDs { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3b56b6a8fb8..d921e4ba6fc 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -288,18 +288,19 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // federation. byPassForSingleQuerier := true - t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)) - t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer) - t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer) + reg := prometheus.DefaultRegisterer + t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation, byPassForSingleQuerier, reg)) + t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation, reg) + t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation, byPassForSingleQuerier, reg) if t.Cfg.TenantFederation.RegexMatcherEnabled { util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled") bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { - return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, prometheus.DefaultRegisterer) + return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, reg) } - regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, t.Cfg.TenantFederation, prometheus.DefaultRegisterer, bucketClientFactory, util_log.Logger) + regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, t.Cfg.TenantFederation, reg, bucketClientFactory, util_log.Logger) if err != nil { return nil, fmt.Errorf("failed to initialize regex resolver: %v", err) } diff --git a/pkg/querier/tenantfederation/exemplar_merge_queryable.go b/pkg/querier/tenantfederation/exemplar_merge_queryable.go index 286c67f363e..654d88e659e 100644 --- a/pkg/querier/tenantfederation/exemplar_merge_queryable.go +++ b/pkg/querier/tenantfederation/exemplar_merge_queryable.go @@ -29,8 +29,15 @@ import ( // If the label "__tenant_id__" is already existing, its value is overwritten // by the tenant ID and the previous value is exposed through a new label // prefixed with "original_". This behaviour is not implemented recursively. -func NewExemplarQueryable(upstream storage.ExemplarQueryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable { - return NewMergeExemplarQueryable(defaultTenantLabel, maxConcurrent, tenantExemplarQuerierCallback(upstream), byPassWithSingleQuerier, reg) +func NewExemplarQueryable(upstream storage.ExemplarQueryable, cfg Config, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable { + return NewMergeExemplarQueryable( + defaultTenantLabel, + cfg.MaxConcurrent, + tenantExemplarQuerierCallback(upstream), + byPassWithSingleQuerier, + cfg.AllowPartialData, + reg, + ) } func tenantExemplarQuerierCallback(exemplarQueryable storage.ExemplarQueryable) MergeExemplarQuerierCallback { @@ -68,10 +75,11 @@ type MergeExemplarQuerierCallback func(ctx context.Context) (ids []string, queri // If the label `idLabelName` is already existing, its value is overwritten and // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively. -func NewMergeExemplarQueryable(idLabelName string, maxConcurrent int, callback MergeExemplarQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable { +func NewMergeExemplarQueryable(idLabelName string, maxConcurrent int, callback MergeExemplarQuerierCallback, byPassWithSingleQuerier, allowPartialData bool, reg prometheus.Registerer) storage.ExemplarQueryable { return &mergeExemplarQueryable{ idLabelName: idLabelName, byPassWithSingleQuerier: byPassWithSingleQuerier, + allowPartialData: allowPartialData, callback: callback, maxConcurrent: maxConcurrent, @@ -88,6 +96,7 @@ type mergeExemplarQueryable struct { idLabelName string maxConcurrent int byPassWithSingleQuerier bool + allowPartialData bool callback MergeExemplarQuerierCallback tenantsPerExemplarQuery prometheus.Histogram } @@ -113,6 +122,7 @@ func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.E tenantIds: ids, queriers: queriers, byPassWithSingleQuerier: m.byPassWithSingleQuerier, + allowPartialData: m.allowPartialData, }, nil } @@ -129,6 +139,7 @@ type mergeExemplarQuerier struct { tenantIds []string queriers []storage.ExemplarQuerier byPassWithSingleQuerier bool + allowPartialData bool } type exemplarSelectJob struct { @@ -170,6 +181,9 @@ func (m mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Mat res, err := job.querier.Select(start, end, allUnrelatedMatchers...) if err != nil { + if m.allowPartialData { + return nil + } return errors.Wrapf(err, "error exemplars querying %s %s", rewriteLabelName(m.idLabelName), job.id) } diff --git a/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go b/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go index 994d51e51ea..0e261ea3f6c 100644 --- a/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go +++ b/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go @@ -126,13 +126,14 @@ func Test_MergeExemplarQuerier_Select(t *testing.T) { users.WithDefaultResolver(users.NewMultiResolver()) tests := []struct { - name string - upstream mockExemplarQueryable - matcher [][]*labels.Matcher - orgId string - expectedResult []exemplar.QueryResult - expectedErr error - expectedMetrics string + name string + upstream mockExemplarQueryable + matcher [][]*labels.Matcher + orgId string + allowPartialData bool + expectedResult []exemplar.QueryResult + expectedErr error + expectedMetrics string }{ { name: "should be treated as single tenant", @@ -295,12 +296,42 @@ func Test_MergeExemplarQuerier_Select(t *testing.T) { }, expectedErr: errors.New("some error"), }, + { + name: "get error from one querier (partial data enabled)", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{err: errors.New("some error")}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: "user-1|user-2", + allowPartialData: true, + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + }, + expectedMetrics: expectedTwoTenantsExemplarMetrics, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - exemplarQueryable := NewExemplarQueryable(&test.upstream, defaultMaxConcurrency, true, reg) + + cfg := Config{ + MaxConcurrent: defaultMaxConcurrency, + AllowPartialData: test.allowPartialData, + } + exemplarQueryable := NewExemplarQueryable(&test.upstream, cfg, true, reg) ctx := user.InjectOrgID(context.Background(), test.orgId) q, err := exemplarQueryable.ExemplarQuerier(ctx) require.NoError(t, err) @@ -346,13 +377,14 @@ func Test_MergeExemplarQuerier_Select_WhenUseRegexResolver(t *testing.T) { }) tests := []struct { - name string - upstream mockExemplarQueryable - matcher [][]*labels.Matcher - orgId string - expectedResult []exemplar.QueryResult - expectedErr error - expectedMetrics string + name string + upstream mockExemplarQueryable + matcher [][]*labels.Matcher + orgId string + allowPartialData bool + expectedResult []exemplar.QueryResult + expectedErr error + expectedMetrics string }{ { name: "result labels should contains __tenant_id__ even if one tenant is queried", @@ -412,12 +444,42 @@ func Test_MergeExemplarQuerier_Select_WhenUseRegexResolver(t *testing.T) { }, expectedMetrics: expectedTwoTenantsExemplarMetrics, }, + { + name: "get error from one querier (partial data enabled)", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{err: errors.New("some error")}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: "user-.+", + allowPartialData: true, + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + }, + expectedMetrics: expectedTwoTenantsExemplarMetrics, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - exemplarQueryable := NewExemplarQueryable(&test.upstream, defaultMaxConcurrency, false, reg) + + cfg := Config{ + MaxConcurrent: defaultMaxConcurrency, + AllowPartialData: test.allowPartialData, + } + exemplarQueryable := NewExemplarQueryable(&test.upstream, cfg, false, reg) ctx := user.InjectOrgID(context.Background(), test.orgId) q, err := exemplarQueryable.ExemplarQuerier(ctx) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 12359990c72..111627a7478 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -38,8 +38,15 @@ const ( // If the label "__tenant_id__" is already existing, its value is overwritten // by the tenant ID and the previous value is exposed through a new label // prefixed with "original_". This behaviour is not implemented recursively. -func NewQueryable(upstream storage.Queryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { - return NewMergeQueryable(defaultTenantLabel, maxConcurrent, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg) +func NewQueryable(upstream storage.Queryable, cfg Config, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { + return NewMergeQueryable( + defaultTenantLabel, + cfg.MaxConcurrent, + tenantQuerierCallback(upstream), + byPassWithSingleQuerier, + cfg.AllowPartialData, + reg, + ) } func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback { @@ -81,12 +88,13 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids // If the label `idLabelName` is already existing, its value is overwritten and // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively. -func NewMergeQueryable(idLabelName string, maxConcurrent int, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { +func NewMergeQueryable(idLabelName string, maxConcurrent int, callback MergeQuerierCallback, byPassWithSingleQuerier, allowPartialData bool, reg prometheus.Registerer) storage.Queryable { return &mergeQueryable{ idLabelName: idLabelName, maxConcurrent: maxConcurrent, callback: callback, byPassWithSingleQuerier: byPassWithSingleQuerier, + allowPartialData: allowPartialData, tenantsPerQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", @@ -101,6 +109,7 @@ type mergeQueryable struct { idLabelName string maxConcurrent int byPassWithSingleQuerier bool + allowPartialData bool callback MergeQuerierCallback tenantsPerQuery prometheus.Histogram } @@ -114,6 +123,7 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error mint: mint, maxt: maxt, byPassWithSingleQuerier: m.byPassWithSingleQuerier, + allowPartialData: m.allowPartialData, callback: m.callback, tenantsPerQuery: m.tenantsPerQuery, }, nil @@ -124,7 +134,7 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error // from. // If the label `idLabelName` is already existing, its value is overwritten and // the previous value is exposed through a new label prefixed with "original_". -// This behaviour is not implemented recursively +// This behavior is not implemented recursively type mergeQuerier struct { idLabelName string mint, maxt int64 @@ -132,6 +142,7 @@ type mergeQuerier struct { maxConcurrent int byPassWithSingleQuerier bool + allowPartialData bool tenantsPerQuery prometheus.Histogram } @@ -271,6 +282,11 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(ctx context.Context, newCtx := user.InjectOrgID(parentCtx, job.id) job.result, job.warnings, err = f(newCtx, job.querier) if err != nil { + if m.allowPartialData { + job.warnings.Add(fmt.Errorf("%w (partial data returned)", err)) + return nil + } + return errors.Wrapf(err, "error querying %s %s", rewriteLabelName(m.idLabelName), job.id) } @@ -363,8 +379,9 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora // Based on parent ctx here as we are using lazy querier. newCtx := user.InjectOrgID(parentCtx, job.id) seriesSets[job.pos] = &addLabelsSeriesSet{ - upstream: job.querier.Select(newCtx, sortSeries, hints, filteredMatchers...), - labels: labels.FromStrings(m.idLabelName, job.id), + upstream: job.querier.Select(newCtx, sortSeries, hints, filteredMatchers...), + labels: labels.FromStrings(m.idLabelName, job.id), + allowPartialData: m.allowPartialData, } return nil } @@ -421,9 +438,10 @@ func filterValuesByMatchers(idLabelName string, ids []string, matchers ...*label } type addLabelsSeriesSet struct { - upstream storage.SeriesSet - labels labels.Labels - currSeries storage.Series + upstream storage.SeriesSet + labels labels.Labels + currSeries storage.Series + allowPartialData bool } func (m *addLabelsSeriesSet) Next() bool { @@ -446,6 +464,9 @@ func (m *addLabelsSeriesSet) At() storage.Series { // The error that iteration as failed with. // When an error occurs, set cannot continue to iterate. func (m *addLabelsSeriesSet) Err() error { + if m.allowPartialData { + return nil + } return errors.Wrapf(m.upstream.Err(), "error querying %s", labelsToString(m.labels)) } @@ -457,6 +478,12 @@ func (m *addLabelsSeriesSet) Warnings() annotations.Annotations { for pos := range upstream { warnings[pos] = errors.Wrapf(upstream[pos], "warning querying %s", labelsToString(m.labels)) } + + if m.allowPartialData { + if err := m.upstream.Err(); err != nil { + warnings.Add(errors.Wrapf(err, "failed to query tenant %s (partial data returned)", labelsToString(m.labels))) + } + } return warnings } diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index e06d86909c6..f1772150048 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -173,6 +173,14 @@ func (m mockTenantQuerier) Select(ctx context.Context, _ bool, sp *storage.Selec } } + if m.queryErr != nil { + return &mockSeriesSet{ + upstream: storage.ErrSeriesSet(m.queryErr), + warnings: m.warnings, + queryErr: m.queryErr, + } + } + log, _ := spanlogger.New(ctx, "mockTenantQuerier.select") defer log.Finish() var matrix model.Matrix @@ -212,6 +220,10 @@ func (m mockTenantQuerier) LabelValues(ctx context.Context, name string, hints * } } + if m.queryErr != nil { + return nil, nil, m.queryErr + } + if len(matchers) > 0 { m.warnings.Add(errors.New(mockMatchersNotImplemented)) } @@ -259,6 +271,10 @@ func (m mockTenantQuerier) LabelNames(ctx context.Context, hints *storage.LabelH } } + if m.queryErr != nil { + return nil, nil, m.queryErr + } + var results []string if len(matchers) == 1 && matchers[0].Name == seriesWithLabelNames { @@ -302,12 +318,18 @@ type mergeQueryableScenario struct { queryable mockTenantQueryableWithFilter // doNotByPassSingleQuerier determines whether the MergeQueryable is by-passed in favor of a single querier. doNotByPassSingleQuerier bool + // allowPartialData determines whether partial data is allowed. + allowPartialData bool } func (s *mergeQueryableScenario) init() (storage.Querier, prometheus.Gatherer, error) { // initialize with default tenant label reg := prometheus.NewPedanticRegistry() - q := NewQueryable(&s.queryable, defaultMaxConcurrency, !s.doNotByPassSingleQuerier, reg) + cfg := Config{ + MaxConcurrent: defaultMaxConcurrency, + AllowPartialData: s.allowPartialData, + } + q := NewQueryable(&s.queryable, cfg, !s.doNotByPassSingleQuerier, reg) // retrieve querier querier, err := q.Querier(mint, maxt) @@ -389,7 +411,10 @@ func TestMergeQueryable_Querier(t *testing.T) { t.Run("querying without a tenant specified should error", func(t *testing.T) { t.Parallel() queryable := &mockTenantQueryableWithFilter{} - q := NewQueryable(queryable, defaultMaxConcurrency, false /* byPassWithSingleQuerier */, nil) + cfg := Config{ + MaxConcurrent: defaultMaxConcurrency, + } + q := NewQueryable(queryable, cfg, false /* byPassWithSingleQuerier */, nil) querier, err := q.Querier(mint, maxt) require.NoError(t, err) @@ -445,6 +470,17 @@ var ( }, } + threeTenantsWithErrorAndPartialDataScenario = mergeQueryableScenario{ + name: "three tenants, one erroring with partial data allowed", + tenants: []string{"team-a", "team-b", "team-c"}, + allowPartialData: true, + queryable: mockTenantQueryableWithFilter{ + queryErrByTenant: map[string]error{ + "team-b": errors.New("failure xyz"), + }, + }, + } + expectedSingleTenantsMetrics = ` # HELP cortex_querier_federated_tenants_per_query Number of tenants per query. # TYPE cortex_querier_federated_tenants_per_query histogram @@ -652,6 +688,24 @@ func TestMergeQueryable_Select(t *testing.T) { expectedMetrics: expectedThreeTenantsMetrics, }}, }, + { + mergeQueryableScenario: threeTenantsWithErrorAndPartialDataScenario, + selectTestCases: []selectTestCase{{ + name: "should return partial results and warnings instead of error", + expectedSeriesCount: 4, + expectedLabels: []labels.Labels{ + labels.FromStrings("__tenant_id__", "team-a", "instance", "host1", "tenant-team-a", "static"), + labels.FromStrings("__tenant_id__", "team-a", "instance", "host2.team-a"), + labels.FromStrings("__tenant_id__", "team-c", "instance", "host1", "tenant-team-c", "static"), + labels.FromStrings("__tenant_id__", "team-c", "instance", "host2.team-c"), + }, + expectedWarnings: []string{ + "failed to query tenant tenant_id team-b (partial data returned): failure xyz", + }, + expectedQueryErr: nil, + expectedMetrics: expectedThreeTenantsMetrics, + }}, + }, } { t.Run(scenario.name, func(t *testing.T) { for _, useRegexResolver := range []bool{true, false} { @@ -853,6 +907,18 @@ func TestMergeQueryable_LabelNames(t *testing.T) { expectedMetrics: expectedThreeTenantsMetrics, }, }, + { + mergeQueryableScenario: threeTenantsWithErrorAndPartialDataScenario, + labelNamesTestCase: labelNamesTestCase{ + name: "should return partial label names and warnings instead of error", + expectedLabelNames: []string{defaultTenantLabel, "instance", "tenant-team-a", "tenant-team-c"}, + expectedWarnings: []string{ + "warning querying tenant_id team-b: failure xyz (partial data returned)", + }, + expectedQueryErr: nil, + expectedMetrics: expectedThreeTenantsMetrics, + }, + }, } { for _, useRegexResolver := range []bool{true, false} { t.Run(fmt.Sprintf("%s, useRegexResolver: %v", scenario.mergeQueryableScenario.name, useRegexResolver), func(t *testing.T) { @@ -1088,6 +1154,19 @@ func TestMergeQueryable_LabelValues(t *testing.T) { expectedMetrics: expectedThreeTenantsMetrics, }}, }, + { + mergeQueryableScenario: threeTenantsWithErrorAndPartialDataScenario, + labelValuesTestCases: []labelValuesTestCase{{ + name: "should return partial label values and warnings instead of error", + labelName: "instance", + expectedLabelValues: []string{"host1", "host2.team-a", "host2.team-c"}, + expectedWarnings: []string{ + "warning querying tenant_id team-b: failure xyz (partial data returned)", + }, + expectedQueryErr: nil, + expectedMetrics: expectedThreeTenantsMetrics, + }}, + }, } { t.Run(scenario.name, func(t *testing.T) { for _, useRegexResolver := range []bool{true, false} { @@ -1211,7 +1290,10 @@ func TestTracingMergeQueryable(t *testing.T) { // set a multi tenant resolver users.WithDefaultResolver(users.NewMultiResolver()) filter := mockTenantQueryableWithFilter{} - q := NewQueryable(&filter, defaultMaxConcurrency, false, nil) + cfg := Config{ + MaxConcurrent: defaultMaxConcurrency, + } + q := NewQueryable(&filter, cfg, false, nil) // retrieve querier if set querier, err := q.Querier(mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/metadata_merge_querier.go b/pkg/querier/tenantfederation/metadata_merge_querier.go index 7c61d587680..c68eada705c 100644 --- a/pkg/querier/tenantfederation/metadata_merge_querier.go +++ b/pkg/querier/tenantfederation/metadata_merge_querier.go @@ -19,10 +19,11 @@ import ( // NewMetadataQuerier returns a MetadataQuerier that merges metric // metadata for multiple tenants. -func NewMetadataQuerier(upstream querier.MetadataQuerier, maxConcurrent int, reg prometheus.Registerer) querier.MetadataQuerier { +func NewMetadataQuerier(upstream querier.MetadataQuerier, cfg Config, reg prometheus.Registerer) querier.MetadataQuerier { return &mergeMetadataQuerier{ - upstream: upstream, - maxConcurrent: maxConcurrent, + upstream: upstream, + maxConcurrent: cfg.MaxConcurrent, + allowPartialData: cfg.AllowPartialData, tenantsPerMetadataQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", @@ -35,6 +36,7 @@ func NewMetadataQuerier(upstream querier.MetadataQuerier, maxConcurrent int, reg type mergeMetadataQuerier struct { maxConcurrent int + allowPartialData bool tenantsPerMetadataQuery prometheus.Histogram upstream querier.MetadataQuerier } @@ -82,7 +84,10 @@ func (m *mergeMetadataQuerier) MetricsMetadata(ctx context.Context, req *client. res, err := job.querier.MetricsMetadata(user.InjectOrgID(ctx, job.id), req) if err != nil { - return errors.Wrapf(err, "error exemplars querying %s %s", job.id, err) + if m.allowPartialData { + return nil + } + return errors.Wrapf(err, "error metadata querying %s %s", job.id, err) } results[job.pos] = res diff --git a/pkg/querier/tenantfederation/metadata_merge_querier_test.go b/pkg/querier/tenantfederation/metadata_merge_querier_test.go index fb4122f782a..6bf9c85c298 100644 --- a/pkg/querier/tenantfederation/metadata_merge_querier_test.go +++ b/pkg/querier/tenantfederation/metadata_merge_querier_test.go @@ -56,6 +56,7 @@ cortex_querier_federated_tenants_per_metadata_query_count 1 type mockMetadataQuerier struct { tenantIdToMetadata map[string][]scrape.MetricMetadata + errorByTenant map[string]error } func (m *mockMetadataQuerier) MetricsMetadata(ctx context.Context, _ *client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) { @@ -66,6 +67,10 @@ func (m *mockMetadataQuerier) MetricsMetadata(ctx context.Context, _ *client.Met } id := ids[0] + if err, ok := m.errorByTenant[id]; ok { + return nil, err + } + if res, ok := m.tenantIdToMetadata[id]; !ok { return nil, fmt.Errorf("tenant not found, tenantId: %s", id) } else { @@ -80,9 +85,12 @@ func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) { tests := []struct { name string tenantIdToMetadata map[string][]scrape.MetricMetadata + errorByTenant map[string]error orgId string + allowPartialData bool expectedResults []scrape.MetricMetadata expectedMetrics string + expectedErr string }{ { name: "single tenant", @@ -134,6 +142,35 @@ func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) { }, expectedMetrics: expectedTwoTenantsMetadataMetrics, }, + { + name: "two tenants, one fails (partial data disabled)", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": {{MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge"}}, + }, + errorByTenant: map[string]error{ + "user-2": fmt.Errorf("metadata fetch failed"), + }, + orgId: "user-1|user-2", + allowPartialData: false, + expectedErr: "error metadata querying user-2 metadata fetch failed", + expectedMetrics: expectedTwoTenantsMetadataMetrics, + }, + { + name: "two tenants, one fails (partial data enabled)", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": {{MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge"}}, + }, + errorByTenant: map[string]error{ + "user-2": fmt.Errorf("metadata fetch failed"), + }, + orgId: "user-1|user-2", + allowPartialData: true, + expectedResults: []scrape.MetricMetadata{ + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge"}, + }, + expectedMetrics: expectedTwoTenantsMetadataMetrics, + expectedErr: "", // no error + }, } for _, test := range tests { @@ -141,13 +178,24 @@ func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) { reg := prometheus.NewPedanticRegistry() upstream := mockMetadataQuerier{ tenantIdToMetadata: test.tenantIdToMetadata, + errorByTenant: test.errorByTenant, + } + + cfg := Config{ + MaxConcurrent: defaultMaxConcurrency, + AllowPartialData: test.allowPartialData, } - mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg) + mergeMetadataQuerier := NewMetadataQuerier(&upstream, cfg, reg) metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId), &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""}) - require.NoError(t, err) + if test.expectedErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, test.expectedResults, metadata) + } require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query")) - require.Equal(t, test.expectedResults, metadata) }) } } @@ -182,9 +230,12 @@ func Test_mergeMetadataQuerier_MetricsMetadata_WhenUseRegexResolver(t *testing.T tests := []struct { name string tenantIdToMetadata map[string][]scrape.MetricMetadata + errorByTenant map[string]error orgId string + allowPartialData bool expectedResults []scrape.MetricMetadata expectedMetrics string + expectedErr string }{ { name: "single tenant", @@ -218,6 +269,35 @@ func Test_mergeMetadataQuerier_MetricsMetadata_WhenUseRegexResolver(t *testing.T }, expectedMetrics: expectedTwoTenantsMetadataMetrics, }, + { + name: "regex match two tenants, one fails (partial data disabled)", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": {{MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge"}}, + }, + errorByTenant: map[string]error{ + "user-2": fmt.Errorf("metadata fetch failed"), + }, + orgId: "user-.+", + allowPartialData: false, + expectedErr: "error metadata querying user-2 metadata fetch failed", + expectedMetrics: expectedTwoTenantsMetadataMetrics, + }, + { + name: "regex match two tenants, one fails (partial data enabled)", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": {{MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge"}}, + }, + errorByTenant: map[string]error{ + "user-2": fmt.Errorf("metadata fetch failed"), + }, + orgId: "user-.+", + allowPartialData: true, + expectedResults: []scrape.MetricMetadata{ + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge"}, + }, + expectedMetrics: expectedTwoTenantsMetadataMetrics, + expectedErr: "", + }, } for _, test := range tests { @@ -225,13 +305,24 @@ func Test_mergeMetadataQuerier_MetricsMetadata_WhenUseRegexResolver(t *testing.T reg = prometheus.NewPedanticRegistry() upstream := mockMetadataQuerier{ tenantIdToMetadata: test.tenantIdToMetadata, + errorByTenant: test.errorByTenant, + } + + cfg := Config{ + MaxConcurrent: defaultMaxConcurrency, + AllowPartialData: test.allowPartialData, } - mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg) + mergeMetadataQuerier := NewMetadataQuerier(&upstream, cfg, reg) metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId), &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""}) - require.NoError(t, err) + if test.expectedErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, test.expectedResults, metadata) + } require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query")) - require.Equal(t, test.expectedResults, metadata) }) } } diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index 9023f3b4b1e..d80fa3e8c5a 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -16,6 +16,8 @@ type Config struct { RegexMatcherEnabled bool `yaml:"regex_matcher_enabled"` // UserSyncInterval How frequently to scan users, scanned users are used to calculate matched tenantIDs if the regex matcher is enabled. UserSyncInterval time.Duration `yaml:"user_sync_interval"` + // AllowPartialData If true, enables returning partial results. + AllowPartialData bool `yaml:"allow_partial_data"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -24,4 +26,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.") f.BoolVar(&cfg.RegexMatcherEnabled, "tenant-federation.regex-matcher-enabled", false, "[Experimental] If enabled, the `X-Scope-OrgID` header value can accept a regex and the matched tenantIDs are automatically involved. The regex matching rule follows the Prometheus, see the detail: https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions. The user discovery is based on scanning block storage, so new users can get queries after uploading a block (generally 2h).") f.DurationVar(&cfg.UserSyncInterval, "tenant-federation.user-sync-interval", time.Minute*5, "[Experimental] If the regex matcher is enabled, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs. The scanning strategy depends on the `-blocks-storage.users-scanner.strategy`.") + f.BoolVar(&cfg.AllowPartialData, "tenant-federation.allow-partial-data", false, "[Experimental] If enabled, query errors from individual tenants are treated as warnings, allowing partial results to be returned.") } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8e1e48a8f87..e10c3fd0152 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -9048,6 +9048,12 @@ }, "tenant_federation": { "properties": { + "allow_partial_data": { + "default": false, + "description": "[Experimental] If enabled, query errors from individual tenants are treated as warnings, allowing partial results to be returned.", + "type": "boolean", + "x-cli-flag": "tenant-federation.allow-partial-data" + }, "enabled": { "default": false, "description": "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).",