From 479ebed9fadf559c6f86e53d4bcf87aea7d88096 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Wed, 19 Feb 2025 15:49:46 +0000 Subject: [PATCH] engine: remove implicit fallback Signed-off-by: Michael Hoffmann --- README.md | 2 +- engine/bench_test.go | 1 - engine/distributed_test.go | 12 +- engine/engine.go | 64 ++-------- engine/engine_test.go | 224 +++++++++++++--------------------- engine/enginefuzz_test.go | 232 +----------------------------------- engine/user_defined_test.go | 3 +- 7 files changed, 108 insertions(+), 430 deletions(-) diff --git a/README.md b/README.md index 1ab77667e..6b24ab7ca 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ The project is currently under active development. ## Roadmap -The engine intends to have full compatibility with the original engine used in Prometheus. Since implementing the full specification will take time, we aim to add support for most commonly used expressions while falling back to the original engine for operations that are not yet supported. This will allow us to have smaller and faster releases, and gather feedback on a regular basis. Instructions on using the engine will be added after we have enough confidence in its correctness. +The engine intends to have full compatibility with the original engine used in Prometheus. Since implementing the full specification will take time, we aim to add support for most commonly used expressions. Instructions on using the engine will be added after we have enough confidence in its correctness. If the engine encounters an expression it does not support it will return an error that can be tested with `engine.IsUnimplemented(err)`, the calling code is expected to handle this fallback. The following table shows operations which are currently supported by the engine diff --git a/engine/bench_test.go b/engine/bench_test.go index 9e076bc5f..118e97a3a 100644 --- a/engine/bench_test.go +++ b/engine/bench_test.go @@ -102,7 +102,6 @@ func BenchmarkSingleQuery(b *testing.B) { query := "sum(rate(http_requests_total[2m]))" opts := engine.Opts{ EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second}, - DisableFallback: true, SelectorBatchSize: 256, } b.ReportAllocs() diff --git a/engine/distributed_test.go b/engine/distributed_test.go index 2e2747758..403f01fc4 100644 --- a/engine/distributed_test.go +++ b/engine/distributed_test.go @@ -193,10 +193,9 @@ func TestDistributedAggregations(t *testing.T) { } queries := []struct { - name string - query string - rangeStart time.Time - expectFallback bool + name string + query string + rangeStart time.Time }{ {name: "binop with selector and constant series", query: `bar or on () vector(0)`}, {name: "binop with aggregation and constant series", query: `sum(bar) or on () vector(0)`}, @@ -226,7 +225,7 @@ func TestDistributedAggregations(t *testing.T) { {name: "binary nested with constants", query: `(1 + 2) + (1 atan2 (-1 % -1))`}, {name: "binary nested with functions", query: `(1 + exp(vector(1))) + (1 atan2 (-1 % -1))`}, {name: "filtered selector interaction", query: `sum by (region) (bar{region="east"}) / sum by (region) (bar)`}, - {name: "unsupported aggregation", query: `count_values("pod", bar)`, expectFallback: true}, + {name: "unsupported aggregation", query: `count_values("pod", bar)`}, {name: "absent_over_time for non-existing metric", query: `absent_over_time(foo[2m])`}, {name: "absent_over_time for existing metric", query: `absent_over_time(bar{pod="nginx-1"}[2m])`}, {name: "absent for non-existing metric", query: `absent(foo)`}, @@ -249,7 +248,7 @@ func TestDistributedAggregations(t *testing.T) { {name: "query with @start() absolute timestamp", query: `sum(bar @ start())`}, {name: "query with @end() timestamp", query: `sum(bar @ end())`}, {name: "query with numeric timestamp", query: `sum(bar @ 140.000)`}, - {name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`, expectFallback: true}, + {name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`}, {name: `subquery with @end() timestamp`, query: `bar @ 100.000 - bar @ 150.000`}, } @@ -306,7 +305,6 @@ func TestDistributedAggregations(t *testing.T) { for _, queryOpts := range allQueryOpts { ctx := context.Background() distOpts := localOpts - distOpts.DisableFallback = !query.expectFallback for _, instantTS := range instantTSs { t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) { distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines)) diff --git a/engine/engine.go b/engine/engine.go index f74ff74f3..422c659ad 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -11,8 +11,6 @@ import ( "sort" "time" - "github.com/thanos-io/promql-engine/execution/telemetry" - "github.com/efficientgo/core/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -27,6 +25,7 @@ import ( "github.com/thanos-io/promql-engine/execution/function" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" + "github.com/thanos-io/promql-engine/execution/telemetry" "github.com/thanos-io/promql-engine/execution/warnings" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/logicalplan" @@ -39,7 +38,7 @@ type QueryType int type engineMetrics struct { currentQueries prometheus.Gauge - queries *prometheus.CounterVec + totalQueries prometheus.Counter } const ( @@ -50,16 +49,16 @@ const ( stepsBatch = 10 ) +func IsUnimplemented(err error) bool { + return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) +} + type Opts struct { promql.EngineOpts // LogicalOptimizers are optimizers that are run if the value is not nil. If it is nil then the default optimizers are run. Default optimizer list is available in the logicalplan package. LogicalOptimizers []logicalplan.Optimizer - // DisableFallback enables mode where engine returns error if some expression of feature is not yet implemented - // in the new engine, instead of falling back to prometheus engine. - DisableFallback bool - // ExtLookbackDelta specifies what time range to use to determine valid previous sample for extended range functions. // Defaults to 1 hour if not specified. ExtLookbackDelta time.Duration @@ -71,9 +70,6 @@ type Opts struct { // This will default to false. EnableXFunctions bool - // FallbackEngine - Engine promql.QueryEngine - // EnableAnalysis enables query analysis. EnableAnalysis bool @@ -177,23 +173,16 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine { Help: "The current number of queries being executed or waiting.", }, ), - queries: promauto.With(opts.Reg).NewCounterVec( + totalQueries: promauto.With(opts.Reg).NewCounter( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queries_total", Help: "Number of PromQL queries.", - }, []string{"fallback"}, + }, ), } - var engine promql.QueryEngine - if opts.Engine == nil { - engine = promql.NewEngine(opts.EngineOpts) - } else { - engine = opts.Engine - } - decodingConcurrency := opts.DecodingConcurrency if opts.DecodingConcurrency < 1 { decodingConcurrency = runtime.GOMAXPROCS(0) / 2 @@ -208,13 +197,11 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine { } return &Engine{ - prom: engine, functions: functions, scanners: scanners, activeQueryTracker: queryTracker, disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks, - disableFallback: opts.DisableFallback, logger: opts.Logger, lookbackDelta: opts.LookbackDelta, @@ -240,13 +227,11 @@ var ( ) type Engine struct { - prom promql.QueryEngine functions map[string]*parser.Function scanners engstorage.Scanners activeQueryTracker promql.QueryTracker disableDuplicateLabelChecks bool - disableFallback bool logger *slog.Logger lookbackDelta time.Duration @@ -290,14 +275,10 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts) - if e.triggerFallback(err) { - e.metrics.queries.WithLabelValues("true").Inc() - return e.prom.NewInstantQuery(ctx, q, opts, qs, ts) - } - e.metrics.queries.WithLabelValues("false").Inc() if err != nil { return nil, err } + e.metrics.totalQueries.Inc() return &compatibilityQuery{ Query: &Query{exec: exec, opts: opts}, engine: e, @@ -338,14 +319,10 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab } exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) - if e.triggerFallback(err) { - e.metrics.queries.WithLabelValues("true").Inc() - return e.prom.NewInstantQuery(ctx, q, opts, root.String(), ts) - } - e.metrics.queries.WithLabelValues("false").Inc() if err != nil { return nil, err } + e.metrics.totalQueries.Inc() return &compatibilityQuery{ Query: &Query{exec: exec, opts: opts}, @@ -396,14 +373,10 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts * } exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) - if e.triggerFallback(err) { - e.metrics.queries.WithLabelValues("true").Inc() - return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step) - } - e.metrics.queries.WithLabelValues("false").Inc() if err != nil { return nil, err } + e.metrics.totalQueries.Inc() return &compatibilityQuery{ Query: &Query{exec: exec, opts: opts}, @@ -442,14 +415,11 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) - if e.triggerFallback(err) { - e.metrics.queries.WithLabelValues("true").Inc() - return e.prom.NewRangeQuery(ctx, q, opts, lplan.Root().String(), start, end, step) - } - e.metrics.queries.WithLabelValues("false").Inc() if err != nil { return nil, err } + e.metrics.totalQueries.Inc() + return &compatibilityQuery{ Query: &Query{exec: exec, opts: opts}, engine: e, @@ -516,14 +486,6 @@ func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Optio return e.scanners, nil } -func (e *Engine) triggerFallback(err error) bool { - if e.disableFallback { - return false - } - - return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) -} - type Query struct { exec model.VectorOperator opts promql.QueryOpts diff --git a/engine/engine_test.go b/engine/engine_test.go index e056592f1..2a03a84c7 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -89,8 +89,14 @@ func TestPromqlAcceptance(t *testing.T) { }}) st := &skipTest{ - skipTests: []string{"testdata/name_label_dropping.test"}, // skip name_label_dropping test temporary TODO(sungjin1212): change to test whole cases - TBRun: t, + skipTests: []string{ + "testdata/name_label_dropping.test", // feature unsupported + "testdata/limit.test", // limitk, limit_ratio + "testdata/native_histograms.test", // histogram_stddev, histogram_stdvar + "testdata/functions.test", // mad_over_time, predict_linear + "testdata/histograms.test", // histogram_stddev, histogram_stdvar + }, // TODO(sungjin1212): change to test whole cases + TBRun: t, } promqltest.RunBuiltinTests(st, engine) @@ -181,7 +187,6 @@ func TestQuerierClosedAfterQueryClosed(t *testing.T) { optimizers := logicalplan.AllOptimizers newEngine := engine.New(engine.Opts{ EngineOpts: opts, - DisableFallback: true, LogicalOptimizers: optimizers, // Set to 1 to make sure batching is tested. SelectorBatchSize: 1, @@ -2092,34 +2097,29 @@ avg by (storage_info) ( } for _, disableOptimizers := range disableOptimizerOpts { t.Run(fmt.Sprintf("disableOptimizers=%v", disableOptimizers), func(t *testing.T) { - for _, disableFallback := range []bool{false, true} { - t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) { - optimizers := logicalplan.AllOptimizers - if disableOptimizers { - optimizers = logicalplan.NoOptimizers - } - newEngine := engine.New(engine.Opts{ - EngineOpts: opts, - DisableFallback: disableFallback, - LogicalOptimizers: optimizers, - // Set to 1 to make sure batching is tested. - SelectorBatchSize: 1, - }) - ctx := context.Background() - q1, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.start, tc.end, tc.step) - testutil.Ok(t, err) - defer q1.Close() - newResult := q1.Exec(ctx) - - oldEngine := promql.NewEngine(opts) - q2, err := oldEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.start, tc.end, tc.step) - testutil.Ok(t, err) - defer q2.Close() - oldResult := q2.Exec(ctx) - - testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1)) - }) + optimizers := logicalplan.AllOptimizers + if disableOptimizers { + optimizers = logicalplan.NoOptimizers } + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + LogicalOptimizers: optimizers, + // Set to 1 to make sure batching is tested. + SelectorBatchSize: 1, + }) + ctx := context.Background() + q1, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.start, tc.end, tc.step) + testutil.Ok(t, err) + defer q1.Close() + newResult := q1.Exec(ctx) + + oldEngine := promql.NewEngine(opts) + q2, err := oldEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.start, tc.end, tc.step) + testutil.Ok(t, err) + defer q2.Close() + oldResult := q2.Exec(ctx) + + testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1)) }) } }) @@ -2382,7 +2382,6 @@ func TestDisabledXFunction(t *testing.T) { newEngine := engine.New(engine.Opts{ EngineOpts: opts, - DisableFallback: true, LogicalOptimizers: optimizers, }) _, err := newEngine.NewInstantQuery(context.Background(), storage, nil, tc.query, queryTime) @@ -2416,7 +2415,6 @@ func TestXFunctionsWithNativeHistograms(t *testing.T) { ctx := context.Background() newEngine := engine.New(engine.Opts{ EngineOpts: opts, - DisableFallback: true, LogicalOptimizers: optimizers, EnableXFunctions: true, }) @@ -2671,7 +2669,6 @@ func TestXFunctions(t *testing.T) { ctx := context.Background() newEngine := engine.New(engine.Opts{ EngineOpts: opts, - DisableFallback: true, LogicalOptimizers: optimizers, EnableXFunctions: true, }) @@ -2986,7 +2983,6 @@ func TestRateVsXRate(t *testing.T) { newEngine := engine.New(engine.Opts{ EngineOpts: opts, - DisableFallback: true, LogicalOptimizers: optimizers, EnableXFunctions: true, }) @@ -4199,39 +4195,34 @@ min without () ( LookbackDelta: lookbackDelta, } - for _, disableFallback := range []bool{false, true} { - t.Run(fmt.Sprintf("disableFallback=%v", disableFallback), func(t *testing.T) { - var queryTime time.Time = defaultQueryTime - if tc.queryTime != (time.Time{}) { - queryTime = tc.queryTime - } - - optimizers := logicalplan.AllOptimizers - if disableOptimizers { - optimizers = logicalplan.NoOptimizers - } - newEngine := engine.New(engine.Opts{ - EngineOpts: opts, - DisableFallback: disableFallback, - LogicalOptimizers: optimizers, - }) - - ctx := context.Background() - q1, err := newEngine.NewInstantQuery(ctx, testStorage, nil, tc.query, queryTime) - testutil.Ok(t, err) - defer q1.Close() - - newResult := q1.Exec(ctx) - - oldEngine := promql.NewEngine(opts) - q2, err := oldEngine.NewInstantQuery(ctx, testStorage, nil, tc.query, queryTime) - testutil.Ok(t, err) - defer q2.Close() - - oldResult := q2.Exec(ctx) - testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1)) - }) + var queryTime time.Time = defaultQueryTime + if tc.queryTime != (time.Time{}) { + queryTime = tc.queryTime + } + + optimizers := logicalplan.AllOptimizers + if disableOptimizers { + optimizers = logicalplan.NoOptimizers } + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + LogicalOptimizers: optimizers, + }) + + ctx := context.Background() + q1, err := newEngine.NewInstantQuery(ctx, testStorage, nil, tc.query, queryTime) + testutil.Ok(t, err) + defer q1.Close() + + newResult := q1.Exec(ctx) + + oldEngine := promql.NewEngine(opts) + q2, err := oldEngine.NewInstantQuery(ctx, testStorage, nil, tc.query, queryTime) + testutil.Ok(t, err) + defer q2.Close() + + oldResult := q2.Exec(ctx) + testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1)) } }) } @@ -4349,7 +4340,7 @@ func TestQueryTimeout(t *testing.T) { storage := promqltest.LoadedStorage(t, load) defer storage.Close() - newEngine := engine.New(engine.Opts{DisableFallback: true, EngineOpts: opts}) + newEngine := engine.New(engine.Opts{EngineOpts: opts}) q, err := newEngine.NewInstantQuery(context.Background(), storage, nil, query, end) testutil.Ok(t, err) @@ -4401,39 +4392,39 @@ func TestSelectHintsSetCorrectly(t *testing.T) { {Start: -4000 + 1, End: 1000}, }, }, { - query: `foo[2m]`, start: 200000, + query: `rate(foo[2m])`, start: 200000, expected: []*storage.SelectHints{ - {Start: 80000 + 1, End: 200000, Range: 120000}, + {Start: 80000 + 1, End: 200000, Range: 120000, Func: "rate"}, }, }, { - query: `foo[2m] @ 180.000`, start: 200000, + query: `rate(foo[2m] @ 180.000)`, start: 200000, expected: []*storage.SelectHints{ - {Start: 60000 + 1, End: 180000, Range: 120000}, + {Start: 60000 + 1, End: 180000, Range: 120000, Func: "rate"}, }, }, { - query: `foo[2m] @ 300.000`, start: 200000, + query: `rate(foo[2m] @ 300.000)`, start: 200000, expected: []*storage.SelectHints{ - {Start: 180000 + 1, End: 300000, Range: 120000}, + {Start: 180000 + 1, End: 300000, Range: 120000, Func: "rate"}, }, }, { - query: `foo[2m] @ 60.000`, start: 200000, + query: `rate(foo[2m] @ 60.000)`, start: 200000, expected: []*storage.SelectHints{ - {Start: -60000 + 1, End: 60000, Range: 120000}, + {Start: -60000 + 1, End: 60000, Range: 120000, Func: "rate"}, }, }, { - query: `foo[2m] offset 2m`, start: 300000, + query: `rate(foo[2m] offset 2m)`, start: 300000, expected: []*storage.SelectHints{ - {Start: 60000 + 1, End: 180000, Range: 120000}, + {Start: 60000 + 1, End: 180000, Range: 120000, Func: "rate"}, }, }, { - query: `foo[2m] @ 200.000 offset 2m`, start: 300000, + query: `rate(foo[2m] @ 200.000 offset 2m)`, start: 300000, expected: []*storage.SelectHints{ - {Start: -40000 + 1, End: 80000, Range: 120000}, + {Start: -40000 + 1, End: 80000, Range: 120000, Func: "rate"}, }, }, { - query: `foo[2m:1s]`, start: 300000, + query: `rate(foo[2m:1s])`, start: 300000, expected: []*storage.SelectHints{ - {Start: 175000 + 1, End: 300000, Step: 1000}, + {Start: 175000 + 1, End: 300000, Step: 1000, Func: "rate"}, }, }, { query: `count_over_time(foo[2m:1s])`, start: 300000, @@ -4599,12 +4590,12 @@ func TestSelectHintsSetCorrectly(t *testing.T) { {Start: 5000 + 1, End: 10000, Func: "max", By: true, Grouping: []string{"dim2"}}, }, }, { - query: `(max by (dim1) (foo))[5s:1s]`, start: 10000, + query: `max_over_time((max by (dim1) (foo))[5s:1s])`, start: 10000, expected: []*storage.SelectHints{ {Start: 0 + 1, End: 10000, Func: "max", By: true, Grouping: []string{"dim1"}, Step: 1000}, }, }, { - query: "(sum(http_requests{group=~\"p.*\"})+max(http_requests{group=~\"c.*\"}))[20s:5s]", start: 120000, + query: "max_over_time((sum(http_requests{group=~\"p.*\"})+max(http_requests{group=~\"c.*\"}))[20s:5s])", start: 120000, expected: []*storage.SelectHints{ {Start: 95000 + 1, End: 120000, Func: "sum", By: true, Step: 5000}, {Start: 95000 + 1, End: 120000, Func: "max", By: true, Step: 5000}, @@ -4645,12 +4636,18 @@ func TestSelectHintsSetCorrectly(t *testing.T) { {Start: 655000 + 1, End: 780000, Step: 1000, Func: "rate"}, }, }, { // Hints are based on the inner most subquery timestamp. - query: `sum_over_time(sum_over_time(metric{job="1"}[1m40s])[1m40s:25s] @ 50.000)[3s:1s] @ 3000.000`, start: 100000, + query: ` +sum_over_time( +sum_over_time(sum_over_time(metric{job="1"}[1m40s])[1m40s:25s] @ 50.000)[3s:1s] @ 3000.000 +)`, start: 100000, expected: []*storage.SelectHints{ {Start: -150000 + 1, End: 50000, Range: 100000, Func: "sum_over_time", Step: 25000}, }, }, { // Hints are based on the inner most subquery timestamp. - query: `sum_over_time(sum_over_time(metric{job="1"}[1m40s])[1m40s:25s] @ 3000.000)[3s:1s] @ 50.000`, + query: ` +sum_over_time( +sum_over_time(sum_over_time(metric{job="1"}[1m40s])[1m40s:25s] @ 3000.000)[3s:1s] @ 50.000 +)`, expected: []*storage.SelectHints{ {Start: 2800000 + 1, End: 3000000, Range: 100000, Func: "sum_over_time", Step: 25000}, }, @@ -4702,51 +4699,6 @@ func TestSelectHintsSetCorrectly(t *testing.T) { } } -func TestFallback(t *testing.T) { - start := time.Unix(0, 0) - end := time.Unix(120, 0) - step := time.Second * 30 - - cases := []struct { - name string - query string - }{ - { - name: "unsupported function with scalar", - query: `quantile_over_time(scalar(sum(http_requests_total)), http_requests_total[2m])`, - }, - } - - load := `load 30s - http_requests_total{pod="nginx-1"} 1+1x1 - http_requests_total{pod="nginx-2"} 1+2x40` - - storage := promqltest.LoadedStorage(t, load) - defer storage.Close() - - for _, tcase := range cases { - t.Run(tcase.name, func(t *testing.T) { - for _, disableFallback := range []bool{true, false} { - t.Run(fmt.Sprintf("disableFallback=%t", disableFallback), func(t *testing.T) { - opts := promql.EngineOpts{ - Timeout: 2 * time.Second, - MaxSamples: math.MaxInt64, - } - newEngine := engine.New(engine.Opts{DisableFallback: disableFallback, EngineOpts: opts}) - q1, err := newEngine.NewRangeQuery(context.Background(), storage, nil, tcase.query, start, end, step) - if disableFallback { - testutil.NotOk(t, err) - } else { - testutil.Ok(t, err) - newResult := q1.Exec(context.Background()) - testutil.Ok(t, newResult.Err) - } - }) - } - }) - } -} - func TestQueryStats(t *testing.T) { cases := []struct { name string @@ -4919,7 +4871,7 @@ func TestQueryStats(t *testing.T) { ctx := context.Background() oldEngine := promql.NewEngine(opts) - newEngine := engine.New(engine.Opts{DisableFallback: true, EnableAnalysis: true, EngineOpts: opts}) + newEngine := engine.New(engine.Opts{EnableAnalysis: true, EngineOpts: opts}) // Instant query oldQ, err := oldEngine.NewInstantQuery(ctx, storage, qOpts, tc.query, tc.end) @@ -5196,9 +5148,7 @@ func TestEngineRecoversFromPanic(t *testing.T) { }, } t.Run("instant", func(t *testing.T) { - newEngine := engine.New(engine.Opts{ - DisableFallback: true, - }) + newEngine := engine.New(engine.Opts{}) ctx := context.Background() q, err := newEngine.NewInstantQuery(ctx, querier, nil, "somequery", time.Time{}) testutil.Ok(t, err) @@ -5208,9 +5158,7 @@ func TestEngineRecoversFromPanic(t *testing.T) { }) t.Run("range", func(t *testing.T) { - newEngine := engine.New(engine.Opts{ - DisableFallback: true, - }) + newEngine := engine.New(engine.Opts{}) ctx := context.Background() q, err := newEngine.NewRangeQuery(ctx, querier, nil, "somequery", time.Time{}, time.Time{}, 42) testutil.Ok(t, err) @@ -5464,7 +5412,6 @@ func testNativeHistograms(t *testing.T, cases []histogramTestCase, opts promql.E promEngine := promql.NewEngine(opts) thanosEngine := engine.New(engine.Opts{ EngineOpts: opts, - DisableFallback: true, LogicalOptimizers: logicalplan.AllOptimizers, }) @@ -5620,7 +5567,6 @@ func TestMixedNativeHistogramTypes(t *testing.T) { engine := engine.New(engine.Opts{ EngineOpts: opts, - DisableFallback: true, LogicalOptimizers: logicalplan.AllOptimizers, }) diff --git a/engine/enginefuzz_test.go b/engine/enginefuzz_test.go index c18df6c20..a08e9b517 100644 --- a/engine/enginefuzz_test.go +++ b/engine/enginefuzz_test.go @@ -23,11 +23,8 @@ import ( "github.com/prometheus/prometheus/promql/promqltest" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/stats" - "github.com/prometheus/prometheus/util/teststorage" - "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/engine" - "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/logicalplan" ) @@ -106,7 +103,7 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) { } ps := promqlsmith.New(rnd, seriesSet, psOpts...) - newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: true, EnableAnalysis: true}) + newEngine := engine.New(engine.Opts{EngineOpts: opts, EnableAnalysis: true}) oldEngine := promql.NewEngine(opts) var ( @@ -122,7 +119,7 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) { query = expr.Pretty(0) q1, err = newEngine.NewRangeQuery(context.Background(), storage, qOpts, query, start, end, interval) - if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) || errors.As(err, &parser.ParseErrors{}) { + if engine.IsUnimplemented(err) || errors.As(err, &parser.ParseErrors{}) { continue } else { break @@ -187,7 +184,6 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) { queryTime := time.Unix(int64(ts), 0) newEngine := engine.New(engine.Opts{ EngineOpts: opts, - DisableFallback: true, LogicalOptimizers: logicalplan.AllOptimizers, EnableAnalysis: true, }) @@ -220,7 +216,7 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) { } query = expr.Pretty(0) q1, err = newEngine.NewInstantQuery(context.Background(), storage, qOpts, query, queryTime) - if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) || errors.As(err, &parser.ParseErrors{}) { + if engine.IsUnimplemented(err) || errors.As(err, &parser.ParseErrors{}) { continue } else { break @@ -255,228 +251,6 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) { }) } -func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) { - f.Skip("Skip from CI to repair later") - - f.Add(int64(0), uint32(0), uint32(120), uint32(30), 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 30) - - f.Fuzz(func(t *testing.T, seed int64, startTS, endTS, intervalSeconds uint32, initialVal1, initialVal2, initialVal3, initialVal4, inc1, inc2 float64, stepRange int) { - if math.IsNaN(initialVal1) || math.IsNaN(initialVal2) || math.IsNaN(inc1) || math.IsNaN(inc2) { - return - } - if math.IsInf(initialVal1, 0) || math.IsInf(initialVal2, 0) || math.IsInf(inc1, 0) || math.IsInf(inc2, 0) { - return - } - if inc1 < 0 || inc2 < 0 || stepRange <= 0 || intervalSeconds <= 0 || endTS < startTS { - return - } - load := fmt.Sprintf(`load 30s - http_requests_total{pod="nginx-1", route="/"} %.2f+%.2fx4 - http_requests_total{pod="nginx-2", route="/"} %2.f+%.2fx4`, initialVal1, inc1, initialVal2, inc2) - load2 := fmt.Sprintf(`load 30s - http_requests_total{pod="nginx-1", route="/"} %.2f+%.2fx4 - http_requests_total{pod="nginx-2", route="/"} %2.f+%.2fx4`, initialVal3, inc1, initialVal4, inc2) - - opts := promql.EngineOpts{ - Timeout: 1 * time.Hour, - MaxSamples: 1e10, - EnableNegativeOffset: true, - EnableAtModifier: true, - } - engineOpts := engine.Opts{ - EngineOpts: opts, - DisableFallback: true, - LogicalOptimizers: logicalplan.AllOptimizers, - } - - queryables := []*teststorage.TestStorage{} - storage1 := promqltest.LoadedStorage(t, load) - defer storage1.Close() - queryables = append(queryables, storage1) - - storage2 := promqltest.LoadedStorage(t, load2) - defer storage2.Close() - queryables = append(queryables, storage2) - - start := time.Unix(int64(startTS), 0) - end := time.Unix(int64(endTS), 0) - interval := time.Duration(intervalSeconds) * time.Second - - partitionLabels := [][]labels.Labels{ - {labels.FromStrings("zone", "west-1")}, - {labels.FromStrings("zone", "west-2")}, - } - remoteEngines := make([]api.RemoteEngine, 0, 2) - for i := 0; i < 2; i++ { - e := engine.NewRemoteEngine( - engineOpts, - queryables[i], - queryables[i].DB.Head().MinTime(), - queryables[i].DB.Head().MaxTime(), - partitionLabels[i], - ) - remoteEngines = append(remoteEngines, e) - } - distEngine := engine.NewDistributedEngine(engineOpts, api.NewStaticEndpoints(remoteEngines)) - oldEngine := promql.NewEngine(opts) - - mergeStore := storage.NewFanout(nil, storage1, storage2) - seriesSet, err := getSeries(context.Background(), mergeStore) - require.NoError(t, err) - rnd := rand.New(rand.NewSource(seed)) - psOpts := []promqlsmith.Option{ - promqlsmith.WithEnableOffset(true), - promqlsmith.WithEnableAtModifier(true), - promqlsmith.WithAtModifierMaxTimestamp(180 * 1000), - promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.GROUP, parser.COUNT, parser.BOTTOMK, parser.TOPK}), - } - ps := promqlsmith.New(rnd, seriesSet, psOpts...) - - var ( - q1 promql.Query - query string - ) - cases := make([]*testCase, testRuns) - ctx := context.Background() - for i := 0; i < testRuns; i++ { - // Since we disabled fallback, keep trying until we find a query - // that can be natively execute by the engine. - for { - expr := ps.WalkRangeQuery() - query = expr.Pretty(0) - q1, err = distEngine.NewRangeQuery(ctx, mergeStore, nil, query, start, end, interval) - if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) { - continue - } else { - break - } - } - - testutil.Ok(t, err) - newResult := q1.Exec(ctx) - - q2, err := oldEngine.NewRangeQuery(ctx, mergeStore, nil, query, start, end, interval) - testutil.Ok(t, err) - - oldResult := q2.Exec(ctx) - - cases[i] = &testCase{ - query: query, - newRes: newResult, - oldRes: oldResult, - loads: []string{load, load2}, - } - } - validateTestCases(t, cases) - }) -} - -func FuzzDistributedEnginePromQLSmithInstantQuery(f *testing.F) { - f.Skip("Skip from CI to repair later") - - f.Add(int64(0), uint32(0), 1.0, 1.0, 1.0, 1.0, 1.0, 2.0) - - f.Fuzz(func(t *testing.T, seed int64, ts uint32, initialVal1, initialVal2, initialVal3, initialVal4, inc1, inc2 float64) { - if inc1 < 0 || inc2 < 0 { - return - } - load := fmt.Sprintf(`load 30s - http_requests_total{pod="nginx-1", route="/"} %.2f+%.2fx4 - http_requests_total{pod="nginx-2", route="/"} %2.f+%.2fx4`, initialVal1, inc1, initialVal2, inc2) - load2 := fmt.Sprintf(`load 30s - http_requests_total{pod="nginx-1", route="/"} %.2f+%.2fx4 - http_requests_total{pod="nginx-2", route="/"} %2.f+%.2fx4`, initialVal3, inc1, initialVal4, inc2) - - opts := promql.EngineOpts{ - Timeout: 1 * time.Hour, - MaxSamples: 1e10, - EnableNegativeOffset: true, - EnableAtModifier: true, - } - engineOpts := engine.Opts{EngineOpts: opts, DisableFallback: true} - - queryables := []*teststorage.TestStorage{} - storage1 := promqltest.LoadedStorage(t, load) - defer storage1.Close() - queryables = append(queryables, storage1) - - storage2 := promqltest.LoadedStorage(t, load2) - defer storage2.Close() - queryables = append(queryables, storage2) - - partitionLabels := [][]labels.Labels{ - {labels.FromStrings("zone", "west-1")}, - {labels.FromStrings("zone", "west-2")}, - } - queryTime := time.Unix(int64(ts), 0) - remoteEngines := make([]api.RemoteEngine, 0, 2) - for i := 0; i < 2; i++ { - e := engine.NewRemoteEngine( - engineOpts, - queryables[i], - queryables[i].DB.Head().MinTime(), - queryables[i].DB.Head().MaxTime(), - partitionLabels[i], - ) - remoteEngines = append(remoteEngines, e) - } - distEngine := engine.NewDistributedEngine(engineOpts, api.NewStaticEndpoints(remoteEngines)) - oldEngine := promql.NewEngine(opts) - - mergeStore := storage.NewFanout(nil, storage1, storage2) - seriesSet, err := getSeries(context.Background(), mergeStore) - require.NoError(t, err) - rnd := rand.New(rand.NewSource(seed)) - psOpts := []promqlsmith.Option{ - promqlsmith.WithEnableOffset(true), - promqlsmith.WithEnableAtModifier(true), - promqlsmith.WithAtModifierMaxTimestamp(180 * 1000), - promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.GROUP, parser.COUNT, parser.BOTTOMK, parser.TOPK}), - } - ps := promqlsmith.New(rnd, seriesSet, psOpts...) - ctx := context.Background() - - var ( - q1 promql.Query - query string - ) - cases := make([]*testCase, testRuns) - for i := 0; i < testRuns; i++ { - // Since we disabled fallback, keep trying until we find a query - // that can be natively execute by the engine. - for { - // Matrix value type cannot be supported for now as distributed engine - // will execute remote query as range query. - expr := ps.Walk(parser.ValueTypeVector) - query = expr.Pretty(0) - q1, err = distEngine.NewInstantQuery(ctx, mergeStore, nil, query, queryTime) - if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) { - continue - } else { - break - } - } - - testutil.Ok(t, err) - newResult := q1.Exec(ctx) - - q2, err := oldEngine.NewInstantQuery(ctx, mergeStore, nil, query, queryTime) - testutil.Ok(t, err) - - oldResult := q2.Exec(ctx) - - cases[i] = &testCase{ - query: query, - newRes: newResult, - oldRes: oldResult, - loads: []string{load, load2}, - start: queryTime, - } - } - validateTestCases(t, cases) - }) -} - func getSeries(ctx context.Context, q storage.Queryable) ([]labels.Labels, error) { querier, err := q.Querier(0, time.Now().Unix()) if err != nil { diff --git a/engine/user_defined_test.go b/engine/user_defined_test.go index 858a36e70..acdf67d1d 100644 --- a/engine/user_defined_test.go +++ b/engine/user_defined_test.go @@ -36,8 +36,7 @@ load 30s defer storage.Close() newEngine := engine.New(engine.Opts{ - EngineOpts: opts, - DisableFallback: true, + EngineOpts: opts, LogicalOptimizers: []logicalplan.Optimizer{ &injectVectorSelector{}, },