Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/spf13/afero v1.11.0
github.com/stretchr/testify v1.10.0
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c
github.com/thanos-io/promql-engine v0.0.0-20250302135832-accbf0891a16
github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1687,8 +1687,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c h1:STCm5S4Aht3hOR0WQ0B3daZv21GQC13uPYIfkcN762U=
github.com/thanos-io/promql-engine v0.0.0-20250220213456-fab1185f8c6c/go.mod h1:aHSV5hL94fNb7PklN9L0V10j+/RGIlzqbw7OLdNgZFs=
github.com/thanos-io/promql-engine v0.0.0-20250302135832-accbf0891a16 h1:ezd8hNCWiGQr4kdfCHFa0VCSi+LAO/28Mna264nDs2c=
github.com/thanos-io/promql-engine v0.0.0-20250302135832-accbf0891a16/go.mod h1:aHSV5hL94fNb7PklN9L0V10j+/RGIlzqbw7OLdNgZFs=
github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80 h1:mOCRYn9SLBWJCXAdP+qDfgZDc0eqDxDc2HZGKTZ5vzk=
github.com/thanos-io/thanos v0.37.3-0.20250212101700-346d18bb0f80/go.mod h1:Y7D8la8B5rpzRVKq2HCR4hbYZ4LGroSPqIJjtizgQg8=
github.com/tjhop/slog-gokit v0.1.2 h1:pmQI4SvU9h4gA0vIQsdhJQSqQg4mOmsPykG2/PM3j1I=
Expand Down
88 changes: 88 additions & 0 deletions pkg/querier/engine_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package querier

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
)

type EngineFactory struct {
prometheusEngine *promql.Engine
thanosEngine *engine.Engine

fallbackQueriesTotal prometheus.Counter
}

func NewEngineFactory(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *EngineFactory {
prometheusEngine := promql.NewEngine(opts)

var thanosEngine *engine.Engine
if enableThanosEngine {
thanosEngine = engine.New(engine.Opts{
EngineOpts: opts,
LogicalOptimizers: logicalplan.AllOptimizers,
EnableAnalysis: true,
})
}

return &EngineFactory{
prometheusEngine: prometheusEngine,
thanosEngine: thanosEngine,
fallbackQueriesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_thanos_engine_fallback_queries_total",
Help: "Total number of fallback queries due to not implementation in thanos engine",
}),
}
}

func (qf *EngineFactory) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
if qf.thanosEngine != nil {
res, err := qf.thanosEngine.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
if err != nil {
if engine.IsUnimplemented(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a counter metric for number of fallbacks because new engine doesn't support it?
Similar to
https://github.com/thanos-io/promql-engine/pull/518/files#diff-2e6c4934f63ff9b712c2c346b33036af4724adf70b0801fff9b74f71b37fcd89L180

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a metric and update the pr.

// fallback to use prometheus engine
qf.fallbackQueriesTotal.Inc()
goto fallback
}
return nil, err
}
return res, nil
}

fallback:
return qf.prometheusEngine.NewInstantQuery(ctx, q, opts, qs, ts)
}

func (qf *EngineFactory) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
if qf.thanosEngine != nil {
res, err := qf.thanosEngine.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, interval)
if err != nil {
if engine.IsUnimplemented(err) {
// fallback to use prometheus engine
qf.fallbackQueriesTotal.Inc()
goto fallback
}
return nil, err
}
return res, nil
}

fallback:
return qf.prometheusEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
}

func fromPromQLOpts(opts promql.QueryOpts) *engine.QueryOpts {
if opts == nil {
return &engine.QueryOpts{}
}
return &engine.QueryOpts{
LookbackDeltaParam: opts.LookbackDelta(),
EnablePerStepStatsParam: opts.EnablePerStepStats(),
}
}
59 changes: 59 additions & 0 deletions pkg/querier/engine_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package querier

import (
"bytes"
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
)

func TestEngineFactory_Fallback(t *testing.T) {
// add unimplemented function
parser.Functions["unimplemented"] = &parser.Function{
Name: "unimplemented",
ArgTypes: []parser.ValueType{parser.ValueTypeVector},
ReturnType: parser.ValueTypeVector,
}

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.ThanosEngine = true
ctx := context.Background()
reg := prometheus.NewRegistry()

chunkStore := &emptyChunkStore{}
distributor := &errDistributor{}

overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

now := time.Now()
start := time.Now().Add(-time.Minute * 5)
step := time.Minute
queryable, _, queryEngine := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, reg, log.NewNopLogger(), nil)

// instant query, should go to fallback
_, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now)
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
# TYPE cortex_thanos_engine_fallback_queries_total counter
cortex_thanos_engine_fallback_queries_total 1
`), "cortex_thanos_engine_fallback_queries_total"))

// range query, should go to fallback
_, _ = queryEngine.NewRangeQuery(ctx, queryable, nil, "unimplemented(foo)", start, now, step)
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
# TYPE cortex_thanos_engine_fallback_queries_total counter
cortex_thanos_engine_fallback_queries_total 2
`), "cortex_thanos_engine_fallback_queries_total"))
}
13 changes: 1 addition & 12 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/thanos/pkg/strutil"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -208,7 +206,6 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
// The cortex supports holt_winters for users using this function.
EnableExperimentalPromQLFunctions(cfg.EnablePromQLExperimentalFunctions, true)

var queryEngine promql.QueryEngine
opts := promql.EngineOpts{
Logger: util_log.GoKitLogToSlog(logger),
Reg: reg,
Expand All @@ -223,15 +220,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
return cfg.DefaultEvaluationInterval.Milliseconds()
},
}
if cfg.ThanosEngine {
queryEngine = engine.New(engine.Opts{
EngineOpts: opts,
LogicalOptimizers: logicalplan.AllOptimizers,
EnableAnalysis: true,
})
} else {
queryEngine = promql.NewEngine(opts)
}
queryEngine := NewEngineFactory(opts, cfg.ThanosEngine, reg)
return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, queryEngine
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading