From 29ac0993980da3baf308ade8ac98425298db96e0 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 31 Dec 2020 14:02:06 +0000 Subject: [PATCH] Implement result cache for tenant query federation Signed-off-by: Christian Simon --- CHANGELOG.md | 1 + docs/guides/limitations.md | 4 -- pkg/api/middlewares.go | 4 +- pkg/chunk/composite_store.go | 8 +-- pkg/chunk/purger/tombstones.go | 61 ++++++++++++++-- pkg/chunk/purger/tombstones_test.go | 74 +++++++++++++++++++- pkg/querier/queryrange/results_cache.go | 15 ++-- pkg/querier/queryrange/results_cache_test.go | 2 +- pkg/util/validation/limits.go | 13 ++++ 9 files changed, 154 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e72e84619c..250b8eae454 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * [ENHANCEMENT] Alertmanager: Add support for Azure blob storage. #3634 * [ENHANCEMENT] Compactor: tenants marked for deletion will now be fully cleaned up after some delay since deletion of last block. Cleanup includes removal of remaining marker files (including tenant deletion mark file) and files under `debug/metas`. #3613 * [ENHANCEMENT] Compactor: retry compaction of a single tenant on failure instead of re-running compaction for all tenants. #3627 +* [ENHANCEMENT] Querier: Implement result caching for tenant query federation. #3640 * [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598 * [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603 * [BUGFIX] Ingester: do not close idle TSDBs while blocks shipping is in progress. #3630 diff --git a/docs/guides/limitations.md b/docs/guides/limitations.md index 7b6a3cf0436..cd2caefabcd 100644 --- a/docs/guides/limitations.md +++ b/docs/guides/limitations.md @@ -43,7 +43,3 @@ The Cortex chunks storage doesn't support queries without a metric name, like `c ## Query series and labels When running queries to the `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` endpoints, query's time range is ignored and the data is always fetched from ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set. - -## Tenant federation - -When tenant federation is enabled on a Cortex cluster, result caching is disabled for queries spanning more than a single tenant. Result caching is planned to be implemented before general availability of this feature. diff --git a/pkg/api/middlewares.go b/pkg/api/middlewares.go index 3d990edddae..7e0e88e8030 100644 --- a/pkg/api/middlewares.go +++ b/pkg/api/middlewares.go @@ -20,9 +20,7 @@ func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader *purger.T return } - // len(tenantIDs) will always be > 0, as it otherwise errors - // TODO: Handle multiple tenants by creating reproducible aggregation of all individual cacheGenNumbers - cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs[0]) + cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs) w.Header().Set(queryrange.ResultsCacheGenNumberHeaderName, cacheGenNumber) next.ServeHTTP(w, r) diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index a3c5a22b20e..d3c79013bbf 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -19,7 +19,7 @@ type StoreLimits interface { } type CacheGenNumLoader interface { - GetStoreCacheGenNumber(userID string) string + GetStoreCacheGenNumber(tenantIDs []string) string } // Store for chunks. @@ -217,7 +217,7 @@ func (c compositeStore) forStores(ctx context.Context, userID string, from, thro return nil } - ctx = c.injectCacheGen(ctx, userID) + ctx = c.injectCacheGen(ctx, []string{userID}) // first, find the schema with the highest start _before or at_ from i := sort.Search(len(c.stores), func(i int) bool { @@ -262,10 +262,10 @@ func (c compositeStore) forStores(ctx context.Context, userID string, from, thro return nil } -func (c compositeStore) injectCacheGen(ctx context.Context, userID string) context.Context { +func (c compositeStore) injectCacheGen(ctx context.Context, tenantIDs []string) context.Context { if c.cacheGenNumLoader == nil { return ctx } - return cache.InjectCacheGenNumber(ctx, c.cacheGenNumLoader.GetStoreCacheGenNumber(userID)) + return cache.InjectCacheGenNumber(ctx, c.cacheGenNumLoader.GetStoreCacheGenNumber(tenantIDs)) } diff --git a/pkg/chunk/purger/tombstones.go b/pkg/chunk/purger/tombstones.go index 1f1ad1b5bec..d04788d2f28 100644 --- a/pkg/chunk/purger/tombstones.go +++ b/pkg/chunk/purger/tombstones.go @@ -3,6 +3,7 @@ package purger import ( "context" "sort" + "strconv" "sync" "time" @@ -246,14 +247,64 @@ func (tl *TombstonesLoader) loadPendingTombstones(userID string) error { } // GetStoreCacheGenNumber returns store cache gen number for a user -func (tl *TombstonesLoader) GetStoreCacheGenNumber(userID string) string { - return tl.getCacheGenNumbers(userID).store - +func (tl *TombstonesLoader) GetStoreCacheGenNumber(tenantIDs []string) string { + return tl.getCacheGenNumbersPerTenants(tenantIDs).store } // GetResultsCacheGenNumber returns results cache gen number for a user -func (tl *TombstonesLoader) GetResultsCacheGenNumber(userID string) string { - return tl.getCacheGenNumbers(userID).results +func (tl *TombstonesLoader) GetResultsCacheGenNumber(tenantIDs []string) string { + return tl.getCacheGenNumbersPerTenants(tenantIDs).results +} + +func (tl *TombstonesLoader) getCacheGenNumbersPerTenants(tenantIDs []string) *cacheGenNumbers { + var result cacheGenNumbers + + if len(tenantIDs) == 0 { + return &result + } + + // keep the maximum value that's currently in result + var maxResults, maxStore int + + for pos, tenantID := range tenantIDs { + numbers := tl.getCacheGenNumbers(tenantID) + + // handle first tenant in the list + if pos == 0 { + // short cut if there is only one tenant + if len(tenantIDs) == 1 { + return numbers + } + + // set first tenant string whatever happens next + result.results = numbers.results + result.store = numbers.store + } + + // set results number string if it's higher than the ones before + if numbers.results != "" { + results, err := strconv.Atoi(numbers.results) + if err != nil { + level.Error(util.Logger).Log("msg", "error parsing resultsCacheGenNumber", "tenant", tenantID, "err", err) + } else if maxResults < results { + maxResults = results + result.results = numbers.results + } + } + + // set store number string if it's higher than the ones before + if numbers.store != "" { + store, err := strconv.Atoi(numbers.store) + if err != nil { + level.Error(util.Logger).Log("msg", "error parsing storeCacheGenNumber", "tenant", tenantID, "err", err) + } else if maxStore < store { + maxStore = store + result.store = numbers.store + } + } + } + + return &result } func (tl *TombstonesLoader) getCacheGenNumbers(userID string) *cacheGenNumbers { diff --git a/pkg/chunk/purger/tombstones_test.go b/pkg/chunk/purger/tombstones_test.go index 5f17d75dafd..e04d6ef02fc 100644 --- a/pkg/chunk/purger/tombstones_test.go +++ b/pkg/chunk/purger/tombstones_test.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -133,6 +134,70 @@ func TestTombstonesLoader(t *testing.T) { } } +func TestTombstonesLoader_GetCacheGenNumber(t *testing.T) { + s := &store{ + numbers: map[string]*cacheGenNumbers{ + "tenant-a": { + results: "1000", + store: "2050", + }, + "tenant-b": { + results: "1050", + store: "2000", + }, + "tenant-c": { + results: "", + store: "", + }, + "tenant-d": { + results: "results-c", + store: "store-c", + }, + }, + } + tombstonesLoader := NewTombstonesLoader(s, nil) + + for _, tc := range []struct { + name string + expectedResultsCacheGenNumber string + expectedStoreCacheGenNumber string + tenantIDs []string + }{ + { + name: "single tenant with numeric values", + tenantIDs: []string{"tenant-a"}, + expectedResultsCacheGenNumber: "1000", + expectedStoreCacheGenNumber: "2050", + }, + { + name: "single tenant with non-numeric values", + tenantIDs: []string{"tenant-d"}, + expectedResultsCacheGenNumber: "results-c", + expectedStoreCacheGenNumber: "store-c", + }, + { + name: "multiple tenants with numeric values", + tenantIDs: []string{"tenant-a", "tenant-b"}, + expectedResultsCacheGenNumber: "1050", + expectedStoreCacheGenNumber: "2050", + }, + { + name: "multiple tenants with numeric and non-numeric values", + tenantIDs: []string{"tenant-d", "tenant-c", "tenant-b", "tenant-a"}, + expectedResultsCacheGenNumber: "1050", + expectedStoreCacheGenNumber: "2050", + }, + { + name: "no tenants", // not really an expected call, edge case check to avoid any panics + }, + } { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expectedResultsCacheGenNumber, tombstonesLoader.GetResultsCacheGenNumber(tc.tenantIDs)) + assert.Equal(t, tc.expectedStoreCacheGenNumber, tombstonesLoader.GetStoreCacheGenNumber(tc.tenantIDs)) + }) + } +} + func TestTombstonesReloadDoesntDeadlockOnFailure(t *testing.T) { s := &store{} tombstonesLoader := NewTombstonesLoader(s, nil) @@ -146,10 +211,17 @@ func TestTombstonesReloadDoesntDeadlockOnFailure(t *testing.T) { } type store struct { - err error + numbers map[string]*cacheGenNumbers + err error } func (f *store) getCacheGenerationNumbers(ctx context.Context, user string) (*cacheGenNumbers, error) { + if f.numbers != nil { + number, ok := f.numbers[user] + if ok { + return number, nil + } + } return &cacheGenNumbers{}, f.err } diff --git a/pkg/querier/queryrange/results_cache.go b/pkg/querier/queryrange/results_cache.go index 8b0278b5dc9..5138b87f9cd 100644 --- a/pkg/querier/queryrange/results_cache.go +++ b/pkg/querier/queryrange/results_cache.go @@ -25,6 +25,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/validation" ) var ( @@ -36,7 +37,7 @@ var ( ) type CacheGenNumberLoader interface { - GetResultsCacheGenNumber(userID string) string + GetResultsCacheGenNumber(tenantIDs []string) string } // ResultsCacheConfig is the config for the results cache. @@ -184,27 +185,21 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - // do not cache multi tenant queries - if len(tenantIDs) != 1 { - return s.next.Do(ctx, r) - } - userID := tenantIDs[0] - if s.shouldCache != nil && !s.shouldCache(r) { return s.next.Do(ctx, r) } if s.cacheGenNumberLoader != nil { - ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(userID)) + ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(tenantIDs)) } var ( - key = s.splitter.GenerateCacheKey(userID, r) + key = s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), r) extents []Extent response Response ) - maxCacheFreshness := s.limits.MaxCacheFreshness(userID) + maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, s.limits.MaxCacheFreshness) maxCacheTime := int64(model.Now().Add(-maxCacheFreshness)) if r.GetStart() > maxCacheTime { return s.next.Do(ctx, r) diff --git a/pkg/querier/queryrange/results_cache_test.go b/pkg/querier/queryrange/results_cache_test.go index 1688b96bc24..5ca1c2e1146 100644 --- a/pkg/querier/queryrange/results_cache_test.go +++ b/pkg/querier/queryrange/results_cache_test.go @@ -641,6 +641,6 @@ func newMockCacheGenNumberLoader() CacheGenNumberLoader { return mockCacheGenNumberLoader{} } -func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(userID string) string { +func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string { return "" } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 4872ef6545c..4451692d007 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -474,3 +474,16 @@ func SmallestPositiveNonZeroDurationPerTenant(tenantIDs []string, f func(string) } return *result } + +// MaxDurationPerTenant is returning the maximum duration per tenant. Without +// tenants given it will return a time.Duration(0). +func MaxDurationPerTenant(tenantIDs []string, f func(string) time.Duration) time.Duration { + result := time.Duration(0) + for _, tenantID := range tenantIDs { + v := f(tenantID) + if v > result { + result = v + } + } + return result +}