Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 8 additions & 48 deletions core/trino-main/src/main/java/io/trino/operator/TopNOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@

import com.google.common.collect.ImmutableList;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory;
import io.trino.operator.WorkProcessor.TransformationState;
import io.trino.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator;
import io.trino.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory;
import io.trino.spi.Page;
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.operator.WorkProcessorOperatorAdapter.createAdapterOperatorFactory;
import static io.trino.operator.BasicWorkProcessorOperatorAdapter.createAdapterOperatorFactory;
import static java.util.Objects.requireNonNull;

/**
* Returns the top N rows from the source sorted according to the specified ordering in the keyChannelIndex channel.
*/
public class TopNOperator
implements AdapterWorkProcessorOperator
implements WorkProcessorOperator
{
public static OperatorFactory createOperatorFactory(
int operatorId,
Expand All @@ -50,7 +48,7 @@ public static OperatorFactory createOperatorFactory(
}

private static class Factory
implements AdapterWorkProcessorOperatorFactory
implements BasicAdapterWorkProcessorOperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
Expand Down Expand Up @@ -87,21 +85,7 @@ public WorkProcessorOperator create(
checkState(!closed, "Factory is already closed");
return new TopNOperator(
processorContext.getMemoryTrackingContext(),
Optional.of(sourcePages),
sourceTypes,
n,
sortChannels,
sortOrders,
typeOperators);
}

@Override
public AdapterWorkProcessorOperator createAdapterOperator(ProcessorContext processorContext)
{
checkState(!closed, "Factory is already closed");
return new TopNOperator(
processorContext.getMemoryTrackingContext(),
Optional.empty(),
sourcePages,
sourceTypes,
n,
sortChannels,
Expand Down Expand Up @@ -142,11 +126,10 @@ public Factory duplicate()

private final TopNProcessor topNProcessor;
private final WorkProcessor<Page> pages;
private final PageBuffer pageBuffer = new PageBuffer();

private TopNOperator(
MemoryTrackingContext memoryTrackingContext,
Optional<WorkProcessor<Page>> sourcePages,
WorkProcessor<Page> sourcePages,
List<Type> types,
int n,
List<Integer> sortChannels,
Expand All @@ -165,7 +148,7 @@ private TopNOperator(
pages = WorkProcessor.of();
}
else {
pages = sourcePages.orElse(pageBuffer.pages()).transform(new TopNPages());
pages = sourcePages.transform(new TopNPages());
}
}

Expand All @@ -175,37 +158,14 @@ public WorkProcessor<Page> getOutputPages()
return pages;
}

@Override
public boolean needsInput()
{
return pageBuffer.isEmpty() && !pageBuffer.isFinished();
}

@Override
public void addInput(Page page)
{
addPage(page);
}

@Override
public void finish()
{
pageBuffer.finish();
}

private void addPage(Page page)
{
topNProcessor.addInput(page);
}

private class TopNPages
implements WorkProcessor.Transformation<Page, Page>
{
@Override
public TransformationState<Page> process(Page inputPage)
{
if (inputPage != null) {
addPage(inputPage);
topNProcessor.addInput(inputPage);
return TransformationState.needsMoreData();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public void testExceedMemoryLimit()
ImmutableList.of(0),
ImmutableList.of(ASC_NULLS_LAST));
Operator operator = operatorFactory.createOperator(smallDiverContext);
assertThatThrownBy(() -> operator.addInput(input.get(0)))
operator.addInput(input.get(0));
assertThatThrownBy(() -> operator.getOutput())
.isInstanceOf(ExceededMemoryLimitException.class)
.hasMessageStartingWith("Query exceeded per-node memory limit of ");
}
Expand Down