Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import scala.collection.JavaConverters._

import org.apache.spark.util.BoundedPriorityQueue


/**
* A simple utility for tracking runtime and associated stats in query planning.
*
* There are two separate concepts we track:
*
* 1. Phases: These are broad scope phases in query planning, as listed below, i.e. analysis,
* optimizationm and physical planning (just planning).
*
* 2. Rules: These are the individual Catalyst rules that we track. In addition to time, we also
* track the number of invocations and effective invocations.
*/
object QueryPlanningTracker {

// Define a list of common phases here.
val PARSING = "parsing"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly because Scala enum is not great, and I was thinking about making this a generic thing that's extensible.

val ANALYSIS = "analysis"
val OPTIMIZATION = "optimization"
val PLANNING = "planning"

class RuleSummary(
var totalTimeNs: Long, var numInvocations: Long, var numEffectiveInvocations: Long) {

def this() = this(totalTimeNs = 0, numInvocations = 0, numEffectiveInvocations = 0)

override def toString: String = {
s"RuleSummary($totalTimeNs, $numInvocations, $numEffectiveInvocations)"
}
}

/**
* A thread local variable to implicitly pass the tracker around. This assumes the query planner
* is single-threaded, and avoids passing the same tracker context in every function call.
*/
private val localTracker = new ThreadLocal[QueryPlanningTracker]() {
override def initialValue: QueryPlanningTracker = null
}

/** Returns the current tracker in scope, based on the thread local variable. */
def get: Option[QueryPlanningTracker] = Option(localTracker.get())

/** Sets the current tracker for the execution of function f. We assume f is single-threaded. */
def withTracker[T](tracker: QueryPlanningTracker)(f: => T): T = {
val originalTracker = localTracker.get()
localTracker.set(tracker)
try f finally { localTracker.set(originalTracker) }
}
}


class QueryPlanningTracker {

import QueryPlanningTracker._

// Mapping from the name of a rule to a rule's summary.
// Use a Java HashMap for less overhead.
private val rulesMap = new java.util.HashMap[String, RuleSummary]

// From a phase to time in ns.
private val phaseToTimeNs = new java.util.HashMap[String, Long]

/** Measure the runtime of function f, and add it to the time for the specified phase. */
def measureTime[T](phase: String)(f: => T): T = {
val startTime = System.nanoTime()
val ret = f
val timeTaken = System.nanoTime() - startTime
phaseToTimeNs.put(phase, phaseToTimeNs.getOrDefault(phase, 0) + timeTaken)
ret
}

/**
* Record a specific invocation of a rule.
*
* @param rule name of the rule
* @param timeNs time taken to run this invocation
* @param effective whether the invocation has resulted in a plan change
*/
def recordRuleInvocation(rule: String, timeNs: Long, effective: Boolean): Unit = {
var s = rulesMap.get(rule)
if (s eq null) {
s = new RuleSummary
rulesMap.put(rule, s)
}

s.totalTimeNs += timeNs
s.numInvocations += 1
s.numEffectiveInvocations += (if (effective) 1 else 0)
}

// ------------ reporting functions below ------------

def rules: Map[String, RuleSummary] = rulesMap.asScala.toMap

def phases: Map[String, Long] = phaseToTimeNs.asScala.toMap

/** Returns the top k most expensive rules (as measured by time). */
def topRulesByTime(k: Int): Seq[(String, RuleSummary)] = {
val orderingByTime: Ordering[(String, RuleSummary)] = Ordering.by(e => e._2.totalTimeNs)
val q = new BoundedPriorityQueue(k)(orderingByTime)
rulesMap.asScala.foreach(q.+=)
q.toSeq.sortBy(r => -r._2.totalTimeNs)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,18 @@ class Analyzer(
this(catalog, conf, conf.optimizerMaxIterations)
}

def executeAndCheck(plan: LogicalPlan): LogicalPlan = AnalysisHelper.markInAnalyzer {
val analyzed = execute(plan)
try {
checkAnalysis(analyzed)
analyzed
} catch {
case e: AnalysisException =>
val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
AnalysisHelper.markInAnalyzer {
val analyzed = executeAndTrack(plan, tracker)
try {
checkAnalysis(analyzed)
analyzed
} catch {
case e: AnalysisException =>
val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.rules

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.sideBySide
Expand Down Expand Up @@ -66,6 +67,17 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
*/
protected def isPlanIntegral(plan: TreeType): Boolean = true

/**
* Executes the batches of rules defined by the subclass, and also tracks timing info for each
* rule using the provided tracker.
* @see [[execute]]
*/
def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = {
QueryPlanningTracker.withTracker(tracker) {
execute(plan)
}
}

/**
* Executes the batches of rules defined by the subclass. The batches are executed serially
* using the defined execution strategy. Within each batch, rules are also executed serially.
Expand All @@ -74,6 +86,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger()
val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get

batches.foreach { batch =>
val batchStartPlan = curPlan
Expand All @@ -88,15 +101,19 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime
val effective = !result.fastEquals(plan)

if (!result.fastEquals(plan)) {
if (effective) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
planChangeLogger.log(rule.ruleName, plan, result)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)

// Record timing information using QueryPlanningTracker
tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this make the query-local and the global metrics inconsistent when tracker is None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! (not great -- but I'd probably remove the global tracker at some point)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing the global tracker would be great!


// Run the structural integrity checker against the plan after each rule.
if (!isPlanIntegral(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import org.apache.spark.SparkFunSuite

class QueryPlanningTrackerSuite extends SparkFunSuite {

test("phases") {
val t = new QueryPlanningTracker
t.measureTime("p1") {
Thread.sleep(1)
}

assert(t.phases("p1") > 0)
assert(!t.phases.contains("p2"))

val old = t.phases("p1")

t.measureTime("p1") {
Thread.sleep(1)
}
assert(t.phases("p1") > old)
}

test("rules") {
val t = new QueryPlanningTracker
t.recordRuleInvocation("r1", 1, effective = false)
t.recordRuleInvocation("r2", 2, effective = true)
t.recordRuleInvocation("r3", 1, effective = false)
t.recordRuleInvocation("r3", 2, effective = true)

val rules = t.rules

assert(rules("r1").totalTimeNs == 1)
assert(rules("r1").numInvocations == 1)
assert(rules("r1").numEffectiveInvocations == 0)

assert(rules("r2").totalTimeNs == 2)
assert(rules("r2").numInvocations == 1)
assert(rules("r2").numEffectiveInvocations == 1)

assert(rules("r3").totalTimeNs == 3)
assert(rules("r3").numInvocations == 2)
assert(rules("r3").numEffectiveInvocations == 1)
}

test("topRulesByTime") {
val t = new QueryPlanningTracker
t.recordRuleInvocation("r2", 2, effective = true)
t.recordRuleInvocation("r4", 4, effective = true)
t.recordRuleInvocation("r1", 1, effective = false)
t.recordRuleInvocation("r3", 3, effective = false)

val top = t.topRulesByTime(2)
assert(top.size == 2)
assert(top(0)._1 == "r4")
assert(top(1)._1 == "r3")

// Don't crash when k > total size
assert(t.topRulesByTime(10).size == 4)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.net.URI
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -54,7 +55,7 @@ trait AnalysisTest extends PlanTest {
expectedPlan: LogicalPlan,
caseSensitive: Boolean = true): Unit = {
val analyzer = getAnalyzer(caseSensitive)
val actualPlan = analyzer.executeAndCheck(inputPlan)
val actualPlan = analyzer.executeAndCheck(inputPlan, new QueryPlanningTracker)
comparePlans(actualPlan, expectedPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import java.util.TimeZone

import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -109,7 +110,7 @@ class ResolveGroupingAnalyticsSuite extends AnalysisTest {
Seq(UnresolvedAlias(Multiply(unresolved_a, Literal(2))),
unresolved_b, UnresolvedAlias(count(unresolved_c))))

val resultPlan = getAnalyzer(true).executeAndCheck(originalPlan2)
val resultPlan = getAnalyzer(true).executeAndCheck(originalPlan2, new QueryPlanningTracker)
val gExpressions = resultPlan.asInstanceOf[Aggregate].groupingExpressions
assert(gExpressions.size == 3)
val firstGroupingExprAttrName =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -34,6 +35,7 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest {
private lazy val uuid3 = Uuid().as('_uuid3)
private lazy val uuid1Ref = uuid1.toAttribute

private val tracker = new QueryPlanningTracker
private val analyzer = getAnalyzer(caseSensitive = true)

private def getUuidExpressions(plan: LogicalPlan): Seq[Uuid] = {
Expand All @@ -47,7 +49,7 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest {

test("analyzed plan sets random seed for Uuid expression") {
val plan = r.select(a, uuid1)
val resolvedPlan = analyzer.executeAndCheck(plan)
val resolvedPlan = analyzer.executeAndCheck(plan, tracker)
getUuidExpressions(resolvedPlan).foreach { u =>
assert(u.resolved)
assert(u.randomSeed.isDefined)
Expand All @@ -56,14 +58,14 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest {

test("Uuid expressions should have different random seeds") {
val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3)
val resolvedPlan = analyzer.executeAndCheck(plan)
val resolvedPlan = analyzer.executeAndCheck(plan, tracker)
assert(getUuidExpressions(resolvedPlan).map(_.randomSeed.get).distinct.length == 3)
}

test("Different analyzed plans should have different random seeds in Uuids") {
val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3)
val resolvedPlan1 = analyzer.executeAndCheck(plan)
val resolvedPlan2 = analyzer.executeAndCheck(plan)
val resolvedPlan1 = analyzer.executeAndCheck(plan, tracker)
val resolvedPlan2 = analyzer.executeAndCheck(plan, tracker)
val uuids1 = getUuidExpressions(resolvedPlan1)
val uuids2 = getUuidExpressions(resolvedPlan2)
assert(uuids1.distinct.length == 3)
Expand Down
9 changes: 9 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.encoders._
Expand Down Expand Up @@ -76,6 +77,14 @@ private[sql] object Dataset {
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}

/** A variant of ofRows that allows passing in a tracker so we can track query parsing time. */
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
: DataFrame = {
val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
}

/**
Expand Down
Loading