diff --git a/CHANGELOG.md b/CHANGELOG.md index f186bedf2f5..b63addb95d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ * [ENHANCEMENT] When a tenant accesses the Alertmanager UI or its API, if we have valid `-alertmanager.configs.fallback` we'll use that to start the manager and avoid failing the request. #3073 * [ENHANCEMENT] Add `DELETE api/v1/rules/{namespace}` to the Ruler. It allows all the rule groups of a namespace to be deleted. #3120 * [ENHANCEMENT] Experimental Delete Series: Retry processing of Delete requests during failures. #2926 +* [ENHANCEMENT] Improve performance of QueryStream() in ingesters. #3177 * [ENHANCEMENT] Modules included in "All" target are now visible in output of `-modules` CLI flag. #3155 * [BUGFIX] Ruler: when loading rules from "local" storage, check for directory after resolving symlink. #3137 * [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 5a682f60de4..16e9d5c1b51 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -723,6 +723,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ } numSeries, numChunks := 0, 0 + reuseWireChunks := [queryStreamBatchSize][]client.Chunk{} batch := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize) // We'd really like to have series in label order, not FP order, so we // can iteratively merge them with entries coming from the chunk store. But @@ -741,10 +742,12 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ } numSeries++ - wireChunks, err := toWireChunks(chunks, nil) + reusePos := len(batch) + wireChunks, err := toWireChunks(chunks, reuseWireChunks[reusePos]) if err != nil { return err } + reuseWireChunks[reusePos] = wireChunks numChunks += len(wireChunks) batch = append(batch, client.TimeSeriesChunk{ diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index b193094b5f4..cb82865e8b1 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "math" + "math/rand" "net/http" "os" "path/filepath" @@ -843,19 +844,9 @@ func BenchmarkIngesterPushErrors(b *testing.B) { benchmarkIngesterPush(b, limits, true) } -func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) { - cfg := defaultIngesterTestConfig() - clientCfg := defaultClientTestConfig() - - const ( - series = 100 - samples = 100 - ) - - // Construct a set of realistic-looking samples, all with slightly different label sets - var allLabels []labels.Labels - var allSamples []client.Sample - for j := 0; j < series; j++ { +// Construct a set of realistic-looking samples, all with slightly different label sets +func benchmarkData(nSeries int) (allLabels []labels.Labels, allSamples []client.Sample) { + for j := 0; j < nSeries; j++ { labels := chunk.BenchmarkLabels.Copy() for i := range labels { if labels[i].Name == "cpu" { @@ -865,6 +856,19 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte allLabels = append(allLabels, labels) allSamples = append(allSamples, client.Sample{TimestampMs: 0, Value: float64(j)}) } + return +} + +func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) { + cfg := defaultIngesterTestConfig() + clientCfg := defaultClientTestConfig() + + const ( + series = 100 + samples = 100 + ) + + allLabels, allSamples := benchmarkData(series) ctx := user.InjectOrgID(context.Background(), "1") encodings := []struct { @@ -897,3 +901,48 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte } } + +func BenchmarkIngester_QueryStream(b *testing.B) { + cfg := defaultIngesterTestConfig() + clientCfg := defaultClientTestConfig() + limits := defaultLimitsTestConfig() + _, ing := newTestStore(b, cfg, clientCfg, limits, nil) + ctx := user.InjectOrgID(context.Background(), "1") + + const ( + series = 2000 + samples = 1000 + ) + + allLabels, allSamples := benchmarkData(series) + + // Bump the timestamp and set a random value on each of our test samples each time round the loop + for j := 0; j < samples; j++ { + for i := range allSamples { + allSamples[i].TimestampMs = int64(j + 1) + allSamples[i].Value = rand.Float64() + } + _, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, nil, client.API)) + require.NoError(b, err) + } + + req := &client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: samples + 1, + + Matchers: []*client.LabelMatcher{{ + Type: client.EQUAL, + Name: model.MetricNameLabel, + Value: "container_cpu_usage_seconds_total", + }}, + } + + mockStream := &mockQueryStreamServer{ctx: ctx} + + b.ResetTimer() + + for ix := 0; ix < b.N; ix++ { + err := ing.QueryStream(req, mockStream) + require.NoError(b, err) + } +}