diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java b/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java index ad38d00d03b3e..f59ceabb7b4a8 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StageExecutionInfo.java @@ -16,7 +16,6 @@ import com.facebook.airlift.stats.Distribution.DistributionSnapshot; import com.facebook.presto.common.RuntimeStats; import com.facebook.presto.operator.BlockedReason; -import com.facebook.presto.operator.NativeExecutionInfo; import com.facebook.presto.operator.OperatorStats; import com.facebook.presto.operator.PipelineStats; import com.facebook.presto.operator.TaskStats; @@ -131,21 +130,7 @@ public static StageExecutionInfo create( } TaskStats taskStats = taskInfo.getStats(); - - boolean isNativeTask = false; - for (PipelineStats pipeline : taskStats.getPipelines()) { - for (OperatorStats operatorStats : pipeline.getOperatorSummaries()) { - if (operatorStats.getInfo() instanceof NativeExecutionInfo) { - isNativeTask = true; - allTaskStats.addAll(((NativeExecutionInfo) operatorStats.getInfo()).getTaskStats()); - } - } - } - - // Prefer statistics from the native process. - if (!isNativeTask) { - allTaskStats.add(taskStats); - } + allTaskStats.add(taskStats); if (state == FINISHED && taskInfo.getTaskStatus().getState() == TaskState.FAILED) { retriedCpuTime += taskStats.getTotalCpuTimeInNanos(); diff --git a/presto-main/src/main/java/com/facebook/presto/operator/NativeExecutionInfo.java b/presto-main/src/main/java/com/facebook/presto/operator/NativeExecutionInfo.java deleted file mode 100644 index e30c23fa5f969..0000000000000 --- a/presto-main/src/main/java/com/facebook/presto/operator/NativeExecutionInfo.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.operator; - -import com.facebook.drift.annotations.ThriftConstructor; -import com.facebook.drift.annotations.ThriftField; -import com.facebook.drift.annotations.ThriftStruct; -import com.facebook.presto.util.Mergeable; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; - -import java.util.List; - -import static java.util.Objects.requireNonNull; - -@ThriftStruct -public class NativeExecutionInfo - implements Mergeable, OperatorInfo -{ - /// Runtime statistics received from native process. - private final List taskStats; - - @JsonCreator - @ThriftConstructor - public NativeExecutionInfo(@JsonProperty("taskStats") List taskStats) - { - this.taskStats = requireNonNull(taskStats); - } - - @JsonProperty - @ThriftField(1) - public List getTaskStats() - { - return taskStats; - } - - @Override - public NativeExecutionInfo mergeWith(NativeExecutionInfo other) - { - return new NativeExecutionInfo(new ImmutableList.Builder() - .addAll(taskStats) - .addAll(other.taskStats) - .build()); - } - - @Override - public boolean isFinal() - { - return true; - } -} diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorInfo.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorInfo.java index e516cef235700..d377ea874876a 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/OperatorInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorInfo.java @@ -26,7 +26,6 @@ @JsonSubTypes({ @JsonSubTypes.Type(value = ExchangeClientStatus.class, name = "exchangeClientStatus"), @JsonSubTypes.Type(value = LocalExchangeBufferInfo.class, name = "localExchangeBuffer"), - @JsonSubTypes.Type(value = NativeExecutionInfo.class, name = "NativeExecutionInfo"), @JsonSubTypes.Type(value = TableFinishInfo.class, name = "tableFinish"), @JsonSubTypes.Type(value = SplitOperatorInfo.class, name = "splitOperator"), @JsonSubTypes.Type(value = HashCollisionsInfo.class, name = "hashCollisionsInfo"), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index 921cba5d43f3b..28fb50d56d0c1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -52,7 +52,6 @@ import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; import com.facebook.presto.sql.planner.plan.MetadataDeleteNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; @@ -78,7 +77,6 @@ import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; -import static com.facebook.presto.SystemSessionProperties.isNativeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; @@ -179,35 +177,6 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan tableWriterNodeIds); } - // Only delegate non-coordinatorOnly plan fragment to native engine - if (isNativeExecutionEnabled(session) && !properties.getPartitioningHandle().isCoordinatorOnly()) { - if (root instanceof OutputNode) { - // OutputNode is special in that it can have duplicate output variables since it - // does not get converted to a PhysicalOperation for execution. - // Regular plan nodes, including NativeExecutionNode, must have unique output variables. - // Check if OutputNode has duplicate output variables and remove these. - // This is safe because OutputNode variables are not used by the workers. - OutputNode outputNode = (OutputNode) root; - List outputVariables = outputNode.getOutputVariables(); - List newOutputVariables = new ArrayList<>(); - List newColumnNames = new ArrayList<>(); - Set uniqueOutputVariables = new HashSet<>(); - for (int i = 0; i < outputVariables.size(); ++i) { - VariableReferenceExpression variable = outputVariables.get(i); - if (uniqueOutputVariables.add(variable)) { - newOutputVariables.add(variable); - newColumnNames.add(outputNode.getColumnNames().get(i)); - } - } - - if (uniqueOutputVariables.size() < outputVariables.size()) { - root = new OutputNode(outputNode.getSourceLocation(), outputNode.getId(), outputNode.getSource(), newColumnNames, newOutputVariables); - } - } - root = new NativeExecutionNode(root); - schedulingOrder = scheduleOrder(root); - } - PlanFragment fragment = new PlanFragment( fragmentId, root, diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/InputExtractor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/InputExtractor.java index c63fe06f920ea..5ee8b49d4fe97 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/InputExtractor.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/InputExtractor.java @@ -29,7 +29,6 @@ import com.facebook.presto.sql.planner.plan.IndexSourceNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; import com.facebook.presto.sql.planner.plan.JoinNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.SemiJoinNode; import com.facebook.presto.sql.planner.plan.SpatialJoinNode; import com.google.common.collect.ImmutableList; @@ -155,13 +154,6 @@ public Void visitIndexSource(IndexSourceNode node, Context context) return null; } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Context context) - { - node.getSubPlan().accept(this, context); - return null; - } - @Override public Void visitPlan(PlanNode node, Context context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java index e397b5cc487f3..6a6981110112d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java @@ -18,7 +18,6 @@ import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.google.common.base.VerifyException; @@ -62,13 +61,6 @@ public Void visitTableWriter(TableWriterNode node, Void context) return null; } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Void context) - { - node.getSubPlan().accept(this, context); - return null; - } - @Override public Void visitPlan(PlanNode node, Void context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java index 5afb4e548a973..7ab4d09a16767 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java @@ -20,7 +20,6 @@ import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.relation.VariableReferenceExpression; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.fasterxml.jackson.annotation.JsonCreator; @@ -214,11 +213,6 @@ private static void findRemoteSourceNodes(PlanNode node, Builder schedulingOr schedulingOrder.accept(node.getId()); return null; } - - @Override - public Void visitNativeExecution(NativeExecutionNode node, Consumer schedulingOrder) - { - schedulingOrder.accept(node.getId()); - return null; - } } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java index 10838c01518b9..056caee1e720c 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java @@ -48,7 +48,6 @@ import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.MergeJoinNode; import com.facebook.presto.sql.planner.plan.MetadataDeleteNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; import com.facebook.presto.sql.planner.plan.SampleNode; @@ -398,12 +397,6 @@ public Map visitExchange(ExchangeNode node, Context con return processSources(node.getSources(), context); } - @Override - public Map visitNativeExecution(NativeExecutionNode node, Context context) - { - return processSources(ImmutableList.of(node.getSubPlan()), context); - } - private Map processSources(List sources, Context context) { ImmutableMap.Builder result = ImmutableMap.builder(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/Plans.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/Plans.java index 26e05f5aad4a9..b3a75707178cf 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/Plans.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/Plans.java @@ -15,7 +15,6 @@ import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import java.util.List; import java.util.stream.Collectors; @@ -55,13 +54,6 @@ public PlanNode visitGroupReference(GroupReference node, Void context) { return lookup.resolve(node).accept(this, context); } - - @Override - public PlanNode visitNativeExecution(NativeExecutionNode node, Void context) - { - node.getSubPlan().accept(this, context); - return node; - } } private Plans() {} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java index f31ec701d1ede..f2828854f6fbc 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/InternalPlanVisitor.java @@ -176,9 +176,4 @@ public R visitStatsEquivalentPlanNodeWithLimit(StatsEquivalentPlanNodeWithLimit { return visitPlan(node, context); } - - public R visitNativeExecution(NativeExecutionNode node, C context) - { - return visitPlan(node, context); - } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/NativeExecutionNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/NativeExecutionNode.java deleted file mode 100644 index 9e10dc12d9429..0000000000000 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/NativeExecutionNode.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.sql.planner.plan; - -import com.facebook.presto.spi.SourceLocation; -import com.facebook.presto.spi.plan.PlanNode; -import com.facebook.presto.spi.plan.PlanNodeId; -import com.facebook.presto.spi.relation.VariableReferenceExpression; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import javax.annotation.concurrent.Immutable; - -import java.util.Collections; -import java.util.List; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; - -/** - * The NativeExecutionNode is a wrapper node encapsulating the actual logical plan nodes in the subPlan field which will be eventually executed on the native engine. - */ -@Immutable -public class NativeExecutionNode - extends InternalPlanNode -{ - private final PlanNode subPlan; - - @JsonCreator - public NativeExecutionNode(Optional sourceLocation, @JsonProperty("id") PlanNodeId id, @JsonProperty("subPlan") PlanNode subPlan) - { - this(sourceLocation, id, Optional.empty(), subPlan); - } - - public NativeExecutionNode(Optional sourceLocation, PlanNodeId id, Optional statsEquivalentPlanNode, PlanNode subPlan) - { - super(sourceLocation, id, statsEquivalentPlanNode); - this.subPlan = requireNonNull(subPlan, "subPlan is null"); - } - - public NativeExecutionNode(PlanNode subPlan) - { - this(subPlan.getSourceLocation(), subPlan.getId(), subPlan.getStatsEquivalentPlanNode(), subPlan); - } - - /* - * Since NativeExecutionNode will hide its subPlan away from outside viewer, the getSources() intended to - * return an empty list to avoid any Vistor visiting the subPlan. - */ - @Override - public List getSources() - { - return Collections.emptyList(); - } - - @Override - public List getOutputVariables() - { - return subPlan.getOutputVariables(); - } - - @JsonProperty - public PlanNode getSubPlan() - { - return subPlan; - } - - @Override - public PlanNode replaceChildren(List newChildren) - { - throw new UnsupportedOperationException("replaceChildren is not supported by NativeExecutionNode"); - } - - @Override - public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalentPlanNode) - { - return new NativeExecutionNode(getSourceLocation(), getId(), statsEquivalentPlanNode, subPlan.assignStatsEquivalentPlanNode(statsEquivalentPlanNode)); - } - - @Override - public R accept(InternalPlanVisitor visitor, C context) - { - return visitor.visitNativeExecution(this, context); - } -} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanNodeStatsSummarizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanNodeStatsSummarizer.java index cd5fb5e67a89c..515e9bdfea129 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanNodeStatsSummarizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanNodeStatsSummarizer.java @@ -16,7 +16,6 @@ import com.facebook.presto.execution.StageInfo; import com.facebook.presto.execution.TaskInfo; import com.facebook.presto.operator.HashCollisionsInfo; -import com.facebook.presto.operator.NativeExecutionInfo; import com.facebook.presto.operator.OperatorStats; import com.facebook.presto.operator.PipelineStats; import com.facebook.presto.operator.TaskStats; @@ -144,12 +143,6 @@ private static List getPlanNodeStats(TaskStats taskStats) windowNodeStats.merge(planNodeId, WindowOperatorStats.create(windowInfo), (left, right) -> left.mergeWith(right)); } - if (operatorStats.getInfo() instanceof NativeExecutionInfo) { - NativeExecutionInfo info = (NativeExecutionInfo) operatorStats.getInfo(); - nativeTaskStats.addAll(info.getTaskStats()); - nativePlanNodeIds.add(planNodeId); - } - planNodeInputPositions.merge(planNodeId, operatorStats.getInputPositions(), Long::sum); planNodeInputBytes.merge(planNodeId, operatorStats.getInputDataSize().toBytes(), Long::sum); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index e0c69c193f9a0..95d8784960fc2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -78,7 +78,6 @@ import com.facebook.presto.sql.planner.plan.LateralJoinNode; import com.facebook.presto.sql.planner.plan.MergeJoinNode; import com.facebook.presto.sql.planner.plan.MetadataDeleteNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; @@ -1223,15 +1222,6 @@ public Void visitLateralJoin(LateralJoinNode node, Void context) return processChildren(node, context); } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Void context) - { - // Do not add 'node' as it shares the ID with the root node of the sub-plan. - node.getSubPlan().accept(this, context); - - return null; - } - @Override public Void visitPlan(PlanNode node, Void context) { diff --git a/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java b/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java index 0acd96251da57..51ac3a467ee37 100644 --- a/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java @@ -49,7 +49,6 @@ import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.LateralJoinNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; @@ -328,13 +327,6 @@ public Void visitMarkDistinct(MarkDistinctNode node, Void context) return node.getSource().accept(this, context); } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Void context) - { - printNode(node, "NativeExecution", NODE_COLORS.get(NodeType.NATIVE_EXECUTION)); - return null; - } - @Override public Void visitWindow(WindowNode node, Void context) { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java index 2457f00d82b92..1abdbcb424ebe 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java @@ -69,7 +69,6 @@ import com.facebook.presto.sql.planner.plan.IndexSourceNode; import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.LateralJoinNode; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.OffsetNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; @@ -934,11 +933,6 @@ public UnnestNode unnest(PlanNode source, List repl ordinalityVariable); } - public NativeExecutionNode nativeExecution(PlanNode subPlan) - { - return new NativeExecutionNode(subPlan); - } - public static Expression expression(String sql) { return ExpressionUtils.rewriteIdentifiersToSymbolReferences(new SqlParser().createExpression(sql)); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/planPrinter/TestPlanPrinter.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/planPrinter/TestPlanPrinter.java index 1256fc941ae1c..6ada71d24df91 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/planPrinter/TestPlanPrinter.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/planPrinter/TestPlanPrinter.java @@ -23,18 +23,13 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.TableHandle; -import com.facebook.presto.spi.plan.Assignments; -import com.facebook.presto.spi.plan.LimitNode; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; -import com.facebook.presto.spi.plan.ProjectNode; import com.facebook.presto.spi.plan.TableScanNode; -import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.planner.Partitioning; import com.facebook.presto.sql.planner.PartitioningScheme; import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.testing.TestingHandle; import com.facebook.presto.testing.TestingMetadata; @@ -191,38 +186,4 @@ public void testDomainTextFormatting() Domain.onlyNull(VARCHAR), "[NULL]"); } - - @Test - public void testPrintNativeExecutionNode() - { - ImmutableMap map = ImmutableMap.of( - COLUMN_VARIABLE, - COLUMN_VARIABLE); - TableScanNode scan = PLAN_BUILDER.tableScan( - TABLE_HANDLE_WITH_LAYOUT, - ImmutableList.of(COLUMN_VARIABLE), - ImmutableMap.of(COLUMN_VARIABLE, COLUMN_HANDLE)); - LimitNode limit = PLAN_BUILDER.limit(10, scan); - ProjectNode project = PLAN_BUILDER.project(new Assignments(map), limit); - NativeExecutionNode nativeExecution = PLAN_BUILDER.nativeExecution(project); - - PlanFragment testFragment = new PlanFragment( - new PlanFragmentId(0), - nativeExecution, - ImmutableSet.of(), - SOURCE_DISTRIBUTION, - ImmutableList.of(nativeExecution.getId()), - new PartitioningScheme(Partitioning.create(SOURCE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of()), - StageExecutionDescriptor.ungroupedExecution(), - false, - StatsAndCosts.empty(), - Optional.empty()); - - String textPlan = PlanPrinter.textPlanFragment(testFragment, FUNCTION_AND_TYPE_MANAGER, TEST_SESSION, true); - assertTrue(textPlan.contains("Project[projectLocality = UNKNOWN] => [column:varchar]")); - assertTrue(textPlan.contains("Limit[10] => [column:varchar]")); - assertTrue( - textPlan.matches( - "(?s).*TableScan\\[TableHandle \\{connectorId='testConnector',(?s).*\\[column:varchar\\](?s).*")); - } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index 788dea643fe20..63daad315ecbf 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -111,6 +111,7 @@ import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory; import com.facebook.presto.spark.execution.PrestoSparkLocalShuffleReadInfo; import com.facebook.presto.spark.execution.PrestoSparkLocalShuffleWriteInfo; +import com.facebook.presto.spark.execution.PrestoSparkNativeTaskExecutorFactory; import com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory; import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig; import com.facebook.presto.spark.execution.property.NativeExecutionNodeConfig; @@ -516,6 +517,7 @@ protected void setup(Binder binder) binder.bind(PrestoSparkAccessControlChecker.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkPlanFragmenter.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkRddFactory.class).in(Scopes.SINGLETON); + binder.bind(PrestoSparkNativeTaskExecutorFactory.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkTaskExecutorFactory.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkQueryExecutionFactory.class).in(Scopes.SINGLETON); binder.bind(PrestoSparkService.class).in(Scopes.SINGLETON); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java index cce500eeed02f..1de2e6172235f 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkService.java @@ -17,6 +17,7 @@ import com.facebook.presto.spark.classloader_interface.IPrestoSparkQueryExecutionFactory; import com.facebook.presto.spark.classloader_interface.IPrestoSparkService; import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory; +import com.facebook.presto.spark.execution.PrestoSparkNativeTaskExecutorFactory; import com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory; import javax.inject.Inject; @@ -28,16 +29,19 @@ public class PrestoSparkService { private final PrestoSparkQueryExecutionFactory queryExecutionFactory; private final PrestoSparkTaskExecutorFactory taskExecutorFactory; + private final PrestoSparkNativeTaskExecutorFactory prestoSparkNativeTaskExecutorFactory; private final LifeCycleManager lifeCycleManager; @Inject public PrestoSparkService( PrestoSparkQueryExecutionFactory queryExecutionFactory, PrestoSparkTaskExecutorFactory taskExecutorFactory, + PrestoSparkNativeTaskExecutorFactory prestoSparkNativeTaskExecutorFactory, LifeCycleManager lifeCycleManager) { this.queryExecutionFactory = requireNonNull(queryExecutionFactory, "queryExecutionFactory is null"); this.taskExecutorFactory = requireNonNull(taskExecutorFactory, "taskExecutorFactory is null"); + this.prestoSparkNativeTaskExecutorFactory = requireNonNull(prestoSparkNativeTaskExecutorFactory, "prestoSparkNativeTaskExecutorFactory is null"); this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); } @@ -53,9 +57,17 @@ public IPrestoSparkTaskExecutorFactory getTaskExecutorFactory() return taskExecutorFactory; } + @Override + public IPrestoSparkTaskExecutorFactory getNativeTaskExecutorFactory() + { + return prestoSparkNativeTaskExecutorFactory; + } + @Override public void close() { lifeCycleManager.stop(); + prestoSparkNativeTaskExecutorFactory.close(); + taskExecutorFactory.close(); } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSourceStatsCollector.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSourceStatsCollector.java index a762e22e7bed1..0e84bcf23c1f6 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSourceStatsCollector.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSourceStatsCollector.java @@ -22,7 +22,6 @@ import com.facebook.presto.spi.plan.TableScanNode; import com.facebook.presto.spi.statistics.TableStatistics; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.google.common.collect.ImmutableList; import java.util.Iterator; @@ -97,13 +96,6 @@ public Void visitTableScan(TableScanNode node, Void context) return null; } - @Override - public Void visitNativeExecution(NativeExecutionNode node, Void context) - { - node.getSubPlan().accept(this, context); - return null; - } - @Override public Void visitPlan(PlanNode node, Void context) { diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/HttpNativeExecutionTaskInfoFetcher.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/HttpNativeExecutionTaskInfoFetcher.java index 65f89747e42af..2c6bcf0bc7f7a 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/HttpNativeExecutionTaskInfoFetcher.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/HttpNativeExecutionTaskInfoFetcher.java @@ -144,7 +144,7 @@ public void stop() public Optional getTaskInfo() throws RuntimeException { - if (scheduledFuture != null && scheduledFuture.isCancelled()) { + if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) { throw lastException.get(); } TaskInfo info = taskInfo.get(); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcess.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcess.java index d5fd58644949c..a634a9d719d41 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcess.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcess.java @@ -183,6 +183,10 @@ public int getPort() return port; } + public URI getLocation() + { + return location; + } private static URI getBaseUriWithPort(URI baseUri, int port) { return uriBuilderFrom(baseUri) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcessFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcessFactory.java index 598a544153a05..f3d2ca6ee0cec 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcessFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionProcessFactory.java @@ -38,10 +38,8 @@ public class NativeExecutionProcessFactory { - // TODO add config - private static final int MAX_THREADS = 1000; private static final Duration MAX_ERROR_DURATION = new Duration(2, TimeUnit.MINUTES); - + public static final URI DEFAULT_URI = URI.create("http://127.0.0.1"); private final HttpClient httpClient; private final ExecutorService coreExecutor; private final ScheduledExecutorService errorRetryScheduledExecutor; diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionTask.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionTask.java index b52dafe8dd335..0a215ef363b1b 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionTask.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/NativeExecutionTask.java @@ -29,6 +29,7 @@ import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient; import com.facebook.presto.spi.page.SerializedPage; import com.facebook.presto.sql.planner.PlanFragment; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import java.net.URI; @@ -38,6 +39,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import static com.facebook.presto.execution.TaskState.ABORTED; +import static com.facebook.presto.execution.TaskState.CANCELED; +import static com.facebook.presto.execution.TaskState.FAILED; import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; import static java.util.Objects.requireNonNull; @@ -158,7 +162,8 @@ public TaskInfo start() { TaskInfo taskInfo = sendUpdateRequest(); - if (!taskInfo.getTaskStatus().getState().isDone()) { + // We do not start taskInfo fetcher for failed tasks + if (!ImmutableList.of(CANCELED, FAILED, ABORTED).contains(taskInfo.getTaskStatus().getState())) { log.info("Starting TaskInfoFetcher and TaskResultFetcher."); taskResultFetcher.ifPresent(fetcher -> fetcher.start()); taskInfoFetcher.start(); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java new file mode 100644 index 0000000000000..bdd9a2defcbbb --- /dev/null +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkNativeTaskExecutorFactory.java @@ -0,0 +1,513 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spark.execution; + +import com.facebook.airlift.json.Codec; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.Session; +import com.facebook.presto.execution.ExecutionFailureInfo; +import com.facebook.presto.execution.Lifespan; +import com.facebook.presto.execution.Location; +import com.facebook.presto.execution.ScheduledSplit; +import com.facebook.presto.execution.StageExecutionId; +import com.facebook.presto.execution.StageId; +import com.facebook.presto.execution.TaskId; +import com.facebook.presto.execution.TaskInfo; +import com.facebook.presto.execution.TaskSource; +import com.facebook.presto.execution.TaskState; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.RemoteTransactionHandle; +import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.metadata.Split; +import com.facebook.presto.spark.PrestoSparkAuthenticatorProvider; +import com.facebook.presto.spark.PrestoSparkTaskDescriptor; +import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor; +import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory; +import com.facebook.presto.spark.classloader_interface.MutablePartitionId; +import com.facebook.presto.spark.classloader_interface.PrestoSparkNativeTaskInputs; +import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage; +import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleReadDescriptor; +import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats; +import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs; +import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput; +import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor; +import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource; +import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo; +import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleInfoTranslator; +import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.page.SerializedPage; +import com.facebook.presto.spi.plan.OutputNode; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeId; +import com.facebook.presto.spi.security.TokenAuthenticator; +import com.facebook.presto.split.RemoteSplit; +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.plan.PlanFragmentId; +import com.facebook.presto.sql.planner.plan.RemoteSourceNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.spark.util.CollectionAccumulator; +import scala.Tuple2; +import scala.collection.AbstractIterator; +import scala.collection.Iterator; + +import javax.inject.Inject; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID; +import static com.facebook.presto.spark.execution.NativeExecutionProcessFactory.DEFAULT_URI; +import static com.facebook.presto.spark.util.PrestoSparkUtils.deserializeZstdCompressed; +import static com.facebook.presto.spark.util.PrestoSparkUtils.serializeZstdCompressed; +import static com.facebook.presto.spark.util.PrestoSparkUtils.toPrestoSparkSerializedPage; +import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static com.facebook.presto.sql.planner.SchedulingOrderVisitor.scheduleOrder; +import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.units.DataSize.succinctBytes; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +/** + * PrestoSparkNativeTaskExecutorFactory is responsible for launching the external native process and managing the communication + * between Java process and native process (by using the {@Link NativeExecutionTask}). + * It will send necessary metadata (e.g, plan fragment, session properties etc.) as a part of + * BatchTaskUpdateRequest. It will poll the remote CPP task for status and results (pages/data if applicable) + * and send these back to the Spark's RDD api + * + * PrestoSparkNativeTaskExecutorFactory is singleton instantiated once per executor. + * + * For every task it receives, it does the following + * 1. Create the Native execution Process (NativeTaskExecutionFactory) ensure that is it created only once. + * 2. Serialize and pass the planFragment, source-metadata (taskSources), sink-metadata (tableWriteInfo or shuffleWriteInfo) + * and submit a nativeExecutionTask. + * 3. Return Iterator to sparkRDD layer. RDD execution will call the .next() methods, which will + * 3.a Call {@link NativeExecutionTask}'s pollResult() to retrieve {@link SerializedPage} back from external process. + * 3.b If no more output is available, then check if task has finished successfully or with exception + * If task finished with exception - fail the spark task (throw exception) + * IF task finished successfully - collect statistics through taskInfo object and add to accumulator + */ +public class PrestoSparkNativeTaskExecutorFactory + implements IPrestoSparkTaskExecutorFactory +{ + private static final Logger log = Logger.get(PrestoSparkNativeTaskExecutorFactory.class); + + // For Presto-on-Spark, we do not have remoteSourceTasks as the shuffle data is + // in persistent shuffle. + // Current protocol for Split mandates having a remoteSourceTaskId as the + // part of the split info. So for shuffleRead split we set it to a dummy + // value that is ignored by the shuffle-reader + private static final TaskId DUMMY_TASK_ID = TaskId.valueOf("remotesourcetaskid.0.0.0.0"); + + private final SessionPropertyManager sessionPropertyManager; + private final FunctionAndTypeManager functionAndTypeManager; + private final JsonCodec taskDescriptorJsonCodec; + private final Codec taskSourceCodec; + private final Codec taskInfoCodec; + private final PrestoSparkExecutionExceptionFactory executionExceptionFactory; + private final Set authenticatorProviders; + private final NativeExecutionProcessFactory nativeExecutionProcessFactory; + private final NativeExecutionTaskFactory nativeExecutionTaskFactory; + private final PrestoSparkShuffleInfoTranslator shuffleInfoTranslator; + private NativeExecutionProcess nativeExecutionProcess; + + @Inject + public PrestoSparkNativeTaskExecutorFactory( + SessionPropertyManager sessionPropertyManager, + FunctionAndTypeManager functionAndTypeManager, + JsonCodec taskDescriptorJsonCodec, + Codec taskSourceCodec, + Codec taskInfoCodec, + PrestoSparkExecutionExceptionFactory executionExceptionFactory, + Set authenticatorProviders, + PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, + NativeExecutionProcessFactory nativeExecutionProcessFactory, + NativeExecutionTaskFactory nativeExecutionTaskFactory, + PrestoSparkShuffleInfoTranslator shuffleInfoTranslator) + { + this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null"); + this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionManager is null"); + this.taskDescriptorJsonCodec = requireNonNull(taskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null"); + this.taskSourceCodec = requireNonNull(taskSourceCodec, "taskSourceCodec is null"); + this.taskInfoCodec = requireNonNull(taskInfoCodec, "taskInfoCodec is null"); + this.executionExceptionFactory = requireNonNull(executionExceptionFactory, "executionExceptionFactory is null"); + this.authenticatorProviders = ImmutableSet.copyOf(requireNonNull(authenticatorProviders, "authenticatorProviders is null")); + this.nativeExecutionProcessFactory = requireNonNull(nativeExecutionProcessFactory, "processFactory is null"); + this.nativeExecutionTaskFactory = requireNonNull(nativeExecutionTaskFactory, "taskFactory is null"); + this.shuffleInfoTranslator = requireNonNull(shuffleInfoTranslator, "shuffleInfoFactory is null"); + } + + @Override + public IPrestoSparkTaskExecutor create( + int partitionId, + int attemptNumber, + SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, + Iterator serializedTaskSources, + PrestoSparkTaskInputs inputs, + CollectionAccumulator taskInfoCollector, + CollectionAccumulator shuffleStatsCollector, + Class outputType) + { + try { + return doCreate( + partitionId, + attemptNumber, + serializedTaskDescriptor, + serializedTaskSources, + inputs, + taskInfoCollector, + shuffleStatsCollector, + outputType); + } + catch (RuntimeException e) { + throw executionExceptionFactory.toPrestoSparkExecutionException(e); + } + } + + public IPrestoSparkTaskExecutor doCreate( + int partitionId, + int attemptNumber, + SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor, + Iterator serializedTaskSources, + PrestoSparkTaskInputs inputs, + CollectionAccumulator taskInfoCollector, + CollectionAccumulator shuffleStatsCollector, + Class outputType) + { + PrestoSparkTaskDescriptor taskDescriptor = taskDescriptorJsonCodec.fromJson(serializedTaskDescriptor.getBytes()); + ImmutableMap.Builder extraAuthenticators = ImmutableMap.builder(); + authenticatorProviders.forEach(provider -> extraAuthenticators.putAll(provider.getTokenAuthenticators())); + + Session session = taskDescriptor.getSession().toSession( + sessionPropertyManager, + taskDescriptor.getExtraCredentials(), + extraAuthenticators.build()); + PlanFragment fragment = taskDescriptor.getFragment(); + StageId stageId = new StageId(session.getQueryId(), fragment.getId().getId()); + TaskId taskId = new TaskId(new StageExecutionId(stageId, 0), partitionId, attemptNumber); + + // TODO: Remove this once we can display the plan on Spark UI. + // Currently, `textPlanFragment` throws an exception if json-based UDFs are used in the query, which can only + // happen in native execution mode. To resolve this error, `JsonFileBasedFunctionNamespaceManager` must be + // loaded on the executors as well (which is actually not required for native execution). To do so, we need a + // mechanism to ship the JSON file containing the UDF metadata to workers, which does not exist as of today. + // TODO: Address this issue; more details in https://github.com/prestodb/presto/issues/19600 + log.info("Logging plan fragment is not supported for presto-on-spark native execution, yet"); + + if (fragment.getPartitioning().isCoordinatorOnly()) { + throw new UnsupportedOperationException("Coordinator only fragment execution is not supported by native task executor"); + } + + checkArgument( + inputs instanceof PrestoSparkNativeTaskInputs, + format("PrestoSparkNativeTaskInputs is required for native execution, but %s is provided", inputs.getClass().getName())); + + // 1. Start the native process if it hasn't already been started or dead + createAndStartNativeExecutionProcess(session); + + // 2. compute the task info to send to cpp process + PrestoSparkNativeTaskInputs nativeInputs = (PrestoSparkNativeTaskInputs) inputs; + // 2.a Populate Read info + List taskSources = getTaskSources(serializedTaskSources, fragment, session, nativeInputs); + + // 2.b Populate Write info + Optional shuffleWriteInfo = nativeInputs.getShuffleWriteDescriptor().isPresent() + && !findTableWriteNode(fragment.getRoot()).isPresent() + && !(fragment.getRoot() instanceof OutputNode) ? + Optional.of(shuffleInfoTranslator.createShuffleWriteInfo(session, nativeInputs.getShuffleWriteDescriptor().get())) : Optional.empty(); + Optional serializedShuffleWriteInfo = shuffleWriteInfo.map(shuffleInfoTranslator::createSerializedWriteInfo); + + // 3. Submit the task to cpp process for execution + log.info("Submitting native execution task "); + NativeExecutionTask task = nativeExecutionTaskFactory.createNativeExecutionTask( + session, + nativeExecutionProcess.getLocation(), + taskId, + fragment, + ImmutableList.copyOf(taskSources), + taskDescriptor.getTableWriteInfo(), + serializedShuffleWriteInfo); + + log.info("Creating task and will wait for remote task completion"); + TaskInfo taskInfo = task.start(); + + // task creation might have failed + processTaskInfoForErrors(taskInfo); + // 4. return output to spark RDD layer + return new PrestoSparkNativeTaskOutputIterator<>(task, outputType, taskInfoCollector, taskInfoCodec, executionExceptionFactory); + } + + @Override + public void close() + { + if (nativeExecutionProcess != null) { + nativeExecutionProcess.close(); + } + } + + private static void completeTask(CollectionAccumulator taskInfoCollector, NativeExecutionTask task, Codec taskInfoCodec) + { + // stop the task + task.stop(); + + // collect statistics (if available) + Optional taskInfoOptional = task.getTaskInfo(); + if (!taskInfoOptional.isPresent()) { + log.error("Missing taskInfo. Statistics might be inaccurate"); + return; + } + SerializedTaskInfo serializedTaskInfo = new SerializedTaskInfo(serializeZstdCompressed(taskInfoCodec, taskInfoOptional.get())); + taskInfoCollector.add(serializedTaskInfo); + + // Update Spark Accumulators for spark internal metrics + PrestoSparkStatsCollectionUtils.collectMetrics(taskInfoOptional.get()); + } + + private static void processTaskInfoForErrors(TaskInfo taskInfo) + { + if (!taskInfo.getTaskStatus().getState().isDone()) { + log.info("processTaskInfoForErrors: task is not done yet.. %s", taskInfo); + return; + } + + if (!taskInfo.getTaskStatus().getState().equals(TaskState.FINISHED)) { + // task failed with errors + RuntimeException failure = taskInfo.getTaskStatus().getFailures().stream() + .findFirst() + .map(ExecutionFailureInfo::toException) + .orElse(new PrestoException(GENERIC_INTERNAL_ERROR, "Native task failed for an unknown reason")); + throw failure; + } + + log.info("processTaskInfoForErrors: task completed successfully = %s", taskInfo); + } + + private void createAndStartNativeExecutionProcess(Session session) + { + requireNonNull(nativeExecutionProcessFactory, "Trying to instantiate native process but factory is null"); + + try { + // create the CPP sidecar process if it doesn't exist. + // We create this when the first task is scheduled + nativeExecutionProcess = nativeExecutionProcessFactory.getNativeExecutionProcess( + session, + DEFAULT_URI); + nativeExecutionProcess.start(); + } + catch (ExecutionException | InterruptedException | IOException e) { + throw new RuntimeException(e); + } + } + + private List getTaskSources( + Iterator serializedTaskSources, + PlanFragment fragment, + Session session, + PrestoSparkNativeTaskInputs nativeTaskInputs) + { + List taskSources = new ArrayList<>(); + + // Populate TableScan sources + long totalSerializedSizeInBytes = 0; + while (serializedTaskSources.hasNext()) { + SerializedPrestoSparkTaskSource serializedTaskSource = serializedTaskSources.next(); + taskSources.add(deserializeZstdCompressed(taskSourceCodec, serializedTaskSource.getBytes())); + totalSerializedSizeInBytes += serializedTaskSource.getBytes().length; + } + + // When joining bucketed table with a non-bucketed table with a filter on "$bucket", + // some tasks may not have splits for the bucketed table. In this case we still need + // to send no-more-splits message to Velox. + Set planNodeIdsWithSources = taskSources.stream().map(TaskSource::getPlanNodeId).collect(Collectors.toSet()); + Set tableScanIds = Sets.newHashSet(scheduleOrder(fragment.getRoot())); + tableScanIds.stream() + .filter(id -> !planNodeIdsWithSources.contains(id)) + .forEach(id -> taskSources.add(new TaskSource(id, ImmutableSet.of(), true))); + + log.info("Total serialized size of all table scan task sources: %s", succinctBytes(totalSerializedSizeInBytes)); + + // Populate ShuffleRead sources + ImmutableList.Builder shuffleTaskSources = ImmutableList.builder(); + AtomicLong nextSplitId = new AtomicLong(); + taskSources.stream() + .flatMap(source -> source.getSplits().stream()) + .mapToLong(ScheduledSplit::getSequenceId) + .max() + .ifPresent(id -> nextSplitId.set(id + 1)); + + for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) { + for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) { + PrestoSparkShuffleReadDescriptor shuffleReadDescriptor = + nativeTaskInputs.getShuffleReadDescriptors().get(sourceFragmentId.toString()); + if (shuffleReadDescriptor != null) { + ScheduledSplit split = new ScheduledSplit(nextSplitId.getAndIncrement(), remoteSource.getId(), new Split(REMOTE_CONNECTOR_ID, new RemoteTransactionHandle(), new RemoteSplit( + new Location(format("batch://%s?shuffleInfo=%s", DUMMY_TASK_ID, + shuffleInfoTranslator.createSerializedReadInfo( + shuffleInfoTranslator.createShuffleReadInfo(session, shuffleReadDescriptor)))), + DUMMY_TASK_ID))); + TaskSource source = new TaskSource(remoteSource.getId(), ImmutableSet.of(split), ImmutableSet.of(Lifespan.taskWide()), true); + shuffleTaskSources.add(source); + } + } + } + + taskSources.addAll(shuffleTaskSources.build()); + + return taskSources; + } + + private Optional findTableWriteNode(PlanNode node) + { + return searchFrom(node) + .where(TableWriterNode.class::isInstance) + .findFirst(); + } + + private static class PrestoSparkNativeTaskOutputIterator + extends AbstractIterator> + implements IPrestoSparkTaskExecutor + { + private final NativeExecutionTask nativeExecutionTask; + private Optional next = Optional.empty(); + private final CollectionAccumulator taskInfoCollectionAccumulator; + private final Codec taskInfoCodec; + private final Class outputType; + private final PrestoSparkExecutionExceptionFactory executionExceptionFactory; + + public PrestoSparkNativeTaskOutputIterator( + NativeExecutionTask nativeExecutionTask, + Class outputType, + CollectionAccumulator taskInfoCollectionAccumulator, + Codec taskInfoCodec, + PrestoSparkExecutionExceptionFactory executionExceptionFactory) + { + this.nativeExecutionTask = nativeExecutionTask; + this.taskInfoCollectionAccumulator = taskInfoCollectionAccumulator; + this.taskInfoCodec = taskInfoCodec; + this.outputType = outputType; + this.executionExceptionFactory = executionExceptionFactory; + } + + /** + * This function is called by Spark's RDD layer to check if there are output pages + * There are 2 scenarios + * 1. ShuffleMap Task - Always returns false. But the internal function calls do all the work needed + * 2. Result Task - True until pages are available. False once all pages have been extracted + * @return if output is available + */ + @Override + public boolean hasNext() + { + next = computeNext(); + return next.isPresent(); + } + + /** This function returns the next available page fetched from CPP process + * + * Has 3 main responsibilities + * 1) Busy-wait-for-pages-or-completion + * + * Loop until either of the 3 conditions happen + * * 1. We get a page + * * 2. Task has finished successfully + * * 3. Task has finished with error + * + * For ShuffleMap Task, as of now, the CPP process returns no pages. + * So the loop acts as a wait-for-completion loop and returns an Optional.empty() + * once the task has terminated + * + * For a Result Task, this function will return all the pages and Optional.empty() + * once all the pages have been read and the task has been terminates + * + * 2) Exception handling + * when there are no pages available, the function checks if the task has finished + * with exceptions and throws the appropriate exception back to spark's RDD processing + * layer + * + * 3) Statistics collection + * For both, when the task finished successfully or with exception, it tries to collect + * statistics if TaskInfo object is available + * + * @return Optional outputPage + */ + private Optional computeNext() + { + // A while(true) loop is not desirable, but in this case we cannot avoid + // it because of Spark'sRDD contract, which is that this iterator either + // returns data or is complete. It CANNOT return null. + // While the remote task is still running and there is no output pages, + // we need to simulate a busy-loop to avoid returning null. + while (true) { + try { + // For ShuffleMap Task, this will always return Optional.empty() + Optional pageOptional = nativeExecutionTask.pollResult(); + + if (pageOptional.isPresent()) { + return pageOptional; + } + + try { + Optional taskInfo = nativeExecutionTask.getTaskInfo(); + + // Case1: Task is still running + if (!taskInfo.isPresent() || !taskInfo.get().getTaskStatus().getState().isDone()) { + continue; + } + + // Case 2: Task finished with errors captured inside taskInfo + processTaskInfoForErrors(taskInfo.get()); + } + catch (RuntimeException ex) { + // For a failed task, if taskInfo is present we still want to log the metrics + completeTask(taskInfoCollectionAccumulator, nativeExecutionTask, taskInfoCodec); + throw executionExceptionFactory.toPrestoSparkExecutionException(ex); + } + + // Case3: Task terminated with success + break; + } + catch (InterruptedException e) { + log.error(e); + throw new RuntimeException(e); + } + } + + // Reaching here marks the end of task processing + completeTask(taskInfoCollectionAccumulator, nativeExecutionTask, taskInfoCodec); + return Optional.empty(); + } + + @Override + public Tuple2 next() + { + // Result Tasks only have outputType of PrestoSparkSerializedPage. + checkArgument(outputType == PrestoSparkSerializedPage.class, + format("PrestoSparkNativeTaskExecutorFactory only outputType=PrestoSparkSerializedPage" + + "But tried to extract outputType=%s", outputType)); + return new Tuple2<>(new MutablePartitionId(), (T) toPrestoSparkSerializedPage(next.get())); + } + } +} diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecution.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecution.java index a8b52683563b7..543755245cef5 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecution.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecution.java @@ -113,8 +113,7 @@ public PrestoSparkTaskExecution( TaskExecutor taskExecutor, SplitMonitor splitMonitor, Executor notificationExecutor, - ScheduledExecutorService memoryUpdateExecutor, - boolean isNativeTask) + ScheduledExecutorService memoryUpdateExecutor) { this.taskStateMachine = requireNonNull(taskStateMachine, "taskStateMachine is null"); this.taskId = taskStateMachine.getTaskId(); @@ -149,7 +148,7 @@ public PrestoSparkTaskExecution( checkArgument(this.driverRunnerFactoriesWithSplitLifeCycle.keySet().equals(tableScanSources), "Fragment is partitioned, but not all partitioned drivers were found"); - taskHandle = createTaskHandle(taskStateMachine, taskContext, localExecutionPlan, taskExecutor, isNativeTask); + taskHandle = createTaskHandle(taskStateMachine, taskContext, localExecutionPlan, taskExecutor); requireNonNull(memoryUpdateExecutor, "memoryUpdateExecutor is null"); memoryUpdateExecutor.schedule(taskContext::updatePeakMemory, 1, SECONDS); @@ -160,15 +159,14 @@ private static TaskHandle createTaskHandle( TaskStateMachine taskStateMachine, TaskContext taskContext, LocalExecutionPlan localExecutionPlan, - TaskExecutor taskExecutor, - boolean isNativeTask) + TaskExecutor taskExecutor) { TaskHandle taskHandle = taskExecutor.addTask( taskStateMachine.getTaskId(), () -> 0, getInitialSplitsPerNode(taskContext.getSession()), getSplitConcurrencyAdjustmentInterval(taskContext.getSession()), - isNativeTask ? OptionalInt.of(MAX_JAVA_DRIVERS_FOR_NATIVE_TASK) : getMaxDriversPerTask(taskContext.getSession())); + getMaxDriversPerTask(taskContext.getSession())); taskStateMachine.addStateChangeListener(state -> { if (state.isDone()) { taskExecutor.removeTask(taskHandle); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java index 38e5cb34ec938..d9fd85aeb5c87 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.java @@ -19,12 +19,9 @@ import com.facebook.airlift.stats.TestingGcMonitor; import com.facebook.presto.Session; import com.facebook.presto.common.block.BlockEncodingManager; -import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.common.io.DataOutput; import com.facebook.presto.event.SplitMonitor; import com.facebook.presto.execution.ExecutionFailureInfo; -import com.facebook.presto.execution.Lifespan; -import com.facebook.presto.execution.Location; import com.facebook.presto.execution.MemoryRevokingSchedulerUtils; import com.facebook.presto.execution.ScheduledSplit; import com.facebook.presto.execution.StageExecutionId; @@ -45,9 +42,7 @@ import com.facebook.presto.memory.TraversingQueryContextVisitor; import com.facebook.presto.memory.VoidTraversingQueryContextVisitor; import com.facebook.presto.metadata.FunctionAndTypeManager; -import com.facebook.presto.metadata.RemoteTransactionHandle; import com.facebook.presto.metadata.SessionPropertyManager; -import com.facebook.presto.metadata.Split; import com.facebook.presto.operator.OperatorContext; import com.facebook.presto.operator.OutputFactory; import com.facebook.presto.operator.TaskContext; @@ -61,9 +56,7 @@ import com.facebook.presto.spark.classloader_interface.MutablePartitionId; import com.facebook.presto.spark.classloader_interface.PrestoSparkJavaExecutionTaskInputs; import com.facebook.presto.spark.classloader_interface.PrestoSparkMutableRow; -import com.facebook.presto.spark.classloader_interface.PrestoSparkNativeTaskInputs; import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage; -import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleReadDescriptor; import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats; import com.facebook.presto.spark.classloader_interface.PrestoSparkStorageHandle; import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs; @@ -75,14 +68,10 @@ import com.facebook.presto.spark.execution.PrestoSparkRowBatch.RowTupleSupplier; import com.facebook.presto.spark.execution.PrestoSparkRowOutputOperator.PreDeterminedPartitionFunction; import com.facebook.presto.spark.execution.PrestoSparkRowOutputOperator.PrestoSparkRowOutputFactory; -import com.facebook.presto.spark.execution.operator.NativeExecutionOperator; -import com.facebook.presto.spark.execution.shuffle.PrestoSparkShuffleInfoTranslator; import com.facebook.presto.spark.util.PrestoSparkStatsCollectionUtils; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.memory.MemoryPoolId; import com.facebook.presto.spi.page.PageDataOutput; -import com.facebook.presto.spi.plan.OutputNode; -import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.security.TokenAuthenticator; import com.facebook.presto.spi.storage.TempDataOperationContext; @@ -91,15 +80,12 @@ import com.facebook.presto.spi.storage.TempStorageHandle; import com.facebook.presto.spiller.NodeSpillConfig; import com.facebook.presto.spiller.SpillSpaceTracker; -import com.facebook.presto.split.RemoteSplit; import com.facebook.presto.sql.planner.LocalExecutionPlanner; import com.facebook.presto.sql.planner.LocalExecutionPlanner.LocalExecutionPlan; import com.facebook.presto.sql.planner.OutputPartitioning; import com.facebook.presto.sql.planner.PlanFragment; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; -import com.facebook.presto.sql.planner.plan.TableWriterNode; import com.facebook.presto.sql.planner.planPrinter.PlanPrinter; import com.facebook.presto.storage.TempStorageManager; import com.google.common.collect.ImmutableList; @@ -122,7 +108,6 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.OptionalInt; @@ -135,7 +120,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import java.util.zip.CRC32; import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalTotalMemoryLimit; @@ -146,14 +130,12 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxRevocableMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.isHeapDumpOnExceededMemoryLimitEnabled; -import static com.facebook.presto.SystemSessionProperties.isNativeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.execution.TaskState.FAILED; import static com.facebook.presto.execution.TaskStatus.STARTING_VERSION; import static com.facebook.presto.execution.buffer.BufferState.FINISHED; import static com.facebook.presto.metadata.MetadataUpdates.DEFAULT_METADATA_UPDATES; -import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMemoryRevokingTarget; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getMemoryRevokingThreshold; import static com.facebook.presto.spark.PrestoSparkSessionProperties.getShuffleOutputTargetAverageRowSize; @@ -166,7 +148,6 @@ import static com.facebook.presto.spark.util.PrestoSparkUtils.toPrestoSparkSerializedPage; import static com.facebook.presto.spi.ErrorCause.UNKNOWN; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; -import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static com.facebook.presto.util.Failures.toFailures; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -220,11 +201,6 @@ public class PrestoSparkTaskExecutorFactory private final AtomicBoolean memoryRevokePending = new AtomicBoolean(); private final AtomicBoolean memoryRevokeRequestInProgress = new AtomicBoolean(); - private final BlockEncodingSerde blockEncodingSerde; - private final NativeExecutionProcessFactory processFactory; - private final NativeExecutionTaskFactory taskFactory; - private final PrestoSparkShuffleInfoTranslator shuffleInfoTranslator; - @Inject public PrestoSparkTaskExecutorFactory( SessionPropertyManager sessionPropertyManager, @@ -248,11 +224,7 @@ public PrestoSparkTaskExecutorFactory( NodeSpillConfig nodeSpillConfig, TempStorageManager tempStorageManager, PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, - PrestoSparkConfig prestoSparkConfig, - BlockEncodingSerde blockEncodingSerde, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - PrestoSparkShuffleInfoTranslator shuffleInfoTranslator) + PrestoSparkConfig prestoSparkConfig) { this( sessionPropertyManager, @@ -280,11 +252,7 @@ public PrestoSparkTaskExecutorFactory( requireNonNull(taskManagerConfig, "taskManagerConfig is null").isTaskAllocationTrackingEnabled(), tempStorageManager, requireNonNull(prestoSparkConfig, "prestoSparkConfig is null").getStorageBasedBroadcastJoinStorage(), - prestoSparkBroadcastTableCacheManager, - blockEncodingSerde, - processFactory, - taskFactory, - shuffleInfoTranslator); + prestoSparkBroadcastTableCacheManager); } public PrestoSparkTaskExecutorFactory( @@ -313,11 +281,7 @@ public PrestoSparkTaskExecutorFactory( boolean allocationTrackingEnabled, TempStorageManager tempStorageManager, String storageBasedBroadcastJoinStorage, - PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, - BlockEncodingSerde blockEncodingSerde, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - PrestoSparkShuffleInfoTranslator shuffleInfoTranslator) + PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager) { this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null"); this.blockEncodingManager = requireNonNull(blockEncodingManager, "blockEncodingManager is null"); @@ -346,10 +310,6 @@ public PrestoSparkTaskExecutorFactory( this.tempStorageManager = requireNonNull(tempStorageManager, "tempStorageManager is null"); this.storageBasedBroadcastJoinStorage = requireNonNull(storageBasedBroadcastJoinStorage, "storageBasedBroadcastJoinStorage is null"); this.prestoSparkBroadcastTableCacheManager = requireNonNull(prestoSparkBroadcastTableCacheManager, "prestoSparkBroadcastTableCacheManager is null"); - this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); - this.processFactory = requireNonNull(processFactory, "processFactory is null"); - this.taskFactory = requireNonNull(taskFactory, "taskFactory is null"); - this.shuffleInfoTranslator = requireNonNull(shuffleInfoTranslator, "shuffleInfoFactory is null"); } @Override @@ -379,6 +339,9 @@ public IPrestoSparkTaskExecutor create( } } + @Override + public void close() {} + public IPrestoSparkTaskExecutor doCreate( int partitionId, int attemptNumber, @@ -406,18 +369,7 @@ public IPrestoSparkTaskExecutor doCreate( TaskId taskId = new TaskId(new StageExecutionId(stageId, 0), partitionId, attemptNumber); - // TODO: Remove this once we can display the plan on Spark UI. - // Currently, `textPlanFragment` throws an exception if json-based UDFs are used in the query, which can only - // happen in native execution mode. To resolve this error, `JsonFileBasedFunctionNamespaceManager` must be - // loaded on the executors as well (which is actually not required for native execution). To do so, we need a - // mechanism to ship the JSON file containing the UDF metadata to workers, which does not exist as of today. - // TODO: Address this issue; more details in https://github.com/prestodb/presto/issues/19600 - if (isNativeExecutionEnabled(session)) { - log.info("Logging plan fragment is not supported for presto-on-spark native execution, yet"); - } - else { - log.info(PlanPrinter.textPlanFragment(fragment, functionAndTypeManager, session, true)); - } + log.info(PlanPrinter.textPlanFragment(fragment, functionAndTypeManager, session, true)); DataSize maxUserMemory = getQueryMaxMemoryPerNode(session); DataSize maxTotalMemory = getQueryMaxTotalMemoryPerNode(session); @@ -529,24 +481,12 @@ public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong rem List taskSources; Optional shuffleWriteInfo = Optional.empty(); - if (isNativeExecutionEnabled(session) && fragment.getRoot() instanceof NativeExecutionNode) { - checkArgument( - inputs instanceof PrestoSparkNativeTaskInputs, - format("PrestoSparkNativeTaskInputs is required for native execution, but %s is provided", inputs.getClass().getName())); - PrestoSparkNativeTaskInputs nativeInputs = (PrestoSparkNativeTaskInputs) inputs; - fillNativeExecutionTaskInputs(fragment, session, nativeInputs, shuffleReadInfos); - shuffleWriteInfo = needShuffleWriteInfo(nativeInputs, (NativeExecutionNode) fragment.getRoot()) ? - Optional.of(shuffleInfoTranslator.createShuffleWriteInfo(session, nativeInputs.getShuffleWriteDescriptor().get())) : Optional.empty(); - taskSources = getNativeExecutionShuffleSources(session, taskId, fragment, shuffleReadInfos.build(), getTaskSources(serializedTaskSources)); - } - else { - checkArgument( - inputs instanceof PrestoSparkJavaExecutionTaskInputs, - format("PrestoSparkJavaExecutionTaskInputs is required for java execution, but %s is provided", inputs.getClass().getName())); - PrestoSparkJavaExecutionTaskInputs taskInputs = (PrestoSparkJavaExecutionTaskInputs) inputs; - fillJavaExecutionTaskInputs(fragment, taskInputs, shuffleInputs, pageInputs, broadcastInputs); - taskSources = getTaskSources(serializedTaskSources); - } + checkArgument( + inputs instanceof PrestoSparkJavaExecutionTaskInputs, + format("PrestoSparkJavaExecutionTaskInputs is required for java execution, but %s is provided", inputs.getClass().getName())); + PrestoSparkJavaExecutionTaskInputs taskInputs = (PrestoSparkJavaExecutionTaskInputs) inputs; + fillJavaExecutionTaskInputs(fragment, taskInputs, shuffleInputs, pageInputs, broadcastInputs); + taskSources = getTaskSources(serializedTaskSources); OutputBufferMemoryManager memoryManager = new OutputBufferMemoryManager( sinkMaxBufferSize.toBytes(), @@ -600,13 +540,7 @@ public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong rem stageId), taskDescriptor.getTableWriteInfo(), true, - ImmutableList.of(new NativeExecutionOperator.NativeExecutionOperatorTranslator( - session, - fragment, - blockEncodingSerde, - processFactory, - taskFactory, - shuffleWriteInfo.map(shuffleInfoTranslator::createSerializedWriteInfo)))); + ImmutableList.of()); taskStateMachine.addStateChangeListener(state -> { if (state.isDone()) { @@ -621,8 +555,7 @@ public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong rem taskExecutor, splitMonitor, notificationExecutor, - memoryUpdateExecutor, - isNativeExecutionEnabled(session) && fragment.getRoot() instanceof NativeExecutionNode); + memoryUpdateExecutor); log.info("Task [%s] received %d splits.", taskId, @@ -688,27 +621,6 @@ private static OptionalLong computeAllSplitsSize(List taskSources) return OptionalLong.of(sum); } - private boolean needShuffleWriteInfo(PrestoSparkNativeTaskInputs nativeInputs, NativeExecutionNode node) - { - return nativeInputs.getShuffleWriteDescriptor().isPresent() && !findTableWriteNode(node).isPresent() && !(node.getSubPlan() instanceof OutputNode); - } - - private void fillNativeExecutionTaskInputs( - PlanFragment fragment, - Session session, - PrestoSparkNativeTaskInputs inputs, - ImmutableMap.Builder shuffleReadInfos) - { - for (RemoteSourceNode remoteSource : fragment.getRemoteSourceNodes()) { - for (PlanFragmentId sourceFragmentId : remoteSource.getSourceFragmentIds()) { - PrestoSparkShuffleReadDescriptor shuffleReadDescriptor = inputs.getShuffleReadDescriptors().get(sourceFragmentId.toString()); - if (shuffleReadDescriptor != null) { - shuffleReadInfos.put(remoteSource.getId(), shuffleInfoTranslator.createShuffleReadInfo(session, shuffleReadDescriptor)); - } - } - } - } - private void fillJavaExecutionTaskInputs( PlanFragment fragment, PrestoSparkJavaExecutionTaskInputs inputs, @@ -778,48 +690,6 @@ private List getTaskSources(Iterator getNativeExecutionShuffleSources( - Session session, TaskId taskId, PlanFragment fragment, Map shuffleReadInfos, List taskSources) - { - ImmutableSet.Builder result = ImmutableSet.builder(); - PlanNode root = fragment.getRoot(); - AtomicLong nextSplitId = new AtomicLong(); - taskSources.stream() - .flatMap(source -> source.getSplits().stream()) - .mapToLong(split -> split.getSequenceId()) - .max() - .ifPresent(id -> nextSplitId.set(id + 1)); - shuffleReadInfos.forEach((planNodeId, info) -> - result.add(new ScheduledSplit(nextSplitId.getAndIncrement(), planNodeId, new Split(REMOTE_CONNECTOR_ID, new RemoteTransactionHandle(), new RemoteSplit( - new Location(format("batch://%s?shuffleInfo=%s", taskId, shuffleInfoTranslator.createSerializedReadInfo(info))), - taskId))))); - - List nativeExecutionSources = taskSources.stream().filter(taskSource -> taskSource.getPlanNodeId().equals(root)).collect(Collectors.toList()); - checkState(nativeExecutionSources.size() <= 1, "At most 1 taskSource is expected for NativeExecutionNode but got %s", nativeExecutionSources.size()); - if (!nativeExecutionSources.isEmpty()) { - // Append the shuffle splits with original splits - TaskSource nativeExecutionSource = nativeExecutionSources.get(0); - result.addAll(nativeExecutionSource.getSplits()); - } - - TaskSource newTaskSource = new TaskSource(root.getId(), result.build(), ImmutableSet.of(Lifespan.taskWide()), true); - ImmutableList.Builder newTaskSources = ImmutableList.builder(); - // Combine the shuffle read taskSource and original sources - newTaskSources.add(newTaskSource) - .addAll(taskSources.stream() - .filter(taskSource -> !taskSource.getPlanNodeId().equals(root)) - .collect(Collectors.toList())); - return newTaskSources.build(); - } - - private Optional findTableWriteNode(PlanNode node) - { - PlanNode root = node instanceof NativeExecutionNode ? ((NativeExecutionNode) node).getSubPlan() : node; - return searchFrom(root) - .where(TableWriterNode.class::isInstance) - .findFirst(); - } - @SuppressWarnings("unchecked") private static Output configureOutput( Class outputType, diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/operator/NativeExecutionOperator.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/operator/NativeExecutionOperator.java deleted file mode 100644 index f0f4fbb8ce2e1..0000000000000 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/operator/NativeExecutionOperator.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.spark.execution.operator; - -import com.facebook.airlift.log.Logger; -import com.facebook.presto.Session; -import com.facebook.presto.common.Page; -import com.facebook.presto.common.block.BlockEncodingSerde; -import com.facebook.presto.execution.ExecutionFailureInfo; -import com.facebook.presto.execution.ScheduledSplit; -import com.facebook.presto.execution.TaskInfo; -import com.facebook.presto.execution.TaskSource; -import com.facebook.presto.execution.TaskState; -import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.PagesSerdeFactory; -import com.facebook.presto.execution.scheduler.TableWriteInfo; -import com.facebook.presto.memory.context.LocalMemoryContext; -import com.facebook.presto.operator.DriverContext; -import com.facebook.presto.operator.NativeExecutionInfo; -import com.facebook.presto.operator.OperatorContext; -import com.facebook.presto.operator.OperatorFactory; -import com.facebook.presto.operator.SourceOperator; -import com.facebook.presto.operator.SourceOperatorFactory; -import com.facebook.presto.spark.execution.NativeExecutionProcess; -import com.facebook.presto.spark.execution.NativeExecutionProcessFactory; -import com.facebook.presto.spark.execution.NativeExecutionTask; -import com.facebook.presto.spark.execution.NativeExecutionTaskFactory; -import com.facebook.presto.spi.PrestoException; -import com.facebook.presto.spi.UpdatablePageSource; -import com.facebook.presto.spi.page.PagesSerde; -import com.facebook.presto.spi.page.SerializedPage; -import com.facebook.presto.spi.plan.PlanNode; -import com.facebook.presto.spi.plan.PlanNodeId; -import com.facebook.presto.sql.planner.LocalExecutionPlanner; -import com.facebook.presto.sql.planner.PlanFragment; -import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom; -import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled; -import static com.facebook.presto.SystemSessionProperties.isExchangeCompressionEnabled; -import static com.facebook.presto.operator.PipelineExecutionStrategy.UNGROUPED_EXECUTION; -import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; -import static com.facebook.presto.sql.planner.SchedulingOrderVisitor.scheduleOrder; -import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; - -/** - * NativeExecutionOperator is responsible for launching the external native process and managing the communication - * between Java process and native process (by using the {@Link NativeExecutionTask}). The NativeExecutionOperator will send necessary meta information - * (e.g, plan fragment, session properties etc.) will be sent to native process and collect the execution results (data, metrics etc) back and propagate out as - * the operator output through the operator's getOutput method. - * The lifecycle of the NativeExecutionOperator is: - * 1. Launch the native engine external process when initializing the operator. - * 2. Serialize and pass the planFragment, tableWriteInfo and taskSource to the external process through {@link NativeExecutionTask} APIs. - * 3. Call {@link NativeExecutionTask}'s pollResult() to retrieve {@link SerializedPage} back from external process. - * 4. Deserialize {@link SerializedPage} to {@link Page} and return it back to driver from the getOutput method. - * 5. The close() will be called by the driver when {@link NativeExecutionTask} completes and pollResult() returns an empty result. - * 6. Shut down the external process upon calling of close() method - *

- */ -public class NativeExecutionOperator - implements SourceOperator -{ - private static final Logger log = Logger.get(NativeExecutionOperator.class); - private static final String NATIVE_EXECUTION_SERVER_URI = "http://127.0.0.1"; - - private final PlanNodeId sourceId; - private final OperatorContext operatorContext; - private final LocalMemoryContext systemMemoryContext; - private final PlanFragment planFragment; - private final TableWriteInfo tableWriteInfo; - private final Optional shuffleWriteInfo; - private final PagesSerde serde; - private final NativeExecutionProcessFactory processFactory; - private final NativeExecutionTaskFactory taskFactory; - - private NativeExecutionProcess process; - private NativeExecutionTask task; - private CompletableFuture taskStatusFuture; - private List taskSource = new ArrayList<>(); - private Map> splits = new HashMap<>(); - private boolean finished; - - private final AtomicReference info = new AtomicReference<>(null); - - public NativeExecutionOperator( - PlanNodeId sourceId, - OperatorContext operatorContext, - PlanFragment planFragment, - TableWriteInfo tableWriteInfo, - PagesSerde serde, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - Optional shuffleWriteInfo) - { - this.sourceId = requireNonNull(sourceId, "sourceId is null"); - this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); - this.systemMemoryContext = operatorContext.localSystemMemoryContext(); - this.planFragment = requireNonNull(planFragment, "planFragment is null"); - this.tableWriteInfo = requireNonNull(tableWriteInfo, "tableWriteInfo is null"); - this.shuffleWriteInfo = requireNonNull(shuffleWriteInfo, "shuffleWriteInfo is null"); - this.serde = requireNonNull(serde, "serde is null"); - this.processFactory = requireNonNull(processFactory, "processFactory is null"); - this.taskFactory = requireNonNull(taskFactory, "taskFactory is null"); - - operatorContext.setInfoSupplier(info::get); - } - - @Override - public OperatorContext getOperatorContext() - { - return operatorContext; - } - - @Override - public boolean needsInput() - { - return false; - } - - @Override - public void addInput(Page page) - { - throw new UnsupportedOperationException(); - } - - /** - * The overall workflow of the getOutput method is: - * 1. Submit the plan to the external process - * 2. Call pollResult method to get latest buffered result. - * 3. Call getTaskInfo method to get the TaskInfo and propagate it - * 4. Deserialize the polled {@link SerializedPage} to {@link Page} and return it back - */ - @Override - public Page getOutput() - { - if (finished) { - return null; - } - - if (process == null) { - createProcess(); - checkState(process != null, "process is null"); - createTask(); - checkState(task != null, "task is null"); - TaskInfo taskInfo = task.start(); - if (processTaskInfo(taskInfo)) { - return null; - } - } - - try { - Optional page = task.pollResult(); - if (page.isPresent()) { - return processResult(page.get()); - } - - Optional taskInfo = task.getTaskInfo(); - if (taskInfo.isPresent() && processTaskInfo(taskInfo.get())) { - return null; - } - - return null; - } - catch (InterruptedException | RuntimeException e) { - String error = e.getMessage(); - if (!process.isAlive()) { - error = String.format("Native process has crashed. %s", e.getMessage()); - } - log.error(e); - throw new PrestoException(GENERIC_INTERNAL_ERROR, error, e); - } - } - - private boolean processTaskInfo(TaskInfo taskInfo) - { - TaskStatus taskStatus = taskInfo.getTaskStatus(); - if (!taskStatus.getState().isDone()) { - return false; - } - - if (taskStatus.getState() != TaskState.FINISHED) { - RuntimeException failure = taskStatus.getFailures().stream() - .findFirst() - .map(ExecutionFailureInfo::toException) - .orElse(new PrestoException(GENERIC_INTERNAL_ERROR, "Native task failed for an unknown reason")); - throw failure; - } - - info.set(new NativeExecutionInfo(ImmutableList.of(taskInfo.getStats()))); - finished = true; - return true; - } - - private void createProcess() - { - try { - this.process = processFactory.getNativeExecutionProcess( - operatorContext.getSession(), - URI.create(NATIVE_EXECUTION_SERVER_URI)); - log.info("Starting native execution process of task" + getOperatorContext().getDriverContext().getTaskId().toString()); - process.start(); - } - catch (ExecutionException | InterruptedException | IOException e) { - throw new RuntimeException(e); - } - } - - private void createTask() - { - checkState(taskSource != null, "taskSource is null"); - checkState(taskStatusFuture == null, "taskStatusFuture has already been set"); - checkState(task == null, "task has already been set"); - checkState(process != null, "process is null"); - this.task = taskFactory.createNativeExecutionTask( - operatorContext.getSession(), - uriBuilderFrom(URI.create(NATIVE_EXECUTION_SERVER_URI)).port(process.getPort()).build(), - operatorContext.getDriverContext().getTaskId(), - planFragment, - ImmutableList.copyOf(taskSource), - tableWriteInfo, - shuffleWriteInfo); - } - - private Page processResult(SerializedPage page) - { - operatorContext.recordRawInput(page.getSizeInBytes(), page.getPositionCount()); - Page deserializedPage = serde.deserialize(page); - operatorContext.recordProcessedInput(deserializedPage.getSizeInBytes(), page.getPositionCount()); - return deserializedPage; - } - - @Override - public void finish() {} - - @Override - public boolean isFinished() - { - return finished; - } - - @Override - public PlanNodeId getSourceId() - { - return sourceId; - } - - @Override - public Supplier> addSplit(ScheduledSplit split) - { - requireNonNull(split, "split is null"); - - if (finished) { - return Optional::empty; - } - splits.computeIfAbsent(split.getPlanNodeId(), key -> new ArrayList<>()).add(split); - - return Optional::empty; - } - - @Override - public void noMoreSplits() - { - // all splits belonging to a single planNodeId should be within a single taskSource - splits.forEach((planNodeId, split) -> taskSource.add(new TaskSource(planNodeId, ImmutableSet.copyOf(split), true))); - - // When joining bucketed table with a non-bucketed table with a filter on "$bucket", - // some tasks may not have splits for the bucketed table. In this case we still need - // to send no-more-splits message to Velox. - Set tableScanIds = Sets.newHashSet(scheduleOrder(planFragment.getRoot())); - tableScanIds.stream() - .filter(id -> !splits.containsKey(id)) - .forEach(id -> taskSource.add(new TaskSource(id, ImmutableSet.of(), true))); - } - - @Override - public void close() - { - systemMemoryContext.setBytes(0); - if (task != null) { - task.stop(); - } - } - - public static class NativeExecutionOperatorFactory - implements SourceOperatorFactory - { - private final int operatorId; - private final PlanNodeId planNodeId; - private final PlanFragment planFragment; - private final TableWriteInfo tableWriteInfo; - private final Optional shuffleWriteInfo; - private final PagesSerdeFactory serdeFactory; - private final NativeExecutionProcessFactory processFactory; - private final NativeExecutionTaskFactory taskFactory; - private boolean closed; - - public NativeExecutionOperatorFactory( - int operatorId, - PlanNodeId planNodeId, - PlanFragment planFragment, - TableWriteInfo tableWriteInfo, - PagesSerdeFactory serdeFactory, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - Optional shuffleWriteInfo) - { - this.operatorId = operatorId; - this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); - this.planFragment = requireNonNull(planFragment, "planFragment is null"); - this.tableWriteInfo = requireNonNull(tableWriteInfo, "tableWriteInfo is null"); - this.shuffleWriteInfo = requireNonNull(shuffleWriteInfo, "shuffleWriteInfo is null"); - this.serdeFactory = requireNonNull(serdeFactory, "serdeFactory is null"); - this.processFactory = requireNonNull(processFactory, "processFactory is null"); - this.taskFactory = requireNonNull(taskFactory, "taskFactory is null"); - } - - @Override - public PlanNodeId getSourceId() - { - return planNodeId; - } - - @Override - public SourceOperator createOperator(DriverContext driverContext) - { - checkState(!closed, "operator factory is closed"); - OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, NativeExecutionOperator.class.getSimpleName()); - return new NativeExecutionOperator( - planNodeId, - operatorContext, - planFragment, - tableWriteInfo, - serdeFactory.createPagesSerde(), - processFactory, - taskFactory, - shuffleWriteInfo); - } - - @Override - public void noMoreOperators() - { - closed = true; - } - - public PlanFragment getPlanFragment() - { - return planFragment; - } - } - - public static class NativeExecutionOperatorTranslator - extends LocalExecutionPlanner.CustomPlanTranslator - { - private final PlanFragment fragment; - private final Session session; - private final Optional shuffleWriteInfo; - private final BlockEncodingSerde blockEncodingSerde; - private final NativeExecutionProcessFactory processFactory; - private final NativeExecutionTaskFactory taskFactory; - - public NativeExecutionOperatorTranslator( - Session session, - PlanFragment fragment, - BlockEncodingSerde blockEncodingSerde, - NativeExecutionProcessFactory processFactory, - NativeExecutionTaskFactory taskFactory, - Optional shuffleWriteInfo) - { - this.fragment = requireNonNull(fragment, "fragment is null"); - this.session = requireNonNull(session, "session is null"); - this.shuffleWriteInfo = requireNonNull(shuffleWriteInfo, "shuffleWriteInfo is null"); - this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); - this.processFactory = requireNonNull(processFactory, "processFactory is null"); - this.taskFactory = requireNonNull(taskFactory, "taskFactory is null"); - } - - @Override - public Optional translate( - PlanNode node, - LocalExecutionPlanner.LocalExecutionPlanContext context, - InternalPlanVisitor visitor) - { - if (node instanceof NativeExecutionNode) { - OperatorFactory operatorFactory = new NativeExecutionOperator.NativeExecutionOperatorFactory( - context.getNextOperatorId(), - node.getId(), - fragment.withSubPlan(((NativeExecutionNode) node).getSubPlan()), - context.getTableWriteInfo(), - new PagesSerdeFactory(blockEncodingSerde, isExchangeCompressionEnabled(session), isExchangeChecksumEnabled(session)), - processFactory, - taskFactory, - shuffleWriteInfo); - return Optional.of( - new LocalExecutionPlanner.PhysicalOperation(operatorFactory, makeLayout(node), context, UNGROUPED_EXECUTION)); - } - return Optional.empty(); - } - } -} diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkRddFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkRddFactory.java index 5598efa067810..2605ce57ef767 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkRddFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/PrestoSparkRddFactory.java @@ -45,7 +45,6 @@ import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.sql.planner.SplitSourceFactory; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; import com.facebook.presto.sql.planner.plan.PlanFragmentId; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.google.common.collect.ArrayListMultimap; @@ -364,24 +363,7 @@ private ListMultimap createTaskSources return result; } - /** - * If native execution is enabled, the task sources need to be associated with {@link NativeExecutionNode} rather than the original {@link TableScanNode} to allow the - * driver dispatching the task sources to the native process. To achieve that, in this method, we'll store the PlanNodeId from {@link NativeExecutionNode} - * as task source id when we encountered the {@link NativeExecutionNode},otherwise the PlanNodeId from {@link TableScanNode} will be used as the task source id. - */ private static List findTableScanNodes(PlanNode node) - { - return node instanceof NativeExecutionNode ? findTableScanNodesInternal((NativeExecutionNode) node) : findTableScanNodesInternal(node); - } - - private static List findTableScanNodesInternal(NativeExecutionNode node) - { - return searchFrom(node.getSubPlan()) - .where(TableScanNode.class::isInstance) - .findAll().stream().map(t -> new PrestoSparkSource(node.getId(), t)).collect(Collectors.toList()); - } - - private static List findTableScanNodesInternal(PlanNode node) { return searchFrom(node) .where(TableScanNode.class::isInstance) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkStatsCollectionUtils.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkStatsCollectionUtils.java index c6904ee262c3b..7b0d549a4e5d2 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkStatsCollectionUtils.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkStatsCollectionUtils.java @@ -15,19 +15,12 @@ import com.facebook.airlift.log.Logger; import com.facebook.presto.common.RuntimeMetric; -import com.facebook.presto.common.RuntimeStats; import com.facebook.presto.common.RuntimeUnit; import com.facebook.presto.execution.TaskInfo; -import com.facebook.presto.operator.NativeExecutionInfo; -import com.facebook.presto.operator.OperatorStats; -import com.facebook.presto.operator.PipelineStats; -import com.facebook.presto.operator.TaskStats; import org.apache.commons.text.CaseUtils; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.util.AccumulatorV2; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.TimeUnit; public class PrestoSparkStatsCollectionUtils @@ -41,38 +34,35 @@ private PrestoSparkStatsCollectionUtils() {} public static void collectMetrics(final TaskInfo taskInfo) { - int taskId = -1; - int stageId = -1; + if (taskInfo == null || taskInfo.getStats() == null) { + return; + } + try { - taskId = taskInfo.getTaskId().getId(); - stageId = taskInfo.getTaskId().getStageExecutionId().getStageId().getId(); - Set runtimeStatsSet = collectRuntimeStats(taskInfo); - collectMetrics(runtimeStatsSet); + taskInfo.getStats().getRuntimeStats().getMetrics() + .forEach(PrestoSparkStatsCollectionUtils::incSparkInternalAccumulator); } catch (Exception e) { - log.error("An error occurred while processing taskId=%s stageId=%s", taskId, stageId, e); + log.warn("An error occurred while updating Spark Internal metrics for task=%s", taskInfo, e); } } - public static void collectMetrics(Set runtimeStatsSet) + static void incSparkInternalAccumulator(final String prestoKey, final RuntimeMetric metric) { - runtimeStatsSet.forEach(runStats -> - { - runStats.getMetrics().entrySet().forEach(entry -> { - String prestoKey = entry.getKey(); - String sparkInternalAccumulatorKey = getSparkInternalAccumulatorKey(prestoKey); - collectMetric(sparkInternalAccumulatorKey, prestoKey, entry.getValue()); - }); - }); - } + TaskMetrics sparkTaskMetrics = org.apache.spark.TaskContext.get().taskMetrics(); + if (sparkTaskMetrics == null) { + return; + } - static void collectMetric(final String sparkInternalAccumulatorKey, - final String prestoKey, - final RuntimeMetric metric) - { - boolean isSparkUnitMs = sparkInternalAccumulatorKey.contains("Ms"); - long metricVal = getMetricValue(metric, isSparkUnitMs); - incSparkInternalAccumulator(sparkInternalAccumulatorKey, prestoKey, metricVal); + String sparkInternalAccumulatorName = getSparkInternalAccumulatorKey(prestoKey); + scala.Option accumulatorV2Optional = sparkTaskMetrics.nameToAccums().get(sparkInternalAccumulatorName); + if (accumulatorV2Optional.isEmpty()) { + return; + } + + AccumulatorV2 accumulatorV2 = (AccumulatorV2) accumulatorV2Optional.get(); + accumulatorV2.add( + getMetricLongValue(metric, sparkInternalAccumulatorName.contains("Ms"))); } static String getSparkInternalAccumulatorKey(final String prestoKey) @@ -81,12 +71,13 @@ static String getSparkInternalAccumulatorKey(final String prestoKey) int index = prestoKey.indexOf(PRESTO_NATIVE_OPERATOR_STATS_SEP); return prestoKey.substring(index); } - String[] strs = prestoKey.split("\\."); - if (strs == null || strs.length < 2) { + String[] prestoKeyParts = prestoKey.split("\\."); + int prestoKeyPartsLength = prestoKeyParts.length; + if (prestoKeyPartsLength < 2) { log.debug("Fail to build spark internal key for %s format not supported", prestoKey); return ""; } - String prestoNewKey = String.format("%1$s%2$s", strs[0], strs[strs.length - 1]); + String prestoNewKey = String.format("%1$s%2$s", prestoKeyParts[0], prestoKeyParts[prestoKeyPartsLength - 1]); if (prestoNewKey.contains("_")) { prestoNewKey = CaseUtils.toCamelCase(prestoKey, false, '_'); } @@ -94,35 +85,7 @@ static String getSparkInternalAccumulatorKey(final String prestoKey) PRESTO_NATIVE_OPERATOR_STATS_PREFIX, prestoNewKey); } - static Set collectRuntimeStats(TaskInfo taskInfo) - { - Set stats = new HashSet<>(); - if (taskInfo.getStats() == null) { - return stats; - } - for (PipelineStats pipelineStats : taskInfo.getStats().getPipelines()) { - if (pipelineStats != null) { - for (OperatorStats operatorStats : pipelineStats.getOperatorSummaries()) { - if (operatorStats != null) { - if (operatorStats.getOperatorType().equals("NativeExecutionOperator")) { - NativeExecutionInfo nativeExecutionInfo = (NativeExecutionInfo) operatorStats.getInfo(); - if (nativeExecutionInfo != null) { - for (TaskStats taskStats : nativeExecutionInfo.getTaskStats()) { - if (taskStats != null) { - RuntimeStats runtimeStat = taskStats.getRuntimeStats(); - stats.add(runtimeStat); - } - } - } - } - } - } - } - } - return stats; - } - - static long getMetricValue(RuntimeMetric metric, boolean isSparkUnitMs) + static long getMetricLongValue(RuntimeMetric metric, boolean isSparkUnitMs) { long sum = metric.getSum(); if (metric.getUnit().equals(RuntimeUnit.NANO) && isSparkUnitMs) { @@ -130,20 +93,4 @@ static long getMetricValue(RuntimeMetric metric, boolean isSparkUnitMs) } return sum; } - - static void incSparkInternalAccumulator(final String sparkInternalAccuName, final String prestoKey, final Object metric) - { - TaskMetrics tm = org.apache.spark.TaskContext.get().taskMetrics(); - if (tm != null) { - scala.Option acc2 = tm.nameToAccums().get(sparkInternalAccuName); - if (!acc2.isEmpty()) { - AccumulatorV2 acc = (AccumulatorV2) acc2.get(); - acc.add(metric); - } - else { - log.debug("Fail to find spark internal accumulator matching key:" + - " %s prestoKey = %s ", sparkInternalAccuName, prestoKey); - } - } - } } diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java index 4c57dc43146fe..c97cb6a060025 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java @@ -696,6 +696,8 @@ public void close() } if (instanceId != null) { + instances.get(instanceId).getPrestoSparkService().getTaskExecutorFactory().close(); + instances.get(instanceId).getPrestoSparkService().getNativeTaskExecutorFactory().close(); instances.remove(instanceId); } } @@ -715,6 +717,12 @@ public IPrestoSparkTaskExecutorFactory get() { return instances.get(instanceId).getPrestoSparkService().getTaskExecutorFactory(); } + + @Override + public IPrestoSparkTaskExecutorFactory getNative() + { + return instances.get(instanceId).getPrestoSparkService().getNativeTaskExecutorFactory(); + } } private static Database createDatabaseMetastoreObject(String name) diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeExecutionPlanRewrite.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeExecutionPlanRewrite.java deleted file mode 100644 index 96f61cc72fd86..0000000000000 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeExecutionPlanRewrite.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.spark; - -import com.facebook.presto.Session; -import com.facebook.presto.cost.PlanNodeStatsEstimate; -import com.facebook.presto.cost.StatsAndCosts; -import com.facebook.presto.metadata.Metadata; -import com.facebook.presto.metadata.MetadataManager; -import com.facebook.presto.spi.plan.PlanNode; -import com.facebook.presto.spi.plan.TableScanNode; -import com.facebook.presto.sql.planner.Plan; -import com.facebook.presto.sql.planner.PlanFragment; -import com.facebook.presto.sql.planner.SubPlan; -import com.facebook.presto.sql.planner.TypeProvider; -import com.facebook.presto.sql.planner.assertions.PlanAssert; -import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; -import com.facebook.presto.sql.planner.plan.NativeExecutionNode; -import com.facebook.presto.testing.QueryRunner; -import com.facebook.presto.tests.AbstractTestQueryFramework; -import org.testng.annotations.Test; - -import java.util.List; - -import static com.facebook.presto.SystemSessionProperties.NATIVE_EXECUTION_ENABLED; -import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; -import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - -public class TestPrestoSparkNativeExecutionPlanRewrite - extends AbstractTestQueryFramework -{ - private static final Metadata METADATA = MetadataManager.createTestMetadataManager(); - - private void assertPlanMatch(Session session, PlanNode actual, PlanMatchPattern expected) - { - PlanAssert.assertPlan( - session, - METADATA, - (node, sourceStats, lookup, session2, types) -> PlanNodeStatsEstimate.unknown(), - new Plan(actual, TypeProvider.empty(), StatsAndCosts.empty()), - expected); - } - - private void assertPlanNotMatch(Session session, PlanNode actual, PlanMatchPattern expected) - { - PlanAssert.assertPlanDoesNotMatch( - session, - METADATA, - (node, sourceStats, lookup, session2, types) -> PlanNodeStatsEstimate.unknown(), - new Plan(actual, TypeProvider.empty(), StatsAndCosts.empty()), - expected); - } - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - return PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner(); - } - - @Test - public void testSingleStagePlanFragment() - { - Session session = Session.builder(getSession()) - .setSystemProperty(NATIVE_EXECUTION_ENABLED, "true") - .build(); - - SubPlan subPlan = subplan( - "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment FROM orders_bucketed", - session); - PlanFragment fragment = subPlan.getFragment(); - PlanNode root = fragment.getRoot(); - - assertEquals(1, fragment.getTableScanSchedulingOrder().size()); - assertTrue(fragment.getTableScanSchedulingOrder().contains(root.getId())); - assertEquals(1, subPlan.getAllFragments().size()); - assertTrue(root instanceof NativeExecutionNode); - assertPlanMatch(session, ((NativeExecutionNode) root).getSubPlan(), anyTree(node(TableScanNode.class))); - } - - @Test - public void testMultiStagePlanFragmentsWithCoordinatorOnlyFragment() - { - Session session = Session.builder(getSession()) - .setSystemProperty(NATIVE_EXECUTION_ENABLED, "true") - .setSystemProperty("table_writer_merge_operator_enabled", "false") - .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "false") - .build(); - - SubPlan subPlan = subplan("CREATE TABLE test_table_1 as SELECT orderkey, custkey FROM orders ", session); - List fragmentList = subPlan.getAllFragments(); - - assertEquals(2, fragmentList.size()); - - PlanFragment fragment1 = fragmentList.get(0); - assertTrue(fragment1.getPartitioning().isCoordinatorOnly()); - assertPlanNotMatch(session, fragment1.getRoot(), anyTree(node(NativeExecutionNode.class))); - - PlanFragment fragment2 = fragmentList.get(1); - PlanNode root = fragment2.getRoot(); - assertFalse(fragment2.getPartitioning().isCoordinatorOnly()); - assertEquals(1, fragment2.getTableScanSchedulingOrder().size()); - assertTrue(fragment2.getTableScanSchedulingOrder().contains(root.getId())); - assertTrue(root instanceof NativeExecutionNode); - assertPlanMatch(session, ((NativeExecutionNode) root).getSubPlan(), anyTree(node(TableScanNode.class))); - } -} diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/task/TestPrestoSparkTaskExecution.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/task/TestPrestoSparkTaskExecution.java index 1c0d942f2e70e..1c394d451a6b4 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/task/TestPrestoSparkTaskExecution.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/task/TestPrestoSparkTaskExecution.java @@ -113,23 +113,17 @@ public void tearDown() taskNotificationExecutor.shutdown(); } - @Test - public void testNativeDriverInstanceCount() - { - testDriverCount(nativeTestSession, true, 1); - } - @Test public void testJavaDriverInstanceCount() { - testDriverCount(nonNativeTestSession, false, 3); + testDriverCount(nonNativeTestSession, 3); } - private void testDriverCount(Session session, boolean isNative, int expectedDriverCount) + private void testDriverCount(Session session, int expectedDriverCount) { TaskContext taskContext = TestingTaskContext.createTaskContext(taskNotificationExecutor, scheduledExecutor, session, new DataSize(2, GIGABYTE)); taskExecutor.start(); - PrestoSparkTaskExecution taskExecution = new PrestoSparkTaskExecution(taskStateMachine, taskContext, localExecutionPlan, taskExecutor, TaskTestUtils.createTestSplitMonitor(), taskNotificationExecutor, scheduledExecutor, isNative); + PrestoSparkTaskExecution taskExecution = new PrestoSparkTaskExecution(taskStateMachine, taskContext, localExecutionPlan, taskExecutor, TaskTestUtils.createTestSplitMonitor(), taskNotificationExecutor, scheduledExecutor); taskExecution.start(ImmutableList.of(new TaskSource(TABLE_SCAN_NODE_ID, splits, true))); assertEquals(taskContext.getPipelineContexts().get(0).getPipelineStats().getDrivers().size(), expectedDriverCount); } diff --git a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkService.java b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkService.java index 57dc25eb02232..d64e3da7fd759 100644 --- a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkService.java +++ b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkService.java @@ -20,5 +20,7 @@ public interface IPrestoSparkService IPrestoSparkTaskExecutorFactory getTaskExecutorFactory(); + IPrestoSparkTaskExecutorFactory getNativeTaskExecutorFactory(); + void close(); } diff --git a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkTaskExecutorFactory.java b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkTaskExecutorFactory.java index 7b67f86fd76e1..20aff04ff7b32 100644 --- a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkTaskExecutorFactory.java +++ b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/IPrestoSparkTaskExecutorFactory.java @@ -27,4 +27,6 @@ IPrestoSparkTaskExecutor create( CollectionAccumulator taskInfoCollector, CollectionAccumulator shuffleStatsCollector, Class outputType); + + public void close(); } diff --git a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskExecutorFactoryProvider.java b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskExecutorFactoryProvider.java index a6e0c7bdfa2c0..387eced2901e2 100644 --- a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskExecutorFactoryProvider.java +++ b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskExecutorFactoryProvider.java @@ -19,4 +19,6 @@ public interface PrestoSparkTaskExecutorFactoryProvider extends Serializable { IPrestoSparkTaskExecutorFactory get(); + + IPrestoSparkTaskExecutorFactory getNative(); } diff --git a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskProcessor.java b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskProcessor.java index 6a4979a6b2604..288f9e291a373 100644 --- a/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskProcessor.java +++ b/presto-spark-classloader-interface/src/main/java/com/facebook/presto/spark/classloader_interface/PrestoSparkTaskProcessor.java @@ -79,7 +79,7 @@ public Iterator> process( Map shuffleReadDescriptors, Optional shuffleWriteDescriptor) { - return taskExecutorFactoryProvider.get().create( + return taskExecutorFactoryProvider.getNative().create( TaskContext.get().partitionId(), TaskContext.get().attemptNumber(), serializedTaskDescriptor, diff --git a/presto-spark-launcher/src/main/java/com/facebook/presto/spark/launcher/PrestoSparkRunner.java b/presto-spark-launcher/src/main/java/com/facebook/presto/spark/launcher/PrestoSparkRunner.java index 1802e5704a0f7..71f2fb3d72f0e 100644 --- a/presto-spark-launcher/src/main/java/com/facebook/presto/spark/launcher/PrestoSparkRunner.java +++ b/presto-spark-launcher/src/main/java/com/facebook/presto/spark/launcher/PrestoSparkRunner.java @@ -297,6 +297,14 @@ public IPrestoSparkTaskExecutorFactory get() return prestoSparkService.getTaskExecutorFactory(); } + @Override + public IPrestoSparkTaskExecutorFactory getNative() + { + checkState(TaskContext.get() != null, "this method is expected to be called only from the main task thread on the spark executor"); + IPrestoSparkService prestoSparkService = getOrCreatePrestoSparkService(); + return prestoSparkService.getNativeTaskExecutorFactory(); + } + private static IPrestoSparkService service; private static String currentPackagePath; private static Map currentConfigProperties;