Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/promql-engine/api"

"github.com/thanos-io/promql-engine/api"
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/discovery/cache"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/exemplars"
Expand Down Expand Up @@ -128,10 +129,11 @@ func registerQuery(app *extkingpin.App) {
Strings()
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings()

enableDedupMerge := cmd.Flag("query.dedup-merge", "Enable deduplication merge of multiple time series with the same labels.").
Default("false").Bool()
enableQuorumChunkDedup := cmd.Flag("query.quorum-chunk-dedup", "Enable quorum-based deduplication for chunks from replicas.").
Default("false").Bool()
queryDeduplicationFunc := cmd.Flag("query.deduplication.func", "Experimental. Deduplication algorithm for merging overlapping series. "+
"Possible values are: \"penalty\", \"chain\", \"quorum\". If no value is specified, penalty based deduplication algorithm will be used. "+
"When set to chain, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. At least one replica label has to be set via --query.replica-label flag."+
"When set to quorum, the databricks deduplication algorithm is used, it is suitable for metrics ingested via receivers.").
Default(dedup.AlgorithmPenalty).Enum(dedup.AlgorithmPenalty, dedup.AlgorithmChain, dedup.AlgorithmQuorum)

instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

Expand Down Expand Up @@ -338,6 +340,7 @@ func registerQuery(app *extkingpin.App) {
*queryConnMetricLabels,
*queryReplicaLabels,
*queryPartitionLabels,
*queryDeduplicationFunc,
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
Expand Down Expand Up @@ -381,8 +384,6 @@ func registerQuery(app *extkingpin.App) {
*enforceTenancy,
*tenantLabel,
*enableGroupReplicaPartialStrategy,
*enableDedupMerge,
*enableQuorumChunkDedup,
)
})
}
Expand Down Expand Up @@ -423,6 +424,7 @@ func runQuery(
queryConnMetricLabels []string,
queryReplicaLabels []string,
queryPartitionLabels []string,
queryDeduplicationFunc string,
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
Expand Down Expand Up @@ -466,8 +468,6 @@ func runQuery(
enforceTenancy bool,
tenantLabel string,
groupReplicaPartialResponseStrategy bool,
enableDedupMerge bool,
enableQuorumChunkDedup bool,
) error {
comp := component.Query
if alertQueryURL == "" {
Expand Down Expand Up @@ -554,7 +554,7 @@ func runQuery(
options := []store.ProxyStoreOption{
store.WithTSDBSelector(tsdbSelector),
store.WithProxyStoreDebugLogging(debugLogging),
store.WithQuorumChunkDedup(enableQuorumChunkDedup),
store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum),
}

// Parse and sanitize the provided replica labels flags.
Expand Down Expand Up @@ -596,7 +596,7 @@ func runQuery(
)
opts := query.Options{
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
EnableDedupMerge: enableDedupMerge,
DeduplicationFunc: queryDeduplicationFunc,
}
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
queryableCreator = query.NewQueryableCreatorWithOptions(
Expand Down
20 changes: 15 additions & 5 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ import (
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
const initialPenalty = 5000

const (
AlgorithmPenalty = "penalty"
AlgorithmChain = "chain"
AlgorithmQuorum = "quorum"
)

type dedupSeriesSet struct {
set storage.SeriesSet

Expand All @@ -32,7 +38,8 @@ type dedupSeriesSet struct {
peek storage.Series
ok bool

f string
f string
deduplicationFunc string
}

// isCounter deduces whether a counter metric has been passed. There must be
Expand Down Expand Up @@ -109,8 +116,8 @@ func (o *overlapSplitSet) Err() error {

// NewSeriesSet returns seriesSet that deduplicates the same series.
// The series in series set are expected be sorted by all labels.
func NewSeriesSet(set storage.SeriesSet, f string) storage.SeriesSet {
s := &dedupSeriesSet{set: set, f: f}
func NewSeriesSet(set storage.SeriesSet, f string, deduplicationFunc string) storage.SeriesSet {
s := &dedupSeriesSet{set: set, f: f, deduplicationFunc: deduplicationFunc}
s.ok = s.set.Next()
if s.ok {
s.peek = s.set.At()
Expand Down Expand Up @@ -160,9 +167,12 @@ func (s *dedupSeriesSet) At() storage.Series {
// Clients may store the series, so we must make a copy of the slice before advancing.
repl := make([]storage.Series, len(s.replicas))
copy(repl, s.replicas)
if s.f == UseMergedSeries {
if s.deduplicationFunc == AlgorithmQuorum {
// merge all samples which are ingested via receiver, no skips.
return NewMergedSeries(s.lset, repl)
return NewQuorumSeries(s.lset, repl, s.f)
}
if s.deduplicationFunc == AlgorithmChain {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi @yuchen-db, I've also added the prometheus implementation, but it doesn't work for a number of unit tests if you wonder

return seriesWithLabels{Series: storage.ChainedSeriesMerge(repl...), lset: s.lset}
}
return newDedupSeries(s.lset, repl, s.f)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dedup/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestDedupSeriesSet(t *testing.T) {
if tcase.isCounter {
f = "rate"
}
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f)
dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmPenalty)
var ats []storage.Series
for dedupSet.Next() {
ats = append(ats, dedupSet.At())
Expand Down
68 changes: 39 additions & 29 deletions pkg/dedup/merge_iter.go → pkg/dedup/quorum_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,42 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

const UseMergedSeries = "use_merged_series"

// mergedSeries is a storage.Series that implements a simple merge sort algorithm.
// when replicas has conflict values at the same timestamp, the first replica will be selected.
type mergedSeries struct {
// quorumSeries is a storage.Series that implements quorum algorithm.
// when replicas has conflict values at the same timestamp, the value in majority replica will be selected.
type quorumSeries struct {
lset labels.Labels
replicas []storage.Series

isCounter bool
}

func NewMergedSeries(lset labels.Labels, replicas []storage.Series) storage.Series {
return &mergedSeries{
func NewQuorumSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series {
return &quorumSeries{
lset: lset,
replicas: replicas,

isCounter: isCounter(f),
}
}

func (m *mergedSeries) Labels() labels.Labels {
func (m *quorumSeries) Labels() labels.Labels {
return m.lset
}
func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
iters := make([]chunkenc.Iterator, 0, len(m.replicas))
func (m *quorumSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
iters := make([]adjustableSeriesIterator, 0, len(m.replicas))
oks := make([]bool, 0, len(m.replicas))
for _, r := range m.replicas {
it := r.Iterator(nil)
var it adjustableSeriesIterator
if m.isCounter {
it = &counterErrAdjustSeriesIterator{Iterator: r.Iterator(nil)}
} else {
it = &noopAdjustableSeriesIterator{Iterator: r.Iterator(nil)}
}
ok := it.Next() != chunkenc.ValNone // iterate to the first value.
iters = append(iters, it)
oks = append(oks, ok)
}
return &mergedSeriesIterator{
return &quorumSeriesIterator{
iters: iters,
oks: oks,
lastT: math.MinInt64,
Expand All @@ -49,50 +56,50 @@ func (m *mergedSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
}

type quorumValuePicker struct {
currentValue int64
currentValue float64
cnt int
}

func NewQuorumValuePicker(v float64) *quorumValuePicker {
return &quorumValuePicker{
currentValue: int64(v),
currentValue: v,
cnt: 1,
}
}

// Return true if this is the new majority value.
func (q *quorumValuePicker) addValue(v float64) bool {
iv := int64(v)
if q.currentValue == iv {
if q.currentValue == v {
q.cnt++
} else {
q.cnt--
if q.cnt == 0 {
q.currentValue = iv
q.currentValue = v
q.cnt = 1
return true
}
}
return false
}

type mergedSeriesIterator struct {
iters []chunkenc.Iterator
type quorumSeriesIterator struct {
iters []adjustableSeriesIterator
oks []bool

lastT int64
lastIter chunkenc.Iterator
lastV float64
lastIter adjustableSeriesIterator
}

func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
func (m *quorumSeriesIterator) Next() chunkenc.ValueType {
// m.lastIter points to the last iterator that has the latest timestamp.
// m.lastT always aligns with m.lastIter unless when m.lastIter is nil.
// m.lastIter is nil only in the following cases:
// 1. Next()/Seek() is never called. m.lastT is math.MinInt64 in this case.
// 2. The iterator runs out of values. m.lastT is the last timestamp in this case.
minT := int64(math.MaxInt64)
var lastIter chunkenc.Iterator
quoramValue := NewQuorumValuePicker(0)
var lastIter adjustableSeriesIterator
quoramValue := NewQuorumValuePicker(0.0)
for i, it := range m.iters {
if !m.oks[i] {
continue
Expand All @@ -101,6 +108,8 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
m.oks[i] = it.Seek(m.lastT+initialPenalty) != chunkenc.ValNone
// The it.Seek() call above should guarantee that it.AtT() > m.lastT.
if m.oks[i] {
// adjust the current value for counter functions to avoid unexpected resets
it.adjustAtValue(m.lastV)
t, v := it.At()
if t < minT {
minT = t
Expand All @@ -117,11 +126,12 @@ func (m *mergedSeriesIterator) Next() chunkenc.ValueType {
if m.lastIter == nil {
return chunkenc.ValNone
}
m.lastV = quoramValue.currentValue
m.lastT = minT
return chunkenc.ValFloat
}

func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
func (m *quorumSeriesIterator) Seek(t int64) chunkenc.ValueType {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for m.lastT < t && m.Next() != chunkenc.ValNone {
}
Expand All @@ -132,25 +142,25 @@ func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
return chunkenc.ValFloat
}

func (m *mergedSeriesIterator) At() (t int64, v float64) {
func (m *quorumSeriesIterator) At() (t int64, v float64) {
return m.lastIter.At()
}

func (m *mergedSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
func (m *quorumSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
return m.lastIter.AtHistogram(h)
}

func (m *mergedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
func (m *quorumSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return m.lastIter.AtFloatHistogram(fh)
}

func (m *mergedSeriesIterator) AtT() int64 {
func (m *quorumSeriesIterator) AtT() int64 {
return m.lastT
}

// Err All At() funcs should panic if called after Next() or Seek() return ValNone.
// Only Err() should return nil even after Next() or Seek() return ValNone.
func (m *mergedSeriesIterator) Err() error {
func (m *quorumSeriesIterator) Err() error {
if m.lastIter == nil {
return nil
}
Expand Down
Loading
Loading