diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index d7b9908481b..5173affb197 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -25,6 +25,7 @@ import ( "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/middleware" + "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/codec" "github.com/cortexproject/cortex/pkg/querier/stats" @@ -195,10 +196,13 @@ func NewQuerierHandler( Help: "Current number of inflight requests to the querier.", }, []string{"method", "route"}) + statsRenderer := querier.StatsRenderer + corsOrigin := regexp.MustCompile(".*") + translateSampleAndChunkQueryable := querier.NewErrorTranslateSampleAndChunkQueryable(queryable) api := v1.NewAPI( engine, - querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API. - nil, // No remote write support. + translateSampleAndChunkQueryable, // Translate errors to errors expected by API. + nil, // No remote write support. exemplarQueryable, func(ctx context.Context) v1.ScrapePoolsRetriever { return nil }, func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} }, @@ -214,7 +218,7 @@ func NewQuerierHandler( func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} }, 0, 0, 0, // Remote read samples and concurrency limit. false, - regexp.MustCompile(".*"), + corsOrigin, func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") }, &v1.PrometheusVersion{ Version: version.Version, @@ -229,7 +233,7 @@ func NewQuerierHandler( // This is used for the stats API which we should not support. Or find other ways to. prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }), reg, - querier.StatsRenderer, + statsRenderer, false, nil, false, @@ -240,11 +244,18 @@ func NewQuerierHandler( api.ClearCodecs() cm := codec.NewInstrumentedCodecMetrics(reg) - api.InstallCodec(codec.NewInstrumentedCodec(v1.JSONCodec{}, cm)) - // Install Protobuf codec to give the option for using either. - api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm)) - // Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation. - api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm)) + codecs := []v1.Codec{ + codec.NewInstrumentedCodec(v1.JSONCodec{}, cm), + // Protobuf codec to give the option for using either. + codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm), + // Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation. + codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm), + } + + // Install codecs + for _, c := range codecs { + api.InstallCodec(c) + } router := mux.NewRouter() @@ -269,13 +280,15 @@ func NewQuerierHandler( legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1")) api.Register(legacyPromRouter) + queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin) + // TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in: // https://github.com/prometheus/prometheus/pull/7125/files router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier)) router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger)) router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter) - router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter) - router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter) + router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler)) + router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler)) router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter) router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter) router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter) @@ -287,8 +300,8 @@ func NewQuerierHandler( router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier)) router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger)) router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter) - router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter) - router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(legacyPromRouter) + router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler)) + router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler)) router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter) router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter) router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter) diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go new file mode 100644 index 00000000000..f50a30beb8f --- /dev/null +++ b/pkg/api/queryapi/query_api.go @@ -0,0 +1,249 @@ +package queryapi + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/regexp" + "github.com/munnerz/goautoneg" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + "github.com/prometheus/prometheus/util/httputil" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/api" +) + +type QueryAPI struct { + queryable storage.SampleAndChunkQueryable + queryEngine promql.QueryEngine + now func() time.Time + statsRenderer v1.StatsRenderer + logger log.Logger + codecs []v1.Codec + CORSOrigin *regexp.Regexp +} + +func NewQueryAPI( + qe promql.QueryEngine, + q storage.SampleAndChunkQueryable, + statsRenderer v1.StatsRenderer, + logger log.Logger, + codecs []v1.Codec, + CORSOrigin *regexp.Regexp, +) *QueryAPI { + return &QueryAPI{ + queryEngine: qe, + queryable: q, + statsRenderer: statsRenderer, + logger: logger, + codecs: codecs, + CORSOrigin: CORSOrigin, + now: time.Now, + } +} + +func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { + // TODO(Sungjin1212): Change to emit basic error (not gRPC) + start, err := util.ParseTime(r.FormValue("start")) + if err != nil { + return invalidParamError(err, "start") + } + end, err := util.ParseTime(r.FormValue("end")) + if err != nil { + return invalidParamError(err, "end") + } + if end < start { + return invalidParamError(ErrEndBeforeStart, "end") + } + + step, err := util.ParseDurationMs(r.FormValue("step")) + if err != nil { + return invalidParamError(err, "step") + } + + if step <= 0 { + return invalidParamError(ErrNegativeStep, "step") + } + + // For safety, limit the number of returned points per timeseries. + // This is sufficient for 60s resolution for a week or 1h resolution for a year. + if (end-start)/step > 11000 { + return apiFuncResult{nil, &apiError{errorBadData, ErrStepTooSmall}, nil, nil} + } + + ctx := r.Context() + if to := r.FormValue("timeout"); to != "" { + var cancel context.CancelFunc + timeout, err := util.ParseDurationMs(to) + if err != nil { + return invalidParamError(err, "timeout") + } + + ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout)) + defer cancel() + } + + opts, err := extractQueryOpts(r) + if err != nil { + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} + } + qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step)) + if err != nil { + return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") + } + // From now on, we must only return with a finalizer in the result (to + // be called by the caller) or call qry.Close ourselves (which is + // required in the case of a panic). + defer func() { + if result.finalizer == nil { + qry.Close() + } + }() + + ctx = httputil.ContextFromRequest(ctx, r) + + res := qry.Exec(ctx) + if res.Err != nil { + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} + } + + warnings := res.Warnings + qs := q.statsRenderer(ctx, qry.Stats(), r.FormValue("stats")) + + return apiFuncResult{&v1.QueryData{ + ResultType: res.Value.Type(), + Result: res.Value, + Stats: qs, + }, nil, warnings, qry.Close} +} + +func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { + // TODO(Sungjin1212): Change to emit basic error (not gRPC) + ts, err := util.ParseTimeParam(r, "time", q.now().Unix()) + if err != nil { + return invalidParamError(err, "time") + } + + ctx := r.Context() + if to := r.FormValue("timeout"); to != "" { + var cancel context.CancelFunc + timeout, err := util.ParseDurationMs(to) + if err != nil { + return invalidParamError(err, "timeout") + } + + ctx, cancel = context.WithDeadline(ctx, q.now().Add(convertMsToDuration(timeout))) + defer cancel() + } + + opts, err := extractQueryOpts(r) + if err != nil { + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} + } + qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts)) + if err != nil { + return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") + } + + // From now on, we must only return with a finalizer in the result (to + // be called by the caller) or call qry.Close ourselves (which is + // required in the case of a panic). + defer func() { + if result.finalizer == nil { + qry.Close() + } + }() + + ctx = httputil.ContextFromRequest(ctx, r) + + res := qry.Exec(ctx) + if res.Err != nil { + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} + } + + warnings := res.Warnings + qs := q.statsRenderer(ctx, qry.Stats(), r.FormValue("stats")) + + return apiFuncResult{&v1.QueryData{ + ResultType: res.Value.Type(), + Result: res.Value, + Stats: qs, + }, nil, warnings, qry.Close} +} + +func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + httputil.SetCORS(w, q.CORSOrigin, r) + + result := f(r) + if result.finalizer != nil { + defer result.finalizer() + } + + if result.err != nil { + api.RespondFromGRPCError(q.logger, w, result.err.err) + return + } + + if result.data != nil { + q.respond(w, r, result.data, result.warnings, r.FormValue("query")) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + +func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) { + warn, info := warnings.AsStrings(query, 10, 10) + + resp := &v1.Response{ + Status: statusSuccess, + Data: data, + Warnings: warn, + Infos: info, + } + + codec, err := q.negotiateCodec(req, resp) + if err != nil { + api.RespondFromGRPCError(q.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err})) + return + } + + b, err := codec.Encode(resp) + if err != nil { + level.Error(q.logger).Log("error marshaling response", "url", req.URL, "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", codec.ContentType().String()) + w.WriteHeader(http.StatusOK) + if n, err := w.Write(b); err != nil { + level.Error(q.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err) + } +} + +func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) { + for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) { + for _, codec := range q.codecs { + if codec.ContentType().Satisfies(clause) && codec.CanEncode(resp) { + return codec, nil + } + } + } + + defaultCodec := q.codecs[0] + if !defaultCodec.CanEncode(resp) { + return nil, fmt.Errorf("cannot encode response as %s", defaultCodec.ContentType()) + } + + return defaultCodec, nil +} diff --git a/pkg/api/queryapi/query_api_test.go b/pkg/api/queryapi/query_api_test.go new file mode 100644 index 00000000000..028184a12b8 --- /dev/null +++ b/pkg/api/queryapi/query_api_test.go @@ -0,0 +1,287 @@ +package queryapi + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/gorilla/mux" + "github.com/grafana/regexp" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/cortexproject/cortex/pkg/querier/stats" +) + +type mockSampleAndChunkQueryable struct { + queryableFn func(mint, maxt int64) (storage.Querier, error) + chunkQueryableFn func(mint, maxt int64) (storage.ChunkQuerier, error) +} + +func (m mockSampleAndChunkQueryable) Querier(mint, maxt int64) (storage.Querier, error) { + return m.queryableFn(mint, maxt) +} + +func (m mockSampleAndChunkQueryable) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { + return m.chunkQueryableFn(mint, maxt) +} + +type mockQuerier struct { + matrix model.Matrix +} + +func (m mockQuerier) Select(ctx context.Context, sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + if sp == nil { + panic(fmt.Errorf("select params must be set")) + } + return series.MatrixToSeriesSet(sortSeries, m.matrix) +} + +func (m mockQuerier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (m mockQuerier) LabelNames(ctx context.Context, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (mockQuerier) Close() error { + return nil +} + +func Test_CustomAPI(t *testing.T) { + engine := promql.NewEngine(promql.EngineOpts{ + MaxSamples: 100, + Timeout: time.Second * 2, + }) + mockQueryable := &mockSampleAndChunkQueryable{ + queryableFn: func(_, _ int64) (storage.Querier, error) { + return mockQuerier{ + matrix: model.Matrix{ + { + Metric: model.Metric{"__name__": "test", "foo": "bar"}, + Values: []model.SamplePair{ + {Timestamp: 1536673665000, Value: 0}, + {Timestamp: 1536673670000, Value: 1}, + }, + }, + }, + }, nil + }, + } + + tests := []struct { + name string + path string + expectedCode int + expectedBody string + }{ + { + name: "[Range Query] empty start", + path: "/api/v1/query_range?end=1536673680&query=test&step=5", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"start\\\"; cannot parse \\\"\\\" to a valid timestamp\"}", + }, + { + name: "[Range Query] empty end", + path: "/api/v1/query_range?query=test&start=1536673665&step=5", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\"; cannot parse \\\"\\\" to a valid timestamp\"}", + }, + { + name: "[Range Query] start is greater than end", + path: "/api/v1/query_range?end=1536673680&query=test&start=1536673681&step=5", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\"; end timestamp must not be before start time\"}", + }, + { + name: "[Range Query] negative step", + path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=-1", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"step\\\"; zero or negative query resolution step widths are not accepted. Try a positive integer\"}", + }, + { + name: "[Range Query] returned points are over 11000", + path: "/api/v1/query_range?end=1536700000&query=test&start=1536673665&step=1", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)\"}", + }, + { + name: "[Range Query] empty query", + path: "/api/v1/query_range?end=1536673680&start=1536673665&step=5", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}", + }, + { + name: "[Range Query] invalid lookback delta", + path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5&lookback_delta=dummy", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: rpc error: code = Code(400) desc = cannot parse \\\"dummy\\\" to a valid duration\"}", + }, + { + name: "[Range Query] invalid timeout delta", + path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5&timeout=dummy", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\"; cannot parse \\\"dummy\\\" to a valid duration\"}", + }, + { + name: "[Range Query] normal case", + path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5", + expectedCode: http.StatusOK, + expectedBody: "{\"status\":\"success\",\"data\":{\"resultType\":\"matrix\",\"result\":[{\"metric\":{\"__name__\":\"test\",\"foo\":\"bar\"},\"values\":[[1536673665,\"0\"],[1536673670,\"1\"],[1536673675,\"1\"],[1536673680,\"1\"]]}]}}", + }, + { + name: "[Instant Query] empty query", + path: "/api/v1/query", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}", + }, + { + name: "[Instant Query] invalid lookback delta", + path: "/api/v1/query?lookback_delta=dummy", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: rpc error: code = Code(400) desc = cannot parse \\\"dummy\\\" to a valid duration\"}", + }, + { + name: "[Instant Query] invalid timeout", + path: "/api/v1/query?timeout=dummy", + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\"; cannot parse \\\"dummy\\\" to a valid duration\"}", + }, + { + name: "[Instant Query] normal case", + path: "/api/v1/query?query=test&time=1536673670", + expectedCode: http.StatusOK, + expectedBody: "{\"status\":\"success\",\"data\":{\"resultType\":\"vector\",\"result\":[{\"metric\":{\"__name__\":\"test\",\"foo\":\"bar\"},\"value\":[1536673670,\"1\"]}]}}", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + + router := mux.NewRouter() + router.Path("/api/v1/query").Methods("GET").Handler(c.Wrap(c.InstantQueryHandler)) + router.Path("/api/v1/query_range").Methods("GET").Handler(c.Wrap(c.RangeQueryHandler)) + + req := httptest.NewRequest(http.MethodGet, test.path, nil) + ctx := context.Background() + _, ctx = stats.ContextWithEmptyStats(ctx) + req = req.WithContext(user.InjectOrgID(ctx, "user1")) + + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, test.expectedCode, rec.Code) + body, err := io.ReadAll(rec.Body) + require.NoError(t, err) + require.Equal(t, test.expectedBody, string(body)) + }) + } +} + +type mockCodec struct{} + +func (m *mockCodec) ContentType() v1.MIMEType { + return v1.MIMEType{Type: "application", SubType: "mock"} +} + +func (m *mockCodec) CanEncode(_ *v1.Response) bool { + return false +} + +func (m *mockCodec) Encode(_ *v1.Response) ([]byte, error) { + return nil, errors.New("encode err") +} + +func Test_InvalidCodec(t *testing.T) { + engine := promql.NewEngine(promql.EngineOpts{ + MaxSamples: 100, + Timeout: time.Second * 2, + }) + mockQueryable := &mockSampleAndChunkQueryable{ + queryableFn: func(_, _ int64) (storage.Querier, error) { + return mockQuerier{ + matrix: model.Matrix{ + { + Metric: model.Metric{"__name__": "test", "foo": "bar"}, + Values: []model.SamplePair{ + {Timestamp: 1536673665000, Value: 0}, + {Timestamp: 1536673670000, Value: 1}, + }, + }, + }, + }, nil + }, + } + + queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*")) + router := mux.NewRouter() + router.Path("/api/v1/query").Methods("GET").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler)) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=test", nil) + ctx := context.Background() + _, ctx = stats.ContextWithEmptyStats(ctx) + req = req.WithContext(user.InjectOrgID(ctx, "user1")) + + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotAcceptable, rec.Code) +} + +func Test_CustomAPI_StatsRenderer(t *testing.T) { + engine := promql.NewEngine(promql.EngineOpts{ + MaxSamples: 100, + Timeout: time.Second * 2, + }) + mockQueryable := &mockSampleAndChunkQueryable{ + queryableFn: func(_, _ int64) (storage.Querier, error) { + return mockQuerier{ + matrix: model.Matrix{ + { + Metric: model.Metric{"__name__": "test", "foo": "bar"}, + Values: []model.SamplePair{ + {Timestamp: 1536673665000, Value: 0}, + {Timestamp: 1536673670000, Value: 1}, + {Timestamp: 1536673675000, Value: 2}, + {Timestamp: 1536673680000, Value: 3}, + }, + }, + }, + }, nil + }, + } + + queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + + router := mux.NewRouter() + router.Path("/api/v1/query_range").Methods("GET").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler)) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5", nil) + ctx := context.Background() + _, ctx = stats.ContextWithEmptyStats(ctx) + req = req.WithContext(user.InjectOrgID(ctx, "user1")) + + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + queryStats := stats.FromContext(ctx) + require.NotNil(t, queryStats) + require.Equal(t, uint64(4), queryStats.LoadPeakSamples()) + require.Equal(t, uint64(4), queryStats.LoadScannedSamples()) +} diff --git a/pkg/api/queryapi/util.go b/pkg/api/queryapi/util.go new file mode 100644 index 00000000000..9d85b8a96c7 --- /dev/null +++ b/pkg/api/queryapi/util.go @@ -0,0 +1,120 @@ +package queryapi + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/gogo/status" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/util/annotations" + "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/util" +) + +var ( + ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time") + ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer") + ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") +) + +func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) { + var duration time.Duration + + if strDuration := r.FormValue("lookback_delta"); strDuration != "" { + parsedDuration, err := util.ParseDurationMs(strDuration) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "error parsing lookback delta duration: %v", err) + } + duration = convertMsToDuration(parsedDuration) + } + + return promql.NewPrometheusQueryOpts(r.FormValue("stats") == "all", duration), nil +} + +const ( + statusSuccess = "success" + + // Non-standard status code (originally introduced by nginx) for the case when a client closes + // the connection while the server is still processing the request. + statusClientClosedConnection = 499 +) + +type errorType string + +const ( + errorTimeout errorType = "timeout" + errorCanceled errorType = "canceled" + errorExec errorType = "execution" + errorBadData errorType = "bad_data" + errorInternal errorType = "internal" + errorNotAcceptable errorType = "not_acceptable" +) + +type apiError struct { + typ errorType + err error +} + +func (e *apiError) Error() string { + return fmt.Sprintf("%s: %s", e.typ, e.err) +} + +func returnAPIError(err error) *apiError { + if err == nil { + return nil + } + + var eqc promql.ErrQueryCanceled + var eqt promql.ErrQueryTimeout + var es promql.ErrStorage + + switch { + case errors.As(err, &eqc): + return &apiError{errorCanceled, httpgrpc.Errorf(statusClientClosedConnection, "%v", err)} + case errors.As(err, &eqt): + return &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable, "%v", err)} + case errors.As(err, &es): + return &apiError{errorInternal, httpgrpc.Errorf(http.StatusInternalServerError, "%v", err)} + } + + if errors.Is(err, context.Canceled) { + return &apiError{errorCanceled, httpgrpc.Errorf(statusClientClosedConnection, "%v", err)} + } + + return &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%v", err)} +} + +type apiFuncResult struct { + data interface{} + err *apiError + warnings annotations.Annotations + finalizer func() +} + +type apiFunc func(r *http.Request) apiFuncResult + +func invalidParamError(err error, parameter string) apiFuncResult { + return apiFuncResult{nil, &apiError{ + errorBadData, DecorateWithParamName(err, parameter), + }, nil, nil} +} + +func convertMsToTime(unixMs int64) time.Time { + return time.Unix(0, unixMs*int64(time.Millisecond)) +} + +func convertMsToDuration(unixMs int64) time.Duration { + return time.Duration(unixMs) * time.Millisecond +} + +func DecorateWithParamName(err error, field string) error { + errTmpl := "invalid parameter %q; %v" + if status, ok := status.FromError(err); ok { + return httpgrpc.Errorf(int(status.Code()), errTmpl, field, status.Message()) + } + return fmt.Errorf(errTmpl, field, err) +} diff --git a/pkg/api/queryapi/util_test.go b/pkg/api/queryapi/util_test.go new file mode 100644 index 00000000000..f3caf3ec8e7 --- /dev/null +++ b/pkg/api/queryapi/util_test.go @@ -0,0 +1,15 @@ +package queryapi + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_Convert(t *testing.T) { + time := time.Now().UnixMilli() + + require.Equal(t, time, convertMsToTime(time).UnixMilli()) + require.Equal(t, time, convertMsToDuration(time).Milliseconds()) +} diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index cf11258493b..54fe4aeba0d 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -3,7 +3,6 @@ package instantquery import ( "bytes" "context" - "fmt" "io" "net/http" "net/url" @@ -17,8 +16,8 @@ import ( "github.com/prometheus/common/model" v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/weaveworks/common/httpgrpc" - "google.golang.org/grpc/status" + "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" @@ -67,7 +66,7 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for var err error result.Time, err = util.ParseTimeParam(r, "time", c.now().Unix()) if err != nil { - return nil, decorateWithParamName(err, "time") + return nil, queryapi.DecorateWithParamName(err, "time") } result.Query = r.FormValue("query") @@ -228,14 +227,6 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ return tripperware.MergeResponse(ctx, true, req, responses...) } -func decorateWithParamName(err error, field string) error { - errTmpl := "invalid parameter %q; %v" - if status, ok := status.FromError(err); ok { - return httpgrpc.Errorf(int(status.Code()), errTmpl, field, status.Message()) - } - return fmt.Errorf(errTmpl, field, err) -} - func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) { for _, clause := range goautoneg.ParseAccept(acceptHeader) { if jsonMIMEType.Satisfies(clause) { diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 3cc4b83dcfe..9d82031fc0b 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -3,7 +3,6 @@ package queryrange import ( "bytes" "context" - "fmt" "io" "net/http" "net/url" @@ -11,13 +10,13 @@ import ( "strings" "time" - "github.com/gogo/status" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" + "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" @@ -36,9 +35,6 @@ var ( SortMapKeys: true, ValidateJsonRawMessage: false, }.Froze() - errEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time") - errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer") - errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") // Name of the cache control header. cacheControlHeader = "Cache-Control" @@ -104,31 +100,31 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa var err error result.Start, err = util.ParseTime(r.FormValue("start")) if err != nil { - return nil, decorateWithParamName(err, "start") + return nil, queryapi.DecorateWithParamName(err, "start") } result.End, err = util.ParseTime(r.FormValue("end")) if err != nil { - return nil, decorateWithParamName(err, "end") + return nil, queryapi.DecorateWithParamName(err, "end") } if result.End < result.Start { - return nil, errEndBeforeStart + return nil, queryapi.ErrEndBeforeStart } result.Step, err = util.ParseDurationMs(r.FormValue("step")) if err != nil { - return nil, decorateWithParamName(err, "step") + return nil, queryapi.DecorateWithParamName(err, "step") } if result.Step <= 0 { - return nil, errNegativeStep + return nil, queryapi.ErrNegativeStep } // For safety, limit the number of returned points per timeseries. // This is sufficient for 60s resolution for a week or 1h resolution for a year. if (result.End-result.Start)/result.Step > 11000 { - return nil, errStepTooSmall + return nil, queryapi.ErrStepTooSmall } result.Query = r.FormValue("query") @@ -272,11 +268,3 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, _ *http.Request, res func encodeDurationMs(d int64) string { return strconv.FormatFloat(float64(d)/float64(time.Second/time.Millisecond), 'f', -1, 64) } - -func decorateWithParamName(err error, field string) error { - errTmpl := "invalid parameter %q; %v" - if status, ok := status.FromError(err); ok { - return httpgrpc.Errorf(int(status.Code()), errTmpl, field, status.Message()) - } - return fmt.Errorf(errTmpl, field, err) -} diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index 83529de6fc0..1f3ebb137d8 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -20,6 +20,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -55,7 +56,7 @@ func TestRequest(t *testing.T) { }, { url: "api/v1/query_range?start=123&end=0", - expectedErr: errEndBeforeStart, + expectedErr: queryapi.ErrEndBeforeStart, }, { url: "api/v1/query_range?start=123&end=456&step=baz", @@ -63,11 +64,11 @@ func TestRequest(t *testing.T) { }, { url: "api/v1/query_range?start=123&end=456&step=-1", - expectedErr: errNegativeStep, + expectedErr: queryapi.ErrNegativeStep, }, { url: "api/v1/query_range?start=0&end=11001&step=1", - expectedErr: errStepTooSmall, + expectedErr: queryapi.ErrStepTooSmall, }, } { tc := tc