-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26129][SQL] Instrumentation for per-query planning time #23096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark.util.BoundedPriorityQueue | ||
|
|
||
|
|
||
| /** | ||
| * A simple utility for tracking runtime and associated stats in query planning. | ||
| * | ||
| * There are two separate concepts we track: | ||
| * | ||
| * 1. Phases: These are broad scope phases in query planning, as listed below, i.e. analysis, | ||
| * optimizationm and physical planning (just planning). | ||
| * | ||
| * 2. Rules: These are the individual Catalyst rules that we track. In addition to time, we also | ||
| * track the number of invocations and effective invocations. | ||
| */ | ||
| object QueryPlanningTracker { | ||
|
|
||
| // Define a list of common phases here. | ||
| val PARSING = "parsing" | ||
| val ANALYSIS = "analysis" | ||
| val OPTIMIZATION = "optimization" | ||
| val PLANNING = "planning" | ||
|
|
||
| class RuleSummary( | ||
| var totalTimeNs: Long, var numInvocations: Long, var numEffectiveInvocations: Long) { | ||
|
|
||
| def this() = this(totalTimeNs = 0, numInvocations = 0, numEffectiveInvocations = 0) | ||
|
|
||
| override def toString: String = { | ||
| s"RuleSummary($totalTimeNs, $numInvocations, $numEffectiveInvocations)" | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| class QueryPlanningTracker { | ||
|
|
||
| import QueryPlanningTracker._ | ||
|
|
||
| // Mapping from the name of a rule to a rule's summary. | ||
| // Use a Java HashMap for less overhead. | ||
| private val rulesMap = new java.util.HashMap[String, RuleSummary] | ||
|
|
||
| // From a phase to time in ns. | ||
| private val phaseToTimeNs = new java.util.HashMap[String, Long] | ||
|
|
||
| /** Measure the runtime of function f, and add it to the time for the specified phase. */ | ||
| def measureTime[T](phase: String)(f: => T): T = { | ||
| val startTime = System.nanoTime() | ||
| val ret = f | ||
| val timeTaken = System.nanoTime() - startTime | ||
| phaseToTimeNs.put(phase, phaseToTimeNs.getOrDefault(phase, 0) + timeTaken) | ||
| ret | ||
| } | ||
|
|
||
| /** | ||
| * Reecord a specific invocation of a rule. | ||
|
||
| * | ||
| * @param rule name of the rule | ||
| * @param timeNs time taken to run this invocation | ||
| * @param effective whether the invocation has resulted in a plan change | ||
| */ | ||
| def recordRuleInvocation(rule: String, timeNs: Long, effective: Boolean): Unit = { | ||
| var s = rulesMap.get(rule) | ||
| if (s eq null) { | ||
| s = new RuleSummary | ||
| rulesMap.put(rule, s) | ||
| } | ||
|
|
||
| s.totalTimeNs += timeNs | ||
| s.numInvocations += 1 | ||
| s.numEffectiveInvocations += (if (effective) 1 else 0) | ||
| } | ||
|
|
||
| // ------------ reporting functions below ------------ | ||
|
|
||
| def rules: Map[String, RuleSummary] = rulesMap.asScala.toMap | ||
|
|
||
| def phases: Map[String, Long] = phaseToTimeNs.asScala.toMap | ||
|
|
||
| /** Returns the top k most expensive rules (as measured by time). */ | ||
| def topRulesByTime(k: Int): Seq[(String, RuleSummary)] = { | ||
| val orderingByTime: Ordering[(String, RuleSummary)] = Ordering.by(e => e._2.totalTimeNs) | ||
| val q = new BoundedPriorityQueue(k)(orderingByTime) | ||
| rulesMap.asScala.foreach(q.+=) | ||
| q.toSeq.sortBy(r => -r._2.totalTimeNs) | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,29 +102,34 @@ class Analyzer( | |
| this(catalog, conf, conf.optimizerMaxIterations) | ||
| } | ||
|
|
||
| def executeAndCheck(plan: LogicalPlan): LogicalPlan = AnalysisHelper.markInAnalyzer { | ||
| val analyzed = execute(plan) | ||
| try { | ||
| checkAnalysis(analyzed) | ||
| analyzed | ||
| } catch { | ||
| case e: AnalysisException => | ||
| val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed)) | ||
| ae.setStackTrace(e.getStackTrace) | ||
| throw ae | ||
| def executeAndCheck(plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { | ||
| AnalysisHelper.markInAnalyzer { | ||
| val analyzed = execute(plan, tracker) | ||
| try { | ||
| checkAnalysis(analyzed) | ||
| analyzed | ||
| } catch { | ||
| case e: AnalysisException => | ||
| val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed)) | ||
| ae.setStackTrace(e.getStackTrace) | ||
| throw ae | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def execute(plan: LogicalPlan): LogicalPlan = { | ||
| override def execute(plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { | ||
| AnalysisContext.reset() | ||
| try { | ||
| executeSameContext(plan) | ||
| executeSameContext(plan, tracker) | ||
| } finally { | ||
| AnalysisContext.reset() | ||
| } | ||
| } | ||
|
|
||
| private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan) | ||
| private def executeSameContext( | ||
| plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { | ||
| super.execute(plan, tracker) | ||
| } | ||
|
|
||
| def resolver: Resolver = conf.resolver | ||
|
|
||
|
|
@@ -211,7 +216,7 @@ class Analyzer( | |
| case With(child, relations) => | ||
| substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) { | ||
| case (resolved, (name, relation)) => | ||
| resolved :+ name -> executeSameContext(substituteCTE(relation, resolved)) | ||
| resolved :+ name -> executeSameContext(substituteCTE(relation, resolved), None) | ||
|
||
| }) | ||
| case other => other | ||
| } | ||
|
|
@@ -696,7 +701,7 @@ class Analyzer( | |
| s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " + | ||
| "around this.") | ||
| } | ||
| executeSameContext(child) | ||
| executeSameContext(child, None) | ||
|
||
| } | ||
| view.copy(child = newChild) | ||
| case p @ SubqueryAlias(_, view: View) => | ||
|
|
@@ -1405,7 +1410,7 @@ class Analyzer( | |
| do { | ||
| // Try to resolve the subquery plan using the regular analyzer. | ||
| previous = current | ||
| current = executeSameContext(current) | ||
| current = executeSameContext(current, None) | ||
|
|
||
| // Use the outer references to resolve the subquery plan if it isn't resolved yet. | ||
| val i = plans.iterator | ||
|
|
@@ -1527,7 +1532,7 @@ class Analyzer( | |
| grouping, | ||
| Alias(cond, "havingCondition")() :: Nil, | ||
| child) | ||
| val resolvedOperator = executeSameContext(aggregatedCondition) | ||
| val resolvedOperator = executeSameContext(aggregatedCondition, None) | ||
| def resolvedAggregateFilter = | ||
| resolvedOperator | ||
| .asInstanceOf[Aggregate] | ||
|
|
@@ -1588,7 +1593,7 @@ class Analyzer( | |
| unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")()) | ||
| val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) | ||
| val resolvedAggregate: Aggregate = | ||
| executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate] | ||
| executeSameContext(aggregatedOrdering, None).asInstanceOf[Aggregate] | ||
| val resolvedAliasedOrdering: Seq[Alias] = | ||
| resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]] | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.spark.sql.catalyst.rules | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.catalyst.QueryPlanningTracker | ||
| import org.apache.spark.sql.catalyst.errors.TreeNodeException | ||
| import org.apache.spark.sql.catalyst.trees.TreeNode | ||
| import org.apache.spark.sql.catalyst.util.sideBySide | ||
|
|
@@ -66,11 +67,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { | |
| */ | ||
| protected def isPlanIntegral(plan: TreeType): Boolean = true | ||
|
|
||
| /** A special overload (as opposed to default parameter values) to allow execute in a closure. */ | ||
| def execute(plan: TreeType): TreeType = execute(plan, None) | ||
|
|
||
| /** | ||
| * Executes the batches of rules defined by the subclass. The batches are executed serially | ||
| * using the defined execution strategy. Within each batch, rules are also executed serially. | ||
| */ | ||
| def execute(plan: TreeType): TreeType = { | ||
| def execute(plan: TreeType, tracker: Option[QueryPlanningTracker]): TreeType = { | ||
| var curPlan = plan | ||
| val queryExecutionMetrics = RuleExecutor.queryExecutionMeter | ||
| val planChangeLogger = new PlanChangeLogger() | ||
|
|
@@ -88,15 +92,18 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { | |
| val startTime = System.nanoTime() | ||
| val result = rule(plan) | ||
| val runTime = System.nanoTime() - startTime | ||
| val effective = !result.fastEquals(plan) | ||
|
|
||
| if (!result.fastEquals(plan)) { | ||
| if (effective) { | ||
| queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) | ||
| queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) | ||
| planChangeLogger.log(rule.ruleName, plan, result) | ||
| } | ||
| queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) | ||
| queryExecutionMetrics.incNumExecution(rule.ruleName) | ||
|
|
||
| tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't this make the query-local and the global metrics inconsistent when tracker is None?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes! (not great -- but I'd probably remove the global tracker at some point) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removing the global tracker would be great! |
||
|
|
||
| // Run the structural integrity checker against the plan after each rule. | ||
| if (!isPlanIntegral(result)) { | ||
| val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
|
|
||
| class QueryPlanningTrackerSuite extends SparkFunSuite { | ||
|
|
||
| test("phases") { | ||
| val t = new QueryPlanningTracker | ||
| t.measureTime("p1") { | ||
| Thread.sleep(1) | ||
| } | ||
|
|
||
| assert(t.phases("p1") > 0) | ||
| assert(!t.phases.contains("p2")) | ||
|
|
||
| val old = t.phases("p1") | ||
|
|
||
| t.measureTime("p1") { | ||
| Thread.sleep(1) | ||
| } | ||
| assert(t.phases("p1") > old) | ||
| } | ||
|
|
||
| test("rules") { | ||
| val t = new QueryPlanningTracker | ||
| t.recordRuleInvocation("r1", 1, effective = false) | ||
| t.recordRuleInvocation("r2", 2, effective = true) | ||
| t.recordRuleInvocation("r3", 1, effective = false) | ||
| t.recordRuleInvocation("r3", 2, effective = true) | ||
|
|
||
| val rules = t.rules | ||
|
|
||
| assert(rules("r1").totalTimeNs == 1) | ||
| assert(rules("r1").numInvocations == 1) | ||
| assert(rules("r1").numEffectiveInvocations == 0) | ||
|
|
||
| assert(rules("r2").totalTimeNs == 2) | ||
| assert(rules("r2").numInvocations == 1) | ||
| assert(rules("r2").numEffectiveInvocations == 1) | ||
|
|
||
| assert(rules("r3").totalTimeNs == 3) | ||
| assert(rules("r3").numInvocations == 2) | ||
| assert(rules("r3").numEffectiveInvocations == 1) | ||
| } | ||
|
|
||
| test("topRulesByTime") { | ||
| val t = new QueryPlanningTracker | ||
| t.recordRuleInvocation("r2", 2, effective = true) | ||
| t.recordRuleInvocation("r4", 4, effective = true) | ||
| t.recordRuleInvocation("r1", 1, effective = false) | ||
| t.recordRuleInvocation("r3", 3, effective = false) | ||
|
|
||
| val top = t.topRulesByTime(2) | ||
| assert(top.size == 2) | ||
| assert(top(0)._1 == "r4") | ||
| assert(top(1)._1 == "r3") | ||
|
|
||
| // Don't crash when k > total size | ||
| assert(t.topRulesByTime(10).size == 4) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly because Scala enum is not great, and I was thinking about making this a generic thing that's extensible.