diff --git a/execution/scan/subquery.go b/execution/scan/subquery.go index 43d041d03..239203811 100644 --- a/execution/scan/subquery.go +++ b/execution/scan/subquery.go @@ -201,8 +201,8 @@ func (o *subqueryOperator) Next(ctx context.Context) ([]model.StepVector, error) if ok { if h != nil { sv.AppendHistogram(o.pool, uint64(sampleId), h) - } else if f != nil { - sv.AppendSample(o.pool, uint64(sampleId), *f) + } else { + sv.AppendSample(o.pool, uint64(sampleId), f) } } o.IncrementSamplesAtTimestamp(rangeSamples.Len(), sv.T) diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index 12412eb8b..e85a3a9ce 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -8,12 +8,15 @@ import ( "math" "github.com/efficientgo/core/errors" - "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/promql-engine/execution/warnings" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/thanos-io/promql-engine/execution/aggregate" "github.com/thanos-io/promql-engine/execution/parse" - "github.com/thanos-io/promql-engine/execution/warnings" ) type SamplesBuffer GenericRingBuffer @@ -31,7 +34,7 @@ type FunctionArgs struct { ScalarPoint2 float64 // only for double_exponential_smoothing (trend factor) } -type FunctionCall func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) +type FunctionCall func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) func instantValue(samples []Sample, isRate bool) (float64, bool) { lastSample := samples[len(samples)-1] @@ -60,11 +63,10 @@ func instantValue(samples []Sample, isRate bool) (float64, bool) { } var rangeVectorFuncs = map[string]FunctionCall{ - "sum_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "sum_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - if f.Samples[0].V.H != nil { // histogram sum := f.Samples[0].V.H.Copy() @@ -72,35 +74,30 @@ var rangeVectorFuncs = map[string]FunctionCall{ h := sample.V.H _, err := sum.Add(h) if err != nil { - return nil, sum, true, nil + return 0, sum, true, err } } - return nil, sum, true, nil + return 0, sum, true, nil } - - v := sumOverTime(f.Samples) - return &v, nil, true, nil + return sumOverTime(f.Samples), nil, true, nil }, - "max_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "max_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := maxOverTime(f.Samples) - return &v, nil, true, nil + return maxOverTime(f.Samples), nil, true, nil }, - "min_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "min_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := minOverTime(f.Samples) - return &v, nil, true, nil + return minOverTime(f.Samples), nil, true, nil }, - "avg_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "avg_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - if f.Samples[0].V.H != nil { // histogram count := 1 @@ -111,208 +108,183 @@ var rangeVectorFuncs = map[string]FunctionCall{ right := mean.Copy().Div(float64(count)) toAdd, err := left.Sub(right) if err != nil { - return nil, mean, true, nil + return 0, mean, true, err } _, err = mean.Add(toAdd) if err != nil { - return nil, mean, true, nil + return 0, mean, true, err } } - return nil, mean, true, nil + return 0, mean, true, nil } - v := avgOverTime(f.Samples) - return &v, nil, true, nil + return avgOverTime(f.Samples), nil, true, nil }, - "stddev_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "stddev_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := stddevOverTime(f.Samples) - return &v, nil, true, nil + return stddevOverTime(f.Samples), nil, true, nil }, - "stdvar_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "stdvar_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := stdvarOverTime(f.Samples) - return &v, nil, true, nil + return stdvarOverTime(f.Samples), nil, true, nil }, - "count_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "count_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := countOverTime(f.Samples) - return &v, nil, true, nil + return countOverTime(f.Samples), nil, true, nil }, - "last_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "last_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } if f.Samples[0].V.H != nil { - return nil, f.Samples[len(f.Samples)-1].V.H.Copy(), true, nil + return 0, f.Samples[len(f.Samples)-1].V.H.Copy(), true, nil } - v := f.Samples[len(f.Samples)-1].V.F - return &v, nil, true, nil + return f.Samples[len(f.Samples)-1].V.F, nil, true, nil }, - "present_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "present_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := 1. - return &v, nil, true, nil + return 1., nil, true, nil }, - "quantile_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "quantile_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } floats := make([]float64, len(f.Samples)) for i, sample := range f.Samples { floats[i] = sample.V.F } - v := aggregate.Quantile(f.ScalarPoint, floats) - return &v, nil, true, nil + return aggregate.Quantile(f.ScalarPoint, floats), nil, true, nil }, - "changes": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "changes": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := changes(f.Samples) - return &v, nil, true, nil + return changes(f.Samples), nil, true, nil }, - "resets": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "resets": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := resets(f.Samples) - return &v, nil, true, nil + return resets(f.Samples), nil, true, nil }, - "deriv": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "deriv": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } - if f.Samples[0].V.H != nil { // deriv should ignore histograms. - return nil, nil, false, nil + return 0, nil, false, nil } - - v := deriv(f.Samples) - return &v, nil, true, nil + return deriv(f.Samples), nil, true, nil }, - "irate": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "irate": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { f.Samples = filterFloatOnlySamples(f.Samples) if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } val, ok := instantValue(f.Samples, true) if !ok { - return nil, nil, false, nil + return 0., nil, false, nil } - return &val, nil, true, nil + return val, nil, true, nil }, - "idelta": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "idelta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { f.Samples = filterFloatOnlySamples(f.Samples) if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } val, ok := instantValue(f.Samples, false) if !ok { - return nil, nil, false, nil + return 0., nil, false, nil } - return &val, nil, true, nil + return val, nil, true, nil }, - "rate": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "rate": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil - } - v, h, err := extrapolatedRate(f.ctx, f.Samples, len(f.Samples), true, true, f.StepTime, f.SelectRange, f.Offset) - if err != nil { - return nil, nil, false, err + return 0., nil, false, nil } - - return v, h, true, nil + return extrapolatedRate(f.ctx, f.Samples, len(f.Samples), true, true, f.StepTime, f.SelectRange, f.Offset) }, - "delta": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "delta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil - } - v, h, err := extrapolatedRate(f.ctx, f.Samples, len(f.Samples), false, false, f.StepTime, f.SelectRange, f.Offset) - if err != nil { - return nil, nil, false, err + return 0., nil, false, nil } - return v, h, true, nil + return extrapolatedRate(f.ctx, f.Samples, len(f.Samples), false, false, f.StepTime, f.SelectRange, f.Offset) }, - "increase": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "increase": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil - } - v, h, err := extrapolatedRate(f.ctx, f.Samples, len(f.Samples), true, false, f.StepTime, f.SelectRange, f.Offset) - if err != nil { - return nil, nil, false, err + return 0., nil, false, nil } - return v, h, true, nil + return extrapolatedRate(f.ctx, f.Samples, len(f.Samples), true, false, f.StepTime, f.SelectRange, f.Offset) }, - "xrate": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "xrate": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } if f.MetricAppearedTs == nil { panic("BUG: we got some Samples but metric still hasn't appeared") } v, h, err := extendedRate(f.ctx, f.Samples, true, true, f.StepTime, f.SelectRange, f.Offset, *f.MetricAppearedTs) if err != nil { - return nil, nil, false, err + return 0, nil, false, err } - return &v, h, true, nil + return v, h, true, nil }, - "xdelta": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "xdelta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } if f.MetricAppearedTs == nil { panic("BUG: we got some Samples but metric still hasn't appeared") } v, h, err := extendedRate(f.ctx, f.Samples, false, false, f.StepTime, f.SelectRange, f.Offset, *f.MetricAppearedTs) if err != nil { - return nil, nil, false, err + return 0, nil, false, err } - return &v, h, true, nil + return v, h, true, nil }, - "xincrease": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "xincrease": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } if f.MetricAppearedTs == nil { panic("BUG: we got some Samples but metric still hasn't appeared") } v, h, err := extendedRate(f.ctx, f.Samples, true, false, f.StepTime, f.SelectRange, f.Offset, *f.MetricAppearedTs) if err != nil { - return nil, nil, false, err + return 0, nil, false, err } - return &v, h, true, nil + return v, h, true, nil }, - "predict_linear": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "predict_linear": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } v := predictLinear(f.Samples, f.ScalarPoint, f.StepTime) - return &v, nil, true, nil + return v, nil, true, nil }, - "double_exponential_smoothing": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "double_exponential_smoothing": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { if len(f.Samples) == 1 && f.Samples[0].V.H != nil { warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, f.ctx) - return nil, nil, false, nil + return 0, nil, false, nil } - return nil, nil, false, nil + return 0, nil, false, nil } // Annotate mix of float and histogram. for _, s := range f.Samples { if s.V.H != nil { warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, f.ctx) - return nil, nil, false, nil + return 0, nil, false, nil } } @@ -320,11 +292,7 @@ var rangeVectorFuncs = map[string]FunctionCall{ tf := f.ScalarPoint2 // trend factor argument or beta v, ok := doubleExponentialSmoothing(f.Samples, sf, tf) - if !ok { - return nil, nil, false, nil - } - - return &v, nil, true, nil + return v, nil, ok, nil }, } @@ -340,7 +308,7 @@ func NewRangeVectorFunc(name string) (FunctionCall, error) { // It calculates the rate (allowing for counter resets if isCounter is true), // extrapolates if the first/last sample is close to the boundary, and returns // the result as either per-second (if isRate is true) or overall. -func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64) (*float64, *histogram.FloatHistogram, error) { +func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64) (f float64, h *histogram.FloatHistogram, ok bool, err error) { var ( rangeStart = stepTime - (selectRange + offset) rangeEnd = stepTime - offset @@ -348,11 +316,10 @@ func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isC resultHistogram *histogram.FloatHistogram ) - var err error if samples[0].V.H != nil { resultHistogram, err = histogramRate(ctx, samples, isCounter) if err != nil { - return nil, nil, err + return 0, nil, false, err } } else { resultValue = samples[len(samples)-1].V.F - samples[0].V.F @@ -425,10 +392,10 @@ func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isC if samples[0].V.H != nil && resultHistogram == nil { // to prevent appending sample with 0 - return nil, nil, nil + return 0, nil, false, nil } - return &resultValue, resultHistogram, nil + return resultValue, resultHistogram, true, nil } // extendedRate is a utility function for xrate/xincrease/xdelta. diff --git a/ringbuffer/generic.go b/ringbuffer/generic.go index a2b70a42f..804a34d50 100644 --- a/ringbuffer/generic.go +++ b/ringbuffer/generic.go @@ -106,7 +106,7 @@ func (r *GenericRingBuffer) Reset(mint int64, evalt int64) { r.items = r.items[:keep] } -func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarArg2 float64, metricAppearedTs *int64) (*float64, *histogram.FloatHistogram, bool, error) { +func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarArg2 float64, metricAppearedTs *int64) (float64, *histogram.FloatHistogram, bool, error) { return r.call(FunctionArgs{ ctx: ctx, Samples: r.items, diff --git a/ringbuffer/rate.go b/ringbuffer/rate.go index ce61a4037..ce30f9777 100644 --- a/ringbuffer/rate.go +++ b/ringbuffer/rate.go @@ -18,7 +18,7 @@ type Buffer interface { MaxT() int64 Push(t int64, v Value) Reset(mint int64, evalt int64) - Eval(ctx context.Context, _, _ float64, _ *int64) (*float64, *histogram.FloatHistogram, bool, error) + Eval(ctx context.Context, _, _ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error) ReadIntoLast(f func(*Sample)) } @@ -171,9 +171,9 @@ func (r *RateBuffer) Reset(mint int64, evalt int64) { r.firstSamples[last].T = math.MaxInt64 } -func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ *int64) (*float64, *histogram.FloatHistogram, bool, error) { +func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error) { if r.firstSamples[0].T == math.MaxInt64 || r.firstSamples[0].T == r.last.T { - return nil, nil, false, nil + return 0, nil, false, nil } r.rateBuffer = append(append( @@ -183,8 +183,7 @@ func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ *int64) (*float64 ) r.rateBuffer = slices.CompactFunc(r.rateBuffer, func(s1 Sample, s2 Sample) bool { return s1.T == s2.T }) numSamples := r.stepRanges[0].numSamples - f, h, err := extrapolatedRate(ctx, r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset) - return f, h, true, err + return extrapolatedRate(ctx, r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset) } func (r *RateBuffer) ReadIntoLast(func(*Sample)) {} diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index d4a39f8fb..85bb46f05 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -11,19 +11,21 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/promql/parser/posrange" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/thanos-io/promql-engine/execution/warnings" + "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" - "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" - "github.com/thanos-io/promql-engine/execution/warnings" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/ringbuffer" @@ -72,7 +74,9 @@ type matrixSelector struct { // Lookback delta for extended range functions. extLookbackDelta int64 - inputSeries []SignedSeries + + nonCounterMetric string + hasFloats bool } var ErrNativeHistogramsNotSupported = errors.New("native histograms are not supported in extended range functions") @@ -159,6 +163,10 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { } if o.currentStep > o.maxt { + if o.nonCounterMetric != "" && o.hasFloats { + warnings.AddToContext(annotations.NewPossibleNonCounterInfo(o.nonCounterMetric, posrange.PositionRange{}), ctx) + } + return nil, nil } if err := o.loadSeries(ctx); err != nil { @@ -200,21 +208,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { vectors[currStep].T = seriesTs if h != nil { vectors[currStep].AppendHistogram(o.vectorPool, scanner.signature, h) - } else if f != nil { - if o.functionName == "rate" || o.functionName == "increase" { - if len(o.inputSeries) > 0 { - metricName := o.inputSeries[0].Labels().Get(labels.MetricName) - if metricName != "" && - !strings.HasSuffix(metricName, "_total") && - !strings.HasSuffix(metricName, "_sum") && - !strings.HasSuffix(metricName, "_count") && - !strings.HasSuffix(metricName, "_bucket") { - warnings.AddToContext(annotations.NewPossibleNonCounterInfo(metricName, posrange.PositionRange{}), ctx) - } - } - } - - vectors[currStep].AppendSample(o.vectorPool, scanner.signature, *f) + } else { + vectors[currStep].AppendSample(o.vectorPool, scanner.signature, f) + o.hasFloats = true } } o.IncrementSamplesAtTimestamp(scanner.buffer.Len(), seriesTs) @@ -237,8 +233,6 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { return } - o.inputSeries = series - o.scanners = make([]matrixScanner, len(series)) o.series = make([]labels.Labels, len(series)) b := labels.ScratchBuilder{} @@ -267,6 +261,20 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { o.seriesBatchSize = numSeries } o.vectorPool.SetStepSize(int(o.seriesBatchSize)) + + // Add a warning if rate or increase is applied on metrics which are not named like counters. + if o.functionName == "rate" || o.functionName == "increase" { + if len(series) > 0 { + metricName := series[0].Labels().Get(labels.MetricName) + if metricName != "" && + !strings.HasSuffix(metricName, "_total") && + !strings.HasSuffix(metricName, "_sum") && + !strings.HasSuffix(metricName, "_count") && + !strings.HasSuffix(metricName, "_bucket") { + o.nonCounterMetric = metricName + } + } + } }) return err }