Skip to content

Commit b0871cb

Browse files
committed
Add dedicated instant/range query handlers
Signed-off-by: SungJin1212 <[email protected]>
1 parent 7046357 commit b0871cb

File tree

7 files changed

+695
-26
lines changed

7 files changed

+695
-26
lines changed

pkg/api/handlers.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/cortexproject/cortex/pkg/querier"
2929
"github.com/cortexproject/cortex/pkg/querier/codec"
30+
"github.com/cortexproject/cortex/pkg/querier/queryapi"
3031
"github.com/cortexproject/cortex/pkg/querier/stats"
3132
"github.com/cortexproject/cortex/pkg/util"
3233
util_log "github.com/cortexproject/cortex/pkg/util/log"
@@ -195,10 +196,13 @@ func NewQuerierHandler(
195196
Help: "Current number of inflight requests to the querier.",
196197
}, []string{"method", "route"})
197198

199+
statsRenderer := querier.StatsRenderer
200+
corsOrigin := regexp.MustCompile(".*")
201+
translateSampleAndChunkQueryable := querier.NewErrorTranslateSampleAndChunkQueryable(queryable)
198202
api := v1.NewAPI(
199203
engine,
200-
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
201-
nil, // No remote write support.
204+
translateSampleAndChunkQueryable, // Translate errors to errors expected by API.
205+
nil, // No remote write support.
202206
exemplarQueryable,
203207
func(ctx context.Context) v1.ScrapePoolsRetriever { return nil },
204208
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
@@ -214,7 +218,7 @@ func NewQuerierHandler(
214218
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
215219
0, 0, 0, // Remote read samples and concurrency limit.
216220
false,
217-
regexp.MustCompile(".*"),
221+
corsOrigin,
218222
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
219223
&v1.PrometheusVersion{
220224
Version: version.Version,
@@ -229,7 +233,7 @@ func NewQuerierHandler(
229233
// This is used for the stats API which we should not support. Or find other ways to.
230234
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
231235
reg,
232-
querier.StatsRenderer,
236+
statsRenderer,
233237
false,
234238
nil,
235239
false,
@@ -240,11 +244,18 @@ func NewQuerierHandler(
240244
api.ClearCodecs()
241245
cm := codec.NewInstrumentedCodecMetrics(reg)
242246

243-
api.InstallCodec(codec.NewInstrumentedCodec(v1.JSONCodec{}, cm))
244-
// Install Protobuf codec to give the option for using either.
245-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm))
246-
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
247-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm))
247+
codecs := []v1.Codec{
248+
codec.NewInstrumentedCodec(v1.JSONCodec{}, cm),
249+
// Protobuf codec to give the option for using either.
250+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm),
251+
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
252+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm),
253+
}
254+
255+
// Install codecs
256+
for _, c := range codecs {
257+
api.InstallCodec(c)
258+
}
248259

249260
router := mux.NewRouter()
250261

@@ -269,13 +280,15 @@ func NewQuerierHandler(
269280
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
270281
api.Register(legacyPromRouter)
271282

283+
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
284+
272285
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
273286
// https://github.com/prometheus/prometheus/pull/7125/files
274287
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
275288
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
276289
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
277-
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
278-
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter)
290+
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantHandler))
291+
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
279292
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
280293
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
281294
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
@@ -287,8 +300,8 @@ func NewQuerierHandler(
287300
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
288301
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
289302
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
290-
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
291-
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(legacyPromRouter)
303+
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantHandler))
304+
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
292305
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
293306
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
294307
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)

pkg/querier/queryapi/query_api.go

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
package queryapi
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/grafana/regexp"
12+
"github.com/munnerz/goautoneg"
13+
"github.com/prometheus/prometheus/promql"
14+
"github.com/prometheus/prometheus/storage"
15+
"github.com/prometheus/prometheus/util/annotations"
16+
"github.com/prometheus/prometheus/util/httputil"
17+
v1 "github.com/prometheus/prometheus/web/api/v1"
18+
"github.com/weaveworks/common/httpgrpc"
19+
20+
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
21+
"github.com/cortexproject/cortex/pkg/util"
22+
"github.com/cortexproject/cortex/pkg/util/api"
23+
)
24+
25+
type QueryAPI struct {
26+
queryable storage.SampleAndChunkQueryable
27+
queryEngine promql.QueryEngine
28+
now func() time.Time
29+
statsRenderer v1.StatsRenderer
30+
logger log.Logger
31+
codecs []v1.Codec
32+
CORSOrigin *regexp.Regexp
33+
}
34+
35+
func NewQueryAPI(
36+
qe promql.QueryEngine,
37+
q storage.SampleAndChunkQueryable,
38+
statsRenderer v1.StatsRenderer,
39+
logger log.Logger,
40+
codecs []v1.Codec,
41+
CORSOrigin *regexp.Regexp,
42+
) *QueryAPI {
43+
return &QueryAPI{
44+
queryEngine: qe,
45+
queryable: q,
46+
statsRenderer: statsRenderer,
47+
logger: logger,
48+
codecs: codecs,
49+
CORSOrigin: CORSOrigin,
50+
now: time.Now,
51+
}
52+
}
53+
54+
func (c *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
55+
start, err := util.ParseTime(r.FormValue("start"))
56+
if err != nil {
57+
return invalidParamError(err, "start")
58+
}
59+
end, err := util.ParseTime(r.FormValue("end"))
60+
if err != nil {
61+
return invalidParamError(err, "end")
62+
}
63+
if end < start {
64+
return invalidParamError(queryrange.ErrEndBeforeStart, "end")
65+
}
66+
67+
step, err := util.ParseDurationMs(r.FormValue("step"))
68+
if err != nil {
69+
return invalidParamError(err, "step")
70+
}
71+
72+
if step <= 0 {
73+
return invalidParamError(queryrange.ErrNegativeStep, "step")
74+
}
75+
76+
// For safety, limit the number of returned points per timeseries.
77+
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
78+
if (end-start)/step > 11000 {
79+
return apiFuncResult{nil, &apiError{errorBadData, queryrange.ErrStepTooSmall}, nil, nil}
80+
}
81+
82+
ctx := r.Context()
83+
if to := r.FormValue("timeout"); to != "" {
84+
var cancel context.CancelFunc
85+
timeout, err := util.ParseDurationMs(to)
86+
if err != nil {
87+
return invalidParamError(err, "timeout")
88+
}
89+
90+
ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout))
91+
defer cancel()
92+
}
93+
94+
opts, err := extractQueryOpts(r)
95+
if err != nil {
96+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
97+
}
98+
qry, err := c.queryEngine.NewRangeQuery(ctx, c.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
99+
if err != nil {
100+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
101+
}
102+
// From now on, we must only return with a finalizer in the result (to
103+
// be called by the caller) or call qry.Close ourselves (which is
104+
// required in the case of a panic).
105+
defer func() {
106+
if result.finalizer == nil {
107+
qry.Close()
108+
}
109+
}()
110+
111+
ctx = httputil.ContextFromRequest(ctx, r)
112+
113+
res := qry.Exec(ctx)
114+
if res.Err != nil {
115+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
116+
}
117+
118+
warnings := res.Warnings
119+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
120+
121+
return apiFuncResult{&v1.QueryData{
122+
ResultType: res.Value.Type(),
123+
Result: res.Value,
124+
Stats: qs,
125+
}, nil, warnings, qry.Close}
126+
}
127+
128+
func (c *QueryAPI) InstantHandler(r *http.Request) (result apiFuncResult) {
129+
ts, err := util.ParseTimeParam(r, "time", c.now().Unix())
130+
if err != nil {
131+
return invalidParamError(err, "time")
132+
}
133+
134+
ctx := r.Context()
135+
if to := r.FormValue("timeout"); to != "" {
136+
var cancel context.CancelFunc
137+
timeout, err := util.ParseDurationMs(to)
138+
if err != nil {
139+
return invalidParamError(err, "timeout")
140+
}
141+
142+
ctx, cancel = context.WithDeadline(ctx, c.now().Add(convertMsToDuration(timeout)))
143+
defer cancel()
144+
}
145+
146+
opts, err := extractQueryOpts(r)
147+
if err != nil {
148+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
149+
}
150+
qry, err := c.queryEngine.NewInstantQuery(ctx, c.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
151+
if err != nil {
152+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
153+
}
154+
155+
// From now on, we must only return with a finalizer in the result (to
156+
// be called by the caller) or call qry.Close ourselves (which is
157+
// required in the case of a panic).
158+
defer func() {
159+
if result.finalizer == nil {
160+
qry.Close()
161+
}
162+
}()
163+
164+
ctx = httputil.ContextFromRequest(ctx, r)
165+
166+
res := qry.Exec(ctx)
167+
if res.Err != nil {
168+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
169+
}
170+
171+
warnings := res.Warnings
172+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
173+
174+
return apiFuncResult{&v1.QueryData{
175+
ResultType: res.Value.Type(),
176+
Result: res.Value,
177+
Stats: qs,
178+
}, nil, warnings, qry.Close}
179+
}
180+
181+
func (c *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
182+
return func(w http.ResponseWriter, r *http.Request) {
183+
httputil.SetCORS(w, c.CORSOrigin, r)
184+
185+
result := f(r)
186+
if result.finalizer != nil {
187+
defer result.finalizer()
188+
}
189+
190+
if result.err != nil {
191+
api.RespondFromGRPCError(c.logger, w, result.err.err)
192+
return
193+
}
194+
195+
if result.data != nil {
196+
c.respond(w, r, result.data, result.warnings, r.FormValue("query"))
197+
return
198+
}
199+
w.WriteHeader(http.StatusNoContent)
200+
}
201+
}
202+
203+
func (c *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
204+
warn, info := warnings.AsStrings(query, 10, 10)
205+
206+
resp := &v1.Response{
207+
Status: statusSuccess,
208+
Data: data,
209+
Warnings: warn,
210+
Infos: info,
211+
}
212+
213+
codec, err := c.negotiateCodec(req, resp)
214+
if err != nil {
215+
api.RespondFromGRPCError(c.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err}))
216+
return
217+
}
218+
219+
b, err := codec.Encode(resp)
220+
if err != nil {
221+
level.Error(c.logger).Log("error marshaling response", "url", req.URL, "err", err)
222+
http.Error(w, err.Error(), http.StatusInternalServerError)
223+
return
224+
}
225+
226+
w.Header().Set("Content-Type", codec.ContentType().String())
227+
w.WriteHeader(http.StatusOK)
228+
if n, err := w.Write(b); err != nil {
229+
level.Error(c.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
230+
}
231+
}
232+
233+
func (c *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
234+
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
235+
for _, codec := range c.codecs {
236+
if codec.ContentType().Satisfies(clause) && codec.CanEncode(resp) {
237+
return codec, nil
238+
}
239+
}
240+
}
241+
242+
defaultCodec := c.codecs[0]
243+
if !defaultCodec.CanEncode(resp) {
244+
return nil, fmt.Errorf("cannot encode response as %s", defaultCodec.ContentType())
245+
}
246+
247+
return defaultCodec, nil
248+
}

0 commit comments

Comments
 (0)