diff --git a/core/trino-main/src/main/java/io/trino/operator/PagesIndexPageSorter.java b/core/trino-main/src/main/java/io/trino/operator/PagesIndexPageSorter.java index 03760a796e49..d4f7e42e300b 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PagesIndexPageSorter.java +++ b/core/trino-main/src/main/java/io/trino/operator/PagesIndexPageSorter.java @@ -19,10 +19,9 @@ import io.trino.spi.connector.SortOrder; import io.trino.spi.type.Type; +import java.util.Iterator; import java.util.List; -import static io.trino.operator.SyntheticAddress.decodePosition; -import static io.trino.operator.SyntheticAddress.decodeSliceIndex; import static java.util.Objects.requireNonNull; public class PagesIndexPageSorter @@ -37,24 +36,12 @@ public PagesIndexPageSorter(PagesIndex.Factory pagesIndexFactory) } @Override - public long[] sort(List types, List pages, List sortChannels, List sortOrders, int expectedPositions) + public Iterator sort(List types, List pages, List sortChannels, List sortOrders, int expectedPositions) { PagesIndex pagesIndex = pagesIndexFactory.newPagesIndex(types, expectedPositions); pages.forEach(pagesIndex::addPage); pagesIndex.sort(sortChannels, sortOrders); - return pagesIndex.getValueAddresses().toLongArray(); - } - - @Override - public int decodePageIndex(long address) - { - return decodeSliceIndex(address); - } - - @Override - public int decodePositionIndex(long address) - { - return decodePosition(address); + return pagesIndex.getSortedPages(); } } diff --git a/core/trino-main/src/test/java/io/trino/BenchmarkPagesIndexPageSorter.java b/core/trino-main/src/test/java/io/trino/BenchmarkPagesIndexPageSorter.java index c4021a505e93..c1d6e056327b 100644 --- a/core/trino-main/src/test/java/io/trino/BenchmarkPagesIndexPageSorter.java +++ b/core/trino-main/src/test/java/io/trino/BenchmarkPagesIndexPageSorter.java @@ -33,6 +33,7 @@ import org.openjdk.jmh.runner.RunnerException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; @@ -55,8 +56,13 @@ public class BenchmarkPagesIndexPageSorter public int runBenchmark(BenchmarkData data) { PageSorter pageSorter = new PagesIndexPageSorter(new PagesIndex.TestingFactory(false)); - long[] addresses = pageSorter.sort(data.types, data.pages, data.sortChannels, nCopies(data.sortChannels.size(), ASC_NULLS_FIRST), 10_000); - return addresses.length; + Iterator outputPages = pageSorter.sort(data.types, data.pages, data.sortChannels, nCopies(data.sortChannels.size(), ASC_NULLS_FIRST), 10_000); + int totalPositions = 0; + while (outputPages.hasNext()) { + Page page = outputPages.next(); + totalPositions += page.getPositionCount(); + } + return totalPositions; } private static List createPages(int pageCount, int channelCount, Type type) diff --git a/core/trino-main/src/test/java/io/trino/TestPagesIndexPageSorter.java b/core/trino-main/src/test/java/io/trino/TestPagesIndexPageSorter.java index f715e40f74e9..8b0530428379 100644 --- a/core/trino-main/src/test/java/io/trino/TestPagesIndexPageSorter.java +++ b/core/trino-main/src/test/java/io/trino/TestPagesIndexPageSorter.java @@ -18,8 +18,6 @@ import io.trino.operator.PagesIndex; import io.trino.operator.PagesIndexPageSorter; import io.trino.spi.Page; -import io.trino.spi.PageBuilder; -import io.trino.spi.block.Block; import io.trino.spi.connector.SortOrder; import io.trino.spi.type.Type; import io.trino.testing.MaterializedResult; @@ -150,29 +148,10 @@ public void testPageSorterForceExpansion() private static void assertSorted(List inputPages, List expectedPages, List types, List sortChannels, List sortOrders, int expectedPositions) { - long[] sortedAddresses = sorter.sort(types, inputPages, sortChannels, sortOrders, expectedPositions); - List outputPages = createOutputPages(types, inputPages, sortedAddresses); + List outputPages = ImmutableList.copyOf(sorter.sort(types, inputPages, sortChannels, sortOrders, expectedPositions)); MaterializedResult expected = toMaterializedResult(TEST_SESSION, types, expectedPages); MaterializedResult actual = toMaterializedResult(TEST_SESSION, types, outputPages); assertThat(actual.getMaterializedRows()).isEqualTo(expected.getMaterializedRows()); } - - private static List createOutputPages(List types, List inputPages, long[] sortedAddresses) - { - PageBuilder pageBuilder = new PageBuilder(types); - pageBuilder.reset(); - for (long address : sortedAddresses) { - int index = sorter.decodePageIndex(address); - int position = sorter.decodePositionIndex(address); - - Page page = inputPages.get(index); - for (int i = 0; i < types.size(); i++) { - Block block = page.getBlock(i); - pageBuilder.getBlockBuilder(i).append(block.getUnderlyingValueBlock(), block.getUnderlyingValuePosition(position)); - } - pageBuilder.declarePosition(); - } - return ImmutableList.of(pageBuilder.build()); - } } diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index 51d081a8d650..56a634fc7bf6 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -306,6 +306,19 @@ method void io.trino.spi.connector.ConnectorMetadata::executeTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle) method java.util.Map<java.lang.String, java.lang.Long> io.trino.spi.connector.ConnectorMetadata::executeTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle) + + java.method.removed + method int io.trino.spi.PageSorter::decodePageIndex(long) + + + java.method.removed + method int io.trino.spi.PageSorter::decodePositionIndex(long) + + + java.method.returnTypeChanged + method long[] io.trino.spi.PageSorter::sort(java.util.List<io.trino.spi.type.Type>, java.util.List<io.trino.spi.Page>, java.util.List<java.lang.Integer>, java.util.List<io.trino.spi.connector.SortOrder>, int) + method java.util.Iterator<io.trino.spi.Page> io.trino.spi.PageSorter::sort(java.util.List<io.trino.spi.type.Type>, java.util.List<io.trino.spi.Page>, java.util.List<java.lang.Integer>, java.util.List<io.trino.spi.connector.SortOrder>, int) + diff --git a/core/trino-spi/src/main/java/io/trino/spi/PageSorter.java b/core/trino-spi/src/main/java/io/trino/spi/PageSorter.java index 06f23c21ab62..f87951743e0c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/PageSorter.java +++ b/core/trino-spi/src/main/java/io/trino/spi/PageSorter.java @@ -16,17 +16,13 @@ import io.trino.spi.connector.SortOrder; import io.trino.spi.type.Type; +import java.util.Iterator; import java.util.List; public interface PageSorter { /** - * @return Sorted synthetic addresses for pages. A synthetic address is encoded as a long with - * the high 32 bits containing the page index and the low 32 bits containing position index + * @return Iterator of sorted pages. */ - long[] sort(List types, List pages, List sortChannels, List sortOrders, int expectedPositions); - - int decodePageIndex(long address); - - int decodePositionIndex(long address); + Iterator sort(List types, List pages, List sortChannels, List sortOrders, int expectedPositions); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/SortBuffer.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/SortBuffer.java index 06d36e88ae3e..8f4a34e3c9ed 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/SortBuffer.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/SortBuffer.java @@ -24,12 +24,12 @@ import io.trino.spi.type.Type; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verify; import static io.airlift.slice.SizeOf.instanceSize; import static java.lang.Math.addExact; import static java.util.Objects.requireNonNull; @@ -44,7 +44,6 @@ public class SortBuffer private final List sortOrders; private final PageSorter pageSorter; private final List pages = new ArrayList<>(); - private final PageBuilder pageBuilder; private long usedMemoryBytes; private int rowCount; @@ -62,7 +61,6 @@ public SortBuffer( this.sortFields = ImmutableList.copyOf(requireNonNull(sortFields, "sortFields is null")); this.sortOrders = ImmutableList.copyOf(requireNonNull(sortOrders, "sortOrders is null")); this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); - this.pageBuilder = new PageBuilder(types); } public long getRetainedBytes() @@ -103,32 +101,8 @@ public void flushTo(Consumer consumer) { checkState(!pages.isEmpty(), "page buffer is empty"); - long[] addresses = pageSorter.sort(types, pages, sortFields, sortOrders, rowCount); - - int[] pageIndex = new int[addresses.length]; - int[] positionIndex = new int[addresses.length]; - for (int i = 0; i < addresses.length; i++) { - pageIndex[i] = pageSorter.decodePageIndex(addresses[i]); - positionIndex[i] = pageSorter.decodePositionIndex(addresses[i]); - } - - verify(pageBuilder.isEmpty()); - - for (int i = 0; i < pageIndex.length; i++) { - Page page = pages.get(pageIndex[i]); - int position = positionIndex[i]; - appendPositionTo(page, position, pageBuilder); - - if (pageBuilder.isFull()) { - consumer.accept(pageBuilder.build()); - pageBuilder.reset(); - } - } - - if (!pageBuilder.isEmpty()) { - consumer.accept(pageBuilder.build()); - pageBuilder.reset(); - } + Iterator sortedPages = pageSorter.sort(types, pages, sortFields, sortOrders, rowCount); + sortedPages.forEachRemaining(consumer); pages.clear(); rowCount = 0;