Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.Type;

import java.util.Iterator;
import java.util.List;

import static io.trino.operator.SyntheticAddress.decodePosition;
import static io.trino.operator.SyntheticAddress.decodeSliceIndex;
import static java.util.Objects.requireNonNull;

public class PagesIndexPageSorter
Expand All @@ -37,24 +36,12 @@ public PagesIndexPageSorter(PagesIndex.Factory pagesIndexFactory)
}

@Override
public long[] sort(List<Type> types, List<Page> pages, List<Integer> sortChannels, List<SortOrder> sortOrders, int expectedPositions)
public Iterator<Page> sort(List<Type> types, List<Page> pages, List<Integer> sortChannels, List<SortOrder> sortOrders, int expectedPositions)
{
PagesIndex pagesIndex = pagesIndexFactory.newPagesIndex(types, expectedPositions);
pages.forEach(pagesIndex::addPage);
pagesIndex.sort(sortChannels, sortOrders);

return pagesIndex.getValueAddresses().toLongArray();
}

@Override
public int decodePageIndex(long address)
{
return decodeSliceIndex(address);
}

@Override
public int decodePositionIndex(long address)
{
return decodePosition(address);
return pagesIndex.getSortedPages();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.openjdk.jmh.runner.RunnerException;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand All @@ -55,8 +56,13 @@ public class BenchmarkPagesIndexPageSorter
public int runBenchmark(BenchmarkData data)
{
PageSorter pageSorter = new PagesIndexPageSorter(new PagesIndex.TestingFactory(false));
long[] addresses = pageSorter.sort(data.types, data.pages, data.sortChannels, nCopies(data.sortChannels.size(), ASC_NULLS_FIRST), 10_000);
return addresses.length;
Iterator<Page> outputPages = pageSorter.sort(data.types, data.pages, data.sortChannels, nCopies(data.sortChannels.size(), ASC_NULLS_FIRST), 10_000);
int totalPositions = 0;
while (outputPages.hasNext()) {
Page page = outputPages.next();
totalPositions += page.getPositionCount();
}
return totalPositions;
}

private static List<Page> createPages(int pageCount, int channelCount, Type type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import io.trino.operator.PagesIndex;
import io.trino.operator.PagesIndexPageSorter;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.Type;
import io.trino.testing.MaterializedResult;
Expand Down Expand Up @@ -150,29 +148,10 @@ public void testPageSorterForceExpansion()

private static void assertSorted(List<Page> inputPages, List<Page> expectedPages, List<Type> types, List<Integer> sortChannels, List<SortOrder> sortOrders, int expectedPositions)
{
long[] sortedAddresses = sorter.sort(types, inputPages, sortChannels, sortOrders, expectedPositions);
List<Page> outputPages = createOutputPages(types, inputPages, sortedAddresses);
List<Page> outputPages = ImmutableList.copyOf(sorter.sort(types, inputPages, sortChannels, sortOrders, expectedPositions));

MaterializedResult expected = toMaterializedResult(TEST_SESSION, types, expectedPages);
MaterializedResult actual = toMaterializedResult(TEST_SESSION, types, outputPages);
assertThat(actual.getMaterializedRows()).isEqualTo(expected.getMaterializedRows());
}

private static List<Page> createOutputPages(List<Type> types, List<Page> inputPages, long[] sortedAddresses)
{
PageBuilder pageBuilder = new PageBuilder(types);
pageBuilder.reset();
for (long address : sortedAddresses) {
int index = sorter.decodePageIndex(address);
int position = sorter.decodePositionIndex(address);

Page page = inputPages.get(index);
for (int i = 0; i < types.size(); i++) {
Block block = page.getBlock(i);
pageBuilder.getBlockBuilder(i).append(block.getUnderlyingValueBlock(), block.getUnderlyingValuePosition(position));
}
pageBuilder.declarePosition();
}
return ImmutableList.of(pageBuilder.build());
}
}
13 changes: 13 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,19 @@
<old>method void io.trino.spi.connector.ConnectorMetadata::executeTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle)</old>
<new>method java.util.Map&lt;java.lang.String, java.lang.Long&gt; io.trino.spi.connector.ConnectorMetadata::executeTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle)</new>
</item>
<item>
<code>java.method.removed</code>
<old>method int io.trino.spi.PageSorter::decodePageIndex(long)</old>
</item>
<item>
<code>java.method.removed</code>
<old>method int io.trino.spi.PageSorter::decodePositionIndex(long)</old>
</item>
<item>
<code>java.method.returnTypeChanged</code>
<old>method long[] io.trino.spi.PageSorter::sort(java.util.List&lt;io.trino.spi.type.Type&gt;, java.util.List&lt;io.trino.spi.Page&gt;, java.util.List&lt;java.lang.Integer&gt;, java.util.List&lt;io.trino.spi.connector.SortOrder&gt;, int)</old>
<new>method java.util.Iterator&lt;io.trino.spi.Page&gt; io.trino.spi.PageSorter::sort(java.util.List&lt;io.trino.spi.type.Type&gt;, java.util.List&lt;io.trino.spi.Page&gt;, java.util.List&lt;java.lang.Integer&gt;, java.util.List&lt;io.trino.spi.connector.SortOrder&gt;, int)</new>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
10 changes: 3 additions & 7 deletions core/trino-spi/src/main/java/io/trino/spi/PageSorter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.Type;

import java.util.Iterator;
import java.util.List;

public interface PageSorter
{
/**
* @return Sorted synthetic addresses for pages. A synthetic address is encoded as a long with
* the high 32 bits containing the page index and the low 32 bits containing position index
* @return Iterator of sorted pages.
*/
long[] sort(List<Type> types, List<Page> pages, List<Integer> sortChannels, List<SortOrder> sortOrders, int expectedPositions);

int decodePageIndex(long address);

int decodePositionIndex(long address);
Iterator<Page> sort(List<Type> types, List<Page> pages, List<Integer> sortChannels, List<SortOrder> sortOrders, int expectedPositions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import io.trino.spi.type.Type;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.SizeOf.instanceSize;
import static java.lang.Math.addExact;
import static java.util.Objects.requireNonNull;
Expand All @@ -44,7 +44,6 @@ public class SortBuffer
private final List<SortOrder> sortOrders;
private final PageSorter pageSorter;
private final List<Page> pages = new ArrayList<>();
private final PageBuilder pageBuilder;

private long usedMemoryBytes;
private int rowCount;
Expand All @@ -62,7 +61,6 @@ public SortBuffer(
this.sortFields = ImmutableList.copyOf(requireNonNull(sortFields, "sortFields is null"));
this.sortOrders = ImmutableList.copyOf(requireNonNull(sortOrders, "sortOrders is null"));
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.pageBuilder = new PageBuilder(types);
}

public long getRetainedBytes()
Expand Down Expand Up @@ -103,32 +101,8 @@ public void flushTo(Consumer<Page> consumer)
{
checkState(!pages.isEmpty(), "page buffer is empty");

long[] addresses = pageSorter.sort(types, pages, sortFields, sortOrders, rowCount);

int[] pageIndex = new int[addresses.length];
int[] positionIndex = new int[addresses.length];
for (int i = 0; i < addresses.length; i++) {
pageIndex[i] = pageSorter.decodePageIndex(addresses[i]);
positionIndex[i] = pageSorter.decodePositionIndex(addresses[i]);
}

verify(pageBuilder.isEmpty());

for (int i = 0; i < pageIndex.length; i++) {
Page page = pages.get(pageIndex[i]);
int position = positionIndex[i];
appendPositionTo(page, position, pageBuilder);

if (pageBuilder.isFull()) {
consumer.accept(pageBuilder.build());
pageBuilder.reset();
}
}

if (!pageBuilder.isEmpty()) {
consumer.accept(pageBuilder.build());
pageBuilder.reset();
}
Iterator<Page> sortedPages = pageSorter.sort(types, pages, sortFields, sortOrders, rowCount);
sortedPages.forEachRemaining(consumer);

pages.clear();
rowCount = 0;
Expand Down