Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ public LocalExecutionPlan plan(
outputTypes,
pagePreprocessor,
createExchangePagesSerdeFactory(plannerContext.getBlockEncodingSerde(), session)),
ImmutableMap.of(),
physicalOperation),
context);

Expand Down Expand Up @@ -2560,8 +2561,7 @@ public PhysicalOperation visitJoin(JoinNode node, LocalExecutionPlanContext cont
List<Symbol> rightSymbols = Lists.transform(clauses, JoinNode.EquiJoinClause::getRight);

return switch (node.getType()) {
case INNER, LEFT, RIGHT, FULL ->
createLookupJoin(node, node.getLeft(), leftSymbols, node.getRight(), rightSymbols, localDynamicFilters, context);
case INNER, LEFT, RIGHT, FULL -> createLookupJoin(node, node.getLeft(), leftSymbols, node.getRight(), rightSymbols, localDynamicFilters, context);
};
}

Expand Down Expand Up @@ -2737,7 +2737,7 @@ private PhysicalOperation createNestedLoopJoin(JoinNode node, Set<DynamicFilterI

context.addDriverFactory(
false,
new PhysicalOperation(nestedLoopBuildOperatorFactory, buildSource),
new PhysicalOperation(nestedLoopBuildOperatorFactory, ImmutableMap.of(), buildSource),
buildContext);

// build output mapping
Expand Down Expand Up @@ -2874,7 +2874,7 @@ private PagesSpatialIndexFactory createPagesSpatialIndexFactory(

context.addDriverFactory(
false,
new PhysicalOperation(builderOperatorFactory, buildSource),
new PhysicalOperation(builderOperatorFactory, ImmutableMap.of(), buildSource),
buildContext);

return builderOperatorFactory.getPagesSpatialIndexFactory();
Expand Down Expand Up @@ -3010,7 +3010,7 @@ private PhysicalOperation createLookupJoin(

context.addDriverFactory(
false,
new PhysicalOperation(hashBuilderOperatorFactory, buildSource),
new PhysicalOperation(hashBuilderOperatorFactory, ImmutableMap.of(), buildSource),
buildContext);

JoinOperatorType joinType = JoinOperatorType.ofJoinNodeType(node.getType(), outputSingleMatch, waitForBuild);
Expand Down Expand Up @@ -3059,7 +3059,7 @@ private PhysicalOperation createLookupJoin(

context.addDriverFactory(
false,
new PhysicalOperation(hashBuilderOperatorFactory, buildSource),
new PhysicalOperation(hashBuilderOperatorFactory, ImmutableMap.of(), buildSource),
buildContext);

JoinOperatorType joinType = JoinOperatorType.ofJoinNodeType(node.getType(), outputSingleMatch, waitForBuild);
Expand Down Expand Up @@ -3292,7 +3292,7 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont
SetSupplier setProvider = setBuilderOperatorFactory.getSetProvider();
context.addDriverFactory(
false,
new PhysicalOperation(setBuilderOperatorFactory, buildSource),
new PhysicalOperation(setBuilderOperatorFactory, ImmutableMap.of(), buildSource),
buildContext);

// Source channels are always laid out first, followed by the boolean output symbol
Expand Down Expand Up @@ -3726,6 +3726,7 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan
subContext.getNextOperatorId(),
node.getId(),
pagePreprocessor),
ImmutableMap.of(),
source),
subContext);
// the main driver is not an input... the exchange sources are the input for the plan
Expand Down Expand Up @@ -3806,6 +3807,7 @@ else if (context.getDriverInstanceCount().isPresent()) {
subContext.getNextOperatorId(),
node.getId(),
pagePreprocessor),
ImmutableMap.of(),
source),
subContext);
}
Expand Down Expand Up @@ -4279,36 +4281,26 @@ private static class PhysicalOperation
private final Map<Symbol, Integer> layout;
private final List<Type> types;

public PhysicalOperation(OperatorFactory operatorFactory, Map<Symbol, Integer> layout)
PhysicalOperation(OperatorFactory operatorFactory, Map<Symbol, Integer> layout)
{
this(operatorFactory, layout, Optional.empty());
this(ImmutableList.of(operatorFactory), layout);
}

public PhysicalOperation(OperatorFactory operatorFactory, Map<Symbol, Integer> layout, PhysicalOperation source)
PhysicalOperation(OperatorFactory operatorFactory, Map<Symbol, Integer> layout, PhysicalOperation source)
{
this(operatorFactory, layout, Optional.of(requireNonNull(source, "source is null")));
}

public PhysicalOperation(OperatorFactory outputOperatorFactory, PhysicalOperation source)
{
this(outputOperatorFactory, ImmutableMap.of(), Optional.of(requireNonNull(source, "source is null")));
this(
ImmutableList.<OperatorFactory>builder()
.addAll(source.getOperatorFactories())
.add(operatorFactory)
.build(),
layout);
}

private PhysicalOperation(
OperatorFactory operatorFactory,
Map<Symbol, Integer> layout,
Optional<PhysicalOperation> source)
PhysicalOperation(List<OperatorFactory> operatorFactories, Map<Symbol, Integer> layout)
{
requireNonNull(operatorFactory, "operatorFactory is null");
requireNonNull(layout, "layout is null");
requireNonNull(source, "source is null");

this.operatorFactories = ImmutableList.copyOf(requireNonNull(operatorFactories, "operatorFactories is null"));
this.layout = ImmutableMap.copyOf(requireNonNull(layout, "layout is null"));
Comment thread
findepi marked this conversation as resolved.
this.types = toTypes(layout);
this.operatorFactories = ImmutableList.<OperatorFactory>builder()
.addAll(source.map(PhysicalOperation::getOperatorFactories).orElse(ImmutableList.of()))
.add(operatorFactory)
.build();
this.layout = ImmutableMap.copyOf(layout);
}

private static List<Type> toTypes(Map<Symbol, Integer> layout)
Expand Down