Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Changelog

## master / unreleased

* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355

## 1.21.0 in progress
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ tenant_federation:
# CLI flag: -tenant-federation.user-sync-interval
[user_sync_interval: <duration> | default = 5m]

# [Experimental] Cache size of regex match results used by regex resolver.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is experimental it should be mentioned in v1-guarantees

# Each cache entry includes both regex patterns (e.g. `user-.+`) and resolved
# tenant IDs (e.g. `user-1`). Set to 0 or less to disable caching.
# CLI flag: -tenant-federation.regex-cache-size
[regex_cache_size: <int> | default = 1000]

# [Experimental] If enabled, query errors from individual tenants are treated
# as warnings, allowing partial results to be returned.
# CLI flag: -tenant-federation.allow-partial-data
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Currently experimental features are:
- Blocks storage user index
- Querier: tenant federation
- `-tenant-federation.regex-matcher-enabled`
- `-tenant-federation.regex-cache-size`
- `-tenant-federation.user-sync-interval`
- `-tenant-federation.allow-partial-data`
- The thanosconvert tool for converting Thanos block metadata to Cortex
Expand Down
15 changes: 14 additions & 1 deletion integration/querier_tenant_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTe
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-tenant-federation.enabled": "true",
"-tenant-federation.regex-matcher-enabled": "true",
"-tenant-federation.user-sync-interval": "1s",
"-tenant-federation.user-sync-interval": "5s",

// to upload block quickly
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
Expand Down Expand Up @@ -255,6 +255,19 @@ func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTe

assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))

// Verify regex resolver cache entries.
querierSum, err := querier.SumMetrics([]string{"cortex_regex_resolver_matched_cache_size"}, e2e.SkipMissingMetrics)
require.NoError(t, err)

totalCacheSize := querierSum[0]
if cfg.shuffleShardingEnabled {
querier2Sum, err := querier2.SumMetrics([]string{"cortex_regex_resolver_matched_cache_size"}, e2e.SkipMissingMetrics)
require.NoError(t, err)
totalCacheSize += querier2Sum[0]
}

require.Equal(t, float64(numUsers+1), totalCacheSize)

// ensure a push to multiple tenants is failing
series, _ := generateSeries("series_1", now)
res, err := c.Push(series)
Expand Down
79 changes: 67 additions & 12 deletions pkg/querier/tenantfederation/regex_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
lru "github.com/hashicorp/golang-lru/v2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lru is the right choice. I feel.

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -27,6 +28,8 @@ var (
errInvalidRegex = errors.New("invalid regex present")

ErrTooManyTenants = "too many tenants, max: %d, actual: %d"

defaultRegexCacheSize = 1000
)

// RegexResolver resolves tenantIDs matched given regex.
Expand All @@ -38,12 +41,20 @@ type RegexResolver struct {
maxTenant int
userScanner users.Scanner
logger log.Logger
sync.Mutex
sync.RWMutex

// matchedCache stores the results of regex matching
matchedCache *lru.Cache[string, []string]

// lastUpdateUserRun stores the timestamps of the latest update user loop run
lastUpdateUserRun prometheus.Gauge
// discoveredUsers stores the number of discovered user
discoveredUsers prometheus.Gauge
// matchedCacheSize stores the size of the matchedCache
matchedCacheSize prometheus.Gauge

matchedCacheHits prometheus.Counter
matchedCacheMisses prometheus.Counter
}

func NewRegexResolver(cfg users.UsersScannerConfig, tenantFederationCfg Config, reg prometheus.Registerer, bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error), logger log.Logger) (*RegexResolver, error) {
Expand All @@ -64,6 +75,27 @@ func NewRegexResolver(cfg users.UsersScannerConfig, tenantFederationCfg Config,
logger: logger,
}

if tenantFederationCfg.RegexCacheSize > 0 {
matchedCache, err := lru.New[string, []string](tenantFederationCfg.RegexCacheSize)
if err != nil {
return nil, errors.Wrap(err, "failed to create regex cache")
}
r.matchedCache = matchedCache

r.matchedCacheSize = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_regex_resolver_matched_cache_size",
Help: "Number of entries stored in the matched cache.",
})
r.matchedCacheHits = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_regex_resolver_matched_cache_hits_total",
Help: "Total number of successful cache lookups for the regex matched.",
})
r.matchedCacheMisses = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_regex_resolver_matched_cache_misses_total",
Help: "Total number of cache misses for the regex matched.",
})
}

r.lastUpdateUserRun = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_regex_resolver_last_update_run_timestamp_seconds",
Help: "Unix timestamp of the last successful regex resolver update user run.",
Expand All @@ -88,8 +120,7 @@ func (r *RegexResolver) running(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
// active and deleting users are considered
// The store-gateway can query for deleting users.
// Active and deleting users are considered
active, deleting, _, err := r.userScanner.ScanUsers(ctx)
if err != nil {
level.Error(r.logger).Log("msg", "failed to discover users from bucket", "err", err)
Expand All @@ -99,6 +130,12 @@ func (r *RegexResolver) running(ctx context.Context) error {
r.knownUsers = append(active, deleting...)
// We keep it sort
sort.Strings(r.knownUsers)

// Reset the cache because the set of available users has changed.
if r.matchedCache != nil {
r.matchedCache.Purge()
r.matchedCacheSize.Set(0)
}
r.Unlock()
r.lastUpdateUserRun.SetToCurrentTime()
r.discoveredUsers.Set(float64(len(active) + len(deleting)))
Expand Down Expand Up @@ -126,31 +163,48 @@ func (r *RegexResolver) TenantIDs(ctx context.Context) ([]string, error) {
return nil, err
}

orgIDs, err := r.getRegexMatchedOrgIds(orgID)
if err != nil {
return nil, err
}

return users.ValidateOrgIDs(orgIDs)
return r.getRegexMatchedOrgIds(orgID)
}

func (r *RegexResolver) getRegexMatchedOrgIds(orgID string) ([]string, error) {
var matched []string
if r.matchedCache != nil {
if cachedMatched, ok := r.matchedCache.Get(orgID); ok {
r.matchedCacheHits.Inc()
return r.validateAndReturnMatched(orgID, cachedMatched)
}
r.matchedCacheMisses.Inc()
}

// Use the Prometheus FastRegexMatcher
m, err := labels.NewFastRegexMatcher(orgID)
if err != nil {
return nil, errInvalidRegex
}

r.Lock()
defer r.Unlock()
var matched []string

r.RLock()
for _, id := range r.knownUsers {
if m.MatchString(id) {
matched = append(matched, id)
}
}
r.RUnlock()

validatedMatched, err := users.ValidateOrgIDs(matched)
if err != nil {
return nil, err
}

if r.matchedCache != nil {
r.matchedCache.Add(orgID, validatedMatched)
r.matchedCacheSize.Set(float64(r.matchedCache.Len()))
}

return r.validateAndReturnMatched(orgID, validatedMatched)
}

func (r *RegexResolver) validateAndReturnMatched(orgID string, matched []string) ([]string, error) {
if len(matched) == 0 {
if err := users.ValidTenantID(orgID); err == nil {
// when querying for a newly created orgID, the query may not
Expand All @@ -165,6 +219,7 @@ func (r *RegexResolver) getRegexMatchedOrgIds(orgID string) ([]string, error) {
return []string{"fake"}, nil
}

// Enforce the maximum number of tenants allowed in a federated query.
if r.maxTenant > 0 && len(matched) > r.maxTenant {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", fmt.Errorf(ErrTooManyTenants, r.maxTenant, len(matched)).Error())
}
Expand Down
Loading
Loading