diff --git a/go.mod b/go.mod index 54d5e2a588b..b06af603af4 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/spf13/afero v1.11.0 github.com/stretchr/testify v1.10.0 github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 - github.com/thanos-io/promql-engine v0.0.0-20250211181629-815830ca3e2e + github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 diff --git a/go.sum b/go.sum index 650a8bec060..51d7d8bf1ac 100644 --- a/go.sum +++ b/go.sum @@ -1672,8 +1672,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1 github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0= github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= -github.com/thanos-io/promql-engine v0.0.0-20250211181629-815830ca3e2e h1:jDKxQzp4JIhpbn6NFHXc8TCsRy8GkfHMZ7XNicY1mx8= -github.com/thanos-io/promql-engine v0.0.0-20250211181629-815830ca3e2e/go.mod h1:aHSV5hL94fNb7PklN9L0V10j+/RGIlzqbw7OLdNgZFs= +github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c h1:STCm5S4Aht3hOR0WQ0B3daZv21GQC13uPYIfkcN762U= +github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c/go.mod h1:aHSV5hL94fNb7PklN9L0V10j+/RGIlzqbw7OLdNgZFs= github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80 h1:mOCRYn9SLBWJCXAdP+qDfgZDc0eqDxDc2HZGKTZ5vzk= github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80/go.mod h1:Y7D8la8B5rpzRVKq2HCR4hbYZ4LGroSPqIJjtizgQg8= github.com/tjhop/slog-gokit v0.1.2 h1:pmQI4SvU9h4gA0vIQsdhJQSqQg4mOmsPykG2/PM3j1I= diff --git a/vendor/github.com/thanos-io/promql-engine/execution/execution.go b/vendor/github.com/thanos-io/promql-engine/execution/execution.go index e3df3fa2c7d..80df58c6387 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/execution.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/execution.go @@ -213,6 +213,7 @@ func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *lo } var scalarArg model.VectorOperator + var scalarArg2 model.VectorOperator switch e.Func.Name { case "quantile_over_time": // quantile_over_time(scalar, range-vector) @@ -226,9 +227,19 @@ func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *lo if err != nil { return nil, err } + case "double_exponential_smoothing": + // double_exponential_smoothing(range-vector, scalar, scalar) + scalarArg, err = newOperator(ctx, e.Args[1], storage, opts, hints) + if err != nil { + return nil, err + } + scalarArg2, err = newOperator(ctx, e.Args[2], storage, opts, hints) + if err != nil { + return nil, err + } } - return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, scalarArg, &outerOpts, e, t) + return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, scalarArg, scalarArg2, &outerOpts, e, t) } func newInstantVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { diff --git a/vendor/github.com/thanos-io/promql-engine/execution/scan/subquery.go b/vendor/github.com/thanos-io/promql-engine/execution/scan/subquery.go index 095c3c51971..2392038110c 100644 --- a/vendor/github.com/thanos-io/promql-engine/execution/scan/subquery.go +++ b/vendor/github.com/thanos-io/promql-engine/execution/scan/subquery.go @@ -24,8 +24,9 @@ import ( type subqueryOperator struct { telemetry.OperatorTelemetry - next model.VectorOperator - paramOp model.VectorOperator + next model.VectorOperator + paramOp model.VectorOperator + paramOp2 model.VectorOperator pool *model.VectorPool call ringbuffer.FunctionCall @@ -47,10 +48,13 @@ type subqueryOperator struct { buffers []*ringbuffer.GenericRingBuffer // params holds the function parameter for each step. - params []float64 + // quantile_over time and predict_linear use one parameter (params) + // double_exponential_smoothing uses two (params, params2) for (sf, tf) + params []float64 + params2 []float64 } -func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) { +func NewSubqueryOperator(pool *model.VectorPool, next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) { call, err := ringbuffer.NewRangeVectorFunc(funcExpr.Func.Name) if err != nil { return nil, err @@ -63,6 +67,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera o := &subqueryOperator{ next: next, paramOp: paramOp, + paramOp2: paramOp2, call: call, pool: pool, funcExpr: funcExpr, @@ -75,6 +80,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera stepsBatch: opts.StepsBatch, lastCollected: -1, params: make([]float64, opts.StepsBatch), + params2: make([]float64, opts.StepsBatch), } o.OperatorTelemetry = telemetry.NewSubqueryTelemetry(o, opts) @@ -89,6 +95,8 @@ func (o *subqueryOperator) Explain() (next []model.VectorOperator) { switch o.funcExpr.Func.Name { case "quantile_over_time", "predict_linear": return []model.VectorOperator{o.paramOp, o.next} + case "double_exponential_smoothing": + return []model.VectorOperator{o.paramOp, o.paramOp2, o.next} default: return []model.VectorOperator{o.next} } @@ -127,6 +135,21 @@ func (o *subqueryOperator) Next(ctx context.Context) ([]model.StepVector, error) o.paramOp.GetPool().PutVectors(args) } + if o.paramOp2 != nil { // double_exponential_smoothing + args, err := o.paramOp2.Next(ctx) + if err != nil { + return nil, err + } + for i := range args { + o.params2[i] = math.NaN() + if len(args[i].Samples) == 1 { + o.params2[i] = args[i].Samples[0] + } + o.paramOp2.GetPool().PutStepVector(args[i]) + } + o.paramOp2.GetPool().PutVectors(args) + } + res := o.pool.GetVectorBatch() for i := 0; o.currentStep <= o.maxt && i < o.stepsBatch; i++ { mint := o.currentStep - o.subQuery.Range.Milliseconds() - o.subQuery.OriginalOffset.Milliseconds() + 1 @@ -171,15 +194,15 @@ func (o *subqueryOperator) Next(ctx context.Context) ([]model.StepVector, error) sv := o.pool.GetStepVector(o.currentStep) for sampleId, rangeSamples := range o.buffers { - f, h, ok, err := rangeSamples.Eval(ctx, o.params[i], nil) + f, h, ok, err := rangeSamples.Eval(ctx, o.params[i], o.params2[i], nil) if err != nil { return nil, err } if ok { if h != nil { sv.AppendHistogram(o.pool, uint64(sampleId), h) - } else if f != nil { - sv.AppendSample(o.pool, uint64(sampleId), *f) + } else { + sv.AppendSample(o.pool, uint64(sampleId), f) } } o.IncrementSamplesAtTimestamp(rangeSamples.Len(), sv.T) diff --git a/vendor/github.com/thanos-io/promql-engine/ringbuffer/functions.go b/vendor/github.com/thanos-io/promql-engine/ringbuffer/functions.go index 6f07c6b385d..e85a3a9cefb 100644 --- a/vendor/github.com/thanos-io/promql-engine/ringbuffer/functions.go +++ b/vendor/github.com/thanos-io/promql-engine/ringbuffer/functions.go @@ -8,12 +8,15 @@ import ( "math" "github.com/efficientgo/core/errors" - "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/promql-engine/execution/warnings" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/thanos-io/promql-engine/execution/aggregate" "github.com/thanos-io/promql-engine/execution/parse" - "github.com/thanos-io/promql-engine/execution/warnings" ) type SamplesBuffer GenericRingBuffer @@ -26,12 +29,12 @@ type FunctionArgs struct { Offset int64 MetricAppearedTs *int64 - // Only holt-winters uses two arguments, we fall back for that. // quantile_over_time and predict_linear use one, so we only use one here. - ScalarPoint float64 + ScalarPoint float64 + ScalarPoint2 float64 // only for double_exponential_smoothing (trend factor) } -type FunctionCall func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) +type FunctionCall func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) func instantValue(samples []Sample, isRate bool) (float64, bool) { lastSample := samples[len(samples)-1] @@ -60,11 +63,10 @@ func instantValue(samples []Sample, isRate bool) (float64, bool) { } var rangeVectorFuncs = map[string]FunctionCall{ - "sum_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "sum_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - if f.Samples[0].V.H != nil { // histogram sum := f.Samples[0].V.H.Copy() @@ -72,35 +74,30 @@ var rangeVectorFuncs = map[string]FunctionCall{ h := sample.V.H _, err := sum.Add(h) if err != nil { - return nil, sum, true, nil + return 0, sum, true, err } } - return nil, sum, true, nil + return 0, sum, true, nil } - - v := sumOverTime(f.Samples) - return &v, nil, true, nil + return sumOverTime(f.Samples), nil, true, nil }, - "max_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "max_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := maxOverTime(f.Samples) - return &v, nil, true, nil + return maxOverTime(f.Samples), nil, true, nil }, - "min_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "min_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := minOverTime(f.Samples) - return &v, nil, true, nil + return minOverTime(f.Samples), nil, true, nil }, - "avg_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "avg_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - if f.Samples[0].V.H != nil { // histogram count := 1 @@ -111,193 +108,191 @@ var rangeVectorFuncs = map[string]FunctionCall{ right := mean.Copy().Div(float64(count)) toAdd, err := left.Sub(right) if err != nil { - return nil, mean, true, nil + return 0, mean, true, err } _, err = mean.Add(toAdd) if err != nil { - return nil, mean, true, nil + return 0, mean, true, err } } - return nil, mean, true, nil + return 0, mean, true, nil } - v := avgOverTime(f.Samples) - return &v, nil, true, nil + return avgOverTime(f.Samples), nil, true, nil }, - "stddev_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "stddev_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := stddevOverTime(f.Samples) - return &v, nil, true, nil + return stddevOverTime(f.Samples), nil, true, nil }, - "stdvar_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "stdvar_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := stdvarOverTime(f.Samples) - return &v, nil, true, nil + return stdvarOverTime(f.Samples), nil, true, nil }, - "count_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "count_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := countOverTime(f.Samples) - return &v, nil, true, nil + return countOverTime(f.Samples), nil, true, nil }, - "last_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "last_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } if f.Samples[0].V.H != nil { - return nil, f.Samples[len(f.Samples)-1].V.H.Copy(), true, nil + return 0, f.Samples[len(f.Samples)-1].V.H.Copy(), true, nil } - v := f.Samples[len(f.Samples)-1].V.F - return &v, nil, true, nil + return f.Samples[len(f.Samples)-1].V.F, nil, true, nil }, - "present_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "present_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := 1. - return &v, nil, true, nil + return 1., nil, true, nil }, - "quantile_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "quantile_over_time": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } floats := make([]float64, len(f.Samples)) for i, sample := range f.Samples { floats[i] = sample.V.F } - v := aggregate.Quantile(f.ScalarPoint, floats) - return &v, nil, true, nil + return aggregate.Quantile(f.ScalarPoint, floats), nil, true, nil }, - "changes": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "changes": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := changes(f.Samples) - return &v, nil, true, nil + return changes(f.Samples), nil, true, nil }, - "resets": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "resets": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } - v := resets(f.Samples) - return &v, nil, true, nil + return resets(f.Samples), nil, true, nil }, - "deriv": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "deriv": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } - if f.Samples[0].V.H != nil { // deriv should ignore histograms. - return nil, nil, false, nil + return 0, nil, false, nil } - - v := deriv(f.Samples) - return &v, nil, true, nil + return deriv(f.Samples), nil, true, nil }, - "irate": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "irate": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { f.Samples = filterFloatOnlySamples(f.Samples) if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } val, ok := instantValue(f.Samples, true) if !ok { - return nil, nil, false, nil + return 0., nil, false, nil } - return &val, nil, true, nil + return val, nil, true, nil }, - "idelta": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "idelta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { f.Samples = filterFloatOnlySamples(f.Samples) if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } val, ok := instantValue(f.Samples, false) if !ok { - return nil, nil, false, nil + return 0., nil, false, nil } - return &val, nil, true, nil + return val, nil, true, nil }, - "rate": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "rate": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil - } - v, h, err := extrapolatedRate(f.ctx, f.Samples, len(f.Samples), true, true, f.StepTime, f.SelectRange, f.Offset) - if err != nil { - return nil, nil, false, err + return 0., nil, false, nil } - - return v, h, true, nil + return extrapolatedRate(f.ctx, f.Samples, len(f.Samples), true, true, f.StepTime, f.SelectRange, f.Offset) }, - "delta": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "delta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } - v, h, err := extrapolatedRate(f.ctx, f.Samples, len(f.Samples), false, false, f.StepTime, f.SelectRange, f.Offset) - if err != nil { - return nil, nil, false, err - } - return v, h, true, nil + return extrapolatedRate(f.ctx, f.Samples, len(f.Samples), false, false, f.StepTime, f.SelectRange, f.Offset) }, - "increase": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "increase": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } - v, h, err := extrapolatedRate(f.ctx, f.Samples, len(f.Samples), true, false, f.StepTime, f.SelectRange, f.Offset) - if err != nil { - return nil, nil, false, err - } - return v, h, true, nil + return extrapolatedRate(f.ctx, f.Samples, len(f.Samples), true, false, f.StepTime, f.SelectRange, f.Offset) }, - "xrate": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "xrate": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } if f.MetricAppearedTs == nil { panic("BUG: we got some Samples but metric still hasn't appeared") } v, h, err := extendedRate(f.ctx, f.Samples, true, true, f.StepTime, f.SelectRange, f.Offset, *f.MetricAppearedTs) if err != nil { - return nil, nil, false, err + return 0, nil, false, err } - return &v, h, true, nil + return v, h, true, nil }, - "xdelta": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "xdelta": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } if f.MetricAppearedTs == nil { panic("BUG: we got some Samples but metric still hasn't appeared") } v, h, err := extendedRate(f.ctx, f.Samples, false, false, f.StepTime, f.SelectRange, f.Offset, *f.MetricAppearedTs) if err != nil { - return nil, nil, false, err + return 0, nil, false, err } - return &v, h, true, nil + return v, h, true, nil }, - "xincrease": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "xincrease": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return nil, nil, false, nil + return 0., nil, false, nil } if f.MetricAppearedTs == nil { panic("BUG: we got some Samples but metric still hasn't appeared") } v, h, err := extendedRate(f.ctx, f.Samples, true, false, f.StepTime, f.SelectRange, f.Offset, *f.MetricAppearedTs) if err != nil { - return nil, nil, false, err + return 0, nil, false, err } - return &v, h, true, nil + return v, h, true, nil }, - "predict_linear": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { + "predict_linear": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) < 2 { - return nil, nil, false, nil + return 0., nil, false, nil } v := predictLinear(f.Samples, f.ScalarPoint, f.StepTime) - return &v, nil, true, nil + return v, nil, true, nil + }, + "double_exponential_smoothing": func(f FunctionArgs) (float64, *histogram.FloatHistogram, bool, error) { + if len(f.Samples) < 2 { + if len(f.Samples) == 1 && f.Samples[0].V.H != nil { + warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, f.ctx) + return 0, nil, false, nil + } + return 0, nil, false, nil + } + + // Annotate mix of float and histogram. + for _, s := range f.Samples { + if s.V.H != nil { + warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, f.ctx) + return 0, nil, false, nil + } + } + + sf := f.ScalarPoint // smoothing factor or alpha + tf := f.ScalarPoint2 // trend factor argument or beta + + v, ok := doubleExponentialSmoothing(f.Samples, sf, tf) + return v, nil, ok, nil }, } @@ -313,7 +308,7 @@ func NewRangeVectorFunc(name string) (FunctionCall, error) { // It calculates the rate (allowing for counter resets if isCounter is true), // extrapolates if the first/last sample is close to the boundary, and returns // the result as either per-second (if isRate is true) or overall. -func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64) (*float64, *histogram.FloatHistogram, error) { +func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64) (f float64, h *histogram.FloatHistogram, ok bool, err error) { var ( rangeStart = stepTime - (selectRange + offset) rangeEnd = stepTime - offset @@ -321,11 +316,10 @@ func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isC resultHistogram *histogram.FloatHistogram ) - var err error if samples[0].V.H != nil { resultHistogram, err = histogramRate(ctx, samples, isCounter) if err != nil { - return nil, nil, err + return 0, nil, false, err } } else { resultValue = samples[len(samples)-1].V.F - samples[0].V.F @@ -398,10 +392,10 @@ func extrapolatedRate(ctx context.Context, samples []Sample, numSamples int, isC if samples[0].V.H != nil && resultHistogram == nil { // to prevent appending sample with 0 - return nil, nil, nil + return 0, nil, false, nil } - return &resultValue, resultHistogram, nil + return resultValue, resultHistogram, true, nil } // extendedRate is a utility function for xrate/xincrease/xdelta. @@ -735,6 +729,65 @@ func predictLinear(points []Sample, duration float64, stepTime int64) float64 { return slope*duration + intercept } +// Based on https://github.com/prometheus/prometheus/blob/8baad1a73e471bd3cf3175a1608199e27484f179/promql/functions.go#L438 +// doubleExponentialSmoothing calculates the smoothed out value for the given series. +// It is similar to a weighted moving average, where historical data has exponentially less influence on the current data. +// It also accounts for trends in data. The smoothing factor (0 < sf < 1), aka "alpha", affects how historical data will affect the current data. +// A lower smoothing factor increases the influence of historical data. +// The trend factor (0 < tf < 1), aka "beta", affects how trends in historical data will affect the current data. +// A higher trend factor increases the influence of trends. +// Algorithm taken from https://en.wikipedia.org/wiki/Exponential_smoothing +func doubleExponentialSmoothing(points []Sample, sf, tf float64) (float64, bool) { + // Check that the input parameters are valid + if sf <= 0 || sf >= 1 || tf <= 0 || tf >= 1 { + return 0, false + } + + // Can't do the smoothing operation with less than two points + if len(points) < 2 { + return 0, false + } + + // Check for histograms in the samples + for _, s := range points { + if s.V.H != nil { + return 0, false + } + } + + var s0, s1, b float64 + // Set initial values + s1 = points[0].V.F + b = points[1].V.F - points[0].V.F + + // Run the smoothing operation + for i := 1; i < len(points); i++ { + // Scale the raw value against the smoothing factor + x := sf * points[i].V.F + // Scale the last smoothed value with the trend at this point + b = calcTrendValue(i-1, tf, s0, s1, b) + y := (1 - sf) * (s1 + b) + s0, s1 = s1, x+y + } + + return s1, true +} + +// calcTrendValue calculates the trend value at the given index i. +// This is somewhat analogous to the slope of the trend at the given index. +// The argument "tf" is the trend factor. +// The argument "s0" is the previous smoothed value. +// The argument "s1" is the current smoothed value. +// The argument "b" is the previous trend value. +func calcTrendValue(i int, tf, s0, s1, b float64) float64 { + if i == 0 { + return b + } + x := tf * (s1 - s0) + y := (1 - tf) * b + return x + y +} + func resets(points []Sample) float64 { var histogramPoints []Sample var floatPoints []Sample diff --git a/vendor/github.com/thanos-io/promql-engine/ringbuffer/generic.go b/vendor/github.com/thanos-io/promql-engine/ringbuffer/generic.go index 007e8d1146c..804a34d5004 100644 --- a/vendor/github.com/thanos-io/promql-engine/ringbuffer/generic.go +++ b/vendor/github.com/thanos-io/promql-engine/ringbuffer/generic.go @@ -106,7 +106,7 @@ func (r *GenericRingBuffer) Reset(mint int64, evalt int64) { r.items = r.items[:keep] } -func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, metricAppearedTs *int64) (*float64, *histogram.FloatHistogram, bool, error) { +func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, scalarArg2 float64, metricAppearedTs *int64) (float64, *histogram.FloatHistogram, bool, error) { return r.call(FunctionArgs{ ctx: ctx, Samples: r.items, @@ -114,6 +114,7 @@ func (r *GenericRingBuffer) Eval(ctx context.Context, scalarArg float64, metricA SelectRange: r.selectRange, Offset: r.offset, ScalarPoint: scalarArg, + ScalarPoint2: scalarArg2, // only for double_exponential_smoothing MetricAppearedTs: metricAppearedTs, }) } diff --git a/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go b/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go index ce600094002..ce30f97770d 100644 --- a/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go +++ b/vendor/github.com/thanos-io/promql-engine/ringbuffer/rate.go @@ -18,7 +18,7 @@ type Buffer interface { MaxT() int64 Push(t int64, v Value) Reset(mint int64, evalt int64) - Eval(ctx context.Context, _ float64, _ *int64) (*float64, *histogram.FloatHistogram, bool, error) + Eval(ctx context.Context, _, _ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error) ReadIntoLast(f func(*Sample)) } @@ -171,9 +171,9 @@ func (r *RateBuffer) Reset(mint int64, evalt int64) { r.firstSamples[last].T = math.MaxInt64 } -func (r *RateBuffer) Eval(ctx context.Context, _ float64, _ *int64) (*float64, *histogram.FloatHistogram, bool, error) { +func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error) { if r.firstSamples[0].T == math.MaxInt64 || r.firstSamples[0].T == r.last.T { - return nil, nil, false, nil + return 0, nil, false, nil } r.rateBuffer = append(append( @@ -183,8 +183,7 @@ func (r *RateBuffer) Eval(ctx context.Context, _ float64, _ *int64) (*float64, * ) r.rateBuffer = slices.CompactFunc(r.rateBuffer, func(s1 Sample, s2 Sample) bool { return s1.T == s2.T }) numSamples := r.stepRanges[0].numSamples - f, h, err := extrapolatedRate(ctx, r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset) - return f, h, true, err + return extrapolatedRate(ctx, r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset) } func (r *RateBuffer) ReadIntoLast(func(*Sample)) {} diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/matrix_selector.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/matrix_selector.go index caf893de691..85bb46f051d 100644 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/matrix_selector.go +++ b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/matrix_selector.go @@ -11,19 +11,21 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/promql/parser/posrange" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/thanos-io/promql-engine/execution/warnings" + "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" - "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" - "github.com/thanos-io/promql-engine/execution/warnings" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/ringbuffer" @@ -45,6 +47,7 @@ type matrixSelector struct { vectorPool *model.VectorPool storage SeriesSelector scalarArg float64 + scalarArg2 float64 scanners []matrixScanner series []labels.Labels once sync.Once @@ -71,7 +74,9 @@ type matrixSelector struct { // Lookback delta for extended range functions. extLookbackDelta int64 - inputSeries []SignedSeries + + nonCounterMetric string + hasFloats bool } var ErrNativeHistogramsNotSupported = errors.New("native histograms are not supported in extended range functions") @@ -82,6 +87,7 @@ func NewMatrixSelector( selector SeriesSelector, functionName string, arg float64, + arg2 float64, opts *query.Options, selectRange, offset time.Duration, batchSize int64, @@ -97,6 +103,7 @@ func NewMatrixSelector( functionName: functionName, vectorPool: pool, scalarArg: arg, + scalarArg2: arg2, fhReader: &histogram.FloatHistogram{}, opts: opts, @@ -156,6 +163,10 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { } if o.currentStep > o.maxt { + if o.nonCounterMetric != "" && o.hasFloats { + warnings.AddToContext(annotations.NewPossibleNonCounterInfo(o.nonCounterMetric, posrange.PositionRange{}), ctx) + } + return nil, nil } if err := o.loadSeries(ctx); err != nil { @@ -189,7 +200,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { // Also, allow operator to exist independently without being nested // under parser.Call by implementing new data model. // https://github.com/thanos-io/promql-engine/issues/39 - f, h, ok, err := scanner.buffer.Eval(ctx, o.scalarArg, scanner.metricAppearedTs) + f, h, ok, err := scanner.buffer.Eval(ctx, o.scalarArg, o.scalarArg2, scanner.metricAppearedTs) if err != nil { return nil, err } @@ -197,21 +208,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { vectors[currStep].T = seriesTs if h != nil { vectors[currStep].AppendHistogram(o.vectorPool, scanner.signature, h) - } else if f != nil { - if o.functionName == "rate" || o.functionName == "increase" { - if len(o.inputSeries) > 0 { - metricName := o.inputSeries[0].Labels().Get(labels.MetricName) - if metricName != "" && - !strings.HasSuffix(metricName, "_total") && - !strings.HasSuffix(metricName, "_sum") && - !strings.HasSuffix(metricName, "_count") && - !strings.HasSuffix(metricName, "_bucket") { - warnings.AddToContext(annotations.NewPossibleNonCounterInfo(metricName, posrange.PositionRange{}), ctx) - } - } - } - - vectors[currStep].AppendSample(o.vectorPool, scanner.signature, *f) + } else { + vectors[currStep].AppendSample(o.vectorPool, scanner.signature, f) + o.hasFloats = true } } o.IncrementSamplesAtTimestamp(scanner.buffer.Len(), seriesTs) @@ -234,8 +233,6 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { return } - o.inputSeries = series - o.scanners = make([]matrixScanner, len(series)) o.series = make([]labels.Labels, len(series)) b := labels.ScratchBuilder{} @@ -264,6 +261,20 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { o.seriesBatchSize = numSeries } o.vectorPool.SetStepSize(int(o.seriesBatchSize)) + + // Add a warning if rate or increase is applied on metrics which are not named like counters. + if o.functionName == "rate" || o.functionName == "increase" { + if len(series) > 0 { + metricName := series[0].Labels().Get(labels.MetricName) + if metricName != "" && + !strings.HasSuffix(metricName, "_total") && + !strings.HasSuffix(metricName, "_sum") && + !strings.HasSuffix(metricName, "_count") && + !strings.HasSuffix(metricName, "_bucket") { + o.nonCounterMetric = metricName + } + } + } }) return err } diff --git a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go index c8883498386..272ee174c48 100644 --- a/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go +++ b/vendor/github.com/thanos-io/promql-engine/storage/prometheus/scanners.go @@ -85,6 +85,7 @@ func (p Scanners) NewMatrixSelector( call logicalplan.FunctionCall, ) (model.VectorOperator, error) { arg := 0.0 + arg2 := 0.0 switch call.Func.Name { case "quantile_over_time": unwrap, err := logicalplan.UnwrapFloat(call.Args[0]) @@ -101,6 +102,22 @@ func (p Scanners) NewMatrixSelector( return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "predict_linear with expression as second argument is not supported") } arg = unwrap + case "double_exponential_smoothing": + sf, err := logicalplan.UnwrapFloat(call.Args[1]) + if err != nil { + return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "double_exponential_smoothing with expression as second argument is not supported") + } + + tf, err := logicalplan.UnwrapFloat(call.Args[2]) + if err != nil { + return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "double_exponential_smoothing with expression as third argument is not supported") + } + + if sf <= 0 || sf >= 1 || tf <= 0 || tf >= 1 { + return nil, nil + } + arg = sf + arg2 = tf } vs := logicalNode.VectorSelector @@ -116,6 +133,7 @@ func (p Scanners) NewMatrixSelector( selector, call.Func.Name, arg, + arg2, opts, logicalNode.Range, vs.Offset, diff --git a/vendor/modules.txt b/vendor/modules.txt index 6faee6e9648..0edeea73918 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -981,7 +981,7 @@ github.com/thanos-io/objstore/providers/gcs github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing/opentracing -# github.com/thanos-io/promql-engine v0.0.0-20250211181629-815830ca3e2e +# github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c ## explicit; go 1.22.7 github.com/thanos-io/promql-engine/api github.com/thanos-io/promql-engine/engine