diff --git a/CHANGELOG.md b/CHANGELOG.md index 515a909b2e0..ecb8d412c9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * `cortex_ruler_clients` * `cortex_ruler_client_request_duration_seconds` * [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901 +* [ENHANCEMENT] Query-frontend: reduced memory allocations when serializing query response. #3964 * [ENHANCEMENT] Ingester: reduce CPU and memory when an high number of errors are returned by the ingester on the write path with the blocks storage. #3969 #3971 #3973 * [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948 * [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959 diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index 065d7fdca6e..45cfb22225c 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -36,9 +36,10 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er } httpResp := &http.Response{ - StatusCode: int(resp.Code), - Body: ioutil.NopCloser(bytes.NewReader(resp.Body)), - Header: http.Header{}, + StatusCode: int(resp.Code), + Body: ioutil.NopCloser(bytes.NewReader(resp.Body)), + Header: http.Header{}, + ContentLength: int64(len(resp.Body)), } for _, h := range resp.Headers { httpResp.Header[h.Key] = h.Values diff --git a/pkg/querier/queryrange/marshaling_test.go b/pkg/querier/queryrange/marshaling_test.go index 9e58f35edc3..83e849e49c0 100644 --- a/pkg/querier/queryrange/marshaling_test.go +++ b/pkg/querier/queryrange/marshaling_test.go @@ -4,29 +4,88 @@ import ( "bytes" "context" "io/ioutil" + "math/rand" "net/http" - "os" "testing" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/cortexpb" ) -func BenchmarkMarshalling(b *testing.B) { - jsonfile := os.Getenv("JSON") - buf, err := ioutil.ReadFile(jsonfile) +func BenchmarkPrometheusCodec_DecodeResponse(b *testing.B) { + const ( + numSeries = 1000 + numSamplesPerSeries = 1000 + ) + + // Generate a mocked response and marshal it. + res := mockPrometheusResponse(numSeries, numSamplesPerSeries) + encodedRes, err := json.Marshal(res) require.NoError(b, err) + b.Log("test prometheus response size:", len(encodedRes)) + + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { - apiResp, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{ - StatusCode: 200, - Body: ioutil.NopCloser(bytes.NewReader(buf)), + _, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewReader(encodedRes)), + ContentLength: int64(len(encodedRes)), }, nil) require.NoError(b, err) + } +} - resp, err := PrometheusCodec.EncodeResponse(context.Background(), apiResp) - require.NoError(b, err) +func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) { + const ( + numSeries = 1000 + numSamplesPerSeries = 1000 + ) + + // Generate a mocked response and marshal it. + res := mockPrometheusResponse(numSeries, numSamplesPerSeries) + + b.ResetTimer() + b.ReportAllocs() - buf2, err := ioutil.ReadAll(resp.Body) + for n := 0; n < b.N; n++ { + _, err := PrometheusCodec.EncodeResponse(context.Background(), res) require.NoError(b, err) - require.Equal(b, string(buf), string(buf2)) + } +} + +func mockPrometheusResponse(numSeries, numSamplesPerSeries int) *PrometheusResponse { + stream := make([]SampleStream, numSeries) + for s := 0; s < numSeries; s++ { + // Generate random samples. + samples := make([]cortexpb.Sample, numSamplesPerSeries) + for i := 0; i < numSamplesPerSeries; i++ { + samples[i] = cortexpb.Sample{ + Value: rand.Float64(), + TimestampMs: int64(i), + } + } + + // Generate random labels. + lbls := make([]cortexpb.LabelAdapter, 10) + for i := range lbls { + lbls[i].Name = "a_medium_size_label_name" + lbls[i].Value = "a_medium_size_label_value_that_is_used_to_benchmark_marshalling" + } + + stream[s] = SampleStream{ + Labels: lbls, + Samples: samples, + } + } + + return &PrometheusResponse{ + Status: "success", + Data: PrometheusData{ + ResultType: "vector", + Result: stream, + }, } } diff --git a/pkg/querier/queryrange/query_range.go b/pkg/querier/queryrange/query_range.go index 2223121cd32..d68d2e595ff 100644 --- a/pkg/querier/queryrange/query_range.go +++ b/pkg/querier/queryrange/query_range.go @@ -31,8 +31,12 @@ import ( const StatusSuccess = "success" var ( - matrix = model.ValMatrix.String() - json = jsoniter.ConfigCompatibleWithStandardLibrary + matrix = model.ValMatrix.String() + json = jsoniter.Config{ + EscapeHTML: false, // No HTML in our responses. + SortMapKeys: true, + ValidateJsonRawMessage: true, + }.Froze() errEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "end timestamp must not be before start time") errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer") errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") @@ -262,16 +266,20 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck defer log.Finish() - buf, err := ioutil.ReadAll(r.Body) - if err != nil { + // Preallocate the buffer with the exact size so we don't waste allocations + // while progressively growing an initial small buffer. The buffer capacity + // is increased by MinRead to avoid extra allocations due to how ReadFrom() + // internally works. + buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead)) + if _, err := buf.ReadFrom(r.Body); err != nil { log.Error(err) return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } - log.LogFields(otlog.Int("bytes", len(buf))) + log.LogFields(otlog.Int("bytes", buf.Len())) var resp PrometheusResponse - if err := json.Unmarshal(buf, &resp); err != nil { + if err := json.Unmarshal(buf.Bytes(), &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } @@ -303,8 +311,9 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http. Header: http.Header{ "Content-Type": []string{"application/json"}, }, - Body: ioutil.NopCloser(bytes.NewBuffer(b)), - StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewBuffer(b)), + StatusCode: http.StatusOK, + ContentLength: int64(len(b)), } return &resp, nil } diff --git a/pkg/querier/queryrange/query_range_test.go b/pkg/querier/queryrange/query_range_test.go index 4bba489326b..5c1b616f9f5 100644 --- a/pkg/querier/queryrange/query_range_test.go +++ b/pkg/querier/queryrange/query_range_test.go @@ -97,9 +97,10 @@ func TestResponse(t *testing.T) { // Reset response, as the above call will have consumed the body reader. response = &http.Response{ - StatusCode: 200, - Header: http.Header{"Content-Type": []string{"application/json"}}, - Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))), + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte(tc.body))), + ContentLength: int64(len(tc.body)), } resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) require.NoError(t, err)