Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [FEATURE] Experimental: Added support for `/api/v1/metadata` Prometheus-based endpoint. #2549
* [FEATURE] Add ability to limit concurrent queries to Cassandra with `-cassandra.query-concurrency` flag. #2562
* [FEATURE] TLS config options added for GRPC clients in Querier (Query-frontend client & Ingester client), Ruler, Store Gateway, as well as HTTP client in Config store client. #2502
* [FEATURE] Add ability to configure per-tenant MaxCacheFreshness in the query-frontend using `frontend.per-user-max-cache-freshness`. #2609
* [ENHANCEMENT] `query-tee` supports `/metadata`, `/alerts`, and `/rules` #2600
* [ENHANCEMENT] Ruler: Automatically remove unhealthy rulers from the ring. #2587
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2366,6 +2366,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]

# Most recent allowed cacheable result per-tenant, to prevent caching very
# recent results that might still be in flux.
# CLI flag: -frontend.per-user-max-cache-freshness
[max_cache_freshness: <duration> | default = 0s]

# File name of per-user overrides. [deprecated, use -runtime-config.file
# instead]
# CLI flag: -limits.per-user-override-config
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type Limits interface {
MaxQueryLength(string) time.Duration
MaxQueryParallelism(string) int
MaxCacheFreshness(string) time.Duration
}

type limits struct {
Expand Down
13 changes: 9 additions & 4 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
response Response
)

maxCacheTime := int64(model.Now().Add(-s.cfg.MaxCacheFreshness))
// check if per-tenant cache freshness value is provided
maxCacheFreshness := s.limits.MaxCacheFreshness(userID)
if maxCacheFreshness == time.Duration(0) {
maxCacheFreshness = s.cfg.MaxCacheFreshness
}
maxCacheTime := int64(model.Now().Add(-maxCacheFreshness))
if r.GetStart() > maxCacheTime {
return s.next.Do(ctx, r)
}
Expand All @@ -184,7 +189,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
}

if err == nil && len(extents) > 0 {
extents, err := s.filterRecentExtents(r, extents)
extents, err := s.filterRecentExtents(r, maxCacheFreshness, extents)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -417,8 +422,8 @@ func partition(req Request, extents []Extent, extractor Extractor) ([]Request, [
return requests, cachedResponses, nil
}

func (s resultsCache) filterRecentExtents(req Request, extents []Extent) ([]Extent, error) {
maxCacheTime := (int64(model.Now().Add(-s.cfg.MaxCacheFreshness)) / req.GetStep()) * req.GetStep()
func (s resultsCache) filterRecentExtents(req Request, maxCacheFreshness time.Duration, extents []Extent) ([]Extent, error) {
maxCacheTime := (int64(model.Now().Add(-maxCacheFreshness)) / req.GetStep()) * req.GetStep()
for i := range extents {
// Never cache data for the latest freshness period.
if extents[i].End > maxCacheTime {
Expand Down
72 changes: 72 additions & 0 deletions pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,18 @@ func (fakeLimits) MaxQueryParallelism(string) int {
return 14 // Flag default.
}

func (fakeLimits) MaxCacheFreshness(string) time.Duration {
return time.Duration(0)
}

type fakeLimitsWithMaxCacheFreshness struct {
fakeLimits
}

func (fakeLimitsWithMaxCacheFreshness) MaxCacheFreshness(string) time.Duration {
return 10 * time.Minute
}

func TestResultsCache(t *testing.T) {
calls := 0
cfg := ResultsCacheConfig{
Expand Down Expand Up @@ -423,6 +435,66 @@ func TestResultsCacheRecent(t *testing.T) {
require.Equal(t, parsedResponse, resp)
}

func TestResultsCacheMaxFreshness(t *testing.T) {
modelNow := model.Now()
for i, tc := range []struct {
fakeLimits Limits
Handler HandlerFunc
expectedResponse *PrometheusResponse
}{
{
// should lookup cache
fakeLimits: fakeLimits{},
Handler: nil,
expectedResponse: mkAPIResponse(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3), 10),
},
{
// should not lookup cache
fakeLimits: fakeLimitsWithMaxCacheFreshness{},
Handler: HandlerFunc(func(_ context.Context, _ Request) (Response, error) {
return parsedResponse, nil
}),
expectedResponse: parsedResponse,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var cfg ResultsCacheConfig
flagext.DefaultValues(&cfg)
cfg.CacheConfig.Cache = cache.NewMockCache()

// set a small "global" MaxCacheFreshness value
cfg.MaxCacheFreshness = 5 * time.Second

fakeLimits := tc.fakeLimits
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
fakeLimits,
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
)
require.NoError(t, err)

// create cache with handler
rc := rcm.Wrap(tc.Handler)
ctx := user.InjectOrgID(context.Background(), "1")

// create request with start end within the key extents
req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3))

// fill cache
key := constSplitter(day).GenerateCacheKey("1", req)
rc.(*resultsCache).put(ctx, key, []Extent{mkExtent(int64(modelNow)-(60*1e3), int64(modelNow))})

resp, err := rc.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, tc.expectedResponse, resp)
})
}
}

func Test_resultsCache_MissingData(t *testing.T) {
cfg := ResultsCacheConfig{
CacheConfig: cache.Config{
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Limits struct {
MaxQueryLength time.Duration `yaml:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism"`
CardinalityLimit int `yaml:"cardinality_limit"`
MaxCacheFreshness time.Duration `yaml:"max_cache_freshness"`

// Config for overrides, convenient if it goes here. [Deprecated in favor of RuntimeConfig flag in cortex.Config]
PerTenantOverrideConfig string `yaml:"per_tenant_override_config"`
Expand Down Expand Up @@ -103,6 +104,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.")
f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.DurationVar(&l.MaxCacheFreshness, "frontend.per-user-max-cache-freshness", 0, "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")

f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides. [deprecated, use -runtime-config.file instead]")
f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides. [deprecated, use -runtime-config.reload-period instead]")
Expand Down Expand Up @@ -282,6 +284,11 @@ func (o *Overrides) MaxQueryLength(userID string) time.Duration {
return o.getOverridesForUser(userID).MaxQueryLength
}

// MaxCacheFreshness returns the limit of the length (in time) of a query.
func (o *Overrides) MaxCacheFreshness(userID string) time.Duration {
return o.getOverridesForUser(userID).MaxCacheFreshness
}

// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {
Expand Down