Skip to content

Commit c75a0ca

Browse files
committed
Pass timestamp as param
Signed-off-by: Justin Jung <[email protected]>
1 parent d90869e commit c75a0ca

File tree

6 files changed

+50
-51
lines changed

6 files changed

+50
-51
lines changed

pkg/frontend/transport/roundtripper.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import (
66
"io"
77
"net/http"
88
"net/url"
9+
"time"
910

1011
"github.com/weaveworks/common/httpgrpc"
1112
"github.com/weaveworks/common/httpgrpc/server"
1213
)
1314

1415
// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
1516
type GrpcRoundTripper interface {
16-
RoundTripGRPC(context.Context, url.Values, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
17+
RoundTripGRPC(context.Context, url.Values, time.Time, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
1718
}
1819

1920
func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
@@ -40,7 +41,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
4041
return nil, err
4142
}
4243

43-
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, req)
44+
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, time.Now(), req)
4445
if err != nil {
4546
return nil, err
4647
}

pkg/frontend/v1/frontend.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) {
177177
}
178178

179179
// RoundTripGRPC round trips a proto (instead of a HTTP request).
180-
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
180+
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, timestamp time.Time, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
181181
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
182182
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
183183
if tracer != nil && span != nil {
@@ -198,7 +198,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values,
198198
request := request{
199199
request: req,
200200
originalCtx: ctx,
201-
isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)),
201+
isHighPriority: util_query.IsHighPriority(requestParams, timestamp, f.limits.HighPriorityQueries(userID)),
202202

203203
// Buffer of 1 to ensure response can be written by the server side
204204
// of the Process stream, even if this goroutine goes away due to

pkg/frontend/v2/frontend.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (f *Frontend) stopping(_ error) error {
171171
}
172172

173173
// RoundTripGRPC round trips a proto (instead of a HTTP request).
174-
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
174+
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, timestamp time.Time, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
175175
if s := f.State(); s != services.Running {
176176
return nil, fmt.Errorf("frontend not running: %v", s)
177177
}
@@ -200,7 +200,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values,
200200
request: req,
201201
userID: userID,
202202
statsEnabled: stats.IsEnabled(ctx),
203-
isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)),
203+
isHighPriority: util_query.IsHighPriority(requestParams, timestamp, f.limits.HighPriorityQueries(userID)),
204204

205205
cancel: cancel,
206206

pkg/frontend/v2/frontend_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func TestFrontendBasicWorkflow(t *testing.T) {
113113
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
114114
}, 0)
115115

116-
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
116+
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
117117
require.NoError(t, err)
118118
require.Equal(t, int32(200), resp.Code)
119119
require.Equal(t, []byte(body), resp.Body)
@@ -143,7 +143,7 @@ func TestFrontendRetryRequest(t *testing.T) {
143143
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
144144
}, 3)
145145

146-
res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
146+
res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
147147
require.NoError(t, err)
148148
require.Equal(t, int32(200), res.Code)
149149
}
@@ -170,7 +170,7 @@ func TestFrontendRetryEnqueue(t *testing.T) {
170170
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
171171
}, 0)
172172

173-
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
173+
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
174174
require.NoError(t, err)
175175
}
176176

@@ -179,7 +179,7 @@ func TestFrontendEnqueueFailure(t *testing.T) {
179179
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
180180
}, 0)
181181

182-
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), url.Values{}, &httpgrpc.HTTPRequest{})
182+
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
183183
require.Error(t, err)
184184
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
185185
}
@@ -190,7 +190,7 @@ func TestFrontendCancellation(t *testing.T) {
190190
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
191191
defer cancel()
192192

193-
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{})
193+
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
194194
require.EqualError(t, err, context.DeadlineExceeded.Error())
195195
require.Nil(t, resp)
196196

@@ -239,7 +239,7 @@ func TestFrontendFailedCancellation(t *testing.T) {
239239
}()
240240

241241
// send request
242-
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{})
242+
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
243243
require.EqualError(t, err, context.Canceled.Error())
244244
require.Nil(t, resp)
245245

pkg/util/query/priority.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/cortexproject/cortex/pkg/util/validation"
1010
)
1111

12-
func IsHighPriority(requestParams url.Values, highPriorityQueries []validation.HighPriorityQuery) bool {
12+
func IsHighPriority(requestParams url.Values, timestamp time.Time, highPriorityQueries []validation.HighPriorityQuery) bool {
1313
queryParam := requestParams.Get("query")
1414
timeParam := requestParams.Get("time")
1515
startParam := requestParams.Get("start")
@@ -22,12 +22,11 @@ func IsHighPriority(requestParams url.Values, highPriorityQueries []validation.H
2222
continue
2323
}
2424

25-
now := time.Now()
26-
startTimeThreshold := now.Add(-1 * highPriorityQuery.StartTime).UnixMilli()
27-
endTimeThreshold := now.Add(-1 * highPriorityQuery.EndTime).UnixMilli()
25+
startTimeThreshold := timestamp.Add(-1 * highPriorityQuery.StartTime.Abs()).UnixMilli()
26+
endTimeThreshold := timestamp.Add(-1 * highPriorityQuery.EndTime.Abs()).UnixMilli()
2827

29-
if time, err := strconv.ParseInt(timeParam, 10, 64); err == nil {
30-
if isBetweenThresholds(time, time, startTimeThreshold, endTimeThreshold) {
28+
if instantTime, err := strconv.ParseInt(timeParam, 10, 64); err == nil {
29+
if isBetweenThresholds(instantTime, instantTime, startTimeThreshold, endTimeThreshold) {
3130
return true
3231
}
3332
}

pkg/util/query/priority_test.go

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,6 @@ import (
1111
"github.com/cortexproject/cortex/pkg/util/validation"
1212
)
1313

14-
func Test_IsHighPriority_DefaultValues(t *testing.T) {
15-
now := time.Now()
16-
config := []validation.HighPriorityQuery{
17-
{}, // By default, it should match all queries happened at "now"
18-
}
19-
20-
assert.True(t, IsHighPriority(url.Values{
21-
"query": []string{"count(up)"},
22-
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
23-
}, config))
24-
assert.False(t, IsHighPriority(url.Values{
25-
"query": []string{"count(up)"},
26-
"time": []string{strconv.FormatInt(now.Add(-1*time.Second).UnixMilli(), 10)},
27-
}, config))
28-
}
29-
3014
func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
3115
now := time.Now()
3216
config := []validation.HighPriorityQuery{
@@ -38,11 +22,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
3822
assert.True(t, IsHighPriority(url.Values{
3923
"query": []string{"sum(up)"},
4024
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
41-
}, config))
25+
}, now, config))
4226
assert.False(t, IsHighPriority(url.Values{
4327
"query": []string{"count(up)"},
4428
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
45-
}, config))
29+
}, now, config))
4630

4731
config = []validation.HighPriorityQuery{
4832
{
@@ -53,11 +37,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
5337
assert.True(t, IsHighPriority(url.Values{
5438
"query": []string{"sum(up)"},
5539
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
56-
}, config))
40+
}, now, config))
5741
assert.True(t, IsHighPriority(url.Values{
5842
"query": []string{"count(up)"},
5943
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
60-
}, config))
44+
}, now, config))
6145

6246
config = []validation.HighPriorityQuery{
6347
{
@@ -71,11 +55,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
7155
assert.True(t, IsHighPriority(url.Values{
7256
"query": []string{"sum(up)"},
7357
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
74-
}, config))
58+
}, now, config))
7559
assert.True(t, IsHighPriority(url.Values{
7660
"query": []string{"count(up)"},
7761
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
78-
}, config))
62+
}, now, config))
7963

8064
config = []validation.HighPriorityQuery{
8165
{
@@ -89,11 +73,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
8973
assert.False(t, IsHighPriority(url.Values{
9074
"query": []string{"sum(up)"},
9175
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
92-
}, config))
76+
}, now, config))
9377
assert.False(t, IsHighPriority(url.Values{
9478
"query": []string{"count(up)"},
9579
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
96-
}, config))
80+
}, now, config))
9781

9882
config = []validation.HighPriorityQuery{
9983
{
@@ -104,11 +88,26 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
10488
assert.True(t, IsHighPriority(url.Values{
10589
"query": []string{"sum(up)"},
10690
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
107-
}, config))
91+
}, now, config))
92+
assert.True(t, IsHighPriority(url.Values{
93+
"query": []string{"count(up)"},
94+
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
95+
}, now, config))
96+
97+
config = []validation.HighPriorityQuery{
98+
{
99+
Regex: "",
100+
},
101+
}
102+
103+
assert.True(t, IsHighPriority(url.Values{
104+
"query": []string{"sum(up)"},
105+
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
106+
}, now, config))
108107
assert.True(t, IsHighPriority(url.Values{
109108
"query": []string{"count(up)"},
110109
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
111-
}, config))
110+
}, now, config))
112111
}
113112

114113
func Test_IsHighPriority_ShouldBeBetweenStartAndEndTime(t *testing.T) {
@@ -123,32 +122,32 @@ func Test_IsHighPriority_ShouldBeBetweenStartAndEndTime(t *testing.T) {
123122
assert.False(t, IsHighPriority(url.Values{
124123
"query": []string{"sum(up)"},
125124
"time": []string{strconv.FormatInt(now.Add(-2*time.Hour).UnixMilli(), 10)},
126-
}, config))
125+
}, now, config))
127126
assert.True(t, IsHighPriority(url.Values{
128127
"query": []string{"sum(up)"},
129128
"time": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)},
130-
}, config))
129+
}, now, config))
131130
assert.True(t, IsHighPriority(url.Values{
132131
"query": []string{"sum(up)"},
133132
"time": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)},
134-
}, config))
133+
}, now, config))
135134
assert.False(t, IsHighPriority(url.Values{
136135
"query": []string{"sum(up)"},
137136
"time": []string{strconv.FormatInt(now.Add(-1*time.Minute).UnixMilli(), 10)},
138-
}, config))
137+
}, now, config))
139138
assert.False(t, IsHighPriority(url.Values{
140139
"query": []string{"sum(up)"},
141140
"start": []string{strconv.FormatInt(now.Add(-2*time.Hour).UnixMilli(), 10)},
142141
"end": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)},
143-
}, config))
142+
}, now, config))
144143
assert.True(t, IsHighPriority(url.Values{
145144
"query": []string{"sum(up)"},
146145
"start": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)},
147146
"end": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)},
148-
}, config))
147+
}, now, config))
149148
assert.False(t, IsHighPriority(url.Values{
150149
"query": []string{"sum(up)"},
151150
"start": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)},
152151
"end": []string{strconv.FormatInt(now.UnixMilli(), 10)},
153-
}, config))
152+
}, now, config))
154153
}

0 commit comments

Comments
 (0)