Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
Optional.of(partitioningScheme),
Optional.empty(),
Optional.empty(),
Optional.empty()),
Optional.empty(),
Optional.of(Boolean.TRUE)),
Optional.of(insertReference),
outputVar,
Optional.empty(),
Expand Down Expand Up @@ -348,7 +349,8 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
Optional.of(partitioningScheme),
Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty())),
Optional.empty(),
Optional.of(Boolean.TRUE))),
variableAllocator.newVariable("intermediaterows", BIGINT),
variableAllocator.newVariable("intermediatefragments", VARBINARY),
variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY),
Expand All @@ -369,7 +371,8 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
Optional.of(partitioningScheme),
Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty());
Optional.empty(),
Optional.of(Boolean.TRUE));
}

return new TableFinishNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public Optional<PlanNode> visitTableWriter(TableWriterNode node, Context context
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
context.addPlan(node, new CanonicalPlan(result, strategy));
return Optional.of(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ private RelationPlan createTableWriterPlan(
// partial aggregation is run within the TableWriteOperator to calculate the statistics for
// the data consumed by the TableWriteOperator
Optional.of(aggregations.getPartialAggregation()),
Optional.empty()),
Optional.empty(),
Optional.of(Boolean.FALSE)),
Optional.of(target),
variableAllocator.newVariable("rows", BIGINT),
// final aggregation is run within the TableFinishOperator to summarize collected statistics
Expand Down Expand Up @@ -457,7 +458,8 @@ private RelationPlan createTableWriterPlan(
tablePartitioningScheme,
preferredShufflePartitioningScheme,
Optional.empty(),
Optional.empty()),
Optional.empty(),
Optional.of(Boolean.FALSE)),
Optional.of(target),
variableAllocator.newVariable("rows", BIGINT),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class PlanFragment
private final List<RemoteSourceNode> remoteSourceNodes;
private final PartitioningScheme partitioningScheme;
private final StageExecutionDescriptor stageExecutionDescriptor;

// Only true for output table writer and false for temporary table writers
private final boolean outputTableWriterFragment;
private final StatsAndCosts statsAndCosts;
private final Optional<String> jsonRepresentation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.ROOT_FRAGMENT_ID;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.finalizeSubPlan;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getTableWriterNodeIds;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getOutputTableWriterNodeIds;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -81,7 +81,7 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNod
sqlParser,
idAllocator,
variableAllocator,
getTableWriterNodeIds(plan.getRoot()));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main fix

getOutputTableWriterNodeIds(plan.getRoot()));

FragmentProperties properties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ public static Set<PlanNodeId> getTableWriterNodeIds(PlanNode plan)
.collect(toImmutableSet());
}

public static Set<PlanNodeId> getOutputTableWriterNodeIds(PlanNode plan)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone of the above function with a filter on the temporaryTableWriters

{
return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan))
.filter(node -> node instanceof TableWriterNode)
.map(node -> (TableWriterNode) node)
.filter(tableWriterNode -> !tableWriterNode.getIsTemporaryTableWriter().orElse(false))
.map(TableWriterNode::getId)
.collect(toImmutableSet());
}

public static Optional<Integer> getTableWriterTasks(PlanNode plan)
{
return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
rewrittenStatisticsAggregation,
node.getTaskCountIfScaledWriter()));
node.getTaskCountIfScaledWriter(),
node.getIsTemporaryTableWriter()));
}
return Result.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
node.getStatisticsAggregation(),
Optional.of(initialTaskNumber)));
Optional.of(initialTaskNumber),
node.getIsTemporaryTableWriter()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
originalTableWriterNode.getTablePartitioningScheme(),
originalTableWriterNode.getPreferredShufflePartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation),
originalTableWriterNode.getTaskCountIfScaledWriter()),
originalTableWriterNode.getTaskCountIfScaledWriter(),
originalTableWriterNode.getIsTemporaryTableWriter()),
fixedParallelism(),
fixedParallelism());
}
Expand All @@ -603,7 +604,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
originalTableWriterNode.getTablePartitioningScheme(),
originalTableWriterNode.getPreferredShufflePartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation),
originalTableWriterNode.getTaskCountIfScaledWriter()),
originalTableWriterNode.getTaskCountIfScaledWriter(),
originalTableWriterNode.getIsTemporaryTableWriter()),
exchange.getProperties());
}
}
Expand Down Expand Up @@ -632,7 +634,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
originalTableWriterNode.getTablePartitioningScheme(),
originalTableWriterNode.getPreferredShufflePartitioningScheme(),
statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation),
originalTableWriterNode.getTaskCountIfScaledWriter()),
originalTableWriterNode.getTaskCountIfScaledWriter(),
originalTableWriterNode.getIsTemporaryTableWriter()),
exchange.getProperties());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,8 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<Set<Variab
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
node.getStatisticsAggregation(),
node.getTaskCountIfScaledWriter());
node.getTaskCountIfScaledWriter(),
node.getIsTemporaryTableWriter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ public TableWriterNode map(TableWriterNode node, PlanNode source, PlanNodeId new
node.getTablePartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)),
node.getPreferredShufflePartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)),
node.getStatisticsAggregation().map(this::map),
node.getTaskCountIfScaledWriter());
node.getTaskCountIfScaledWriter(),
node.getIsTemporaryTableWriter());
}

public StatisticsWriterNode map(StatisticsWriterNode node, PlanNode source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class TableWriterNode
private final Optional<StatisticAggregations> statisticsAggregation;
private final List<VariableReferenceExpression> outputs;
private final Optional<Integer> taskCountIfScaledWriter;
private final Optional<Boolean> isTemporaryTableWriter;

@JsonCreator
public TableWriterNode(
Expand All @@ -73,9 +74,10 @@ public TableWriterNode(
@JsonProperty("partitioningScheme") Optional<PartitioningScheme> tablePartitioningScheme,
@JsonProperty("preferredShufflePartitioningScheme") Optional<PartitioningScheme> preferredShufflePartitioningScheme,
@JsonProperty("statisticsAggregation") Optional<StatisticAggregations> statisticsAggregation,
@JsonProperty("taskCountIfScaledWriter") Optional<Integer> taskCountIfScaledWriter)
@JsonProperty("taskCountIfScaledWriter") Optional<Integer> taskCountIfScaledWriter,
@JsonProperty("isTemporaryTableWriter") Optional<Boolean> isTemporaryTableWriter)
{
this(sourceLocation, id, Optional.empty(), source, target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, notNullColumnVariables, tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter);
this(sourceLocation, id, Optional.empty(), source, target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, notNullColumnVariables, tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter, isTemporaryTableWriter);
}

public TableWriterNode(
Expand All @@ -93,7 +95,8 @@ public TableWriterNode(
Optional<PartitioningScheme> tablePartitioningScheme,
Optional<PartitioningScheme> preferredShufflePartitioningScheme,
Optional<StatisticAggregations> statisticsAggregation,
Optional<Integer> taskCountIfScaledWriter)
Optional<Integer> taskCountIfScaledWriter,
Optional<Boolean> isTemporaryTableWriter)
{
super(sourceLocation, id, statsEquivalentPlanNode);

Expand Down Expand Up @@ -126,6 +129,7 @@ public TableWriterNode(
});
this.outputs = outputs.build();
this.taskCountIfScaledWriter = requireNonNull(taskCountIfScaledWriter, "taskCountIfScaledWriter is null");
this.isTemporaryTableWriter = requireNonNull(isTemporaryTableWriter, "isTemporaryTableWriter is null");
}

@JsonProperty
Expand Down Expand Up @@ -212,6 +216,12 @@ public Optional<Integer> getTaskCountIfScaledWriter()
return taskCountIfScaledWriter;
}

@JsonProperty
public Optional<Boolean> getIsTemporaryTableWriter()
{
return isTemporaryTableWriter;
}

@Override
public <R, C> R accept(InternalPlanVisitor<R, C> visitor, C context)
{
Expand All @@ -236,7 +246,7 @@ public PlanNode replaceChildren(List<PlanNode> newChildren)
tablePartitioningScheme,
preferredShufflePartitioningScheme,
statisticsAggregation,
taskCountIfScaledWriter);
taskCountIfScaledWriter, isTemporaryTableWriter);
}

@Override
Expand All @@ -257,7 +267,7 @@ public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalent
tablePartitioningScheme,
preferredShufflePartitioningScheme,
statisticsAggregation,
taskCountIfScaledWriter);
taskCountIfScaledWriter, isTemporaryTableWriter);
}

// only used during planning -- will not be serialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ public TableWriterNode tableWriter(List<VariableReferenceExpression> columns, Li
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.ROOT_FRAGMENT_ID;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.finalizeSubPlan;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getTableWriterNodeIds;
import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getOutputTableWriterNodeIds;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED;
Expand Down Expand Up @@ -152,7 +152,7 @@ public PlanAndFragments createReadySubPlans(PlanNode plan)
sqlParser,
idAllocator,
variableAllocator,
getTableWriterNodeIds(plan));
getOutputTableWriterNodeIds(plan));
FragmentProperties properties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
plan.getOutputVariables()));
Expand Down