Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.
- [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints.
- [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging.
- [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals.

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
34 changes: 19 additions & 15 deletions pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,38 @@ import (

// thanosCacheKeyGenerator is a utility for using split interval when determining cache keys.
type thanosCacheKeyGenerator struct {
interval queryrange.IntervalFn
resolutions []int64
}

func newThanosCacheKeyGenerator(intervalFn queryrange.IntervalFn) thanosCacheKeyGenerator {
func newThanosCacheKeyGenerator() thanosCacheKeyGenerator {
return thanosCacheKeyGenerator{
interval: intervalFn,
resolutions: []int64{downsample.ResLevel2, downsample.ResLevel1, downsample.ResLevel0},
}
}

// GenerateCacheKey generates a cache key based on the Request and interval.
// TODO(yeya24): Add other request params as request key.
func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Request) string {
currentInterval := r.GetStart() / t.interval(r).Milliseconds()
switch tr := r.(type) {
case *ThanosQueryRangeRequest:
i := 0
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
if sr, ok := r.(SplitRequest); ok {
splitInterval := sr.GetSplitInterval().Milliseconds()
currentInterval := r.GetStart() / splitInterval

switch tr := r.(type) {
case *ThanosQueryRangeRequest:
i := 0
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
}
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%d:%s:%d:%s", userID, tr.Query, tr.Step, splitInterval, currentInterval, i, shardInfoKey, tr.LookbackDelta, tr.Engine)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d:%d", userID, tr.Label, tr.Matchers, splitInterval, currentInterval)
case *ThanosSeriesRequest:
return fmt.Sprintf("fe:%s:%s:%d:%d", userID, tr.Matchers, splitInterval, currentInterval)
}
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s:%d:%s", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey, tr.LookbackDelta, tr.Engine)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d", userID, tr.Label, tr.Matchers, currentInterval)
case *ThanosSeriesRequest:
return fmt.Sprintf("fe:%s:%s:%d", userID, tr.Matchers, currentInterval)
}
return fmt.Sprintf("fe:%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)

// all possible request types are already covered
panic("request type not supported")
}

func generateShardInfoKey(r *ThanosQueryRangeRequest) string {
Expand Down
96 changes: 59 additions & 37 deletions pkg/queryfrontend/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,32 @@ import (
)

func TestGenerateCacheKey(t *testing.T) {
intervalFn := func(r queryrange.Request) time.Duration { return hour }
splitter := newThanosCacheKeyGenerator(intervalFn)
splitter := newThanosCacheKeyGenerator()

for _, tc := range []struct {
name string
req queryrange.Request
expected string
}{
{
name: "non thanos req",
req: &queryrange.PrometheusRequest{
Query: "up",
Start: 0,
Step: 60 * seconds,
},
expected: "fe::up:60000:0",
},
{
name: "non downsampling resolution specified",
req: &ThanosQueryRangeRequest{
Query: "up",
Start: 0,
Step: 60 * seconds,
Query: "up",
Start: 0,
Step: 60 * seconds,
SplitInterval: time.Hour,
},
expected: "fe::up:60000:0:2:-:0:",
expected: "fe::up:60000:3600000:0:2:-:0:",
},
{
name: "10s step",
req: &ThanosQueryRangeRequest{
Query: "up",
Start: 0,
Step: 10 * seconds,
Query: "up",
Start: 0,
Step: 10 * seconds,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:2:-:0:",
expected: "fe::up:10000:3600000:0:2:-:0:",
},
{
name: "1m downsampling resolution",
Expand All @@ -57,8 +49,9 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: 60 * seconds,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:2:-:0:",
expected: "fe::up:10000:3600000:0:2:-:0:",
},
{
name: "5m downsampling resolution, different cache key",
Expand All @@ -67,8 +60,9 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: 300 * seconds,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:1:-:0:",
expected: "fe::up:10000:3600000:0:1:-:0:",
},
{
name: "1h downsampling resolution, different cache key",
Expand All @@ -77,8 +71,9 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: hour,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:0:-:0:",
expected: "fe::up:10000:3600000:0:0:-:0:",
},
{
name: "1h downsampling resolution with lookback delta",
Expand All @@ -88,23 +83,26 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: hour,
LookbackDelta: 1000,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:0:-:1000:",
expected: "fe::up:10000:3600000:0:0:-:1000:",
},
{
name: "label names, no matcher",
req: &ThanosLabelsRequest{
Start: 0,
Start: 0,
SplitInterval: time.Hour,
},
expected: "fe:::[]:0",
expected: "fe:::[]:3600000:0",
},
{
name: "label names, single matcher",
req: &ThanosLabelsRequest{
Start: 0,
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
Start: 0,
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
SplitInterval: time.Hour,
},
expected: `fe:::[[foo="bar"]]:0`,
expected: `fe:::[[foo="bar"]]:3600000:0`,
},
{
name: "label names, multiple matchers",
Expand All @@ -114,25 +112,28 @@ func TestGenerateCacheKey(t *testing.T) {
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")},
{labels.MustNewMatcher(labels.MatchEqual, "baz", "qux")},
},
SplitInterval: time.Hour,
},
expected: `fe:::[[foo="bar"] [baz="qux"]]:0`,
expected: `fe:::[[foo="bar"] [baz="qux"]]:3600000:0`,
},
{
name: "label values, no matcher",
req: &ThanosLabelsRequest{
Start: 0,
Label: "up",
Start: 0,
Label: "up",
SplitInterval: time.Hour,
},
expected: "fe::up:[]:0",
expected: "fe::up:[]:3600000:0",
},
{
name: "label values, single matcher",
req: &ThanosLabelsRequest{
Start: 0,
Label: "up",
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
Start: 0,
Label: "up",
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
SplitInterval: time.Hour,
},
expected: `fe::up:[[foo="bar"]]:0`,
expected: `fe::up:[[foo="bar"]]:3600000:0`,
},
{
name: "label values, multiple matchers",
Expand All @@ -143,8 +144,9 @@ func TestGenerateCacheKey(t *testing.T) {
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")},
{labels.MustNewMatcher(labels.MatchEqual, "baz", "qux")},
},
SplitInterval: time.Hour,
},
expected: `fe::up:[[foo="bar"] [baz="qux"]]:0`,
expected: `fe::up:[[foo="bar"] [baz="qux"]]:3600000:0`,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -153,3 +155,23 @@ func TestGenerateCacheKey(t *testing.T) {
})
}
}

func TestGenerateCacheKey_UnsupportedRequest(t *testing.T) {
splitter := newThanosCacheKeyGenerator()

req := &queryrange.PrometheusRequest{
Query: "up",
Start: 0,
Step: 60 * seconds,
}

defer func() {
if r := recover(); r == nil {
t.Fatal("expected panic")
} else {
testutil.Assert(t, r == "request type not supported", "unexpected panic: %v", r)
}
}()

splitter.GenerateCacheKey("", req)
}
36 changes: 36 additions & 0 deletions pkg/queryfrontend/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ type ShardedRequest interface {
WithShardInfo(info *storepb.ShardInfo) queryrange.Request
}

// SplitRequest interface represents a query request that can be split horizontally.
type SplitRequest interface {
GetSplitInterval() time.Duration
WithSplitInterval(interval time.Duration) queryrange.Request
}

type RequestHeader struct {
Name string
Values []string
Expand Down Expand Up @@ -57,6 +63,7 @@ type ThanosQueryRangeRequest struct {
LookbackDelta int64
Analyze bool
Engine string
SplitInterval time.Duration
}

// IsDedupEnabled returns true if deduplication is enabled.
Expand All @@ -83,6 +90,8 @@ func (r *ThanosQueryRangeRequest) GetCachingOptions() queryrange.CachingOptions

func (r *ThanosQueryRangeRequest) GetStats() string { return r.Stats }

func (r *ThanosQueryRangeRequest) GetSplitInterval() time.Duration { return r.SplitInterval }

func (r *ThanosQueryRangeRequest) WithStats(stats string) queryrange.Request {
q := *r
q.Stats = stats
Expand Down Expand Up @@ -111,6 +120,13 @@ func (r *ThanosQueryRangeRequest) WithShardInfo(info *storepb.ShardInfo) queryra
return &q
}

// WithSplitInterval clones the current request with a different split interval.
func (r *ThanosQueryRangeRequest) WithSplitInterval(interval time.Duration) queryrange.Request {
q := *r
q.SplitInterval = interval
return &q
}

// LogToSpan writes information about this request to an OpenTracing span.
func (r *ThanosQueryRangeRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
Expand Down Expand Up @@ -246,6 +262,7 @@ type ThanosLabelsRequest struct {
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
Stats string
SplitInterval time.Duration
}

// GetStoreMatchers returns store matches.
Expand All @@ -268,6 +285,8 @@ func (r *ThanosLabelsRequest) GetCachingOptions() queryrange.CachingOptions { re

func (r *ThanosLabelsRequest) GetStats() string { return r.Stats }

func (r *ThanosLabelsRequest) GetSplitInterval() time.Duration { return r.SplitInterval }

func (r *ThanosLabelsRequest) WithStats(stats string) queryrange.Request {
q := *r
q.Stats = stats
Expand All @@ -288,6 +307,13 @@ func (r *ThanosLabelsRequest) WithQuery(_ string) queryrange.Request {
return &q
}

// WithSplitInterval clones the current request with a different split interval.
func (r *ThanosLabelsRequest) WithSplitInterval(interval time.Duration) queryrange.Request {
q := *r
q.SplitInterval = interval
return &q
}

// LogToSpan writes information about this request to an OpenTracing span.
func (r *ThanosLabelsRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
Expand Down Expand Up @@ -328,6 +354,7 @@ type ThanosSeriesRequest struct {
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
Stats string
SplitInterval time.Duration
}

// IsDedupEnabled returns true if deduplication is enabled.
Expand All @@ -353,6 +380,8 @@ func (r *ThanosSeriesRequest) GetCachingOptions() queryrange.CachingOptions { re

func (r *ThanosSeriesRequest) GetStats() string { return r.Stats }

func (r *ThanosSeriesRequest) GetSplitInterval() time.Duration { return r.SplitInterval }

func (r *ThanosSeriesRequest) WithStats(stats string) queryrange.Request {
q := *r
q.Stats = stats
Expand All @@ -373,6 +402,13 @@ func (r *ThanosSeriesRequest) WithQuery(_ string) queryrange.Request {
return &q
}

// WithSplitInterval clones the current request with a different split interval.
func (r *ThanosSeriesRequest) WithSplitInterval(interval time.Duration) queryrange.Request {
q := *r
q.SplitInterval = interval
return &q
}

// LogToSpan writes information about this request to an OpenTracing span.
func (r *ThanosSeriesRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
Expand Down
5 changes: 2 additions & 3 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func newQueryRangeTripperware(
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*config.ResultsCacheConfig,
newThanosCacheKeyGenerator(dynamicIntervalFn(config)),
newThanosCacheKeyGenerator(),
limits,
codec,
queryrange.PrometheusResponseExtractor{},
Expand Down Expand Up @@ -299,11 +299,10 @@ func newLabelsTripperware(
}

if config.ResultsCacheConfig != nil {
staticIntervalFn := func(_ queryrange.Request) time.Duration { return config.SplitQueriesByInterval }
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*config.ResultsCacheConfig,
newThanosCacheKeyGenerator(staticIntervalFn),
newThanosCacheKeyGenerator(),
limits,
codec,
ThanosResponseExtractor{},
Expand Down
Loading
Loading