diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1d3fc2c5fcc..6298c3de361 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/wlog" + "github.com/prometheus/prometheus/util/zeropool" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/shipper" @@ -95,6 +96,8 @@ const ( var ( errExemplarRef = errors.New("exemplars not ingested because series not already present") errIngesterStopping = errors.New("ingester stopping") + + tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk] ) // Config for an Ingester. @@ -2045,7 +2048,8 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th return 0, 0, 0, 0, ss.Err() } - chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize) + chunkSeries := getTimeSeriesChunksSlice() + defer putTimeSeriesChunksSlice(chunkSeries) batchSizeBytes := 0 var it chunks.Iterator for ss.Next() { @@ -3062,6 +3066,31 @@ func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(respMsg)) } +func (i *Ingester) getInstanceLimits() *InstanceLimits { + // Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL. + if i.State() == services.Starting { + return nil + } + + if i.cfg.InstanceLimitsFn == nil { + return defaultInstanceLimits + } + + l := i.cfg.InstanceLimitsFn() + if l == nil { + return defaultInstanceLimits + } + + return l +} + +// stopIncomingRequests is called during the shutdown process. +func (i *Ingester) stopIncomingRequests() { + i.stoppedMtx.Lock() + defer i.stoppedMtx.Unlock() + i.stopped = true +} + // metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester. func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryIngestersWithin time.Duration) (mint, maxt int64, err error) { if queryIngestersWithin > 0 { @@ -3119,27 +3148,16 @@ func wrappedTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesL ) } -func (i *Ingester) getInstanceLimits() *InstanceLimits { - // Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL. - if i.State() == services.Starting { - return nil - } - - if i.cfg.InstanceLimitsFn == nil { - return defaultInstanceLimits +func getTimeSeriesChunksSlice() []client.TimeSeriesChunk { + if p := tsChunksPool.Get(); p != nil { + return p } - l := i.cfg.InstanceLimitsFn() - if l == nil { - return defaultInstanceLimits - } - - return l + return make([]client.TimeSeriesChunk, 0, queryStreamBatchSize) } -// stopIncomingRequests is called during the shutdown process. -func (i *Ingester) stopIncomingRequests() { - i.stoppedMtx.Lock() - defer i.stoppedMtx.Unlock() - i.stopped = true +func putTimeSeriesChunksSlice(p []client.TimeSeriesChunk) { + if p != nil { + tsChunksPool.Put(p[:0]) + } } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index ad84c57be4c..38f5a980785 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3424,10 +3424,25 @@ func (m *mockQueryStreamServer) Context() context.Context { } func BenchmarkIngester_QueryStream_Chunks(b *testing.B) { - benchmarkQueryStream(b) + tc := []struct { + samplesCount, seriesCount int + }{ + {samplesCount: 10, seriesCount: 10}, + {samplesCount: 10, seriesCount: 50}, + {samplesCount: 10, seriesCount: 100}, + {samplesCount: 50, seriesCount: 10}, + {samplesCount: 50, seriesCount: 50}, + {samplesCount: 50, seriesCount: 100}, + } + + for _, c := range tc { + b.Run(fmt.Sprintf("samplesCount=%v; seriesCount=%v", c.samplesCount, c.seriesCount), func(b *testing.B) { + benchmarkQueryStream(b, c.samplesCount, c.seriesCount) + }) + } } -func benchmarkQueryStream(b *testing.B) { +func benchmarkQueryStream(b *testing.B, samplesCount, seriesCount int) { cfg := defaultIngesterTestConfig(b) // Create ingester. @@ -3444,7 +3459,6 @@ func benchmarkQueryStream(b *testing.B) { // Push series. ctx := user.InjectOrgID(context.Background(), userID) - const samplesCount = 1000 samples := make([]cortexpb.Sample, 0, samplesCount) for i := 0; i < samplesCount; i++ { @@ -3454,7 +3468,6 @@ func benchmarkQueryStream(b *testing.B) { }) } - const seriesCount = 100 for s := 0; s < seriesCount; s++ { _, err = i.Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: strconv.Itoa(s)}}, samples)) require.NoError(b, err) @@ -3462,7 +3475,7 @@ func benchmarkQueryStream(b *testing.B) { req := &client.QueryRequest{ StartTimestampMs: 0, - EndTimestampMs: samplesCount + 1, + EndTimestampMs: int64(samplesCount + 1), Matchers: []*client.LabelMatcher{{ Type: client.EQUAL, @@ -3474,6 +3487,7 @@ func benchmarkQueryStream(b *testing.B) { mockStream := &mockQueryStreamServer{ctx: ctx} b.ResetTimer() + b.ReportAllocs() for ix := 0; ix < b.N; ix++ { err := i.QueryStream(req, mockStream)