Skip to content

Commit

Permalink
fix(blooms): Fix partitionSeriesByDay function (#12900)
Browse files Browse the repository at this point in the history
The function `partitionSeriesByDay` could yield empty when chunks were outside of the requested time range.
    
This would result in requests for a time range that does not contain any
chunks for a series to filter. These requests would return errors, because a subsequent `partitionSeriesByDay` for would remove that day resulting in no task.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored May 6, 2024
1 parent 1951322 commit 738c274
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
"series_requested", len(req.Refs),
)

if len(seriesByDay) != 1 {
if len(seriesByDay) > 1 {
stats.Status = labelFailure
return nil, errors.New("request time range must span exactly one day")
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bloomgateway

import (
"sort"
"time"

"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
Expand All @@ -13,13 +12,8 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

func getDayTime(ts model.Time) time.Time {
return ts.Time().UTC().Truncate(Day)
}

func truncateDay(ts model.Time) model.Time {
// model.minimumTick is time.Millisecond
return ts - (ts % model.Time(24*time.Hour/time.Millisecond))
return model.TimeFromUnix(ts.Time().Truncate(Day).Unix())
}

// getFromThrough assumes a list of ShortRefs sorted by From time
Expand Down Expand Up @@ -125,7 +119,7 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto
})

// All chunks fall outside of the range
if min == len(chunks) || max == 0 {
if min == len(chunks) || max == 0 || min == max {
continue
}

Expand All @@ -135,7 +129,6 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto
if chunks[max-1].Through > maxTs {
maxTs = chunks[max-1].Through
}
// fmt.Println("day", day, "series", series.Fingerprint, "minTs", minTs, "maxTs", maxTs)

res = append(res, &logproto.GroupedChunkRefs{
Fingerprint: series.Fingerprint,
Expand Down
42 changes: 39 additions & 3 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,46 @@ func TestPartitionRequest(t *testing.T) {
exp []seriesWithInterval
}{

"empty": {
"no series": {
inp: &logproto.FilterChunkRefRequest{
From: ts.Add(-24 * time.Hour),
Through: ts,
From: ts.Add(-12 * time.Hour),
Through: ts.Add(12 * time.Hour),
Refs: []*logproto.GroupedChunkRefs{},
},
exp: []seriesWithInterval{},
},

"no chunks for series": {
inp: &logproto.FilterChunkRefRequest{
From: ts.Add(-12 * time.Hour),
Through: ts.Add(12 * time.Hour),
Refs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{},
},
{
Fingerprint: 0x10,
Refs: []*logproto.ShortRef{},
},
},
},
exp: []seriesWithInterval{},
},

"chunks before and after requested day": {
inp: &logproto.FilterChunkRefRequest{
From: ts.Add(-2 * time.Hour),
Through: ts.Add(2 * time.Hour),
Refs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-13 * time.Hour), Through: ts.Add(-12 * time.Hour)},
{From: ts.Add(13 * time.Hour), Through: ts.Add(14 * time.Hour)},
},
},
},
},
exp: []seriesWithInterval{},
},
Expand Down

0 comments on commit 738c274

Please sign in to comment.