diff --git a/CHANGELOG.md b/CHANGELOG.md index efc085818a..b2f2ea4b79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,8 @@ # Changelog ## master / unreleased - * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 +* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 * [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355 ## 1.21.0 in progress diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b2bd78549c..7c1cd7265d 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -310,6 +310,12 @@ tenant_federation: # CLI flag: -tenant-federation.user-sync-interval [user_sync_interval: | default = 5m] + # [Experimental] Cache size of regex match results used by regex resolver. + # Each cache entry includes both regex patterns (e.g. `user-.+`) and resolved + # tenant IDs (e.g. `user-1`). Set to 0 or less to disable caching. + # CLI flag: -tenant-federation.regex-cache-size + [regex_cache_size: | default = 1000] + # [Experimental] If enabled, query errors from individual tenants are treated # as warnings, allowing partial results to be returned. # CLI flag: -tenant-federation.allow-partial-data diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 0508abb6e9..4bb632023e 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -55,6 +55,7 @@ Currently experimental features are: - Blocks storage user index - Querier: tenant federation - `-tenant-federation.regex-matcher-enabled` + - `-tenant-federation.regex-cache-size` - `-tenant-federation.user-sync-interval` - `-tenant-federation.allow-partial-data` - The thanosconvert tool for converting Thanos block metadata to Cortex diff --git a/integration/querier_tenant_federation_test.go b/integration/querier_tenant_federation_test.go index 4bd5d4265f..0ee5024be8 100644 --- a/integration/querier_tenant_federation_test.go +++ b/integration/querier_tenant_federation_test.go @@ -145,7 +145,7 @@ func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTe "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), "-tenant-federation.enabled": "true", "-tenant-federation.regex-matcher-enabled": "true", - "-tenant-federation.user-sync-interval": "1s", + "-tenant-federation.user-sync-interval": "5s", // to upload block quickly "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), @@ -255,6 +255,19 @@ func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTe assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector)) + // Verify regex resolver cache entries. + querierSum, err := querier.SumMetrics([]string{"cortex_regex_resolver_matched_cache_size"}, e2e.SkipMissingMetrics) + require.NoError(t, err) + + totalCacheSize := querierSum[0] + if cfg.shuffleShardingEnabled { + querier2Sum, err := querier2.SumMetrics([]string{"cortex_regex_resolver_matched_cache_size"}, e2e.SkipMissingMetrics) + require.NoError(t, err) + totalCacheSize += querier2Sum[0] + } + + require.Equal(t, float64(numUsers+1), totalCacheSize) + // ensure a push to multiple tenants is failing series, _ := generateSeries("series_1", now) res, err := c.Push(series) diff --git a/pkg/querier/tenantfederation/regex_resolver.go b/pkg/querier/tenantfederation/regex_resolver.go index cab49303f5..7016c2ae0e 100644 --- a/pkg/querier/tenantfederation/regex_resolver.go +++ b/pkg/querier/tenantfederation/regex_resolver.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + lru "github.com/hashicorp/golang-lru/v2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -27,6 +28,8 @@ var ( errInvalidRegex = errors.New("invalid regex present") ErrTooManyTenants = "too many tenants, max: %d, actual: %d" + + defaultRegexCacheSize = 1000 ) // RegexResolver resolves tenantIDs matched given regex. @@ -38,12 +41,20 @@ type RegexResolver struct { maxTenant int userScanner users.Scanner logger log.Logger - sync.Mutex + sync.RWMutex + + // matchedCache stores the results of regex matching + matchedCache *lru.Cache[string, []string] // lastUpdateUserRun stores the timestamps of the latest update user loop run lastUpdateUserRun prometheus.Gauge // discoveredUsers stores the number of discovered user discoveredUsers prometheus.Gauge + // matchedCacheSize stores the size of the matchedCache + matchedCacheSize prometheus.Gauge + + matchedCacheHits prometheus.Counter + matchedCacheMisses prometheus.Counter } func NewRegexResolver(cfg users.UsersScannerConfig, tenantFederationCfg Config, reg prometheus.Registerer, bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error), logger log.Logger) (*RegexResolver, error) { @@ -64,6 +75,27 @@ func NewRegexResolver(cfg users.UsersScannerConfig, tenantFederationCfg Config, logger: logger, } + if tenantFederationCfg.RegexCacheSize > 0 { + matchedCache, err := lru.New[string, []string](tenantFederationCfg.RegexCacheSize) + if err != nil { + return nil, errors.Wrap(err, "failed to create regex cache") + } + r.matchedCache = matchedCache + + r.matchedCacheSize = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_regex_resolver_matched_cache_size", + Help: "Number of entries stored in the matched cache.", + }) + r.matchedCacheHits = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_regex_resolver_matched_cache_hits_total", + Help: "Total number of successful cache lookups for the regex matched.", + }) + r.matchedCacheMisses = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_regex_resolver_matched_cache_misses_total", + Help: "Total number of cache misses for the regex matched.", + }) + } + r.lastUpdateUserRun = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Name: "cortex_regex_resolver_last_update_run_timestamp_seconds", Help: "Unix timestamp of the last successful regex resolver update user run.", @@ -88,8 +120,7 @@ func (r *RegexResolver) running(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - // active and deleting users are considered - // The store-gateway can query for deleting users. + // Active and deleting users are considered active, deleting, _, err := r.userScanner.ScanUsers(ctx) if err != nil { level.Error(r.logger).Log("msg", "failed to discover users from bucket", "err", err) @@ -99,6 +130,12 @@ func (r *RegexResolver) running(ctx context.Context) error { r.knownUsers = append(active, deleting...) // We keep it sort sort.Strings(r.knownUsers) + + // Reset the cache because the set of available users has changed. + if r.matchedCache != nil { + r.matchedCache.Purge() + r.matchedCacheSize.Set(0) + } r.Unlock() r.lastUpdateUserRun.SetToCurrentTime() r.discoveredUsers.Set(float64(len(active) + len(deleting))) @@ -126,16 +163,17 @@ func (r *RegexResolver) TenantIDs(ctx context.Context) ([]string, error) { return nil, err } - orgIDs, err := r.getRegexMatchedOrgIds(orgID) - if err != nil { - return nil, err - } - - return users.ValidateOrgIDs(orgIDs) + return r.getRegexMatchedOrgIds(orgID) } func (r *RegexResolver) getRegexMatchedOrgIds(orgID string) ([]string, error) { - var matched []string + if r.matchedCache != nil { + if cachedMatched, ok := r.matchedCache.Get(orgID); ok { + r.matchedCacheHits.Inc() + return r.validateAndReturnMatched(orgID, cachedMatched) + } + r.matchedCacheMisses.Inc() + } // Use the Prometheus FastRegexMatcher m, err := labels.NewFastRegexMatcher(orgID) @@ -143,14 +181,30 @@ func (r *RegexResolver) getRegexMatchedOrgIds(orgID string) ([]string, error) { return nil, errInvalidRegex } - r.Lock() - defer r.Unlock() + var matched []string + + r.RLock() for _, id := range r.knownUsers { if m.MatchString(id) { matched = append(matched, id) } } + r.RUnlock() + + validatedMatched, err := users.ValidateOrgIDs(matched) + if err != nil { + return nil, err + } + + if r.matchedCache != nil { + r.matchedCache.Add(orgID, validatedMatched) + r.matchedCacheSize.Set(float64(r.matchedCache.Len())) + } + + return r.validateAndReturnMatched(orgID, validatedMatched) +} +func (r *RegexResolver) validateAndReturnMatched(orgID string, matched []string) ([]string, error) { if len(matched) == 0 { if err := users.ValidTenantID(orgID); err == nil { // when querying for a newly created orgID, the query may not @@ -165,6 +219,7 @@ func (r *RegexResolver) getRegexMatchedOrgIds(orgID string) ([]string, error) { return []string{"fake"}, nil } + // Enforce the maximum number of tenants allowed in a federated query. if r.maxTenant > 0 && len(matched) > r.maxTenant { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", fmt.Errorf(ErrTooManyTenants, r.maxTenant, len(matched)).Error()) } diff --git a/pkg/querier/tenantfederation/regex_resolver_test.go b/pkg/querier/tenantfederation/regex_resolver_test.go index 8e9d274979..84970f47ea 100644 --- a/pkg/querier/tenantfederation/regex_resolver_test.go +++ b/pkg/querier/tenantfederation/regex_resolver_test.go @@ -3,6 +3,7 @@ package tenantfederation import ( "context" "errors" + "fmt" "strings" "testing" "time" @@ -167,3 +168,274 @@ func Test_RegexValidator(t *testing.T) { }) } } + +func Test_RegexResolver_Cache(t *testing.T) { + tests := []struct { + description string + existingTenants []string + firstOrgID string + secondOrgID string + expectedFirst []string + expectedSecond []string + }{ + { + description: "cache hit with same regex", + existingTenants: []string{"user-1", "user-2", "user-3"}, + firstOrgID: "user-.+", + secondOrgID: "user-.+", + expectedFirst: []string{"user-1", "user-2", "user-3"}, + expectedSecond: []string{"user-1", "user-2", "user-3"}, + }, + { + description: "cache miss with different regex", + existingTenants: []string{"user-1", "user-2", "admin-1"}, + firstOrgID: "user-.+", + secondOrgID: "admin-.+", + expectedFirst: []string{"user-1", "user-2"}, + expectedSecond: []string{"admin-1"}, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + reg := prometheus.NewRegistry() + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", tc.existingTenants, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + for _, existingTenant := range tc.existingTenants { + bucketClient.MockExists(users.GetGlobalDeletionMarkPath(existingTenant), false, nil) + bucketClient.MockExists(users.GetLocalDeletionMarkPath(existingTenant), false, nil) + } + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + + usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList} + tenantFederationConfig := Config{UserSyncInterval: time.Second, MaxTenant: 0, RegexCacheSize: 10} + regexResolver, err := NewRegexResolver(usersScannerConfig, tenantFederationConfig, reg, bucketClientFactory, log.NewNopLogger()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), regexResolver)) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() any { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 && testutil.ToFloat64(regexResolver.discoveredUsers) == float64(len(tc.existingTenants)) + }) + + checkMetrics := func(expectedHits, expectedMisses float64) { + t.Helper() + require.Equal(t, expectedHits, testutil.ToFloat64(regexResolver.matchedCacheHits)) + require.Equal(t, expectedMisses, testutil.ToFloat64(regexResolver.matchedCacheMisses)) + } + + // First query - cache miss + ctx1 := user.InjectOrgID(context.Background(), tc.firstOrgID) + orgIDs1, err := regexResolver.TenantIDs(ctx1) + require.NoError(t, err) + require.Equal(t, tc.expectedFirst, orgIDs1) + checkMetrics(0, 1) + + // Verify cache was populated + regexResolver.RLock() + cached, exists := regexResolver.matchedCache.Get(tc.firstOrgID) + regexResolver.RUnlock() + require.True(t, exists, "cache should contain the first query result") + require.Equal(t, tc.expectedFirst, cached) + + // Second query - should use cache + ctx2 := user.InjectOrgID(context.Background(), tc.secondOrgID) + orgIDs2, err := regexResolver.TenantIDs(ctx2) + require.NoError(t, err) + require.Equal(t, tc.expectedSecond, orgIDs2) + + // Verify cache behavior + if tc.firstOrgID == tc.secondOrgID { + // Same query should hit cache + regexResolver.RLock() + cached2, exists2 := regexResolver.matchedCache.Get(tc.secondOrgID) + regexResolver.RUnlock() + require.True(t, exists2) + require.Equal(t, tc.expectedSecond, cached2) + checkMetrics(1, 1) + } else { + // Different query should miss cache + checkMetrics(0, 2) + } + }) + } +} + +func Test_RegexResolver_CacheInvalidation(t *testing.T) { + reg := prometheus.NewRegistry() + initialTenants := []string{"user-1", "user-2"} + bucketClient := &bucket.ClientMock{} + + // Initial mock setup + bucketClient.MockIter("", initialTenants, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + for _, tenant := range initialTenants { + bucketClient.MockExists(users.GetGlobalDeletionMarkPath(tenant), false, nil) + bucketClient.MockExists(users.GetLocalDeletionMarkPath(tenant), false, nil) + } + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + + usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList} + tenantFederationConfig := Config{UserSyncInterval: time.Hour, MaxTenant: 0, RegexCacheSize: 10} + regexResolver, err := NewRegexResolver(usersScannerConfig, tenantFederationConfig, reg, bucketClientFactory, log.NewNopLogger()) + require.NoError(t, err) + + // Avoid background ticker races by setting known users directly for this unit test. + regexResolver.Lock() + regexResolver.knownUsers = append([]string(nil), initialTenants...) + regexResolver.discoveredUsers.Set(float64(len(initialTenants))) + regexResolver.Unlock() + + // First query - populate cache + ctx := user.InjectOrgID(context.Background(), "user-.+") + orgIDs, err := regexResolver.TenantIDs(ctx) + require.NoError(t, err) + require.Equal(t, []string{"user-1", "user-2"}, orgIDs) + + // Verify cache is populated + regexResolver.RLock() + require.Equal(t, 1, regexResolver.matchedCache.Len()) + require.Equal(t, float64(1), testutil.ToFloat64(regexResolver.matchedCacheSize)) + regexResolver.RUnlock() + + // Simulate user-sync invalidation by clearing cache under resolver lock. + regexResolver.Lock() + regexResolver.matchedCache.Purge() + regexResolver.matchedCacheSize.Set(0) + regexResolver.Unlock() + + regexResolver.RLock() + require.Equal(t, 0, regexResolver.matchedCache.Len()) + require.Equal(t, float64(0), testutil.ToFloat64(regexResolver.matchedCacheSize)) + regexResolver.RUnlock() + + // Query again - cache should be repopulated + orgIDs, err = regexResolver.TenantIDs(ctx) + require.NoError(t, err) + require.Equal(t, []string{"user-1", "user-2"}, orgIDs) + require.Equal(t, 1, regexResolver.matchedCache.Len()) + require.Equal(t, float64(1), testutil.ToFloat64(regexResolver.matchedCacheSize)) + +} + +func Test_RegexResolver_UsesConfiguredCacheSize(t *testing.T) { + reg := prometheus.NewRegistry() + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + + usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList} + tenantFederationConfig := Config{ + UserSyncInterval: time.Hour, + MaxTenant: 0, + RegexCacheSize: 2, + } + + regexResolver, err := NewRegexResolver(usersScannerConfig, tenantFederationConfig, reg, bucketClientFactory, log.NewNopLogger()) + require.NoError(t, err) + + regexResolver.Lock() + regexResolver.knownUsers = []string{"user-1", "user-2", "user-3"} + regexResolver.Unlock() + + for _, orgID := range []string{"user-1", "user-2", "user-3"} { + ctx := user.InjectOrgID(context.Background(), orgID) + _, err := regexResolver.TenantIDs(ctx) + require.NoError(t, err) + } + + regexResolver.RLock() + require.Equal(t, 2, regexResolver.matchedCache.Len()) + require.Equal(t, float64(2), testutil.ToFloat64(regexResolver.matchedCacheSize)) + regexResolver.RUnlock() +} + +func BenchmarkRegexResolver_TenantIDs(b *testing.B) { + numUsers := 1000 + existingTenants := make([]string, numUsers) + for i := range numUsers { + existingTenants[i] = fmt.Sprintf("user-%d", i) + } + + reg := prometheus.NewRegistry() + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", existingTenants, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + for _, tenant := range existingTenants { + bucketClient.MockExists(users.GetGlobalDeletionMarkPath(tenant), false, nil) + bucketClient.MockExists(users.GetLocalDeletionMarkPath(tenant), false, nil) + } + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + + usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList} + tenantFederationConfig := Config{UserSyncInterval: time.Second, MaxTenant: 0, RegexCacheSize: 2000} + regexResolver, err := NewRegexResolver(usersScannerConfig, tenantFederationConfig, reg, bucketClientFactory, log.NewNopLogger()) + require.NoError(b, err) + err = services.StartAndAwaitRunning(context.Background(), regexResolver) + require.NoError(b, err) + + // Wait for initial sync + test.Poll(b, time.Second*10, true, func() any { + return testutil.ToFloat64(regexResolver.discoveredUsers) == float64(numUsers) + }) + + b.Run("ColdCache with regex tenant", func(b *testing.B) { + for b.Loop() { + // Cache clean + regexResolver.Lock() + regexResolver.matchedCache.Purge() + regexResolver.matchedCacheSize.Set(0) + regexResolver.Unlock() + + ctx := user.InjectOrgID(context.Background(), "user-.+") + _, _ = regexResolver.TenantIDs(ctx) + } + }) + + b.Run("WarmCache with regex tenant", func(b *testing.B) { + ctx := user.InjectOrgID(context.Background(), "user-.+") + _, _ = regexResolver.TenantIDs(ctx) // To warm up cache + + b.ResetTimer() + for b.Loop() { + _, _ = regexResolver.TenantIDs(ctx) + } + }) + + b.Run("ColdCache with resolved tenant", func(b *testing.B) { + for b.Loop() { + // Cache clean + regexResolver.Lock() + regexResolver.matchedCache.Purge() + regexResolver.matchedCacheSize.Set(0) + regexResolver.Unlock() + + ctx := user.InjectOrgID(context.Background(), "user-1") + _, _ = regexResolver.TenantIDs(ctx) + } + }) + + b.Run("WarmCache with resolved tenant", func(b *testing.B) { + ctx := user.InjectOrgID(context.Background(), "user-1") + _, _ = regexResolver.TenantIDs(ctx) // To warm up cache + + b.ResetTimer() + for b.Loop() { + _, _ = regexResolver.TenantIDs(ctx) + } + }) +} diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index 6aca128ab5..38fe0d60f9 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -16,6 +16,8 @@ type Config struct { RegexMatcherEnabled bool `yaml:"regex_matcher_enabled"` // UserSyncInterval How frequently to scan users, scanned users are used to calculate matched tenantIDs if the regex matcher is enabled. UserSyncInterval time.Duration `yaml:"user_sync_interval"` + // RegexCacheSize Maximum number of regex match results cached locally. + RegexCacheSize int `yaml:"regex_cache_size"` // AllowPartialData If true, enables returning partial results. AllowPartialData bool `yaml:"allow_partial_data"` } @@ -26,5 +28,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.") f.BoolVar(&cfg.RegexMatcherEnabled, "tenant-federation.regex-matcher-enabled", false, "[Experimental] If enabled, the `X-Scope-OrgID` header value can accept a regex and the matched tenantIDs are automatically involved. The regex matching rule follows the Prometheus, see the detail: https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions. The user discovery is based on scanning block storage, so new users can get queries after uploading a block (generally 2h).") f.DurationVar(&cfg.UserSyncInterval, "tenant-federation.user-sync-interval", time.Minute*5, "[Experimental] If the regex matcher is enabled, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs. The scanning strategy depends on the `-blocks-storage.users-scanner.strategy`.") + f.IntVar(&cfg.RegexCacheSize, "tenant-federation.regex-cache-size", defaultRegexCacheSize, "[Experimental] Cache size of regex match results used by regex resolver. Each cache entry includes both regex patterns (e.g. `user-.+`) and resolved tenant IDs (e.g. `user-1`). Set to 0 or less to disable caching.") f.BoolVar(&cfg.AllowPartialData, "tenant-federation.allow-partial-data", false, "[Experimental] If enabled, query errors from individual tenants are treated as warnings, allowing partial results to be returned.") } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 37e5afe09e..20cf970c35 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -9111,6 +9111,12 @@ "type": "number", "x-cli-flag": "tenant-federation.max-tenant" }, + "regex_cache_size": { + "default": 1000, + "description": "[Experimental] Cache size of regex match results used by regex resolver. Each cache entry includes both regex patterns (e.g. `user-.+`) and resolved tenant IDs (e.g. `user-1`). Set to 0 or less to disable caching.", + "type": "number", + "x-cli-flag": "tenant-federation.regex-cache-size" + }, "regex_matcher_enabled": { "default": false, "description": "[Experimental] If enabled, the `X-Scope-OrgID` header value can accept a regex and the matched tenantIDs are automatically involved. The regex matching rule follows the Prometheus, see the detail: https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions. The user discovery is based on scanning block storage, so new users can get queries after uploading a block (generally 2h).",