Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/promql-engine/api"

"github.com/thanos-io/promql-engine/api"
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/discovery/cache"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/exemplars"
Expand Down Expand Up @@ -128,10 +129,10 @@ func registerQuery(app *extkingpin.App) {
Strings()
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings()

enableDedupMerge := cmd.Flag("query.dedup-merge", "Enable deduplication merge of multiple time series with the same labels.").
Default("false").Bool()
enableQuorumChunkDedup := cmd.Flag("query.quorum-chunk-dedup", "Enable quorum-based deduplication for chunks from replicas.").
Default("false").Bool()
queryDeduplicationFunc := cmd.Flag("query.deduplication.func", "Experimental. Deduplication algorithm for merging overlapping series. "+
"Possible values are: \"penalty\", \"chain\". If no value is specified, penalty based deduplication algorithm will be used. "+
"When set to chain, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. At least one replica label has to be set via --query.replica-label flag.").
Default(dedup.AlgorithmPenalty).Enum(dedup.AlgorithmPenalty, dedup.AlgorithmChain, dedup.AlgorithmQuorum)

instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

Expand Down Expand Up @@ -338,6 +339,7 @@ func registerQuery(app *extkingpin.App) {
*queryConnMetricLabels,
*queryReplicaLabels,
*queryPartitionLabels,
*queryDeduplicationFunc,
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
Expand Down Expand Up @@ -381,8 +383,6 @@ func registerQuery(app *extkingpin.App) {
*enforceTenancy,
*tenantLabel,
*enableGroupReplicaPartialStrategy,
*enableDedupMerge,
*enableQuorumChunkDedup,
)
})
}
Expand Down Expand Up @@ -423,6 +423,7 @@ func runQuery(
queryConnMetricLabels []string,
queryReplicaLabels []string,
queryPartitionLabels []string,
queryDeduplicationFunc string,
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
Expand Down Expand Up @@ -466,8 +467,6 @@ func runQuery(
enforceTenancy bool,
tenantLabel string,
groupReplicaPartialResponseStrategy bool,
enableDedupMerge bool,
enableQuorumChunkDedup bool,
) error {
comp := component.Query
if alertQueryURL == "" {
Expand Down Expand Up @@ -554,7 +553,7 @@ func runQuery(
options := []store.ProxyStoreOption{
store.WithTSDBSelector(tsdbSelector),
store.WithProxyStoreDebugLogging(debugLogging),
store.WithQuorumChunkDedup(enableQuorumChunkDedup),
store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum),
}

// Parse and sanitize the provided replica labels flags.
Expand Down Expand Up @@ -596,7 +595,7 @@ func runQuery(
)
opts := query.Options{
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
EnableDedupMerge: enableDedupMerge,
DeduplicationFunc: queryDeduplicationFunc,
}
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
queryableCreator = query.NewQueryableCreatorWithOptions(
Expand Down
20 changes: 15 additions & 5 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ import (
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
const initialPenalty = 5000

const (
AlgorithmPenalty = "penalty"
AlgorithmChain = "chain"
AlgorithmQuorum = "quorum"
)

type dedupSeriesSet struct {
set storage.SeriesSet

Expand All @@ -32,7 +38,8 @@ type dedupSeriesSet struct {
peek storage.Series
ok bool

f string
f string
deduplicationFunc string
}

// isCounter deduces whether a counter metric has been passed. There must be
Expand Down Expand Up @@ -109,8 +116,8 @@ func (o *overlapSplitSet) Err() error {

// NewSeriesSet returns seriesSet that deduplicates the same series.
// The series in series set are expected be sorted by all labels.
func NewSeriesSet(set storage.SeriesSet, f string) storage.SeriesSet {
s := &dedupSeriesSet{set: set, f: f}
func NewSeriesSet(set storage.SeriesSet, f string, deduplicationFunc string) storage.SeriesSet {
s := &dedupSeriesSet{set: set, f: f, deduplicationFunc: deduplicationFunc}
s.ok = s.set.Next()
if s.ok {
s.peek = s.set.At()
Expand Down Expand Up @@ -160,9 +167,12 @@ func (s *dedupSeriesSet) At() storage.Series {
// Clients may store the series, so we must make a copy of the slice before advancing.
repl := make([]storage.Series, len(s.replicas))
copy(repl, s.replicas)
if s.f == UseMergedSeries {
if s.deduplicationFunc == AlgorithmChain {
return storage.ChainedSeriesMerge(repl...)
} else if s.deduplicationFunc == AlgorithmQuorum {
// merge all samples which are ingested via receiver, no skips.
return NewMergedSeries(s.lset, repl)
// feed the merged series into dedup series which apply counter adjustment
return NewMergedSeries(s.lset, repl, s.f)
}
return newDedupSeries(s.lset, repl, s.f)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dedup/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestDedupSeriesSet(t *testing.T) {
if tcase.isCounter {
f = "rate"
}
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f)
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmPenalty)
var ats []storage.Series
for dedupSet.Next() {
ats = append(ats, dedupSet.At())
Expand Down
37 changes: 23 additions & 14 deletions pkg/dedup/merge_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,37 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

const UseMergedSeries = "use_merged_series"

// mergedSeries is a storage.Series that implements a simple merge sort algorithm.
// when replicas has conflict values at the same timestamp, the first replica will be selected.
type mergedSeries struct {
lset labels.Labels
replicas []storage.Series

isCounter bool
}

func NewMergedSeries(lset labels.Labels, replicas []storage.Series) storage.Series {
func NewMergedSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series {
return &mergedSeries{
lset: lset,
replicas: replicas,

isCounter: isCounter(f),
}
}

func (m *mergedSeries) Labels() labels.Labels {
return m.lset
}
func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
iters := make([]chunkenc.Iterator, 0, len(m.replicas))
iters := make([]adjustableSeriesIterator, 0, len(m.replicas))
oks := make([]bool, 0, len(m.replicas))
for _, r := range m.replicas {
it := r.Iterator(nil)
var it adjustableSeriesIterator
if m.isCounter {
it = &counterErrAdjustSeriesIterator{Iterator: r.Iterator(nil)}
} else {
it = &noopAdjustableSeriesIterator{Iterator: r.Iterator(nil)}
}
Comment on lines +40 to +45
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are applying counter adjustments to the raw data samples before deduplication. This is more complicated than applying the adjustments to the single time series after deduplication. The adjustments will intervene with quorum-based deduplication logic. I'm concerned it may introduce other edge cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add another test case such that one raw time series has a large gap with a reset, but the other two have complete data?

replica 0: [[1000, 10], [10000, 8], [11000, 10]]

replica 1: [[1000, 10], [2000, 0], [3000, 1], [4000, 2], [5000, 3], [6000, 4], [7000, 5], [8000, 6], [9000, 7],  [10000, 8], [11000, 10]
replica 2: [[1000, 10], [2000, 0], [3000, 1], [4000, 2], [5000, 3], [6000, 4], [7000, 5], [8000, 6], [9000, 7],  [10000, 8], [11000, 10]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added new test cases, the reason i've done it this way is to reuse counterErrAdjustSeriesIterator which you need to call adjustAtValue() somewhere, passing a merged time series to original newDedupSeries doesn't work

ok := it.Next() != chunkenc.ValNone // iterate to the first value.
iters = append(iters, it)
oks = append(oks, ok)
Expand All @@ -49,26 +56,25 @@ func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
}

type quorumValuePicker struct {
currentValue int64
currentValue float64
cnt int
}

func NewQuorumValuePicker(v float64) *quorumValuePicker {
return &quorumValuePicker{
currentValue: int64(v),
currentValue: v,
cnt: 1,
}
}

// Return true if this is the new majority value.
func (q *quorumValuePicker) addValue(v float64) bool {
iv := int64(v)
if q.currentValue == iv {
if q.currentValue == v {
q.cnt++
} else {
q.cnt--
if q.cnt == 0 {
q.currentValue = iv
q.currentValue = v
q.cnt = 1
return true
}
Expand All @@ -77,11 +83,12 @@ func (q *quorumValuePicker) addValue(v float64) bool {
}

type mergedSeriesIterator struct {
iters []chunkenc.Iterator
iters []adjustableSeriesIterator
oks []bool

lastT int64
lastIter chunkenc.Iterator
lastV float64
lastIter adjustableSeriesIterator
}

func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
Expand All @@ -91,8 +98,8 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
// 1. Next()/Seek() is never called. m.lastT is math.MinInt64 in this case.
// 2. The iterator runs out of values. m.lastT is the last timestamp in this case.
minT := int64(math.MaxInt64)
var lastIter chunkenc.Iterator
quoramValue := NewQuorumValuePicker(0)
var lastIter adjustableSeriesIterator
quoramValue := NewQuorumValuePicker(0.0)
for i, it := range m.iters {
if !m.oks[i] {
continue
Expand All @@ -117,6 +124,8 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
if m.lastIter == nil {
return chunkenc.ValNone
}
m.lastIter.adjustAtValue(m.lastV)
_, m.lastV = m.lastIter.At()
m.lastT = minT
return chunkenc.ValFloat
}
Expand Down
100 changes: 98 additions & 2 deletions pkg/dedup/merge_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestIteratorEdgeCases(t *testing.T) {
ms := NewMergedSeries(labels.Labels{}, []storage.Series{})
ms := NewMergedSeries(labels.Labels{}, []storage.Series{}, "")
it := ms.Iterator(nil)
testutil.Ok(t, it.Err())
testutil.Equals(t, int64(math.MinInt64), it.AtT())
Expand Down Expand Up @@ -244,10 +244,106 @@ func TestMergedSeriesIterator(t *testing.T) {
},
},
},
{
// Regression test against https://github.com/thanos-io/thanos/issues/2401.
// Two counter series, when one (initially chosen) series is having hiccup (few dropped samples), while second is live.
// This also happens when 2 replicas scrape in different time (they usually do) and one sees later counter value then the other.
// Now, depending on what replica we look, we can see totally different counter value in total where total means
// after accounting for counter resets. We account for that in downsample.CounterSeriesIterator, mainly because
// we handle downsample Counter Aggregations specially (for detecting resets between chunks).
name: "Regression test against 2401",
isCounter: true,
input: []series{
{
lset: labels.FromStrings("a", "1"),
samples: []sample{
{10000, 8.0}, // Smaller timestamp, this will be chosen. CurrValue = 8.0.
{20000, 9.0}, // Same. CurrValue = 9.0.
// {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator.
{50001, 9 + 1.0}, // Next after 20000+1 has a bit higher than timestamp then in second series. Penalty 5000 will be added.
{60000, 9 + 2.0},
{70000, 9 + 3.0},
{80000, 9 + 4.0},
{90000, 9 + 5.0}, // This should be now taken, and we expect 14 to be correct value now.
{100000, 9 + 6.0},
},
}, {
lset: labels.FromStrings("a", "1"),
samples: []sample{
{10001, 8.0}, // Penalty 5000 will be added.
// 20001 was app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator. Penalty 2 * (20000 - 10000) will be added.
// 30001 no sample. Within penalty, ignored.
{45001, 8 + 0.5}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value.
{55001, 8 + 1.5},
{65001, 8 + 2.5},
// {Gap} app reset. No sample, because stale marker but removed by downsample.CounterSeriesIterator.
},
},
},
exp: []series{
{
lset: labels.FromStrings("a", "1"),
samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}, {t: 50001, f: 10}, {55001, 10}, {65001, 11}, {t: 80000, f: 13}, {90000, 14}, {100000, 15}},
},
},
},
{
// Same thing but not for counter should not adjust anything.
name: "Regression test with no counter adjustment",
isCounter: false,
input: []series{
{
lset: labels.FromStrings("a", "1"),
samples: []sample{
{10000, 8.0}, {20000, 9.0}, {50001, 9 + 1.0}, {60000, 9 + 2.0}, {70000, 9 + 3.0}, {80000, 9 + 4.0}, {90000, 9 + 5.0}, {100000, 9 + 6.0},
},
}, {
lset: labels.FromStrings("a", "1"),
samples: []sample{
{10001, 8.0}, {45001, 8 + 0.5}, {55001, 8 + 1.5}, {65001, 8 + 2.5},
},
},
},
exp: []series{
{
lset: labels.FromStrings("a", "1"),
samples: []sample{{10000, 8}, {20000, 9}, {45001, 8.5}, {t: 50001, f: 10}, {55001, 9.5}, {65001, 10.5}, {t: 80000, f: 13}, {90000, 14}, {100000, 15}},
},
},
},
//{
// name: "Reusable counter with resets and large gaps",
// isCounter: true,
// input: []series{
// {
// lset: labels.FromStrings("a", "1"),
// samples: []sample{
// {10000, 8.0}, {20000, 9.0}, {1050001, 1.0}, {1060001, 5.0}, {2060001, 3.0},
// },
// },
// {
// lset: labels.FromStrings("a", "1"),
// samples: []sample{
// {10000, 8.0}, {20000, 9.0}, {1050001, 1.0}, {1060001, 5.0}, {2060001, 3.0},
// },
// },
// },
// exp: []series{
// {
// lset: labels.FromStrings("a", "1"),
// samples: []sample{{10000, 8.0}, {20000, 9.0}, {1050001, 10.0}, {1060001, 16.0}, {2060001, 19.0}},
// },
// },
//},
} {
t.Run(tcase.name, func(t *testing.T) {
// If it is a counter then pass a function which expects a counter.
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, UseMergedSeries)
// If it is a counter then pass a function which expects a counter.
f := ""
if tcase.isCounter {
f = "rate"
}
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmQuorum)
var ats []storage.Series
for dedupSet.Next() {
ats = append(ats, dedupSet.At())
Expand Down
Loading