Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [ENHANCEMENT] Alertmanager: Add support for Azure blob storage. #3634
* [ENHANCEMENT] Compactor: tenants marked for deletion will now be fully cleaned up after some delay since deletion of last block. Cleanup includes removal of remaining marker files (including tenant deletion mark file) and files under `debug/metas`. #3613
* [ENHANCEMENT] Compactor: retry compaction of a single tenant on failure instead of re-running compaction for all tenants. #3627
* [ENHANCEMENT] Querier: Implement result caching for tenant query federation. #3640
* [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598
* [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603
* [BUGFIX] Ingester: do not close idle TSDBs while blocks shipping is in progress. #3630
Expand Down
4 changes: 0 additions & 4 deletions docs/guides/limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,3 @@ The Cortex chunks storage doesn't support queries without a metric name, like `c
## Query series and labels

When running queries to the `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` endpoints, query's time range is ignored and the data is always fetched from ingesters. There is experimental support to query the long-term store with the *blocks* storage engine when `-querier.query-store-for-labels-enabled` is set.

## Tenant federation

When tenant federation is enabled on a Cortex cluster, result caching is disabled for queries spanning more than a single tenant. Result caching is planned to be implemented before general availability of this feature.
4 changes: 1 addition & 3 deletions pkg/api/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader *purger.T
return
}

// len(tenantIDs) will always be > 0, as it otherwise errors
// TODO: Handle multiple tenants by creating reproducible aggregation of all individual cacheGenNumbers
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs[0])
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(tenantIDs)

w.Header().Set(queryrange.ResultsCacheGenNumberHeaderName, cacheGenNumber)
next.ServeHTTP(w, r)
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type StoreLimits interface {
}

type CacheGenNumLoader interface {
GetStoreCacheGenNumber(userID string) string
GetStoreCacheGenNumber(tenantIDs []string) string
}

// Store for chunks.
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c compositeStore) forStores(ctx context.Context, userID string, from, thro
return nil
}

ctx = c.injectCacheGen(ctx, userID)
ctx = c.injectCacheGen(ctx, []string{userID})

// first, find the schema with the highest start _before or at_ from
i := sort.Search(len(c.stores), func(i int) bool {
Expand Down Expand Up @@ -262,10 +262,10 @@ func (c compositeStore) forStores(ctx context.Context, userID string, from, thro
return nil
}

func (c compositeStore) injectCacheGen(ctx context.Context, userID string) context.Context {
func (c compositeStore) injectCacheGen(ctx context.Context, tenantIDs []string) context.Context {
if c.cacheGenNumLoader == nil {
return ctx
}

return cache.InjectCacheGenNumber(ctx, c.cacheGenNumLoader.GetStoreCacheGenNumber(userID))
return cache.InjectCacheGenNumber(ctx, c.cacheGenNumLoader.GetStoreCacheGenNumber(tenantIDs))
}
61 changes: 56 additions & 5 deletions pkg/chunk/purger/tombstones.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package purger
import (
"context"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -246,14 +247,64 @@ func (tl *TombstonesLoader) loadPendingTombstones(userID string) error {
}

// GetStoreCacheGenNumber returns store cache gen number for a user
func (tl *TombstonesLoader) GetStoreCacheGenNumber(userID string) string {
return tl.getCacheGenNumbers(userID).store

func (tl *TombstonesLoader) GetStoreCacheGenNumber(tenantIDs []string) string {
return tl.getCacheGenNumbersPerTenants(tenantIDs).store
}

// GetResultsCacheGenNumber returns results cache gen number for a user
func (tl *TombstonesLoader) GetResultsCacheGenNumber(userID string) string {
return tl.getCacheGenNumbers(userID).results
func (tl *TombstonesLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
return tl.getCacheGenNumbersPerTenants(tenantIDs).results
}

func (tl *TombstonesLoader) getCacheGenNumbersPerTenants(tenantIDs []string) *cacheGenNumbers {
var result cacheGenNumbers

if len(tenantIDs) == 0 {
return &result
}

// keep the maximum value that's currently in result
var maxResults, maxStore int

for pos, tenantID := range tenantIDs {
numbers := tl.getCacheGenNumbers(tenantID)

// handle first tenant in the list
if pos == 0 {
// short cut if there is only one tenant
if len(tenantIDs) == 1 {
return numbers
}

// set first tenant string whatever happens next
result.results = numbers.results
result.store = numbers.store
}

// set results number string if it's higher than the ones before
if numbers.results != "" {
results, err := strconv.Atoi(numbers.results)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this... is this value guaranteed to be an incrementing number?

update: comment above says cacheGenNumbers holds store and results cache gen numbers for a user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the gen numbers are actually timestamps, we simply set them to current timestamp whenever we need to change it.

if err != nil {
level.Error(util.Logger).Log("msg", "error parsing resultsCacheGenNumber", "tenant", tenantID, "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Across the code base, the tenant ID is logged with the field name user instead of tenant. Do you mind fixing this for consistency, please?

Same comment applies below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point see: #3680

} else if maxResults < results {
maxResults = results
result.results = numbers.results
}
}

// set store number string if it's higher than the ones before
if numbers.store != "" {
store, err := strconv.Atoi(numbers.store)
if err != nil {
level.Error(util.Logger).Log("msg", "error parsing storeCacheGenNumber", "tenant", tenantID, "err", err)
} else if maxStore < store {
maxStore = store
result.store = numbers.store
}
}
}

return &result
}

func (tl *TombstonesLoader) getCacheGenNumbers(userID string) *cacheGenNumbers {
Expand Down
74 changes: 73 additions & 1 deletion pkg/chunk/purger/tombstones_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -133,6 +134,70 @@ func TestTombstonesLoader(t *testing.T) {
}
}

func TestTombstonesLoader_GetCacheGenNumber(t *testing.T) {
s := &store{
numbers: map[string]*cacheGenNumbers{
"tenant-a": {
results: "1000",
store: "2050",
},
"tenant-b": {
results: "1050",
store: "2000",
},
"tenant-c": {
results: "",
store: "",
},
"tenant-d": {
results: "results-c",
store: "store-c",
},
},
}
tombstonesLoader := NewTombstonesLoader(s, nil)

for _, tc := range []struct {
name string
expectedResultsCacheGenNumber string
expectedStoreCacheGenNumber string
tenantIDs []string
}{
{
name: "single tenant with numeric values",
tenantIDs: []string{"tenant-a"},
expectedResultsCacheGenNumber: "1000",
expectedStoreCacheGenNumber: "2050",
},
{
name: "single tenant with non-numeric values",
tenantIDs: []string{"tenant-d"},
expectedResultsCacheGenNumber: "results-c",
expectedStoreCacheGenNumber: "store-c",
},
{
name: "multiple tenants with numeric values",
tenantIDs: []string{"tenant-a", "tenant-b"},
expectedResultsCacheGenNumber: "1050",
expectedStoreCacheGenNumber: "2050",
},
{
name: "multiple tenants with numeric and non-numeric values",
tenantIDs: []string{"tenant-d", "tenant-c", "tenant-b", "tenant-a"},
expectedResultsCacheGenNumber: "1050",
expectedStoreCacheGenNumber: "2050",
},
{
name: "no tenants", // not really an expected call, edge case check to avoid any panics
},
} {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expectedResultsCacheGenNumber, tombstonesLoader.GetResultsCacheGenNumber(tc.tenantIDs))
assert.Equal(t, tc.expectedStoreCacheGenNumber, tombstonesLoader.GetStoreCacheGenNumber(tc.tenantIDs))
})
}
}

func TestTombstonesReloadDoesntDeadlockOnFailure(t *testing.T) {
s := &store{}
tombstonesLoader := NewTombstonesLoader(s, nil)
Expand All @@ -146,10 +211,17 @@ func TestTombstonesReloadDoesntDeadlockOnFailure(t *testing.T) {
}

type store struct {
err error
numbers map[string]*cacheGenNumbers
err error
}

func (f *store) getCacheGenerationNumbers(ctx context.Context, user string) (*cacheGenNumbers, error) {
if f.numbers != nil {
number, ok := f.numbers[user]
if ok {
return number, nil
}
}
return &cacheGenNumbers{}, f.err
}

Expand Down
15 changes: 5 additions & 10 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
Expand All @@ -36,7 +37,7 @@ var (
)

type CacheGenNumberLoader interface {
GetResultsCacheGenNumber(userID string) string
GetResultsCacheGenNumber(tenantIDs []string) string
}

// ResultsCacheConfig is the config for the results cache.
Expand Down Expand Up @@ -184,27 +185,21 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

// do not cache multi tenant queries
if len(tenantIDs) != 1 {
return s.next.Do(ctx, r)
}
userID := tenantIDs[0]

if s.shouldCache != nil && !s.shouldCache(r) {
return s.next.Do(ctx, r)
}

if s.cacheGenNumberLoader != nil {
ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(userID))
ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(tenantIDs))
}

var (
key = s.splitter.GenerateCacheKey(userID, r)
key = s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), r)
extents []Extent
response Response
)

maxCacheFreshness := s.limits.MaxCacheFreshness(userID)
maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, s.limits.MaxCacheFreshness)
maxCacheTime := int64(model.Now().Add(-maxCacheFreshness))
if r.GetStart() > maxCacheTime {
return s.next.Do(ctx, r)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,6 @@ func newMockCacheGenNumberLoader() CacheGenNumberLoader {
return mockCacheGenNumberLoader{}
}

func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(userID string) string {
func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
return ""
}
13 changes: 13 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,16 @@ func SmallestPositiveNonZeroDurationPerTenant(tenantIDs []string, f func(string)
}
return *result
}

// MaxDurationPerTenant is returning the maximum duration per tenant. Without
// tenants given it will return a time.Duration(0).
func MaxDurationPerTenant(tenantIDs []string, f func(string) time.Duration) time.Duration {
result := time.Duration(0)
for _, tenantID := range tenantIDs {
v := f(tenantID)
if v > result {
result = v
}
}
return result
}