Skip to content

Commit 29ac099

Browse files
committed
Implement result cache for tenant query federation
Signed-off-by: Christian Simon <[email protected]>
1 parent 4dbf635 commit 29ac099

File tree

9 files changed

+154
-28
lines changed

9 files changed

+154
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* [ENHANCEMENT] Alertmanager: Add support for Azure blob storage. #3634
2525
* [ENHANCEMENT] Compactor: tenants marked for deletion will now be fully cleaned up after some delay since deletion of last block. Cleanup includes removal of remaining marker files (including tenant deletion mark file) and files under `debug/metas`. #3613
2626
* [ENHANCEMENT] Compactor: retry compaction of a single tenant on failure instead of re-running compaction for all tenants. #3627
27+
* [ENHANCEMENT] Querier: Implement result caching for tenant query federation. #3640
2728
* [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598
2829
* [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603
2930
* [BUGFIX] Ingester: do not close idle TSDBs while blocks shipping is in progress. #3630

docs/guides/limitations.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,3 @@ The Cortex chunks storage doesn't support queries without a metric name, like `c
4343
## Query series and labels
4444

4545
When running queries to the `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` endpoints, query's time range is ignored and the data is always fetched from ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set.
46-
47-
## Tenant federation
48-
49-
When tenant federation is enabled on a Cortex cluster, result caching is disabled for queries spanning more than a single tenant. Result caching is planned to be implemented before general availability of this feature.

pkg/api/middlewares.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader *purger.T
2020
return
2121
}
2222

23-
// len(tenantIDs) will always be > 0, as it otherwise errors
24-
// TODO: Handle multiple tenants by creating reproducible aggregation of all individual cacheGenNumbers
25-
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs[0])
23+
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs)
2624

2725
w.Header().Set(queryrange.ResultsCacheGenNumberHeaderName, cacheGenNumber)
2826
next.ServeHTTP(w, r)

pkg/chunk/composite_store.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type StoreLimits interface {
1919
}
2020

2121
type CacheGenNumLoader interface {
22-
GetStoreCacheGenNumber(userID string) string
22+
GetStoreCacheGenNumber(tenantIDs []string) string
2323
}
2424

2525
// Store for chunks.
@@ -217,7 +217,7 @@ func (c compositeStore) forStores(ctx context.Context, userID string, from, thro
217217
return nil
218218
}
219219

220-
ctx = c.injectCacheGen(ctx, userID)
220+
ctx = c.injectCacheGen(ctx, []string{userID})
221221

222222
// first, find the schema with the highest start _before or at_ from
223223
i := sort.Search(len(c.stores), func(i int) bool {
@@ -262,10 +262,10 @@ func (c compositeStore) forStores(ctx context.Context, userID string, from, thro
262262
return nil
263263
}
264264

265-
func (c compositeStore) injectCacheGen(ctx context.Context, userID string) context.Context {
265+
func (c compositeStore) injectCacheGen(ctx context.Context, tenantIDs []string) context.Context {
266266
if c.cacheGenNumLoader == nil {
267267
return ctx
268268
}
269269

270-
return cache.InjectCacheGenNumber(ctx, c.cacheGenNumLoader.GetStoreCacheGenNumber(userID))
270+
return cache.InjectCacheGenNumber(ctx, c.cacheGenNumLoader.GetStoreCacheGenNumber(tenantIDs))
271271
}

pkg/chunk/purger/tombstones.go

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package purger
33
import (
44
"context"
55
"sort"
6+
"strconv"
67
"sync"
78
"time"
89

@@ -246,14 +247,64 @@ func (tl *TombstonesLoader) loadPendingTombstones(userID string) error {
246247
}
247248

248249
// GetStoreCacheGenNumber returns store cache gen number for a user
249-
func (tl *TombstonesLoader) GetStoreCacheGenNumber(userID string) string {
250-
return tl.getCacheGenNumbers(userID).store
251-
250+
func (tl *TombstonesLoader) GetStoreCacheGenNumber(tenantIDs []string) string {
251+
return tl.getCacheGenNumbersPerTenants(tenantIDs).store
252252
}
253253

254254
// GetResultsCacheGenNumber returns results cache gen number for a user
255-
func (tl *TombstonesLoader) GetResultsCacheGenNumber(userID string) string {
256-
return tl.getCacheGenNumbers(userID).results
255+
func (tl *TombstonesLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
256+
return tl.getCacheGenNumbersPerTenants(tenantIDs).results
257+
}
258+
259+
func (tl *TombstonesLoader) getCacheGenNumbersPerTenants(tenantIDs []string) *cacheGenNumbers {
260+
var result cacheGenNumbers
261+
262+
if len(tenantIDs) == 0 {
263+
return &result
264+
}
265+
266+
// keep the maximum value that's currently in result
267+
var maxResults, maxStore int
268+
269+
for pos, tenantID := range tenantIDs {
270+
numbers := tl.getCacheGenNumbers(tenantID)
271+
272+
// handle first tenant in the list
273+
if pos == 0 {
274+
// short cut if there is only one tenant
275+
if len(tenantIDs) == 1 {
276+
return numbers
277+
}
278+
279+
// set first tenant string whatever happens next
280+
result.results = numbers.results
281+
result.store = numbers.store
282+
}
283+
284+
// set results number string if it's higher than the ones before
285+
if numbers.results != "" {
286+
results, err := strconv.Atoi(numbers.results)
287+
if err != nil {
288+
level.Error(util.Logger).Log("msg", "error parsing resultsCacheGenNumber", "tenant", tenantID, "err", err)
289+
} else if maxResults < results {
290+
maxResults = results
291+
result.results = numbers.results
292+
}
293+
}
294+
295+
// set store number string if it's higher than the ones before
296+
if numbers.store != "" {
297+
store, err := strconv.Atoi(numbers.store)
298+
if err != nil {
299+
level.Error(util.Logger).Log("msg", "error parsing storeCacheGenNumber", "tenant", tenantID, "err", err)
300+
} else if maxStore < store {
301+
maxStore = store
302+
result.store = numbers.store
303+
}
304+
}
305+
}
306+
307+
return &result
257308
}
258309

259310
func (tl *TombstonesLoader) getCacheGenNumbers(userID string) *cacheGenNumbers {

pkg/chunk/purger/tombstones_test.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/prometheus/common/model"
1010
"github.com/prometheus/prometheus/promql/parser"
11+
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
1213
)
1314

@@ -133,6 +134,70 @@ func TestTombstonesLoader(t *testing.T) {
133134
}
134135
}
135136

137+
func TestTombstonesLoader_GetCacheGenNumber(t *testing.T) {
138+
s := &store{
139+
numbers: map[string]*cacheGenNumbers{
140+
"tenant-a": {
141+
results: "1000",
142+
store: "2050",
143+
},
144+
"tenant-b": {
145+
results: "1050",
146+
store: "2000",
147+
},
148+
"tenant-c": {
149+
results: "",
150+
store: "",
151+
},
152+
"tenant-d": {
153+
results: "results-c",
154+
store: "store-c",
155+
},
156+
},
157+
}
158+
tombstonesLoader := NewTombstonesLoader(s, nil)
159+
160+
for _, tc := range []struct {
161+
name string
162+
expectedResultsCacheGenNumber string
163+
expectedStoreCacheGenNumber string
164+
tenantIDs []string
165+
}{
166+
{
167+
name: "single tenant with numeric values",
168+
tenantIDs: []string{"tenant-a"},
169+
expectedResultsCacheGenNumber: "1000",
170+
expectedStoreCacheGenNumber: "2050",
171+
},
172+
{
173+
name: "single tenant with non-numeric values",
174+
tenantIDs: []string{"tenant-d"},
175+
expectedResultsCacheGenNumber: "results-c",
176+
expectedStoreCacheGenNumber: "store-c",
177+
},
178+
{
179+
name: "multiple tenants with numeric values",
180+
tenantIDs: []string{"tenant-a", "tenant-b"},
181+
expectedResultsCacheGenNumber: "1050",
182+
expectedStoreCacheGenNumber: "2050",
183+
},
184+
{
185+
name: "multiple tenants with numeric and non-numeric values",
186+
tenantIDs: []string{"tenant-d", "tenant-c", "tenant-b", "tenant-a"},
187+
expectedResultsCacheGenNumber: "1050",
188+
expectedStoreCacheGenNumber: "2050",
189+
},
190+
{
191+
name: "no tenants", // not really an expected call, edge case check to avoid any panics
192+
},
193+
} {
194+
t.Run(tc.name, func(t *testing.T) {
195+
assert.Equal(t, tc.expectedResultsCacheGenNumber, tombstonesLoader.GetResultsCacheGenNumber(tc.tenantIDs))
196+
assert.Equal(t, tc.expectedStoreCacheGenNumber, tombstonesLoader.GetStoreCacheGenNumber(tc.tenantIDs))
197+
})
198+
}
199+
}
200+
136201
func TestTombstonesReloadDoesntDeadlockOnFailure(t *testing.T) {
137202
s := &store{}
138203
tombstonesLoader := NewTombstonesLoader(s, nil)
@@ -146,10 +211,17 @@ func TestTombstonesReloadDoesntDeadlockOnFailure(t *testing.T) {
146211
}
147212

148213
type store struct {
149-
err error
214+
numbers map[string]*cacheGenNumbers
215+
err error
150216
}
151217

152218
func (f *store) getCacheGenerationNumbers(ctx context.Context, user string) (*cacheGenNumbers, error) {
219+
if f.numbers != nil {
220+
number, ok := f.numbers[user]
221+
if ok {
222+
return number, nil
223+
}
224+
}
153225
return &cacheGenNumbers{}, f.err
154226
}
155227

pkg/querier/queryrange/results_cache.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cortexproject/cortex/pkg/tenant"
2626
"github.com/cortexproject/cortex/pkg/util/flagext"
2727
"github.com/cortexproject/cortex/pkg/util/spanlogger"
28+
"github.com/cortexproject/cortex/pkg/util/validation"
2829
)
2930

3031
var (
@@ -36,7 +37,7 @@ var (
3637
)
3738

3839
type CacheGenNumberLoader interface {
39-
GetResultsCacheGenNumber(userID string) string
40+
GetResultsCacheGenNumber(tenantIDs []string) string
4041
}
4142

4243
// ResultsCacheConfig is the config for the results cache.
@@ -184,27 +185,21 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
184185
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
185186
}
186187

187-
// do not cache multi tenant queries
188-
if len(tenantIDs) != 1 {
189-
return s.next.Do(ctx, r)
190-
}
191-
userID := tenantIDs[0]
192-
193188
if s.shouldCache != nil && !s.shouldCache(r) {
194189
return s.next.Do(ctx, r)
195190
}
196191

197192
if s.cacheGenNumberLoader != nil {
198-
ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(userID))
193+
ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(tenantIDs))
199194
}
200195

201196
var (
202-
key = s.splitter.GenerateCacheKey(userID, r)
197+
key = s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), r)
203198
extents []Extent
204199
response Response
205200
)
206201

207-
maxCacheFreshness := s.limits.MaxCacheFreshness(userID)
202+
maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, s.limits.MaxCacheFreshness)
208203
maxCacheTime := int64(model.Now().Add(-maxCacheFreshness))
209204
if r.GetStart() > maxCacheTime {
210205
return s.next.Do(ctx, r)

pkg/querier/queryrange/results_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,6 @@ func newMockCacheGenNumberLoader() CacheGenNumberLoader {
641641
return mockCacheGenNumberLoader{}
642642
}
643643

644-
func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(userID string) string {
644+
func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
645645
return ""
646646
}

pkg/util/validation/limits.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,3 +474,16 @@ func SmallestPositiveNonZeroDurationPerTenant(tenantIDs []string, f func(string)
474474
}
475475
return *result
476476
}
477+
478+
// MaxDurationPerTenant is returning the maximum duration per tenant. Without
479+
// tenants given it will return a time.Duration(0).
480+
func MaxDurationPerTenant(tenantIDs []string, f func(string) time.Duration) time.Duration {
481+
result := time.Duration(0)
482+
for _, tenantID := range tenantIDs {
483+
v := f(tenantID)
484+
if v > result {
485+
result = v
486+
}
487+
}
488+
return result
489+
}

0 commit comments

Comments
 (0)