Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [CHANGE] Ingester: Creating label `native-histogram-sample` on the `cortex_discarded_samples_total` to keep track of discarded native histogram samples. #5289
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
25 changes: 25 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
Expand All @@ -33,6 +34,7 @@ type queryFrontendTestConfig struct {
querySchedulerEnabled bool
queryStatsEnabled bool
remoteReadEnabled bool
testSubQueryStepSize bool
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
}

Expand Down Expand Up @@ -209,6 +211,19 @@ func TestQueryFrontendRemoteRead(t *testing.T) {
})
}

func TestQueryFrontendSubQueryStepSize(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testSubQueryStepSize: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))

minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))
return cortexConfigFile, flags
},
})
}

func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
const numUsers = 10
const numQueriesPerUser = 10
Expand Down Expand Up @@ -334,6 +349,12 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
require.True(t, len(res.Results[0].Timeseries[0].Labels) > 0)
}

// No need to repeat the test on subquery step size.
if userID == 0 && cfg.testSubQueryStepSize {
resp, _, _ := c.QueryRaw(`up[30d:1m]`, now)
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

// In this test we do ensure that the /series start/end time is ignored and Cortex
// always returns series in ingesters memory. No need to repeat it for each user.
if userID == 0 {
Expand Down Expand Up @@ -386,6 +407,10 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
extra++
}

if cfg.testSubQueryStepSize {
extra++
}

require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total"))

// The number of received request is greater than the query requests because include
Expand Down
11 changes: 10 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,12 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
// to optimize Prometheus query requests.
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
queryAnalyzer := querysharding.NewQueryAnalyzer()
defaultSubQueryInterval := t.Cfg.Querier.DefaultEvaluationInterval
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
util_log.Logger,
Expand All @@ -458,6 +464,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheus.DefaultRegisterer,
t.TombstonesLoader,
queryAnalyzer,
prometheusCodec,
shardedPrometheusCodec,
)
if err != nil {
return nil, err
Expand All @@ -473,10 +481,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
t.Cfg.QueryRange.ForwardHeaders,
queryRangeMiddlewares,
instantQueryMiddlewares,
queryrange.PrometheusCodec,
prometheusCodec,
instantquery.InstantQueryCodec,
t.Overrides,
queryAnalyzer,
defaultSubQueryInterval,
)

return services.NewIdleService(nil, func(_ error) error {
Expand Down
10 changes: 7 additions & 3 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
promqlparser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/status"

promqlparser "github.com/prometheus/prometheus/promql/parser"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
Expand Down Expand Up @@ -109,7 +108,8 @@ func (r *PrometheusRequest) WithStats(stats string) tripperware.Request {

type instantQueryCodec struct {
tripperware.Codec
now func() time.Time
now func() time.Time
noStepSubQueryInterval time.Duration
}

func newInstantQueryCodec() instantQueryCodec {
Expand Down Expand Up @@ -139,6 +139,10 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
}

result.Query = r.FormValue("query")
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
return nil, err
}

result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

Expand Down
4 changes: 1 addition & 3 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
func TestRequest(t *testing.T) {
t.Parallel()
now := time.Now()
codec := instantQueryCodec{now: func() time.Time {
return now
}}
codec := InstantQueryCodec

for _, tc := range []struct {
url string
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/tripperware/instantquery/shard_by_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package instantquery

import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
)

func Test_shardQuery(t *testing.T) {
t.Parallel()
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.ShardedPrometheusCodec)
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, time.Minute))
}
20 changes: 14 additions & 6 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ var (
errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")

// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
PrometheusCodec tripperware.Codec = &prometheusCodec{sharded: false}
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
ShardedPrometheusCodec tripperware.Codec = &prometheusCodec{sharded: true}

// Name of the cache control header.
cacheControlHeader = "Cache-Control"
)

type prometheusCodec struct {
sharded bool

noStepSubQueryInterval time.Duration
}

func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { //nolint:revive
return &prometheusCodec{
sharded: sharded,
noStepSubQueryInterval: noStepSubQueryInterval,
}
}

// WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp.
Expand Down Expand Up @@ -166,7 +170,7 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques
return &response, nil
}

func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
var result PrometheusRequest
var err error
result.Start, err = util.ParseTime(r.FormValue("start"))
Expand Down Expand Up @@ -199,6 +203,10 @@ func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forward
}

result.Query = r.FormValue("query")
if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil {
return nil, err
}

result.Stats = r.FormValue("stats")
result.Path = r.URL.Path

Expand Down
8 changes: 5 additions & 3 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func Middlewares(
registerer prometheus.Registerer,
cacheGenNumberLoader CacheGenNumberLoader,
queryAnalyzer querysharding.Analyzer,
prometheusCodec tripperware.Codec,
shardedPrometheusCodec tripperware.Codec,
) ([]tripperware.Middleware, cache.Cache, error) {
// Metric used to keep track of each middleware execution duration.
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
Expand All @@ -88,7 +90,7 @@ func Middlewares(
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, PrometheusCodec, registerer))
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
}

var c cache.Cache
Expand All @@ -99,7 +101,7 @@ func Middlewares(
}
return false
}
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, PrometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer)
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer)
if err != nil {
return nil, nil, err
}
Expand All @@ -111,7 +113,7 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
}

queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, ShardedPrometheusCodec, queryAnalyzer))
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer))

return queryRangeMiddleware, c, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"strconv"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
Expand All @@ -18,6 +19,11 @@ import (
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

var (
PrometheusCodec = NewPrometheusCodec(false, time.Minute)
ShardedPrometheusCodec = NewPrometheusCodec(false, time.Minute)
)

func TestRoundTrip(t *testing.T) {
t.Parallel()
s := httptest.NewServer(
Expand Down Expand Up @@ -53,6 +59,8 @@ func TestRoundTrip(t *testing.T) {
nil,
nil,
qa,
PrometheusCodec,
ShardedPrometheusCodec,
)
require.NoError(t, err)

Expand All @@ -65,6 +73,7 @@ func TestRoundTrip(t *testing.T) {
nil,
nil,
qa,
time.Minute,
)

for i, tc := range []struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/querier/tripperware/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"compress/gzip"
"context"
"fmt"
io "io"
"io"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -61,6 +61,10 @@ func TestRequest(t *testing.T) {
url: "api/v1/query_range?start=0&end=11001&step=1",
expectedErr: errStepTooSmall,
},
{
url: "/api/v1/query?query=up%5B30d%3A%5D&start=123&end=456&step=10",
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000),
},
} {
tc := tc
t.Run(tc.url, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/queryrange/step_align.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// StepAlignMiddleware aligns the start and end of request to the step to
// improved the cacheability of the query results.
// improve the cacheability of the query results.
var StepAlignMiddleware = tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
return stepAlign{
next: next,
Expand Down
10 changes: 8 additions & 2 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewQueryTripperware(
instantQueryCodec Codec,
limits Limits,
queryAnalyzer querysharding.Analyzer,
defaultSubQueryInterval time.Duration,
) Tripperware {
// Per tenant query metrics.
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -144,13 +145,18 @@ func NewQueryTripperware(
if isQueryRange {
return queryrange.RoundTrip(r)
} else if isQuery {
// If the given query is not shardable, use downstream roundtripper.
query := r.FormValue("query")
// Check subquery step size.
if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil {
return nil, err
}

// If vertical sharding is not enabled for the tenant, use downstream roundtripper.
numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
if numShards <= 1 {
return next.RoundTrip(r)
}
// If the given query is not shardable, use downstream roundtripper.
query := r.FormValue("query")
analysis, err := queryAnalyzer.Analyze(query)
if err != nil || !analysis.IsShardable() {
return next.RoundTrip(r)
Expand Down
Loading