From b4a0da782bc726dbf5f458e46fb42101388ec44a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Mazur?= Date: Mon, 14 Oct 2024 15:10:50 +0200 Subject: [PATCH 1/3] query-frontend: Fix cache keys for dynamic split intervals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Mazur --- pkg/queryfrontend/cache.go | 20 ++- pkg/queryfrontend/cache_test.go | 69 ++++---- pkg/queryfrontend/request.go | 36 +++++ pkg/queryfrontend/roundtrip.go | 5 +- pkg/queryfrontend/split_by_interval.go | 16 +- pkg/queryfrontend/split_by_interval_test.go | 170 ++++++++++++-------- 6 files changed, 203 insertions(+), 113 deletions(-) diff --git a/pkg/queryfrontend/cache.go b/pkg/queryfrontend/cache.go index 156d26b5c37..8b15cb89881 100644 --- a/pkg/queryfrontend/cache.go +++ b/pkg/queryfrontend/cache.go @@ -12,13 +12,11 @@ import ( // thanosCacheKeyGenerator is a utility for using split interval when determining cache keys. type thanosCacheKeyGenerator struct { - interval queryrange.IntervalFn resolutions []int64 } -func newThanosCacheKeyGenerator(intervalFn queryrange.IntervalFn) thanosCacheKeyGenerator { +func newThanosCacheKeyGenerator() thanosCacheKeyGenerator { return thanosCacheKeyGenerator{ - interval: intervalFn, resolutions: []int64{downsample.ResLevel2, downsample.ResLevel1, downsample.ResLevel0}, } } @@ -26,20 +24,26 @@ func newThanosCacheKeyGenerator(intervalFn queryrange.IntervalFn) thanosCacheKey // GenerateCacheKey generates a cache key based on the Request and interval. // TODO(yeya24): Add other request params as request key. func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Request) string { - currentInterval := r.GetStart() / t.interval(r).Milliseconds() switch tr := r.(type) { case *ThanosQueryRangeRequest: i := 0 for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ { } shardInfoKey := generateShardInfoKey(tr) - return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s:%d:%s", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey, tr.LookbackDelta, tr.Engine) + splitInterval := tr.SplitInterval.Milliseconds() + currentInterval := r.GetStart() / splitInterval + return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%d:%s:%d:%s", userID, tr.Query, tr.Step, splitInterval, currentInterval, i, shardInfoKey, tr.LookbackDelta, tr.Engine) case *ThanosLabelsRequest: - return fmt.Sprintf("fe:%s:%s:%s:%d", userID, tr.Label, tr.Matchers, currentInterval) + splitInterval := tr.SplitInterval.Milliseconds() + currentInterval := r.GetStart() / splitInterval + return fmt.Sprintf("fe:%s:%s:%s:%d:%d", userID, tr.Label, tr.Matchers, splitInterval, currentInterval) case *ThanosSeriesRequest: - return fmt.Sprintf("fe:%s:%s:%d", userID, tr.Matchers, currentInterval) + splitInterval := tr.SplitInterval.Milliseconds() + currentInterval := r.GetStart() / splitInterval + return fmt.Sprintf("fe:%s:%s:%d:%d", userID, tr.Matchers, splitInterval, currentInterval) } - return fmt.Sprintf("fe:%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval) + + return "request_type_not_supported" } func generateShardInfoKey(r *ThanosQueryRangeRequest) string { diff --git a/pkg/queryfrontend/cache_test.go b/pkg/queryfrontend/cache_test.go index 9cd14ff2b81..a257cdb730d 100644 --- a/pkg/queryfrontend/cache_test.go +++ b/pkg/queryfrontend/cache_test.go @@ -15,8 +15,7 @@ import ( ) func TestGenerateCacheKey(t *testing.T) { - intervalFn := func(r queryrange.Request) time.Duration { return hour } - splitter := newThanosCacheKeyGenerator(intervalFn) + splitter := newThanosCacheKeyGenerator() for _, tc := range []struct { name string @@ -30,25 +29,27 @@ func TestGenerateCacheKey(t *testing.T) { Start: 0, Step: 60 * seconds, }, - expected: "fe::up:60000:0", + expected: "request_type_not_supported", }, { name: "non downsampling resolution specified", req: &ThanosQueryRangeRequest{ - Query: "up", - Start: 0, - Step: 60 * seconds, + Query: "up", + Start: 0, + Step: 60 * seconds, + SplitInterval: time.Hour, }, - expected: "fe::up:60000:0:2:-:0:", + expected: "fe::up:60000:3600000:0:2:-:0:", }, { name: "10s step", req: &ThanosQueryRangeRequest{ - Query: "up", - Start: 0, - Step: 10 * seconds, + Query: "up", + Start: 0, + Step: 10 * seconds, + SplitInterval: time.Hour, }, - expected: "fe::up:10000:0:2:-:0:", + expected: "fe::up:10000:3600000:0:2:-:0:", }, { name: "1m downsampling resolution", @@ -57,8 +58,9 @@ func TestGenerateCacheKey(t *testing.T) { Start: 0, Step: 10 * seconds, MaxSourceResolution: 60 * seconds, + SplitInterval: time.Hour, }, - expected: "fe::up:10000:0:2:-:0:", + expected: "fe::up:10000:3600000:0:2:-:0:", }, { name: "5m downsampling resolution, different cache key", @@ -67,8 +69,9 @@ func TestGenerateCacheKey(t *testing.T) { Start: 0, Step: 10 * seconds, MaxSourceResolution: 300 * seconds, + SplitInterval: time.Hour, }, - expected: "fe::up:10000:0:1:-:0:", + expected: "fe::up:10000:3600000:0:1:-:0:", }, { name: "1h downsampling resolution, different cache key", @@ -77,8 +80,9 @@ func TestGenerateCacheKey(t *testing.T) { Start: 0, Step: 10 * seconds, MaxSourceResolution: hour, + SplitInterval: time.Hour, }, - expected: "fe::up:10000:0:0:-:0:", + expected: "fe::up:10000:3600000:0:0:-:0:", }, { name: "1h downsampling resolution with lookback delta", @@ -88,23 +92,26 @@ func TestGenerateCacheKey(t *testing.T) { Step: 10 * seconds, MaxSourceResolution: hour, LookbackDelta: 1000, + SplitInterval: time.Hour, }, - expected: "fe::up:10000:0:0:-:1000:", + expected: "fe::up:10000:3600000:0:0:-:1000:", }, { name: "label names, no matcher", req: &ThanosLabelsRequest{ - Start: 0, + Start: 0, + SplitInterval: time.Hour, }, - expected: "fe:::[]:0", + expected: "fe:::[]:3600000:0", }, { name: "label names, single matcher", req: &ThanosLabelsRequest{ - Start: 0, - Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + Start: 0, + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + SplitInterval: time.Hour, }, - expected: `fe:::[[foo="bar"]]:0`, + expected: `fe:::[[foo="bar"]]:3600000:0`, }, { name: "label names, multiple matchers", @@ -114,25 +121,28 @@ func TestGenerateCacheKey(t *testing.T) { {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, {labels.MustNewMatcher(labels.MatchEqual, "baz", "qux")}, }, + SplitInterval: time.Hour, }, - expected: `fe:::[[foo="bar"] [baz="qux"]]:0`, + expected: `fe:::[[foo="bar"] [baz="qux"]]:3600000:0`, }, { name: "label values, no matcher", req: &ThanosLabelsRequest{ - Start: 0, - Label: "up", + Start: 0, + Label: "up", + SplitInterval: time.Hour, }, - expected: "fe::up:[]:0", + expected: "fe::up:[]:3600000:0", }, { name: "label values, single matcher", req: &ThanosLabelsRequest{ - Start: 0, - Label: "up", - Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + Start: 0, + Label: "up", + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + SplitInterval: time.Hour, }, - expected: `fe::up:[[foo="bar"]]:0`, + expected: `fe::up:[[foo="bar"]]:3600000:0`, }, { name: "label values, multiple matchers", @@ -143,8 +153,9 @@ func TestGenerateCacheKey(t *testing.T) { {labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}, {labels.MustNewMatcher(labels.MatchEqual, "baz", "qux")}, }, + SplitInterval: time.Hour, }, - expected: `fe::up:[[foo="bar"] [baz="qux"]]:0`, + expected: `fe::up:[[foo="bar"] [baz="qux"]]:3600000:0`, }, } { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/queryfrontend/request.go b/pkg/queryfrontend/request.go index 4a6fb3f0d87..69c98682d38 100644 --- a/pkg/queryfrontend/request.go +++ b/pkg/queryfrontend/request.go @@ -27,6 +27,12 @@ type ShardedRequest interface { WithShardInfo(info *storepb.ShardInfo) queryrange.Request } +// SplitRequest interface represents a query request that can be split horizontally. +type SplitRequest interface { + GetSplitInterval() time.Duration + WithSplitInterval(interval time.Duration) queryrange.Request +} + type RequestHeader struct { Name string Values []string @@ -57,6 +63,7 @@ type ThanosQueryRangeRequest struct { LookbackDelta int64 Analyze bool Engine string + SplitInterval time.Duration } // IsDedupEnabled returns true if deduplication is enabled. @@ -83,6 +90,8 @@ func (r *ThanosQueryRangeRequest) GetCachingOptions() queryrange.CachingOptions func (r *ThanosQueryRangeRequest) GetStats() string { return r.Stats } +func (r *ThanosQueryRangeRequest) GetSplitInterval() time.Duration { return r.SplitInterval } + func (r *ThanosQueryRangeRequest) WithStats(stats string) queryrange.Request { q := *r q.Stats = stats @@ -111,6 +120,13 @@ func (r *ThanosQueryRangeRequest) WithShardInfo(info *storepb.ShardInfo) queryra return &q } +// WithSplitInterval clones the current request with a different split interval. +func (r *ThanosQueryRangeRequest) WithSplitInterval(interval time.Duration) queryrange.Request { + q := *r + q.SplitInterval = interval + return &q +} + // LogToSpan writes information about this request to an OpenTracing span. func (r *ThanosQueryRangeRequest) LogToSpan(sp opentracing.Span) { fields := []otlog.Field{ @@ -246,6 +262,7 @@ type ThanosLabelsRequest struct { CachingOptions queryrange.CachingOptions Headers []*RequestHeader Stats string + SplitInterval time.Duration } // GetStoreMatchers returns store matches. @@ -268,6 +285,8 @@ func (r *ThanosLabelsRequest) GetCachingOptions() queryrange.CachingOptions { re func (r *ThanosLabelsRequest) GetStats() string { return r.Stats } +func (r *ThanosLabelsRequest) GetSplitInterval() time.Duration { return r.SplitInterval } + func (r *ThanosLabelsRequest) WithStats(stats string) queryrange.Request { q := *r q.Stats = stats @@ -288,6 +307,13 @@ func (r *ThanosLabelsRequest) WithQuery(_ string) queryrange.Request { return &q } +// WithSplitInterval clones the current request with a different split interval. +func (r *ThanosLabelsRequest) WithSplitInterval(interval time.Duration) queryrange.Request { + q := *r + q.SplitInterval = interval + return &q +} + // LogToSpan writes information about this request to an OpenTracing span. func (r *ThanosLabelsRequest) LogToSpan(sp opentracing.Span) { fields := []otlog.Field{ @@ -328,6 +354,7 @@ type ThanosSeriesRequest struct { CachingOptions queryrange.CachingOptions Headers []*RequestHeader Stats string + SplitInterval time.Duration } // IsDedupEnabled returns true if deduplication is enabled. @@ -353,6 +380,8 @@ func (r *ThanosSeriesRequest) GetCachingOptions() queryrange.CachingOptions { re func (r *ThanosSeriesRequest) GetStats() string { return r.Stats } +func (r *ThanosSeriesRequest) GetSplitInterval() time.Duration { return r.SplitInterval } + func (r *ThanosSeriesRequest) WithStats(stats string) queryrange.Request { q := *r q.Stats = stats @@ -373,6 +402,13 @@ func (r *ThanosSeriesRequest) WithQuery(_ string) queryrange.Request { return &q } +// WithSplitInterval clones the current request with a different split interval. +func (r *ThanosSeriesRequest) WithSplitInterval(interval time.Duration) queryrange.Request { + q := *r + q.SplitInterval = interval + return &q +} + // LogToSpan writes information about this request to an OpenTracing span. func (r *ThanosSeriesRequest) LogToSpan(sp opentracing.Span) { fields := []otlog.Field{ diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index f6c04403079..095799e68d9 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -216,7 +216,7 @@ func newQueryRangeTripperware( queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware( logger, *config.ResultsCacheConfig, - newThanosCacheKeyGenerator(dynamicIntervalFn(config)), + newThanosCacheKeyGenerator(), limits, codec, queryrange.PrometheusResponseExtractor{}, @@ -299,11 +299,10 @@ func newLabelsTripperware( } if config.ResultsCacheConfig != nil { - staticIntervalFn := func(_ queryrange.Request) time.Duration { return config.SplitQueriesByInterval } queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware( logger, *config.ResultsCacheConfig, - newThanosCacheKeyGenerator(staticIntervalFn), + newThanosCacheKeyGenerator(), limits, codec, ThanosResponseExtractor{}, diff --git a/pkg/queryfrontend/split_by_interval.go b/pkg/queryfrontend/split_by_interval.go index 9d79512303a..68d6127f56f 100644 --- a/pkg/queryfrontend/split_by_interval.go +++ b/pkg/queryfrontend/split_by_interval.go @@ -8,10 +8,12 @@ package queryfrontend import ( "context" + "net/http" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" "github.com/thanos-io/thanos/internal/cortex/querier/queryrange" ) @@ -71,7 +73,9 @@ func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryran func splitQuery(r queryrange.Request, interval time.Duration) ([]queryrange.Request, error) { var reqs []queryrange.Request - if _, ok := r.(*ThanosQueryRangeRequest); ok { + + switch tr := r.(type) { + case *ThanosQueryRangeRequest: // Replace @ modifier function to their respective constant values in the query. // This way subqueries will be evaluated at the same time as the parent query. query, err := queryrange.EvaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd()) @@ -79,7 +83,7 @@ func splitQuery(r queryrange.Request, interval time.Duration) ([]queryrange.Requ return nil, err } if start := r.GetStart(); start == r.GetEnd() { - reqs = append(reqs, r.WithStartEnd(start, start)) + reqs = append(reqs, tr.WithSplitInterval(interval).WithStartEnd(start, start)) } else { for ; start < r.GetEnd(); start = nextIntervalBoundary(start, r.GetStep(), interval) + r.GetStep() { end := nextIntervalBoundary(start, r.GetStep(), interval) @@ -87,10 +91,10 @@ func splitQuery(r queryrange.Request, interval time.Duration) ([]queryrange.Requ end = r.GetEnd() } - reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end)) + reqs = append(reqs, tr.WithSplitInterval(interval).WithQuery(query).WithStartEnd(start, end)) } } - } else { + case SplitRequest: dur := int64(interval / time.Millisecond) for start := r.GetStart(); start < r.GetEnd(); start = start + dur { end := start + dur @@ -98,8 +102,10 @@ func splitQuery(r queryrange.Request, interval time.Duration) ([]queryrange.Requ end = r.GetEnd() } - reqs = append(reqs, r.WithStartEnd(start, end)) + reqs = append(reqs, tr.WithSplitInterval(interval).WithStartEnd(start, end)) } + default: + return nil, httpgrpc.Errorf(http.StatusBadRequest, `{"status": "error", "error": "request type %T not supported"}`, r) } return reqs, nil diff --git a/pkg/queryfrontend/split_by_interval_test.go b/pkg/queryfrontend/split_by_interval_test.go index 560fd51496f..11469f94067 100644 --- a/pkg/queryfrontend/split_by_interval_test.go +++ b/pkg/queryfrontend/split_by_interval_test.go @@ -30,10 +30,11 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 0, - End: 60 * 60 * seconds, - Step: 15 * seconds, - Query: "foo", + Start: 0, + End: 60 * 60 * seconds, + Step: 15 * seconds, + Query: "foo", + SplitInterval: day, }, }, interval: day, @@ -47,10 +48,11 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 60 * 60 * seconds, - End: 60 * 60 * seconds, - Step: 15 * seconds, - Query: "foo", + Start: 60 * 60 * seconds, + End: 60 * 60 * seconds, + Step: 15 * seconds, + Query: "foo", + SplitInterval: day, }, }, interval: day, @@ -64,10 +66,11 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 0, - End: 60 * 60 * seconds, - Step: 15 * seconds, - Query: "foo", + Start: 0, + End: 60 * 60 * seconds, + Step: 15 * seconds, + Query: "foo", + SplitInterval: 3 * time.Hour, }, }, interval: 3 * time.Hour, @@ -81,10 +84,11 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 0, - End: 24 * 3600 * seconds, - Step: 15 * seconds, - Query: "foo", + Start: 0, + End: 24 * 3600 * seconds, + Step: 15 * seconds, + Query: "foo", + SplitInterval: day, }, }, interval: day, @@ -98,10 +102,11 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 0, - End: 3 * 3600 * seconds, - Step: 15 * seconds, - Query: "foo", + Start: 0, + End: 3 * 3600 * seconds, + Step: 15 * seconds, + Query: "foo", + SplitInterval: 3 * time.Hour, }, }, interval: 3 * time.Hour, @@ -115,16 +120,18 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 0, - End: (24 * 3600 * seconds) - (15 * seconds), - Step: 15 * seconds, - Query: "foo @ 0.000", + Start: 0, + End: (24 * 3600 * seconds) - (15 * seconds), + Step: 15 * seconds, + Query: "foo @ 0.000", + SplitInterval: day, }, &ThanosQueryRangeRequest{ - Start: 24 * 3600 * seconds, - End: 2 * 24 * 3600 * seconds, - Step: 15 * seconds, - Query: "foo @ 0.000", + Start: 24 * 3600 * seconds, + End: 2 * 24 * 3600 * seconds, + Step: 15 * seconds, + Query: "foo @ 0.000", + SplitInterval: day, }, }, interval: day, @@ -138,16 +145,18 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 0, - End: (24 * 3600 * seconds) - (15 * seconds), - Step: 15 * seconds, - Query: "foo @ 172800.000", + Start: 0, + End: (24 * 3600 * seconds) - (15 * seconds), + Step: 15 * seconds, + Query: "foo @ 172800.000", + SplitInterval: day, }, &ThanosQueryRangeRequest{ - Start: 24 * 3600 * seconds, - End: 2 * 24 * 3600 * seconds, - Step: 15 * seconds, - Query: "foo @ 172800.000", + Start: 24 * 3600 * seconds, + End: 2 * 24 * 3600 * seconds, + Step: 15 * seconds, + Query: "foo @ 172800.000", + SplitInterval: day, }, }, interval: day, @@ -161,16 +170,18 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 0, - End: (3 * 3600 * seconds) - (15 * seconds), - Step: 15 * seconds, - Query: "foo", + Start: 0, + End: (3 * 3600 * seconds) - (15 * seconds), + Step: 15 * seconds, + Query: "foo", + SplitInterval: 3 * time.Hour, }, &ThanosQueryRangeRequest{ - Start: 3 * 3600 * seconds, - End: 2 * 3 * 3600 * seconds, - Step: 15 * seconds, - Query: "foo", + Start: 3 * 3600 * seconds, + End: 2 * 3 * 3600 * seconds, + Step: 15 * seconds, + Query: "foo", + SplitInterval: 3 * time.Hour, }, }, interval: 3 * time.Hour, @@ -184,22 +195,25 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 3 * 3600 * seconds, - End: (24 * 3600 * seconds) - (15 * seconds), - Step: 15 * seconds, - Query: "foo", + Start: 3 * 3600 * seconds, + End: (24 * 3600 * seconds) - (15 * seconds), + Step: 15 * seconds, + Query: "foo", + SplitInterval: day, }, &ThanosQueryRangeRequest{ - Start: 24 * 3600 * seconds, - End: (2 * 24 * 3600 * seconds) - (15 * seconds), - Step: 15 * seconds, - Query: "foo", + Start: 24 * 3600 * seconds, + End: (2 * 24 * 3600 * seconds) - (15 * seconds), + Step: 15 * seconds, + Query: "foo", + SplitInterval: day, }, &ThanosQueryRangeRequest{ - Start: 2 * 24 * 3600 * seconds, - End: 3 * 24 * 3600 * seconds, - Step: 15 * seconds, - Query: "foo", + Start: 2 * 24 * 3600 * seconds, + End: 3 * 24 * 3600 * seconds, + Step: 15 * seconds, + Query: "foo", + SplitInterval: day, }, }, interval: day, @@ -213,22 +227,25 @@ func TestSplitQuery(t *testing.T) { }, expected: []queryrange.Request{ &ThanosQueryRangeRequest{ - Start: 2 * 3600 * seconds, - End: (3 * 3600 * seconds) - (15 * seconds), - Step: 15 * seconds, - Query: "foo", + Start: 2 * 3600 * seconds, + End: (3 * 3600 * seconds) - (15 * seconds), + Step: 15 * seconds, + Query: "foo", + SplitInterval: 3 * time.Hour, }, &ThanosQueryRangeRequest{ - Start: 3 * 3600 * seconds, - End: (2 * 3 * 3600 * seconds) - (15 * seconds), - Step: 15 * seconds, - Query: "foo", + Start: 3 * 3600 * seconds, + End: (2 * 3 * 3600 * seconds) - (15 * seconds), + Step: 15 * seconds, + Query: "foo", + SplitInterval: 3 * time.Hour, }, &ThanosQueryRangeRequest{ - Start: 2 * 3 * 3600 * seconds, - End: 3 * 3 * 3600 * seconds, - Step: 15 * seconds, - Query: "foo", + Start: 2 * 3 * 3600 * seconds, + End: 3 * 3 * 3600 * seconds, + Step: 15 * seconds, + Query: "foo", + SplitInterval: 3 * time.Hour, }, }, interval: 3 * time.Hour, @@ -258,3 +275,20 @@ func TestSplitQuery_PromQLErrorReturnsJson(t *testing.T) { require.True(t, json.Valid(resp.Body), "error message is not valid JSON: %s", resp.Body) } + +func TestSplitQuery_PrometheusRequest(t *testing.T) { + input := &queryrange.PrometheusRequest{ + Start: 2 * 3600 * seconds, + End: 3 * 3 * 3600 * seconds, + Step: 15 * seconds, + Query: "foo", + } + queries, err := splitQuery(input, 1*time.Hour) + require.Error(t, err) + require.Nil(t, queries) + + resp, ok := httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "could not assemble httpgrpc.HTTPResponse, is not status.Status") + + require.True(t, json.Valid(resp.Body), "error message is not valid JSON: %s", resp.Body) +} From 8f9d0d4c1ab49272a01e4cac290013d34ea16ec0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Mazur?= Date: Tue, 15 Oct 2024 12:01:43 +0200 Subject: [PATCH 2/3] query-frontend: Fix dynamic split interval e2e test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Mazur --- CHANGELOG.md | 1 + test/e2e/query_frontend_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index afd59dc4406..ab2963fcf5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796. - [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints. - [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging. +- [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals. ### Added - [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics. diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 772241d34b5..f829d15474d 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -764,7 +764,7 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) { // make sure that we don't break cortex cache code. testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(0), "cortex_cache_hits_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_added_new_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_added_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_misses_total")) From d4725c3b70e98cb2218b8582414b1d44b633247a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Mazur?= Date: Wed, 16 Oct 2024 14:54:07 +0200 Subject: [PATCH 3/3] query-frontend: Refactor cache key generation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Mazur --- pkg/queryfrontend/cache.go | 34 ++++++++++++++++----------------- pkg/queryfrontend/cache_test.go | 29 +++++++++++++++++++--------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/pkg/queryfrontend/cache.go b/pkg/queryfrontend/cache.go index 8b15cb89881..2629bc4e5fc 100644 --- a/pkg/queryfrontend/cache.go +++ b/pkg/queryfrontend/cache.go @@ -24,26 +24,26 @@ func newThanosCacheKeyGenerator() thanosCacheKeyGenerator { // GenerateCacheKey generates a cache key based on the Request and interval. // TODO(yeya24): Add other request params as request key. func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Request) string { - switch tr := r.(type) { - case *ThanosQueryRangeRequest: - i := 0 - for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ { - } - shardInfoKey := generateShardInfoKey(tr) - splitInterval := tr.SplitInterval.Milliseconds() - currentInterval := r.GetStart() / splitInterval - return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%d:%s:%d:%s", userID, tr.Query, tr.Step, splitInterval, currentInterval, i, shardInfoKey, tr.LookbackDelta, tr.Engine) - case *ThanosLabelsRequest: - splitInterval := tr.SplitInterval.Milliseconds() - currentInterval := r.GetStart() / splitInterval - return fmt.Sprintf("fe:%s:%s:%s:%d:%d", userID, tr.Label, tr.Matchers, splitInterval, currentInterval) - case *ThanosSeriesRequest: - splitInterval := tr.SplitInterval.Milliseconds() + if sr, ok := r.(SplitRequest); ok { + splitInterval := sr.GetSplitInterval().Milliseconds() currentInterval := r.GetStart() / splitInterval - return fmt.Sprintf("fe:%s:%s:%d:%d", userID, tr.Matchers, splitInterval, currentInterval) + + switch tr := r.(type) { + case *ThanosQueryRangeRequest: + i := 0 + for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ { + } + shardInfoKey := generateShardInfoKey(tr) + return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%d:%s:%d:%s", userID, tr.Query, tr.Step, splitInterval, currentInterval, i, shardInfoKey, tr.LookbackDelta, tr.Engine) + case *ThanosLabelsRequest: + return fmt.Sprintf("fe:%s:%s:%s:%d:%d", userID, tr.Label, tr.Matchers, splitInterval, currentInterval) + case *ThanosSeriesRequest: + return fmt.Sprintf("fe:%s:%s:%d:%d", userID, tr.Matchers, splitInterval, currentInterval) + } } - return "request_type_not_supported" + // all possible request types are already covered + panic("request type not supported") } func generateShardInfoKey(r *ThanosQueryRangeRequest) string { diff --git a/pkg/queryfrontend/cache_test.go b/pkg/queryfrontend/cache_test.go index a257cdb730d..32485c0d083 100644 --- a/pkg/queryfrontend/cache_test.go +++ b/pkg/queryfrontend/cache_test.go @@ -22,15 +22,6 @@ func TestGenerateCacheKey(t *testing.T) { req queryrange.Request expected string }{ - { - name: "non thanos req", - req: &queryrange.PrometheusRequest{ - Query: "up", - Start: 0, - Step: 60 * seconds, - }, - expected: "request_type_not_supported", - }, { name: "non downsampling resolution specified", req: &ThanosQueryRangeRequest{ @@ -164,3 +155,23 @@ func TestGenerateCacheKey(t *testing.T) { }) } } + +func TestGenerateCacheKey_UnsupportedRequest(t *testing.T) { + splitter := newThanosCacheKeyGenerator() + + req := &queryrange.PrometheusRequest{ + Query: "up", + Start: 0, + Step: 60 * seconds, + } + + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic") + } else { + testutil.Assert(t, r == "request type not supported", "unexpected panic: %v", r) + } + }() + + splitter.GenerateCacheKey("", req) +}