Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

public class IndexBuildDriverFactoryProvider
{
private static final String INDEX_BUILDER = "IndexBuilder";
private final int pipelineId;
private final int outputOperatorId;
private final PlanNodeId planNodeId;
Expand Down Expand Up @@ -82,7 +83,7 @@ public DriverFactory createSnapshot(int pipelineId, IndexSnapshotBuilder indexSn
false,
ImmutableList.<OperatorFactory>builder()
.addAll(coreOperatorFactories)
.add(new PagesIndexBuilderOperatorFactory(outputOperatorId, planNodeId, indexSnapshotBuilder))
.add(new PagesIndexBuilderOperatorFactory(outputOperatorId, planNodeId, indexSnapshotBuilder, INDEX_BUILDER))
.build(),
OptionalInt.empty(),
UNGROUPED_EXECUTION,
Expand All @@ -99,7 +100,7 @@ public DriverFactory createStreaming(PageBuffer pageBuffer, Page indexKeyTuple)
operatorFactories.add(dynamicTupleFilterFactory.get().filterWithTuple(indexKeyTuple));
}

operatorFactories.add(new PageBufferOperatorFactory(outputOperatorId, planNodeId, pageBuffer));
operatorFactories.add(new PageBufferOperatorFactory(outputOperatorId, planNodeId, pageBuffer, INDEX_BUILDER));

return new DriverFactory(pipelineId, inputDriver, false, operatorFactories.build(), OptionalInt.empty(), UNGROUPED_EXECUTION, Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ public static class PageBufferOperatorFactory
private final int operatorId;
private final PlanNodeId planNodeId;
private final PageBuffer pageBuffer;
private final String operatorType;

public PageBufferOperatorFactory(int operatorId, PlanNodeId planNodeId, PageBuffer pageBuffer)
public PageBufferOperatorFactory(int operatorId, PlanNodeId planNodeId, PageBuffer pageBuffer, String operatorType)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.pageBuffer = requireNonNull(pageBuffer, "pageBuffer is null");
this.operatorType = requireNonNull(operatorType, "operatorType is null");
}

@Override
public Operator createOperator(DriverContext driverContext)
{
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, PageBufferOperator.class.getSimpleName());
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, operatorType);
return new PageBufferOperator(operatorContext, pageBuffer);
}

Expand All @@ -56,7 +58,7 @@ public void noMoreOperators()
@Override
public OperatorFactory duplicate()
{
return new PageBufferOperatorFactory(operatorId, planNodeId, pageBuffer);
return new PageBufferOperatorFactory(operatorId, planNodeId, pageBuffer, operatorType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,23 @@ public static class PagesIndexBuilderOperatorFactory
private final int operatorId;
private final PlanNodeId planNodeId;
private final IndexSnapshotBuilder indexSnapshotBuilder;
private final String operatorType;
private boolean closed;

public PagesIndexBuilderOperatorFactory(int operatorId, PlanNodeId planNodeId, IndexSnapshotBuilder indexSnapshotBuilder)
public PagesIndexBuilderOperatorFactory(int operatorId, PlanNodeId planNodeId, IndexSnapshotBuilder indexSnapshotBuilder, String operatorType)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.indexSnapshotBuilder = requireNonNull(indexSnapshotBuilder, "indexSnapshotBuilder is null");
this.operatorType = requireNonNull(operatorType, "operatorType is null");
}

@Override
public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");

OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, PagesIndexBuilderOperator.class.getSimpleName());
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, operatorType);
return new PagesIndexBuilderOperator(operatorContext, indexSnapshotBuilder);
}

Expand All @@ -62,7 +64,7 @@ public void noMoreOperators()
@Override
public OperatorFactory duplicate()
{
return new PagesIndexBuilderOperatorFactory(operatorId, planNodeId, indexSnapshotBuilder);
return new PagesIndexBuilderOperatorFactory(operatorId, planNodeId, indexSnapshotBuilder, operatorType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class TestHashJoinOperator
private static final LookupJoinOperators LOOKUP_JOIN_OPERATORS = new LookupJoinOperators();
private static final SingleStreamSpillerFactory SINGLE_STREAM_SPILLER_FACTORY = new DummySpillerFactory();
private static final PartitioningSpillerFactory PARTITIONING_SPILLER_FACTORY = new GenericPartitioningSpillerFactory(SINGLE_STREAM_SPILLER_FACTORY);
private static final String PAGE_BUFFER = "PageBuffer";

private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
Expand Down Expand Up @@ -444,7 +445,7 @@ private void innerJoinWithSpill(boolean probeHashEnabled, List<WhenSpill> whenSp
ValuesOperatorFactory valuesOperatorFactory = new ValuesOperatorFactory(17, new PlanNodeId("values"), probePages.build());

PageBuffer pageBuffer = new PageBuffer(10);
PageBufferOperatorFactory pageBufferOperatorFactory = new PageBufferOperatorFactory(18, new PlanNodeId("pageBuffer"), pageBuffer);
PageBufferOperatorFactory pageBufferOperatorFactory = new PageBufferOperatorFactory(18, new PlanNodeId(PAGE_BUFFER), pageBuffer, PAGE_BUFFER);

Driver joinDriver = Driver.createDriver(
joinDriverContext,
Expand Down Expand Up @@ -561,7 +562,7 @@ public void testInnerJoinWithSpillWithEarlyTermination()
ValuesOperatorFactory valuesOperatorFactory2 = new ValuesOperatorFactory(18, new PlanNodeId("values2"), probe2Pages.build());
ValuesOperatorFactory valuesOperatorFactory3 = new ValuesOperatorFactory(18, new PlanNodeId("values3"), ImmutableList.of());
PageBuffer pageBuffer = new PageBuffer(10);
PageBufferOperatorFactory pageBufferOperatorFactory = new PageBufferOperatorFactory(19, new PlanNodeId("pageBuffer"), pageBuffer);
PageBufferOperatorFactory pageBufferOperatorFactory = new PageBufferOperatorFactory(19, new PlanNodeId(PAGE_BUFFER), pageBuffer, PAGE_BUFFER);

Driver joinDriver1 = Driver.createDriver(
joinDriverContext1,
Expand Down