diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner.go b/pkg/sql/colexec/colexecjoin/crossjoiner.go index c44084571cff..bb2c877e91b2 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner.go @@ -107,6 +107,7 @@ func (c *crossJoiner) Next() coldata.Batch { } c.output, _ = c.unlimitedAllocator.ResetMaybeReallocate( c.outputTypes, c.output, willEmit, c.maxOutputBatchMemSize, + true, /* desiredCapacitySufficient */ ) if willEmit > c.output.Capacity() { willEmit = c.output.Capacity() diff --git a/pkg/sql/colexec/colexecjoin/hashjoiner.go b/pkg/sql/colexec/colexecjoin/hashjoiner.go index eed7a551ff7b..4c76e13a0639 100644 --- a/pkg/sql/colexec/colexecjoin/hashjoiner.go +++ b/pkg/sql/colexec/colexecjoin/hashjoiner.go @@ -725,6 +725,7 @@ func (hj *hashJoiner) resetOutput(nResults int) { const maxOutputBatchMemSize = math.MaxInt64 hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocate( hj.outputTypes, hj.output, nResults, maxOutputBatchMemSize, + true, /* desiredCapacitySufficient */ ) } diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go index b3eeb7007a04..785f2eb886db 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_exceptall.eg.go @@ -14219,6 +14219,7 @@ func (o *mergeJoinExceptAllOp) buildFromBufferedGroup() (bufferedGroupComplete b func (o *mergeJoinExceptAllOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go index a4b80dc6dc37..e36363718a5c 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_fullouter.eg.go @@ -15366,6 +15366,7 @@ func (o *mergeJoinFullOuterOp) buildFromBufferedGroup() (bufferedGroupComplete b func (o *mergeJoinFullOuterOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go index e6800eb3cead..67b4da51c57b 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_inner.eg.go @@ -10900,6 +10900,7 @@ func (o *mergeJoinInnerOp) buildFromBufferedGroup() (bufferedGroupComplete bool) func (o *mergeJoinInnerOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_intersectall.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_intersectall.eg.go index 759f3c619d42..081238597e5c 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_intersectall.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_intersectall.eg.go @@ -11610,6 +11610,7 @@ func (o *mergeJoinIntersectAllOp) buildFromBufferedGroup() (bufferedGroupComplet func (o *mergeJoinIntersectAllOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go index 09cb19892d15..98699d35cd34 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_leftanti.eg.go @@ -13129,6 +13129,7 @@ func (o *mergeJoinLeftAntiOp) buildFromBufferedGroup() (bufferedGroupComplete bo func (o *mergeJoinLeftAntiOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go index ba18ed6e7bc1..71f708345c82 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_leftouter.eg.go @@ -13156,6 +13156,7 @@ func (o *mergeJoinLeftOuterOp) buildFromBufferedGroup() (bufferedGroupComplete b func (o *mergeJoinLeftOuterOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go index 7b0f7d529e0b..c770e70e7b9b 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_leftsemi.eg.go @@ -10853,6 +10853,7 @@ func (o *mergeJoinLeftSemiOp) buildFromBufferedGroup() (bufferedGroupComplete bo func (o *mergeJoinLeftSemiOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go index ba4c19f044f1..eec58431c897 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go @@ -13060,6 +13060,7 @@ func (o *mergeJoinRightAntiOp) buildFromBufferedGroup() (bufferedGroupComplete b func (o *mergeJoinRightAntiOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go index f628e0495d35..54722dc88882 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go @@ -13110,6 +13110,7 @@ func (o *mergeJoinRightOuterOp) buildFromBufferedGroup() (bufferedGroupComplete func (o *mergeJoinRightOuterOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go b/pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go index bccc21907e56..046f95311dee 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go @@ -10813,6 +10813,7 @@ func (o *mergeJoinRightSemiOp) buildFromBufferedGroup() (bufferedGroupComplete b func (o *mergeJoinRightSemiOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go b/pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go index 97beb827907a..af606bb76ed2 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_tmpl.go @@ -1355,6 +1355,7 @@ func _SOURCE_FINISHED_SWITCH(_JOIN_TYPE joinTypeInfo) { // */}} func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next() coldata.Batch { o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate( o.outputTypes, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) o.outputCapacity = o.output.Capacity() o.bufferedGroup.helper.output = o.output diff --git a/pkg/sql/colexec/colexecutils/deselector.go b/pkg/sql/colexec/colexecutils/deselector.go index 9d3e87e3ef41..1c8fdbd8a379 100644 --- a/pkg/sql/colexec/colexecutils/deselector.go +++ b/pkg/sql/colexec/colexecutils/deselector.go @@ -62,6 +62,7 @@ func (p *deselectorOp) Next() coldata.Batch { const maxBatchMemSize = math.MaxInt64 p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate( p.inputTypes, p.output, batch.Length(), maxBatchMemSize, + true, /* desiredCapacitySufficient */ ) sel := batch.Selection() p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() { diff --git a/pkg/sql/colexec/colexecutils/spilling_queue.go b/pkg/sql/colexec/colexecutils/spilling_queue.go index 193a5fb2d783..6dae386d0d05 100644 --- a/pkg/sql/colexec/colexecutils/spilling_queue.go +++ b/pkg/sql/colexec/colexecutils/spilling_queue.go @@ -193,6 +193,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) { const maxBatchMemSize = math.MaxInt64 q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocate( q.typs, q.diskQueueDeselectionScratch, n, maxBatchMemSize, + true, /* desiredCapacitySufficient */ ) q.unlimitedAllocator.PerformOperation(q.diskQueueDeselectionScratch.ColVecs(), func() { for i := range q.typs { @@ -295,6 +296,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) { // attention to the memory registered with the unlimited allocator, and // we will stop adding tuples into this batch and spill when needed. math.MaxInt64, /* maxBatchMemSize */ + true, /* desiredCapacitySufficient */ ) q.unlimitedAllocator.PerformOperation(newBatch.ColVecs(), func() { for i := range q.typs { diff --git a/pkg/sql/colexec/colexecwindow/buffered_window.go b/pkg/sql/colexec/colexecwindow/buffered_window.go index a5f1cf83c27d..d5425789eca7 100644 --- a/pkg/sql/colexec/colexecwindow/buffered_window.go +++ b/pkg/sql/colexec/colexecwindow/buffered_window.go @@ -251,6 +251,7 @@ func (b *bufferedWindowOp) Next() coldata.Batch { const maxBatchMemSize = math.MaxInt64 b.currentBatch, _ = b.allocator.ResetMaybeReallocate( b.outputTypes, b.currentBatch, batch.Length(), maxBatchMemSize, + true, /* desiredCapacitySufficient */ ) b.allocator.PerformOperation(b.currentBatch.ColVecs(), func() { for colIdx, vec := range batch.ColVecs() { diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 9029e1dd61fa..ef9bf388ab2e 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -198,7 +198,8 @@ func (c *Columnarizer) Next() coldata.Batch { switch c.mode { case columnarizerBufferingMode: c.batch, reallocated = c.allocator.ResetMaybeReallocate( - c.typs, c.batch, 1 /* minDesiredCapacity */, c.maxBatchMemSize, + c.typs, c.batch, 1, /* minDesiredCapacity */ + c.maxBatchMemSize, false, /* desiredCapacitySufficient */ ) case columnarizerStreamingMode: // Note that we're not using ResetMaybeReallocate because we will diff --git a/pkg/sql/colexec/hash_aggregator.eg.go b/pkg/sql/colexec/hash_aggregator.eg.go index 83530f9519fe..40dc0d6fca90 100644 --- a/pkg/sql/colexec/hash_aggregator.eg.go +++ b/pkg/sql/colexec/hash_aggregator.eg.go @@ -459,6 +459,7 @@ func getNext_true(op *hashAggregator) coldata.Batch { // len(op.buckets) capacity. op.output, _ = op.accountingHelper.ResetMaybeReallocate( op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, + true, /* desiredCapacitySufficient */ ) curOutputIdx := 0 for curOutputIdx < op.output.Capacity() && @@ -606,6 +607,7 @@ func getNext_false(op *hashAggregator) coldata.Batch { // len(op.buckets) capacity. op.output, _ = op.accountingHelper.ResetMaybeReallocate( op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, + true, /* desiredCapacitySufficient */ ) curOutputIdx := 0 for curOutputIdx < op.output.Capacity() && diff --git a/pkg/sql/colexec/hash_aggregator_tmpl.go b/pkg/sql/colexec/hash_aggregator_tmpl.go index f9c86e41e246..02b44dab0976 100644 --- a/pkg/sql/colexec/hash_aggregator_tmpl.go +++ b/pkg/sql/colexec/hash_aggregator_tmpl.go @@ -345,6 +345,7 @@ func getNext(op *hashAggregator, partialOrder bool) coldata.Batch { // len(op.buckets) capacity. op.output, _ = op.accountingHelper.ResetMaybeReallocate( op.outputTypes, op.output, len(op.buckets), op.maxOutputBatchMemSize, + true, /* desiredCapacitySufficient */ ) curOutputIdx := 0 for curOutputIdx < op.output.Capacity() && diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index c99d9a986060..56cc0024288e 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -285,14 +285,16 @@ func (a *orderedAggregator) Next() coldata.Batch { a.scratch.Batch = a.allocator.NewMemBatchWithFixedCapacity(a.outputTypes, newMinCapacity) } else { a.scratch.Batch, _ = a.allocator.ResetMaybeReallocate( - a.outputTypes, a.scratch.Batch, newMinCapacity, maxBatchMemSize, + a.outputTypes, a.scratch.Batch, newMinCapacity, + maxBatchMemSize, true, /* desiredCapacitySufficient */ ) } // We will never copy more than coldata.BatchSize() into the // temporary buffer, so a half of the scratch's capacity will always // be sufficient. a.scratch.tempBuffer, _ = a.allocator.ResetMaybeReallocate( - a.outputTypes, a.scratch.tempBuffer, newMinCapacity/2, maxBatchMemSize, + a.outputTypes, a.scratch.tempBuffer, newMinCapacity/2, + maxBatchMemSize, true, /* desiredCapacitySufficient */ ) for fnIdx, fn := range a.bucket.fns { fn.SetOutput(a.scratch.ColVec(fnIdx)) diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index 0aa24eb8ccaa..041c58dd1c12 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -270,6 +270,7 @@ func (o *OrderedSynchronizer) resetOutput() { var reallocated bool o.output, reallocated = o.accountingHelper.ResetMaybeReallocate( o.typs, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) if reallocated { o.outVecs.SetBatch(o.output) diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index f68da4c57c1b..c11e831eef24 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -220,6 +220,7 @@ func (o *OrderedSynchronizer) resetOutput() { var reallocated bool o.output, reallocated = o.accountingHelper.ResetMaybeReallocate( o.typs, o.output, 1 /* minDesiredCapacity */, o.memoryLimit, + false, /* desiredCapacitySufficient */ ) if reallocated { o.outVecs.SetBatch(o.output) diff --git a/pkg/sql/colexec/sort.go b/pkg/sql/colexec/sort.go index 37ff8570c685..b2c10115fdda 100644 --- a/pkg/sql/colexec/sort.go +++ b/pkg/sql/colexec/sort.go @@ -287,7 +287,10 @@ func (p *sortOp) Next() coldata.Batch { p.state = sortDone continue } - p.output, _ = p.allocator.ResetMaybeReallocate(p.inputTypes, p.output, toEmit, p.maxOutputBatchMemSize) + p.output, _ = p.allocator.ResetMaybeReallocate( + p.inputTypes, p.output, toEmit, p.maxOutputBatchMemSize, + true, /* desiredCapacitySufficient */ + ) if toEmit > p.output.Capacity() { toEmit = p.output.Capacity() } diff --git a/pkg/sql/colexec/sorttopk.go b/pkg/sql/colexec/sorttopk.go index 96b69f1af7eb..7bc269da030f 100644 --- a/pkg/sql/colexec/sorttopk.go +++ b/pkg/sql/colexec/sorttopk.go @@ -200,7 +200,10 @@ func (t *topKSorter) emit() coldata.Batch { // We're done. return coldata.ZeroBatch } - t.output, _ = t.allocator.ResetMaybeReallocate(t.inputTypes, t.output, toEmit, t.maxOutputBatchMemSize) + t.output, _ = t.allocator.ResetMaybeReallocate( + t.inputTypes, t.output, toEmit, t.maxOutputBatchMemSize, + true, /* desiredCapacitySufficient */ + ) if toEmit > t.output.Capacity() { toEmit = t.output.Capacity() } diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index e50731f36424..0ffa7619ae8b 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -325,6 +325,7 @@ func (cf *cFetcher) resetBatch() { } cf.machine.batch, reallocated = cf.accountingHelper.ResetMaybeReallocate( cf.table.typs, cf.machine.batch, minDesiredCapacity, cf.memoryLimit, + false, /* desiredCapacitySufficient */ ) if reallocated { cf.machine.colvecs.SetBatch(cf.machine.batch) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index db642a1e19cc..67d7ea657a52 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -429,7 +429,10 @@ func (i *Inbox) Next() coldata.Batch { } // We rely on the outboxes to produce reasonably sized batches. const maxBatchMemSize = math.MaxInt64 - i.scratch.b, _ = i.allocator.ResetMaybeReallocate(i.typs, i.scratch.b, batchLength, maxBatchMemSize) + i.scratch.b, _ = i.allocator.ResetMaybeReallocate( + i.typs, i.scratch.b, batchLength, maxBatchMemSize, + true, /* desiredCapacitySufficient */ + ) i.allocator.PerformOperation(i.scratch.b.ColVecs(), func() { if err := i.converter.ArrowToBatch(i.scratch.data, batchLength, i.scratch.b); err != nil { colexecerror.InternalError(err) diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index e00867a0ed2e..3f4316a56248 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -156,7 +156,10 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat // // The method will grow the allocated capacity of the batch exponentially // (possibly incurring a reallocation), until the batch reaches -// coldata.BatchSize() in capacity or maxBatchMemSize in the memory footprint. +// coldata.BatchSize() in capacity or maxBatchMemSize in the memory footprint if +// desiredCapacitySufficient is false. When that parameter is true and the +// capacity of old batch is at least minDesiredCapacity, then the old batch is +// reused. // // NOTE: if the reallocation occurs, then the memory under the old batch is // released, so it is expected that the caller will lose the references to the @@ -164,7 +167,11 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat // Note: the method assumes that minDesiredCapacity is at least 0 and will clamp // minDesiredCapacity to be between 1 and coldata.BatchSize() inclusive. func (a *Allocator) ResetMaybeReallocate( - typs []*types.T, oldBatch coldata.Batch, minDesiredCapacity int, maxBatchMemSize int64, + typs []*types.T, + oldBatch coldata.Batch, + minDesiredCapacity int, + maxBatchMemSize int64, + desiredCapacitySufficient bool, ) (newBatch coldata.Batch, reallocated bool) { if minDesiredCapacity < 0 { colexecerror.InternalError(errors.AssertionFailedf("invalid minDesiredCapacity %d", minDesiredCapacity)) @@ -179,6 +186,11 @@ func (a *Allocator) ResetMaybeReallocate( } else { // If old batch is already of the largest capacity, we will reuse it. useOldBatch := oldBatch.Capacity() == coldata.BatchSize() + // If the old batch already satisfies the desired capacity which is + // sufficient, we will reuse it too. + if desiredCapacitySufficient && oldBatch.Capacity() >= minDesiredCapacity { + useOldBatch = true + } // Avoid calculating the memory footprint if possible. var oldBatchMemSize int64 if !useOldBatch { @@ -568,10 +580,14 @@ func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { // Allocator.ResetMaybeReallocate (and thus has the same contract) with an // additional logic for memory tracking purposes. func (h *SetAccountingHelper) ResetMaybeReallocate( - typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64, + typs []*types.T, + oldBatch coldata.Batch, + minCapacity int, + maxBatchMemSize int64, + desiredCapacitySufficient bool, ) (newBatch coldata.Batch, reallocated bool) { newBatch, reallocated = h.Allocator.ResetMaybeReallocate( - typs, oldBatch, minCapacity, maxBatchMemSize, + typs, oldBatch, minCapacity, maxBatchMemSize, desiredCapacitySufficient, ) if reallocated && !h.allFixedLength { // Allocator.ResetMaybeReallocate has released the precise memory diff --git a/pkg/sql/colmem/allocator_test.go b/pkg/sql/colmem/allocator_test.go index ed9cb524ac9f..ed819efd2c6c 100644 --- a/pkg/sql/colmem/allocator_test.go +++ b/pkg/sql/colmem/allocator_test.go @@ -119,13 +119,13 @@ func TestResetMaybeReallocate(t *testing.T) { typs := []*types.T{types.Bytes} // Allocate a new batch and modify it. - b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64) + b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */) b.SetSelection(true) b.Selection()[0] = 1 b.ColVec(0).Bytes().Set(1, []byte("foo")) oldBatch := b - b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64) + b, _ = testAllocator.ResetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */) // We should have used the same batch, and now it should be in a "reset" // state. require.Equal(t, oldBatch, b) @@ -152,15 +152,21 @@ func TestResetMaybeReallocate(t *testing.T) { // Allocate a new batch attempting to use the batch with too small of a // capacity - new batch should **not** be allocated because the memory // limit is already exceeded. - b, _ = testAllocator.ResetMaybeReallocate(typs, smallBatch, minDesiredCapacity, smallMemSize) + b, _ = testAllocator.ResetMaybeReallocate(typs, smallBatch, minDesiredCapacity, smallMemSize, false /* desiredCapacitySufficient */) require.Equal(t, smallBatch, b) require.Equal(t, minDesiredCapacity/2, b.Capacity()) oldBatch := b + // Reset the batch asking for the same small desired capacity when it is + // sufficient - the same batch should be returned. + b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity/2, smallMemSize, true /* desiredCapacitySufficient */) + require.Equal(t, smallBatch, b) + require.Equal(t, minDesiredCapacity/2, b.Capacity()) + // Reset the batch and confirm that a new batch is allocated because we // have given larger memory limit. - b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize) + b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize, false /* desiredCapacitySufficient */) require.NotEqual(t, oldBatch, b) require.Equal(t, minDesiredCapacity, b.Capacity()) @@ -171,7 +177,7 @@ func TestResetMaybeReallocate(t *testing.T) { // ResetMaybeReallocate truncates the capacity at // coldata.BatchSize(), so we run this part of the test only when // doubled capacity will not be truncated. - b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize) + b, _ = testAllocator.ResetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize, false /* desiredCapacitySufficient */) require.NotEqual(t, oldBatch, b) require.Equal(t, 2*minDesiredCapacity, b.Capacity()) } @@ -300,7 +306,7 @@ func TestSetAccountingHelper(t *testing.T) { // new batch with larger capacity might be allocated. maxBatchMemSize = largeMemSize } - batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, maxBatchMemSize) + batch, _ = helper.ResetMaybeReallocate(typs, batch, numRows, maxBatchMemSize, false /* desiredCapacitySufficient */) for rowIdx := 0; rowIdx < batch.Capacity(); rowIdx++ { for vecIdx, typ := range typs {