Skip to content

Commit 621ca87

Browse files
committed
Assign priority before splitting the query
Signed-off-by: Justin Jung <[email protected]>
1 parent 78816d9 commit 621ca87

File tree

24 files changed

+533
-481
lines changed

24 files changed

+533
-481
lines changed

docs/configuration/config-file-reference.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5039,8 +5039,8 @@ otel:
50395039
# Priority level. Must be a unique value.
50405040
[priority: <int> | default = 0]
50415041
5042-
# Number of reserved queriers to handle priorities higher or equal to this value
5043-
# only. Value between 0 and 1 will be used as a percentage.
5042+
# Number of reserved queriers to handle priorities higher or equal to the
5043+
# priority level. Value between 0 and 1 will be used as a percentage.
50445044
[reserved_queriers: <float> | default = 0]
50455045
50465046
# List of query attributes to assign the priority.
@@ -5050,14 +5050,14 @@ otel:
50505050
### `QueryAttribute`
50515051

50525052
```yaml
5053-
# Query string regex.
5054-
[regex: <string> | default = ".*"]
5053+
# Query string regex. If set to empty string, it will not match anything.
5054+
[regex: <string> | default = ""]
50555055
5056-
# Query start time.
5057-
[start_time: <duration> | default = 0s]
5056+
# Query start time. If set to 0, the start time won't be checked.
5057+
[start_time: <int> | default = 0]
50585058
5059-
# Query end time.
5060-
[end_time: <duration> | default = 0s]
5059+
# Query end time. If set to 0, the end time won't be checked.
5060+
[end_time: <int> | default = 0]
50615061
```
50625062

50635063
### `DisabledRuleGroup`

pkg/frontend/transport/handler.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,12 +200,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
200200
r.Body = io.NopCloser(&buf)
201201
}
202202

203+
r.Header.Get("test")
203204
startTime := time.Now()
204-
// get config
205-
// assign priority
206-
// embed it to the http request, header?
207-
// extract Decode to here, to make sure all requests pass here
208-
// log the priority as well
209205
resp, err := f.roundTripper.RoundTrip(r)
210206
queryResponseTime := time.Since(startTime)
211207

@@ -348,6 +344,9 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
348344
if ua := r.Header.Get("User-Agent"); len(ua) > 0 {
349345
logMessage = append(logMessage, "user_agent", ua)
350346
}
347+
if queryPriority := r.Header.Get(util.QueryPriorityHeaderKey); len(queryPriority) > 0 {
348+
logMessage = append(logMessage, "priority", queryPriority)
349+
}
351350

352351
if error != nil {
353352
s, ok := status.FromError(error)

pkg/frontend/transport/roundtripper.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,14 @@ import (
55
"context"
66
"io"
77
"net/http"
8-
"net/url"
9-
"strings"
10-
"time"
118

129
"github.com/weaveworks/common/httpgrpc"
1310
"github.com/weaveworks/common/httpgrpc/server"
1411
)
1512

1613
// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
1714
type GrpcRoundTripper interface {
18-
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest, url.Values, time.Time) (*httpgrpc.HTTPResponse, error)
15+
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
1916
}
2017

2118
func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
@@ -42,20 +39,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
4239
return nil, err
4340
}
4441

45-
var (
46-
resp *httpgrpc.HTTPResponse
47-
reqValues url.Values
48-
ts time.Time
49-
)
50-
51-
if strings.HasSuffix(r.URL.Path, "/query") || strings.HasSuffix(r.URL.Path, "/query_range") {
52-
if err = r.ParseForm(); err == nil {
53-
reqValues = r.Form
54-
ts = time.Now()
55-
}
56-
}
57-
58-
resp, err = a.roundTripper.RoundTripGRPC(r.Context(), req, reqValues, ts)
42+
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
5943
if err != nil {
6044
return nil, err
6145
}

pkg/frontend/v1/frontend.go

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"net/http"
8-
"net/url"
8+
"strconv"
99
"time"
1010

1111
"github.com/go-kit/log"
@@ -23,7 +23,6 @@ import (
2323
"github.com/cortexproject/cortex/pkg/tenant"
2424
"github.com/cortexproject/cortex/pkg/util"
2525
"github.com/cortexproject/cortex/pkg/util/httpgrpcutil"
26-
util_query "github.com/cortexproject/cortex/pkg/util/query"
2726
"github.com/cortexproject/cortex/pkg/util/services"
2827
"github.com/cortexproject/cortex/pkg/util/validation"
2928
)
@@ -98,11 +97,15 @@ type request struct {
9897
request *httpgrpc.HTTPRequest
9998
err chan error
10099
response chan *httpgrpc.HTTPResponse
101-
priority int64
102100
}
103101

104102
func (r request) Priority() int64 {
105-
return r.priority
103+
priority, err := strconv.ParseInt(httpgrpcutil.GetHeader(*r.request, util.QueryPriorityHeaderKey), 10, 64)
104+
if err != nil {
105+
return 0
106+
}
107+
108+
return priority
106109
}
107110

108111
// New creates a new frontend. Frontend implements service, and must be started and stopped.
@@ -181,7 +184,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) {
181184
}
182185

183186
// RoundTripGRPC round trips a proto (instead of a HTTP request).
184-
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, reqParams url.Values, ts time.Time) (*httpgrpc.HTTPResponse, error) {
187+
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
185188
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
186189
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
187190
if tracer != nil && span != nil {
@@ -192,12 +195,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
192195
}
193196
}
194197

195-
tenantIDs, err := tenant.TenantIDs(ctx)
196-
if err != nil {
197-
return nil, err
198-
}
199-
userID := tenant.JoinTenantIDs(tenantIDs)
200-
201198
return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
202199
request := request{
203200
request: req,
@@ -210,14 +207,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
210207
response: make(chan *httpgrpc.HTTPResponse, 1),
211208
}
212209

213-
if reqParams != nil {
214-
queryPriority := f.limits.QueryPriority(userID)
215-
216-
if queryPriority.Enabled {
217-
request.priority = util_query.GetPriority(reqParams, ts, queryPriority)
218-
}
219-
}
220-
221210
if err := f.queueRequest(ctx, &request); err != nil {
222211
return nil, err
223212
}

pkg/frontend/v2/frontend.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"math/rand"
88
"net/http"
9-
"net/url"
109
"sync"
1110
"time"
1211

@@ -28,7 +27,6 @@ import (
2827
"github.com/cortexproject/cortex/pkg/util/grpcclient"
2928
"github.com/cortexproject/cortex/pkg/util/httpgrpcutil"
3029
util_log "github.com/cortexproject/cortex/pkg/util/log"
31-
util_query "github.com/cortexproject/cortex/pkg/util/query"
3230
"github.com/cortexproject/cortex/pkg/util/services"
3331
)
3432

@@ -89,7 +87,6 @@ type frontendRequest struct {
8987
request *httpgrpc.HTTPRequest
9088
userID string
9189
statsEnabled bool
92-
priority int64
9390

9491
cancel context.CancelFunc
9592

@@ -170,7 +167,7 @@ func (f *Frontend) stopping(_ error) error {
170167
}
171168

172169
// RoundTripGRPC round trips a proto (instead of a HTTP request).
173-
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, reqParams url.Values, ts time.Time) (*httpgrpc.HTTPResponse, error) {
170+
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
174171
if s := f.State(); s != services.Running {
175172
return nil, fmt.Errorf("frontend not running: %v", s)
176173
}
@@ -210,14 +207,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
210207
retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1,
211208
}
212209

213-
if reqParams != nil {
214-
queryPriority := f.limits.QueryPriority(userID)
215-
216-
if queryPriority.Enabled {
217-
freq.priority = util_query.GetPriority(reqParams, ts, queryPriority)
218-
}
219-
}
220-
221210
f.requests.put(freq)
222211
defer f.requests.delete(freq.queryID)
223212

pkg/frontend/v2/frontend_scheduler_worker.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
263263
HttpRequest: req.request,
264264
FrontendAddress: w.frontendAddr,
265265
StatsEnabled: req.statsEnabled,
266-
Priority: req.priority,
267266
})
268267

269268
if err != nil {

pkg/frontend/v2/frontend_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package v2
33
import (
44
"context"
55
"net"
6-
"net/url"
76
"strconv"
87
"strings"
98
"sync"
@@ -112,7 +111,7 @@ func TestFrontendBasicWorkflow(t *testing.T) {
112111
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
113112
}, 0)
114113

115-
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
114+
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
116115
require.NoError(t, err)
117116
require.Equal(t, int32(200), resp.Code)
118117
require.Equal(t, []byte(body), resp.Body)
@@ -142,7 +141,7 @@ func TestFrontendRetryRequest(t *testing.T) {
142141
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
143142
}, 3)
144143

145-
res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
144+
res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
146145
require.NoError(t, err)
147146
require.Equal(t, int32(200), res.Code)
148147
}
@@ -169,7 +168,7 @@ func TestFrontendRetryEnqueue(t *testing.T) {
169168
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
170169
}, 0)
171170

172-
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
171+
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
173172
require.NoError(t, err)
174173
}
175174

@@ -178,7 +177,7 @@ func TestFrontendEnqueueFailure(t *testing.T) {
178177
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
179178
}, 0)
180179

181-
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
180+
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{})
182181
require.Error(t, err)
183182
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
184183
}
@@ -189,7 +188,7 @@ func TestFrontendCancellation(t *testing.T) {
189188
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
190189
defer cancel()
191190

192-
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
191+
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
193192
require.EqualError(t, err, context.DeadlineExceeded.Error())
194193
require.Nil(t, resp)
195194

@@ -238,7 +237,7 @@ func TestFrontendFailedCancellation(t *testing.T) {
238237
}()
239238

240239
// send request
241-
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
240+
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
242241
require.EqualError(t, err, context.Canceled.Error())
243242
require.Nil(t, resp)
244243

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,17 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"github.com/cortexproject/cortex/pkg/util"
78
"io"
89
"net/http"
910
"net/url"
1011
"sort"
11-
"strconv"
1212
"strings"
1313
"time"
1414

1515
jsoniter "github.com/json-iterator/go"
1616
"github.com/opentracing/opentracing-go"
1717
otlog "github.com/opentracing/opentracing-go/log"
18-
"github.com/pkg/errors"
1918
"github.com/prometheus/common/model"
2019
"github.com/prometheus/prometheus/model/labels"
2120
"github.com/prometheus/prometheus/model/timestamp"
@@ -26,7 +25,6 @@ import (
2625
"github.com/cortexproject/cortex/pkg/cortexpb"
2726
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2827
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
29-
"github.com/cortexproject/cortex/pkg/util"
3028
"github.com/cortexproject/cortex/pkg/util/spanlogger"
3129
)
3230

@@ -132,7 +130,7 @@ func (resp *PrometheusInstantQueryResponse) HTTPHeaders() map[string][]string {
132130
func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
133131
result := PrometheusRequest{Headers: map[string][]string{}}
134132
var err error
135-
result.Time, err = parseTimeParam(r, "time", c.now().Unix())
133+
result.Time, err = util.ParseTimeParam(r, "time", c.now().Unix())
136134
if err != nil {
137135
return nil, decorateWithParamName(err, "time")
138136
}
@@ -630,15 +628,3 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
630628
return s.Result.GetRawBytes(), nil
631629
}
632630
}
633-
634-
func parseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) {
635-
val := r.FormValue(paramName)
636-
if val == "" {
637-
val = strconv.FormatInt(defaultValue, 10)
638-
}
639-
result, err := util.ParseTime(val)
640-
if err != nil {
641-
return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
642-
}
643-
return result, nil
644-
}

pkg/querier/tripperware/limits.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package tripperware
22

3-
import "time"
3+
import (
4+
"time"
5+
6+
"github.com/cortexproject/cortex/pkg/util/validation"
7+
)
48

59
// Limits allows us to specify per-tenant runtime limits on the behavior of
610
// the query handling code.
@@ -21,4 +25,7 @@ type Limits interface {
2125

2226
// QueryVerticalShardSize returns the maximum number of queriers that can handle requests for this user.
2327
QueryVerticalShardSize(userID string) int
28+
29+
// QueryPriority returns the query priority config for the tenant, including different priorities and their attributes.
30+
QueryPriority(userID string) validation.QueryPriority
2431
}

0 commit comments

Comments
 (0)