diff --git a/core/trino-main/src/main/java/io/trino/operator/exchange/PartitioningExchanger.java b/core/trino-main/src/main/java/io/trino/operator/exchange/PartitioningExchanger.java index b02a1976a574..98a3f4b51fad 100644 --- a/core/trino-main/src/main/java/io/trino/operator/exchange/PartitioningExchanger.java +++ b/core/trino-main/src/main/java/io/trino/operator/exchange/PartitioningExchanger.java @@ -70,20 +70,27 @@ private synchronized void partitionPage(Page page, Page partitionPage) // build a page for each partition for (int partition = 0; partition < partitionAssignments.length; partition++) { - IntArrayList positions = partitionAssignments[partition]; - int partitionSize = positions.size(); + IntArrayList positionsList = partitionAssignments[partition]; + int partitionSize = positionsList.size(); if (partitionSize == 0) { continue; } + // clear the assigned positions list size for the next iteration to start empty. This + // only resets the size() to 0 which controls the index where subsequent calls to add() + // will store new values, but does not modify the positions array + int[] positions = positionsList.elements(); + positionsList.clear(); + Page pageSplit; if (partitionSize == page.getPositionCount()) { - pageSplit = page; // entire page will be sent to this partition, no copies necessary + // entire page will be sent to this partition, just compact the page to avoid over-retaining + // memory and match the behavior of the sub-partitioned case + page.compact(); + pageSplit = page; } else { - pageSplit = page.copyPositions(positions.elements(), 0, partitionSize); + pageSplit = page.copyPositions(positions, 0, partitionSize); } - // clear the assigned positions list for the next iteration to start empty - positions.clear(); memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes()); buffers.get(partition).accept(new PageReference(pageSplit, 1, onPageReleased)); }