Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private enum State
private final LocalMemoryContext localUserMemoryContext;

private final PagesIndex pageIndex;
private final PagesIndexOrdering pagesIndexOrdering;

private final List<Type> sourceTypes;

Expand Down Expand Up @@ -186,6 +187,7 @@ public OrderByOperator(
this.revocableMemoryContext = operatorContext.localRevocableMemoryContext();

this.pageIndex = pagesIndexFactory.newPagesIndex(sourceTypes, expectedPositions);
this.pagesIndexOrdering = pageIndex.createPagesIndexComparator(this.sortChannels, this.sortOrder);
this.spillEnabled = spillEnabled;
this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null");
this.orderingCompiler = requireNonNull(orderingCompiler, "orderingCompiler is null");
Expand Down Expand Up @@ -228,7 +230,7 @@ public void finish()
}
}

pageIndex.sort(sortChannels, sortOrder);
pageIndex.sort(pagesIndexOrdering);
Iterator<Page> sortedPagesIndex = pageIndex.getSortedPages();

List<WorkProcessor<Page>> spilledPages = getSpilledPages();
Expand Down Expand Up @@ -315,7 +317,7 @@ private ListenableFuture<Void> spillToDisk()
operatorContext.newAggregateUserMemoryContext()));
}

pageIndex.sort(sortChannels, sortOrder);
pageIndex.sort(pagesIndexOrdering);
spillInProgress = asVoid(spiller.get().spill(pageIndex.getSortedPages()));
finishMemoryRevoke = Optional.of(() -> {
pageIndex.clear();
Expand Down
14 changes: 10 additions & 4 deletions core/trino-main/src/main/java/io/trino/operator/PagesIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,19 @@ public int getRawBlockPosition(int position)

public void sort(List<Integer> sortChannels, List<SortOrder> sortOrders)
{
sort(sortChannels, sortOrders, 0, getPositionCount());
sort(createPagesIndexComparator(sortChannels, sortOrders), 0, getPositionCount());
}

public void sort(List<Integer> sortChannels, List<SortOrder> sortOrders, int startPosition, int endPosition)
public void sort(PagesIndexOrdering pagesIndexOrdering)
{
sort(pagesIndexOrdering, 0, getPositionCount());
}

public void sort(PagesIndexOrdering pagesIndexOrdering, int startPosition, int endPosition)
{
requireNonNull(pagesIndexOrdering, "pagesIndexOrdering is null");
modificationCount++;
createPagesIndexComparator(sortChannels, sortOrders).sort(this, startPosition, endPosition);
pagesIndexOrdering.sort(this, startPosition, endPosition);
}

public boolean positionIdenticalToPosition(PagesHashStrategy partitionHashStrategy, int leftPosition, int rightPosition)
Expand All @@ -445,7 +451,7 @@ public boolean positionIdenticalToRow(PagesHashStrategy pagesHashStrategy, int i
return pagesHashStrategy.positionIdenticalToRow(pageIndex, pagePosition, rightPosition, rightPage);
}

private PagesIndexOrdering createPagesIndexComparator(List<Integer> sortChannels, List<SortOrder> sortOrders)
public PagesIndexOrdering createPagesIndexComparator(List<Integer> sortChannels, List<SortOrder> sortOrders)
{
List<Type> sortTypes = sortChannels.stream()
.map(types::get)
Expand Down
41 changes: 25 additions & 16 deletions core/trino-main/src/main/java/io/trino/operator/WindowOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.spiller.SpillerFactory;
import io.trino.sql.gen.OrderingCompiler;
import io.trino.sql.planner.plan.PlanNodeId;
import jakarta.annotation.Nullable;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -525,8 +526,8 @@ private class PagesToPagesIndexes
implements Transformation<Page, PagesIndexWithHashStrategies>
{
final PagesIndexWithHashStrategies pagesIndexWithHashStrategies;
final List<Integer> orderChannels;
final List<SortOrder> ordering;
@Nullable
final PagesIndexOrdering pagesIndexOrdering; // null when ordering channels is empty
final LocalMemoryContext memoryContext;

boolean resetPagesIndex;
Expand All @@ -538,8 +539,12 @@ private class PagesToPagesIndexes
List<SortOrder> ordering)
{
this.pagesIndexWithHashStrategies = pagesIndexWithHashStrategies;
this.orderChannels = orderChannels;
this.ordering = ordering;
if (orderChannels.isEmpty()) {
pagesIndexOrdering = null;
}
else {
pagesIndexOrdering = pagesIndexWithHashStrategies.pagesIndex.createPagesIndexComparator(orderChannels, ordering);
}
this.memoryContext = operatorContext.aggregateUserMemoryContext().newLocalMemoryContext(PagesToPagesIndexes.class.getSimpleName());
}

Expand All @@ -565,7 +570,7 @@ public TransformationState<PagesIndexWithHashStrategies> process(Page pendingInp

// If we have unused input or are finishing, then we have buffered a full group
if (finishing || pendingInputPosition < pendingInput.getPositionCount()) {
sortPagesIndexIfNecessary(pagesIndexWithHashStrategies, orderChannels, ordering);
sortPagesIndexIfNecessary(pagesIndexWithHashStrategies.pagesIndex, pagesIndexWithHashStrategies.preSortedPartitionHashStrategy, pagesIndexOrdering);
resetPagesIndex = true;
return TransformationState.ofResult(pagesIndexWithHashStrategies, false);
}
Expand Down Expand Up @@ -662,9 +667,9 @@ private class SpillablePagesToPagesIndexes
{
final PagesIndexWithHashStrategies inMemoryPagesIndexWithHashStrategies;
final PagesIndexWithHashStrategies mergedPagesIndexWithHashStrategies;
@Nullable
final PagesIndexOrdering inMemoryPagesIndexOrdering; // null when ordering channels is empty
final List<Type> sourceTypes;
final List<Integer> orderChannels;
final List<SortOrder> ordering;
final LocalMemoryContext localRevocableMemoryContext;
final LocalMemoryContext localUserMemoryContext;
final SpillerFactory spillerFactory;
Expand All @@ -691,8 +696,12 @@ private class SpillablePagesToPagesIndexes
this.inMemoryPagesIndexWithHashStrategies = inMemoryPagesIndexWithHashStrategies;
this.mergedPagesIndexWithHashStrategies = mergedPagesIndexWithHashStrategies;
this.sourceTypes = sourceTypes;
this.orderChannels = orderChannels;
this.ordering = ordering;
if (orderChannels.isEmpty()) {
this.inMemoryPagesIndexOrdering = null;
}
else {
this.inMemoryPagesIndexOrdering = inMemoryPagesIndexWithHashStrategies.pagesIndex.createPagesIndexComparator(orderChannels, ordering);
}
this.localUserMemoryContext = operatorContext.aggregateUserMemoryContext().newLocalMemoryContext(SpillablePagesToPagesIndexes.class.getSimpleName());
this.localRevocableMemoryContext = operatorContext.aggregateRevocableMemoryContext().newLocalMemoryContext(SpillablePagesToPagesIndexes.class.getSimpleName());
this.spillerFactory = spillerFactory;
Expand Down Expand Up @@ -770,7 +779,7 @@ TransformationState<WorkProcessor<PagesIndexWithHashStrategies>> fullGroupBuffer
}
}

sortPagesIndexIfNecessary(inMemoryPagesIndexWithHashStrategies, orderChannels, ordering);
sortPagesIndexIfNecessary(inMemoryPagesIndexWithHashStrategies.pagesIndex, inMemoryPagesIndexWithHashStrategies.preSortedPartitionHashStrategy, inMemoryPagesIndexOrdering);
resetPagesIndex = true;
return TransformationState.ofResult(unspill(), false);
}
Expand All @@ -796,7 +805,7 @@ ListenableFuture<Void> spill()
}

verify(inMemoryPagesIndexWithHashStrategies.pagesIndex.getPositionCount() > 0);
sortPagesIndexIfNecessary(inMemoryPagesIndexWithHashStrategies, orderChannels, ordering);
sortPagesIndexIfNecessary(inMemoryPagesIndexWithHashStrategies.pagesIndex, inMemoryPagesIndexWithHashStrategies.preSortedPartitionHashStrategy, inMemoryPagesIndexOrdering);
PeekingIterator<Page> sortedPages = peekingIterator(inMemoryPagesIndexWithHashStrategies.pagesIndex.getSortedPages());
Page anyPage = sortedPages.peek();
verify(anyPage.getPositionCount() != 0, "PagesIndex.getSortedPages returned an empty page");
Expand Down Expand Up @@ -896,13 +905,13 @@ private int updatePagesIndex(PagesIndexWithHashStrategies pagesIndexWithHashStra
return startPosition;
}

private void sortPagesIndexIfNecessary(PagesIndexWithHashStrategies pagesIndexWithHashStrategies, List<Integer> orderChannels, List<SortOrder> ordering)
private static void sortPagesIndexIfNecessary(PagesIndex pagesIndex, PagesHashStrategy preSortedPartitionHashStrategy, @Nullable PagesIndexOrdering pagesIndexOrdering)
{
if (pagesIndexWithHashStrategies.pagesIndex.getPositionCount() > 1 && !orderChannels.isEmpty()) {
if (pagesIndex.getPositionCount() > 1 && pagesIndexOrdering != null) {
int startPosition = 0;
while (startPosition < pagesIndexWithHashStrategies.pagesIndex.getPositionCount()) {
int endPosition = findGroupEnd(pagesIndexWithHashStrategies.pagesIndex, pagesIndexWithHashStrategies.preSortedPartitionHashStrategy, startPosition);
pagesIndexWithHashStrategies.pagesIndex.sort(orderChannels, ordering, startPosition, endPosition);
while (startPosition < pagesIndex.getPositionCount()) {
int endPosition = findGroupEnd(pagesIndex, preSortedPartitionHashStrategy, startPosition);
pagesIndex.sort(pagesIndexOrdering, startPosition, endPosition);
startPosition = endPosition;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.primitives.Ints;
import io.trino.operator.PagesIndex;
import io.trino.operator.PagesIndex.Factory;
import io.trino.operator.PagesIndexOrdering;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
Expand Down Expand Up @@ -109,8 +110,7 @@ private static class OrderedAccumulator
{
private final Accumulator accumulator;
private final int[] argumentChannels;
private final List<Integer> orderByChannels;
private final List<SortOrder> orderings;
private final PagesIndexOrdering pagesIndexOrdering;
@Nullable
private PagesIndex pagesIndex; // null after evaluateFinal() is called

Expand All @@ -124,9 +124,10 @@ private OrderedAccumulator(
{
this.accumulator = requireNonNull(accumulator, "accumulator is null");
this.argumentChannels = Ints.toArray(argumentChannels);
this.orderByChannels = ImmutableList.copyOf(requireNonNull(orderByChannels, "orderByChannels is null"));
this.orderings = ImmutableList.copyOf(requireNonNull(orderings, "orderings is null"));
this.pagesIndex = pagesIndexFactory.newPagesIndex(aggregationSourceTypes, 10_000);
requireNonNull(orderByChannels, "orderByChannels is null");
requireNonNull(orderings, "orderings is null");
this.pagesIndexOrdering = pagesIndex.createPagesIndexComparator(orderByChannels, orderings);
}

@Override
Expand Down Expand Up @@ -168,7 +169,7 @@ public void evaluateIntermediate(BlockBuilder blockBuilder)
public void evaluateFinal(BlockBuilder blockBuilder)
{
checkState(pagesIndex != null, "evaluateFinal() already called");
pagesIndex.sort(orderByChannels, orderings);
pagesIndex.sort(pagesIndexOrdering);
Iterator<Page> pagesIterator = pagesIndex.getSortedPages();
AggregationMask mask = AggregationMask.createSelectAll(0);
pagesIterator.forEachRemaining(arguments -> {
Expand All @@ -186,8 +187,7 @@ private static class OrderingGroupedAccumulator
{
private final GroupedAccumulator accumulator;
private final int[] argumentChannels;
private final List<Integer> orderByChannels;
private final List<SortOrder> orderings;
private final PagesIndexOrdering pagesIndexOrdering;
@Nullable
private PagesIndex pagesIndex; // null after prepareFinal() is called
private long groupCount;
Expand All @@ -203,12 +203,13 @@ private OrderingGroupedAccumulator(
this.accumulator = requireNonNull(accumulator, "accumulator is null");
this.argumentChannels = Ints.toArray(argumentChannels);
requireNonNull(aggregationSourceTypes, "aggregationSourceTypes is null");
this.orderByChannels = ImmutableList.copyOf(requireNonNull(orderByChannels, "orderByChannels is null"));
this.orderings = ImmutableList.copyOf(requireNonNull(orderings, "orderings is null"));
List<Type> pageIndexTypes = new ArrayList<>(aggregationSourceTypes);
// Add group id column
pageIndexTypes.add(INTEGER);
this.pagesIndex = pagesIndexFactory.newPagesIndex(pageIndexTypes, 10_000);
requireNonNull(orderByChannels, "orderByChannels is null");
requireNonNull(orderings, "orderings is null");
this.pagesIndexOrdering = pagesIndex.createPagesIndexComparator(orderByChannels, orderings);
this.groupCount = 0;
}

Expand Down Expand Up @@ -266,7 +267,7 @@ public void evaluateFinal(int groupId, BlockBuilder output)
public void prepareFinal()
{
checkState(pagesIndex != null, "prepareFinal() already called");
pagesIndex.sort(orderByChannels, orderings);
pagesIndex.sort(pagesIndexOrdering);
Iterator<Page> pagesIterator = pagesIndex.getSortedPages();
AggregationMask mask = AggregationMask.createSelectAll(0);
pagesIterator.forEachRemaining(page -> {
Expand Down
Loading