diff --git a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java index fd4e793d6a7b3..db9d811b2d57b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java @@ -211,7 +211,8 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges( Optional.of(partitioningScheme), Optional.empty(), Optional.empty(), - Optional.empty()), + Optional.empty(), + Optional.of(Boolean.TRUE)), Optional.of(insertReference), outputVar, Optional.empty(), @@ -348,7 +349,8 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges( Optional.of(partitioningScheme), Optional.empty(), enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(), - Optional.empty())), + Optional.empty(), + Optional.of(Boolean.TRUE))), variableAllocator.newVariable("intermediaterows", BIGINT), variableAllocator.newVariable("intermediatefragments", VARBINARY), variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY), @@ -369,7 +371,8 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges( Optional.of(partitioningScheme), Optional.empty(), enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(), - Optional.empty()); + Optional.empty(), + Optional.of(Boolean.TRUE)); } return new TableFinishNode( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/CanonicalPlanGenerator.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/CanonicalPlanGenerator.java index 84578460246b4..f262f992b86ed 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/CanonicalPlanGenerator.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/CanonicalPlanGenerator.java @@ -196,6 +196,7 @@ public Optional visitTableWriter(TableWriterNode node, Context context Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); context.addPlan(node, new CanonicalPlan(result, strategy)); return Optional.of(result); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java index f03278306fbdb..2c08570a1d7cb 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java @@ -429,7 +429,8 @@ private RelationPlan createTableWriterPlan( // partial aggregation is run within the TableWriteOperator to calculate the statistics for // the data consumed by the TableWriteOperator Optional.of(aggregations.getPartialAggregation()), - Optional.empty()), + Optional.empty(), + Optional.of(Boolean.FALSE)), Optional.of(target), variableAllocator.newVariable("rows", BIGINT), // final aggregation is run within the TableFinishOperator to summarize collected statistics @@ -457,7 +458,8 @@ private RelationPlan createTableWriterPlan( tablePartitioningScheme, preferredShufflePartitioningScheme, Optional.empty(), - Optional.empty()), + Optional.empty(), + Optional.of(Boolean.FALSE)), Optional.of(target), variableAllocator.newVariable("rows", BIGINT), Optional.empty(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java index 7ab4d09a16767..5c277dea932c4 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java @@ -51,6 +51,8 @@ public class PlanFragment private final List remoteSourceNodes; private final PartitioningScheme partitioningScheme; private final StageExecutionDescriptor stageExecutionDescriptor; + + // Only true for output table writer and false for temporary table writers private final boolean outputTableWriterFragment; private final StatsAndCosts statsAndCosts; private final Optional jsonRepresentation; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index ff2a643dc0d84..689809871ea3a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -37,7 +37,7 @@ import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.sql.planner.PlanFragmenterUtils.ROOT_FRAGMENT_ID; import static com.facebook.presto.sql.planner.PlanFragmenterUtils.finalizeSubPlan; -import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getTableWriterNodeIds; +import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getOutputTableWriterNodeIds; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static java.util.Objects.requireNonNull; @@ -81,7 +81,7 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNod sqlParser, idAllocator, variableAllocator, - getTableWriterNodeIds(plan.getRoot())); + getOutputTableWriterNodeIds(plan.getRoot())); FragmentProperties properties = new FragmentProperties(new PartitioningScheme( Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index 1296b2febf928..2c14807b7d003 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -245,6 +245,16 @@ public static Set getTableWriterNodeIds(PlanNode plan) .collect(toImmutableSet()); } + public static Set getOutputTableWriterNodeIds(PlanNode plan) + { + return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan)) + .filter(node -> node instanceof TableWriterNode) + .map(node -> (TableWriterNode) node) + .filter(tableWriterNode -> !tableWriterNode.getIsTemporaryTableWriter().orElse(false)) + .map(TableWriterNode::getId) + .collect(toImmutableSet()); + } + public static Optional getTableWriterTasks(PlanNode plan) { return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan)) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RowExpressionRewriteRuleSet.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RowExpressionRewriteRuleSet.java index 00ff288819cd1..de3f9ec7ae669 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RowExpressionRewriteRuleSet.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RowExpressionRewriteRuleSet.java @@ -578,7 +578,8 @@ public Result apply(TableWriterNode node, Captures captures, Context context) node.getTablePartitioningScheme(), node.getPreferredShufflePartitioningScheme(), rewrittenStatisticsAggregation, - node.getTaskCountIfScaledWriter())); + node.getTaskCountIfScaledWriter(), + node.getIsTemporaryTableWriter())); } return Result.empty(); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ScaledWriterRule.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ScaledWriterRule.java index cec767bae7672..f2d65ba86a37b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ScaledWriterRule.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ScaledWriterRule.java @@ -80,6 +80,7 @@ public Result apply(TableWriterNode node, Captures captures, Context context) node.getTablePartitioningScheme(), node.getPreferredShufflePartitioningScheme(), node.getStatisticsAggregation(), - Optional.of(initialTaskNumber))); + Optional.of(initialTaskNumber), + node.getIsTemporaryTableWriter())); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 019659096cdf4..5c36b366d7c30 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -578,7 +578,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo originalTableWriterNode.getTablePartitioningScheme(), originalTableWriterNode.getPreferredShufflePartitioningScheme(), statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), - originalTableWriterNode.getTaskCountIfScaledWriter()), + originalTableWriterNode.getTaskCountIfScaledWriter(), + originalTableWriterNode.getIsTemporaryTableWriter()), fixedParallelism(), fixedParallelism()); } @@ -603,7 +604,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo originalTableWriterNode.getTablePartitioningScheme(), originalTableWriterNode.getPreferredShufflePartitioningScheme(), statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), - originalTableWriterNode.getTaskCountIfScaledWriter()), + originalTableWriterNode.getTaskCountIfScaledWriter(), + originalTableWriterNode.getIsTemporaryTableWriter()), exchange.getProperties()); } } @@ -632,7 +634,8 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo originalTableWriterNode.getTablePartitioningScheme(), originalTableWriterNode.getPreferredShufflePartitioningScheme(), statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), - originalTableWriterNode.getTaskCountIfScaledWriter()), + originalTableWriterNode.getTaskCountIfScaledWriter(), + originalTableWriterNode.getIsTemporaryTableWriter()), exchange.getProperties()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java index 65df155ae2ddc..ebe4219c83807 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java @@ -763,7 +763,8 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext canonicalize(partitioningScheme, source)), node.getPreferredShufflePartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)), node.getStatisticsAggregation().map(this::map), - node.getTaskCountIfScaledWriter()); + node.getTaskCountIfScaledWriter(), + node.getIsTemporaryTableWriter()); } public StatisticsWriterNode map(StatisticsWriterNode node, PlanNode source) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java index e27e77037c050..4f171da9fd8aa 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/TableWriterNode.java @@ -57,6 +57,7 @@ public class TableWriterNode private final Optional statisticsAggregation; private final List outputs; private final Optional taskCountIfScaledWriter; + private final Optional isTemporaryTableWriter; @JsonCreator public TableWriterNode( @@ -73,9 +74,10 @@ public TableWriterNode( @JsonProperty("partitioningScheme") Optional tablePartitioningScheme, @JsonProperty("preferredShufflePartitioningScheme") Optional preferredShufflePartitioningScheme, @JsonProperty("statisticsAggregation") Optional statisticsAggregation, - @JsonProperty("taskCountIfScaledWriter") Optional taskCountIfScaledWriter) + @JsonProperty("taskCountIfScaledWriter") Optional taskCountIfScaledWriter, + @JsonProperty("isTemporaryTableWriter") Optional isTemporaryTableWriter) { - this(sourceLocation, id, Optional.empty(), source, target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, notNullColumnVariables, tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter); + this(sourceLocation, id, Optional.empty(), source, target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, notNullColumnVariables, tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter, isTemporaryTableWriter); } public TableWriterNode( @@ -93,7 +95,8 @@ public TableWriterNode( Optional tablePartitioningScheme, Optional preferredShufflePartitioningScheme, Optional statisticsAggregation, - Optional taskCountIfScaledWriter) + Optional taskCountIfScaledWriter, + Optional isTemporaryTableWriter) { super(sourceLocation, id, statsEquivalentPlanNode); @@ -126,6 +129,7 @@ public TableWriterNode( }); this.outputs = outputs.build(); this.taskCountIfScaledWriter = requireNonNull(taskCountIfScaledWriter, "taskCountIfScaledWriter is null"); + this.isTemporaryTableWriter = requireNonNull(isTemporaryTableWriter, "isTemporaryTableWriter is null"); } @JsonProperty @@ -212,6 +216,12 @@ public Optional getTaskCountIfScaledWriter() return taskCountIfScaledWriter; } + @JsonProperty + public Optional getIsTemporaryTableWriter() + { + return isTemporaryTableWriter; + } + @Override public R accept(InternalPlanVisitor visitor, C context) { @@ -236,7 +246,7 @@ public PlanNode replaceChildren(List newChildren) tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, - taskCountIfScaledWriter); + taskCountIfScaledWriter, isTemporaryTableWriter); } @Override @@ -257,7 +267,7 @@ public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalent tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, - taskCountIfScaledWriter); + taskCountIfScaledWriter, isTemporaryTableWriter); } // only used during planning -- will not be serialized diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java index c44e5a2b2e2c8..2ac3c363ecaac 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java @@ -878,6 +878,7 @@ public TableWriterNode tableWriter(List columns, Li Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/IterativePlanFragmenter.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/IterativePlanFragmenter.java index ec25dc99176ef..fb4b03e7a9b3f 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/IterativePlanFragmenter.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/IterativePlanFragmenter.java @@ -66,7 +66,7 @@ import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.sql.planner.PlanFragmenterUtils.ROOT_FRAGMENT_ID; import static com.facebook.presto.sql.planner.PlanFragmenterUtils.finalizeSubPlan; -import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getTableWriterNodeIds; +import static com.facebook.presto.sql.planner.PlanFragmenterUtils.getOutputTableWriterNodeIds; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED; @@ -152,7 +152,7 @@ public PlanAndFragments createReadySubPlans(PlanNode plan) sqlParser, idAllocator, variableAllocator, - getTableWriterNodeIds(plan)); + getOutputTableWriterNodeIds(plan)); FragmentProperties properties = new FragmentProperties(new PartitioningScheme( Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), plan.getOutputVariables()));