Skip to content

Commit dce0c72

Browse files
owen-dtomwilkie
andauthored
Do not split requests around ingester boundaries (#2766)
* only shards requests which dont touch ingesters Signed-off-by: Owen Diehl <[email protected]> * updates changelog Signed-off-by: Owen Diehl <[email protected]> Co-authored-by: Tom Wilkie <[email protected]>
1 parent 77db938 commit dce0c72

File tree

3 files changed

+74
-100
lines changed

3 files changed

+74
-100
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
2525
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
2626
* [BUGFIX] Fix race in processing of headers in sharded queries. #2762
27+
* [BUGFIX] Query Frontend: Do not re-split sharded requests around ingester boundaries. #2766
2728

2829
## 1.2.0 / 2020-06-xx
2930
(in progress of release: current release candidate is https://github.com/cortexproject/cortex/releases/tag/v1.2.0-rc.0)

pkg/querier/queryrange/querysharding.go

Lines changed: 4 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -252,73 +252,10 @@ type shardSplitter struct {
252252

253253
func (splitter *shardSplitter) Do(ctx context.Context, r Request) (Response, error) {
254254
cutoff := splitter.now().Add(-splitter.MinShardingLookback)
255-
sharded, nonsharded := partitionRequest(r, cutoff)
256255

257-
return splitter.parallel(ctx, sharded, nonsharded)
258-
259-
}
260-
261-
func (splitter *shardSplitter) parallel(ctx context.Context, sharded, nonsharded Request) (Response, error) {
262-
if sharded == nil {
263-
return splitter.next.Do(ctx, nonsharded)
256+
// Only attempt to shard queries which are older than the sharding lookback (the period for which ingesters are also queried).
257+
if !cutoff.After(util.TimeFromMillis(r.GetEnd())) {
258+
return splitter.next.Do(ctx, r)
264259
}
265-
266-
if nonsharded == nil {
267-
return splitter.shardingware.Do(ctx, sharded)
268-
}
269-
270-
nonshardCh := make(chan Response, 1)
271-
shardCh := make(chan Response, 1)
272-
errCh := make(chan error, 2)
273-
274-
go func() {
275-
res, err := splitter.next.Do(ctx, nonsharded)
276-
if err != nil {
277-
errCh <- err
278-
return
279-
}
280-
nonshardCh <- res
281-
282-
}()
283-
284-
go func() {
285-
res, err := splitter.shardingware.Do(ctx, sharded)
286-
if err != nil {
287-
errCh <- err
288-
return
289-
}
290-
shardCh <- res
291-
}()
292-
293-
resps := make([]Response, 0, 2)
294-
for i := 0; i < 2; i++ {
295-
select {
296-
case r := <-nonshardCh:
297-
resps = append(resps, r)
298-
case r := <-shardCh:
299-
resps = append(resps, r)
300-
case err := <-errCh:
301-
return nil, err
302-
case <-ctx.Done():
303-
return nil, ctx.Err()
304-
}
305-
306-
}
307-
308-
return splitter.codec.MergeResponse(resps...)
309-
}
310-
311-
// partitionQuery splits a request into potentially multiple requests, one including the request's time range
312-
// [0,t). The other will include [t,inf)
313-
func partitionRequest(r Request, t time.Time) (before Request, after Request) {
314-
boundary := util.TimeToMillis(t)
315-
if r.GetStart() >= boundary {
316-
return nil, r
317-
}
318-
319-
if r.GetEnd() < boundary {
320-
return r, nil
321-
}
322-
323-
return r.WithStartEnd(r.GetStart(), boundary), r.WithStartEnd(boundary, r.GetEnd())
260+
return splitter.shardingware.Do(ctx, r)
324261
}

pkg/querier/queryrange/querysharding_test.go

Lines changed: 69 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -425,45 +425,81 @@ func TestQueryshardingCorrectness(t *testing.T) {
425425

426426
func TestShardSplitting(t *testing.T) {
427427

428-
req := &PrometheusRequest{
429-
Path: "/query_range",
430-
Start: util.TimeToMillis(start),
431-
End: util.TimeToMillis(end),
432-
Step: int64(step) / int64(time.Second),
433-
Query: "sum(rate(bar1[1m]))",
434-
}
435-
436-
shardingware := NewQueryShardMiddleware(
437-
log.NewNopLogger(),
438-
engine,
439-
// ensure that all requests are shard compatbile
440-
ShardingConfigs{
441-
chunk.PeriodConfig{
442-
Schema: "v10",
443-
RowShards: uint32(2),
444-
},
428+
for _, tc := range []struct {
429+
desc string
430+
lookback time.Duration
431+
shouldShard bool
432+
}{
433+
{
434+
desc: "older than lookback",
435+
lookback: -1, // a negative lookback will ensure the entire query doesn't cross the sharding boundary & can safely be sharded.
436+
shouldShard: true,
445437
},
446-
PrometheusCodec,
447-
end.Sub(start)/2, // shard 1/2 of the req
448-
nil,
449-
nil,
450-
)
438+
{
439+
desc: "overlaps lookback",
440+
lookback: end.Sub(start) / 2, // intersect the request causing it to avoid sharding
441+
shouldShard: false,
442+
},
443+
{
444+
desc: "newer than lookback",
445+
lookback: end.Sub(start) + 1,
446+
shouldShard: false,
447+
},
448+
} {
449+
t.Run(tc.desc, func(t *testing.T) {
450+
req := &PrometheusRequest{
451+
Path: "/query_range",
452+
Start: util.TimeToMillis(start),
453+
End: util.TimeToMillis(end),
454+
Step: int64(step) / int64(time.Second),
455+
Query: "sum(rate(bar1[1m]))",
456+
}
451457

452-
downstream := &downstreamHandler{
453-
engine: engine,
454-
queryable: shardAwareQueryable,
455-
}
458+
shardingware := NewQueryShardMiddleware(
459+
log.NewNopLogger(),
460+
engine,
461+
// ensure that all requests are shard compatbile
462+
ShardingConfigs{
463+
chunk.PeriodConfig{
464+
Schema: "v10",
465+
RowShards: uint32(2),
466+
},
467+
},
468+
PrometheusCodec,
469+
tc.lookback,
470+
nil,
471+
nil,
472+
)
456473

457-
handler := shardingware.Wrap(downstream).(*shardSplitter)
458-
handler.now = func() time.Time { return end } // make the split cut the request in half (don't use time.Now)
474+
downstream := &downstreamHandler{
475+
engine: engine,
476+
queryable: shardAwareQueryable,
477+
}
459478

460-
resp, err := handler.Do(context.Background(), req)
461-
require.Nil(t, err)
479+
handler := shardingware.Wrap(downstream).(*shardSplitter)
480+
handler.now = func() time.Time { return end } // make the split cut the request in half (don't use time.Now)
462481

463-
unaltered, err := downstream.Do(context.Background(), req)
464-
require.Nil(t, err)
482+
var didShard bool
483+
484+
old := handler.shardingware
485+
handler.shardingware = HandlerFunc(func(ctx context.Context, req Request) (Response, error) {
486+
didShard = true
487+
return old.Do(ctx, req)
488+
})
489+
490+
resp, err := handler.Do(context.Background(), req)
491+
require.Nil(t, err)
492+
493+
require.Equal(t, tc.shouldShard, didShard)
494+
495+
unaltered, err := downstream.Do(context.Background(), req)
496+
require.Nil(t, err)
497+
498+
approximatelyEquals(t, unaltered.(*PrometheusResponse), resp.(*PrometheusResponse))
499+
500+
})
501+
}
465502

466-
approximatelyEquals(t, unaltered.(*PrometheusResponse), resp.(*PrometheusResponse))
467503
}
468504

469505
func BenchmarkQuerySharding(b *testing.B) {

0 commit comments

Comments
 (0)