Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,10 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
// to optimize Prometheus query requests.
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
queryAnalyzer := querysharding.NewQueryAnalyzer()
defaultSubQueryInterval := t.Cfg.Querier.DefaultEvaluationInterval
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval)
prometheusCodec := queryrange.NewPrometheusCodec(false)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
Expand Down Expand Up @@ -486,7 +485,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
instantquery.InstantQueryCodec,
t.Overrides,
queryAnalyzer,
defaultSubQueryInterval,
t.Cfg.Querier.DefaultEvaluationInterval,
)

return services.NewIdleService(nil, func(_ error) error {
Expand Down
7 changes: 1 addition & 6 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ func (r *PrometheusRequest) WithStats(stats string) tripperware.Request {

type instantQueryCodec struct {
tripperware.Codec
now func() time.Time
noStepSubQueryInterval time.Duration
now func() time.Time
}

func newInstantQueryCodec() instantQueryCodec {
Expand Down Expand Up @@ -139,10 +138,6 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
}

result.Query = r.FormValue("query")
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
return nil, err
}

result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

Expand Down
3 changes: 1 addition & 2 deletions pkg/querier/tripperware/instantquery/shard_by_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package instantquery

import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
)

func Test_shardQuery(t *testing.T) {
t.Parallel()
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, time.Minute))
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true))
}
13 changes: 2 additions & 11 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,10 @@ var (

type prometheusCodec struct {
sharded bool

noStepSubQueryInterval time.Duration
}

func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { //nolint:revive
return &prometheusCodec{
sharded: sharded,
noStepSubQueryInterval: noStepSubQueryInterval,
}
func NewPrometheusCodec(sharded bool) *prometheusCodec { //nolint:revive
return &prometheusCodec{sharded: sharded}
}

// WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp.
Expand Down Expand Up @@ -203,10 +198,6 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa
}

result.Query = r.FormValue("query")
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
return nil, err
}

result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
)

var (
PrometheusCodec = NewPrometheusCodec(false, time.Minute)
ShardedPrometheusCodec = NewPrometheusCodec(false, time.Minute)
PrometheusCodec = NewPrometheusCodec(false)
ShardedPrometheusCodec = NewPrometheusCodec(false)
)

func TestRoundTrip(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/querier/tripperware/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ func TestRequest(t *testing.T) {
url: "api/v1/query_range?start=0&end=11001&step=1",
expectedErr: errStepTooSmall,
},
{
url: "/api/v1/query?query=up%5B30d%3A%5D&start=123&end=456&step=10",
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000),
},
} {
tc := tc
t.Run(tc.url, func(t *testing.T) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,19 @@ func NewQueryTripperware(
activeUsers.UpdateUserTimestamp(userStr, time.Now())
queriesPerTenant.WithLabelValues(op, userStr).Inc()

if isQueryRange {
return queryrange.RoundTrip(r)
} else if isQuery {
// If the given query is not shardable, use downstream roundtripper.
if isQuery || isQueryRange {
query := r.FormValue("query")
// Check subquery step size.
if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil {
return nil, err
}
}

if isQueryRange {
return queryrange.RoundTrip(r)
} else if isQuery {
// If the given query is not shardable, use downstream roundtripper.
query := r.FormValue("query")

// If vertical sharding is not enabled for the tenant, use downstream roundtripper.
numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
Expand Down