Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,21 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec,
shardedPrometheusCodec,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.DefaultEvaluationInterval,
t.Cfg.Frontend.DistributedExecEnabled,
)
if err != nil {
return nil, err
}

instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
instantQueryMiddlewares, err := instantquery.Middlewares(
util_log.Logger,
t.Overrides,
instantQueryCodec,
queryAnalyzer,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.DefaultEvaluationInterval,
t.Cfg.Frontend.DistributedExecEnabled)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type CombinedFrontendConfig struct {
FrontendV1 v1.Config `yaml:",inline"`
FrontendV2 v2.Config `yaml:",inline"`

DownstreamURL string `yaml:"downstream_url"`
DownstreamURL string `yaml:"downstream_url"`
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
}

func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -29,6 +30,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
cfg.FrontendV2.RegisterFlags(f)

f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
}

// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at
Expand Down
93 changes: 93 additions & 0 deletions pkg/querier/tripperware/distributed_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package tripperware

import (
"context"
"net/http"
"time"

"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
"github.com/weaveworks/common/httpgrpc"
)

const (
stepBatch = 10
)

func DistributedQueryMiddleware(defaultEvaluationInterval time.Duration, lookbackDelta time.Duration) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return distributedQueryMiddleware{
next: next,
lookbackDelta: lookbackDelta,
defaultEvaluationInterval: defaultEvaluationInterval,
}
})
}

func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
if step == 0 {
return start, start
}
return start, end
}

type distributedQueryMiddleware struct {
next Handler
defaultEvaluationInterval time.Duration
lookbackDelta time.Duration
}

func (d distributedQueryMiddleware) newLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, error) {

start, end = getStartAndEnd(start, end, step)

qOpts := query.Options{
Start: start,
End: end,
Step: step,
StepsBatch: stepBatch,
NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration {
return d.defaultEvaluationInterval
},
// Hardcoded value for execution-time-params that will be re-populated again in the querier stage
LookbackDelta: d.lookbackDelta,
EnablePerStepStats: false,
}

expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
if err != nil {
return nil, err
}

planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: false,
}

logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts)
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)

return &optimizedPlan, nil
}

func (d distributedQueryMiddleware) Do(ctx context.Context, r Request) (Response, error) {
promReq, ok := r.(*PrometheusRequest)
if !ok {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}

startTime := time.Unix(0, promReq.Start*int64(time.Millisecond))
endTime := time.Unix(0, promReq.End*int64(time.Millisecond))
step := time.Duration(promReq.Step) * time.Millisecond

var err error

newLogicalPlan, err := d.newLogicalPlan(promReq.Query, startTime, endTime, step)
if err != nil {
return nil, err
}

promReq.LogicalPlan = *newLogicalPlan

return d.next.Do(ctx, r)
}
146 changes: 146 additions & 0 deletions pkg/querier/tripperware/distributed_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package tripperware

import (
"context"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestLogicalPlanGeneration(t *testing.T) {
testCases := []struct {
name string
queryType string // "instant" or "range"
input *PrometheusRequest
err error
}{
// instant query test cases
{
name: "instant - rate vector selector",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])",
},
},
{
name: "instant - memory usage expression",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))",
},
},
{
name: "instant - scalar only query",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "42",
},
},
{
name: "instant - vector arithmetic",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "node_load1 / ignoring(cpu) node_cpu_seconds_total",
},
},
{
name: "instant - avg_over_time with nested rate",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "avg_over_time(rate(http_requests_total[5m])[30m:5m])",
},
},

// query range test cases
{
name: "range - rate vector over time",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 15000,
Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])",
},
},
{
name: "range - memory usage ratio",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 30000,
Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))",
},
},
{
name: "range - avg_over_time function",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 60000,
Query: "avg_over_time(http_requests_total[5m])",
},
},
{
name: "range - vector arithmetic with range",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 10000,
Query: "rate(node_network_receive_bytes_total[1m]) / rate(node_network_transmit_bytes_total[1m])",
},
},
{
name: "range - simple scalar operation",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 15000,
Query: "2 + 2",
},
},
}

for i, tc := range testCases {
tc := tc
t.Run(strconv.Itoa(i)+"_"+tc.name, func(t *testing.T) {
t.Parallel()

middleware := DistributedQueryMiddleware(time.Minute, 5*time.Minute)

handler := middleware.Wrap(HandlerFunc(func(_ context.Context, req Request) (Response, error) {
return nil, nil
}))

// additional validation on the test cases based on query type
if tc.queryType == "range" {
require.NotZero(t, tc.input.Step, "range query should have non-zero step")
require.NotEqual(t, tc.input.Start, tc.input.End, "range query should have different start and end times")
} else {
require.Equal(t, tc.input.Start, tc.input.End, "instant query should have equal start and end times")
require.Zero(t, tc.input.Step, "instant query should have zero step")
}

// test: execute middleware to populate the logical plan
_, err := handler.Do(context.Background(), tc.input)
require.NoError(t, err)
require.NotEmpty(t, tc.input.LogicalPlan, "logical plan should be populated")

})
}
}
26 changes: 24 additions & 2 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/spanlogger"

"github.com/thanos-io/promql-engine/logicalplan"
)

var (
Expand Down Expand Up @@ -141,6 +143,19 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response,
return &resp, nil
}

func (c instantQueryCodec) getSerializedBody(promReq *tripperware.PrometheusRequest) ([]byte, error) {
var byteLP []byte
var err error

if promReq.LogicalPlan != nil {
byteLP, err = logicalplan.Marshal(promReq.LogicalPlan.Root())
if err != nil {
return nil, err
}
}
return byteLP, nil
}

func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) {
promReq, ok := r.(*tripperware.PrometheusRequest)
if !ok {
Expand Down Expand Up @@ -168,17 +183,24 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
}
}

h.Add("Content-Type", "application/json")

isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent)
if !isSourceRuler {
// When the source is the Ruler, skip set header
tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
}

byteBody, err := c.getSerializedBody(promReq)
if err != nil {
return nil, err
}

req := &http.Request{
Method: "GET",
Method: "POST",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Body: io.NopCloser(bytes.NewReader(byteBody)),
Header: h,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ func Middlewares(
merger tripperware.Merger,
queryAnalyzer querysharding.Analyzer,
lookbackDelta time.Duration,
defaultEvaluationInterval time.Duration,
distributedExecEnabled bool,
) ([]tripperware.Middleware, error) {
m := []tripperware.Middleware{
NewLimitsMiddleware(limits, lookbackDelta),
tripperware.ShardByMiddleware(log, limits, merger, queryAnalyzer),
}

if distributedExecEnabled {
m = append(m,
tripperware.DistributedQueryMiddleware(defaultEvaluationInterval, lookbackDelta))
}

return m, nil
}
Loading
Loading