Skip to content

Commit 9d9d4bf

Browse files
authored
Use slice pooling to populate the query stream response (#6466)
Signed-off-by: alanprot <[email protected]>
1 parent 5d593f5 commit 9d9d4bf

File tree

2 files changed

+57
-25
lines changed

2 files changed

+57
-25
lines changed

pkg/ingester/ingester.go

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/prometheus/prometheus/tsdb/chunkenc"
3434
"github.com/prometheus/prometheus/tsdb/chunks"
3535
"github.com/prometheus/prometheus/tsdb/wlog"
36+
"github.com/prometheus/prometheus/util/zeropool"
3637
"github.com/thanos-io/objstore"
3738
"github.com/thanos-io/thanos/pkg/block/metadata"
3839
"github.com/thanos-io/thanos/pkg/shipper"
@@ -95,6 +96,8 @@ const (
9596
var (
9697
errExemplarRef = errors.New("exemplars not ingested because series not already present")
9798
errIngesterStopping = errors.New("ingester stopping")
99+
100+
tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk]
98101
)
99102

100103
// Config for an Ingester.
@@ -2055,7 +2058,8 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
20552058
return 0, 0, 0, 0, ss.Err()
20562059
}
20572060

2058-
chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
2061+
chunkSeries := getTimeSeriesChunksSlice()
2062+
defer putTimeSeriesChunksSlice(chunkSeries)
20592063
batchSizeBytes := 0
20602064
var it chunks.Iterator
20612065
for ss.Next() {
@@ -3072,6 +3076,31 @@ func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) {
30723076
_, _ = w.Write([]byte(respMsg))
30733077
}
30743078

3079+
func (i *Ingester) getInstanceLimits() *InstanceLimits {
3080+
// Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL.
3081+
if i.State() == services.Starting {
3082+
return nil
3083+
}
3084+
3085+
if i.cfg.InstanceLimitsFn == nil {
3086+
return defaultInstanceLimits
3087+
}
3088+
3089+
l := i.cfg.InstanceLimitsFn()
3090+
if l == nil {
3091+
return defaultInstanceLimits
3092+
}
3093+
3094+
return l
3095+
}
3096+
3097+
// stopIncomingRequests is called during the shutdown process.
3098+
func (i *Ingester) stopIncomingRequests() {
3099+
i.stoppedMtx.Lock()
3100+
defer i.stoppedMtx.Unlock()
3101+
i.stopped = true
3102+
}
3103+
30753104
// metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester.
30763105
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryIngestersWithin time.Duration) (mint, maxt int64, err error) {
30773106
if queryIngestersWithin > 0 {
@@ -3129,27 +3158,16 @@ func wrappedTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesL
31293158
)
31303159
}
31313160

3132-
func (i *Ingester) getInstanceLimits() *InstanceLimits {
3133-
// Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL.
3134-
if i.State() == services.Starting {
3135-
return nil
3136-
}
3137-
3138-
if i.cfg.InstanceLimitsFn == nil {
3139-
return defaultInstanceLimits
3161+
func getTimeSeriesChunksSlice() []client.TimeSeriesChunk {
3162+
if p := tsChunksPool.Get(); p != nil {
3163+
return p
31403164
}
31413165

3142-
l := i.cfg.InstanceLimitsFn()
3143-
if l == nil {
3144-
return defaultInstanceLimits
3145-
}
3146-
3147-
return l
3166+
return make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
31483167
}
31493168

3150-
// stopIncomingRequests is called during the shutdown process.
3151-
func (i *Ingester) stopIncomingRequests() {
3152-
i.stoppedMtx.Lock()
3153-
defer i.stoppedMtx.Unlock()
3154-
i.stopped = true
3169+
func putTimeSeriesChunksSlice(p []client.TimeSeriesChunk) {
3170+
if p != nil {
3171+
tsChunksPool.Put(p[:0])
3172+
}
31553173
}

pkg/ingester/ingester_test.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3445,10 +3445,25 @@ func (m *mockQueryStreamServer) Context() context.Context {
34453445
}
34463446

34473447
func BenchmarkIngester_QueryStream_Chunks(b *testing.B) {
3448-
benchmarkQueryStream(b)
3448+
tc := []struct {
3449+
samplesCount, seriesCount int
3450+
}{
3451+
{samplesCount: 10, seriesCount: 10},
3452+
{samplesCount: 10, seriesCount: 50},
3453+
{samplesCount: 10, seriesCount: 100},
3454+
{samplesCount: 50, seriesCount: 10},
3455+
{samplesCount: 50, seriesCount: 50},
3456+
{samplesCount: 50, seriesCount: 100},
3457+
}
3458+
3459+
for _, c := range tc {
3460+
b.Run(fmt.Sprintf("samplesCount=%v; seriesCount=%v", c.samplesCount, c.seriesCount), func(b *testing.B) {
3461+
benchmarkQueryStream(b, c.samplesCount, c.seriesCount)
3462+
})
3463+
}
34493464
}
34503465

3451-
func benchmarkQueryStream(b *testing.B) {
3466+
func benchmarkQueryStream(b *testing.B, samplesCount, seriesCount int) {
34523467
cfg := defaultIngesterTestConfig(b)
34533468

34543469
// Create ingester.
@@ -3465,7 +3480,6 @@ func benchmarkQueryStream(b *testing.B) {
34653480
// Push series.
34663481
ctx := user.InjectOrgID(context.Background(), userID)
34673482

3468-
const samplesCount = 1000
34693483
samples := make([]cortexpb.Sample, 0, samplesCount)
34703484

34713485
for i := 0; i < samplesCount; i++ {
@@ -3475,15 +3489,14 @@ func benchmarkQueryStream(b *testing.B) {
34753489
})
34763490
}
34773491

3478-
const seriesCount = 100
34793492
for s := 0; s < seriesCount; s++ {
34803493
_, err = i.Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: strconv.Itoa(s)}}, samples))
34813494
require.NoError(b, err)
34823495
}
34833496

34843497
req := &client.QueryRequest{
34853498
StartTimestampMs: 0,
3486-
EndTimestampMs: samplesCount + 1,
3499+
EndTimestampMs: int64(samplesCount + 1),
34873500

34883501
Matchers: []*client.LabelMatcher{{
34893502
Type: client.EQUAL,
@@ -3495,6 +3508,7 @@ func benchmarkQueryStream(b *testing.B) {
34953508
mockStream := &mockQueryStreamServer{ctx: ctx}
34963509

34973510
b.ResetTimer()
3511+
b.ReportAllocs()
34983512

34993513
for ix := 0; ix < b.N; ix++ {
35003514
err := i.QueryStream(req, mockStream)

0 commit comments

Comments
 (0)