Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID

import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
Expand Down Expand Up @@ -50,7 +51,7 @@ import org.apache.spark.util.Utils
class QueryExecution(
val sparkSession: SparkSession,
val logical: LogicalPlan,
val tracker: QueryPlanningTracker = new QueryPlanningTracker) {
val tracker: QueryPlanningTracker = new QueryPlanningTracker) extends Logging {

// TODO: Move the planner an optimizer into here from SessionState.
protected def planner = sparkSession.sessionState.planner
Expand Down Expand Up @@ -133,26 +134,42 @@ class QueryExecution(
tracker.measurePhase(phase)(block)
}

def simpleString: String = simpleString(false)

def simpleString(formatted: Boolean): String = withRedaction {
def simpleString: String = {
val concat = new PlanStringConcat()
concat.append("== Physical Plan ==\n")
simpleString(false, SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
}

private def simpleString(
formatted: Boolean,
maxFields: Int,
append: String => Unit): Unit = {
append("== Physical Plan ==\n")
if (formatted) {
try {
ExplainUtils.processPlan(executedPlan, concat.append)
ExplainUtils.processPlan(executedPlan, append)
} catch {
case e: AnalysisException => concat.append(e.toString)
case e: IllegalArgumentException => concat.append(e.toString)
case e: AnalysisException => append(e.toString)
case e: IllegalArgumentException => append(e.toString)
}
} else {
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
QueryPlan.append(executedPlan,
append, verbose = false, addSuffix = false, maxFields = maxFields)
}
concat.append("\n")
concat.toString
append("\n")
}

def explainString(mode: ExplainMode): String = {
val concat = new PlanStringConcat()
explainString(mode, SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
}

private def explainString(mode: ExplainMode, maxFields: Int, append: String => Unit): Unit = {
val queryExecution = if (logical.isStreaming) {
// This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
// output mode does not matter since there is no `Sink`.
Expand All @@ -165,19 +182,19 @@ class QueryExecution(

mode match {
case SimpleMode =>
queryExecution.simpleString
queryExecution.simpleString(false, maxFields, append)
case ExtendedMode =>
queryExecution.toString
queryExecution.toString(maxFields, append)
case CodegenMode =>
try {
org.apache.spark.sql.execution.debug.codegenString(queryExecution.executedPlan)
org.apache.spark.sql.execution.debug.writeCodegen(append, queryExecution.executedPlan)
} catch {
case e: AnalysisException => e.toString
case e: AnalysisException => append(e.toString)
}
case CostMode =>
queryExecution.stringWithStats
queryExecution.stringWithStats(maxFields, append)
case FormattedMode =>
queryExecution.simpleString(formatted = true)
queryExecution.simpleString(formatted = true, maxFields = maxFields, append)
}
}

Expand All @@ -204,27 +221,39 @@ class QueryExecution(

override def toString: String = withRedaction {
val concat = new PlanStringConcat()
writePlans(concat.append, SQLConf.get.maxToStringFields)
concat.toString
toString(SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
}

private def toString(maxFields: Int, append: String => Unit): Unit = {
writePlans(append, maxFields)
}

def stringWithStats: String = withRedaction {
def stringWithStats: String = {
val concat = new PlanStringConcat()
stringWithStats(SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
}

private def stringWithStats(maxFields: Int, append: String => Unit): Unit = {
val maxFields = SQLConf.get.maxToStringFields

// trigger to compute stats for logical plans
try {
optimizedPlan.stats
} catch {
case e: AnalysisException => concat.append(e.toString + "\n")
case e: AnalysisException => append(e.toString + "\n")
}
// only show optimized logical plan and physical plan
concat.append("== Optimized Logical Plan ==\n")
QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields)
concat.append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields)
concat.append("\n")
concat.toString
append("== Optimized Logical Plan ==\n")
QueryPlan.append(optimizedPlan, append, verbose = true, addSuffix = true, maxFields)
append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, append, verbose = true, addSuffix = false, maxFields)
append("\n")
}

/**
Expand Down Expand Up @@ -261,19 +290,26 @@ class QueryExecution(
/**
* Dumps debug information about query execution into the specified file.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe path too.

* @param path path of the file the debug info is written to.
* @param maxFields maximum number of fields converted to string representation.
* @param explainMode the explain mode to be used to generate the string
* representation of the plan.
*/
def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
def toFile(
path: String,
maxFields: Int = Int.MaxValue,
explainMode: Option[String] = None): Unit = {
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
val append = (s: String) => {
writer.write(s)
}
try {
writePlans(append, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
val mode = explainMode.map(ExplainMode.fromString(_)).getOrElse(ExtendedMode)
explainString(mode, maxFields, writer.write)
if (mode != CodegenMode) {
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
}
log.info(s"Debug information was written at: $filePath")
} finally {
writer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class QueryExecutionSuite extends SharedSparkSession {
s"*(1) Range (0, $expected, step=1, splits=2)",
""))
}

test("dumping query execution info to a file") {
withTempDir { dir =>
val path = dir.getCanonicalPath + "/plans.txt"
Expand Down Expand Up @@ -93,6 +94,25 @@ class QueryExecutionSuite extends SharedSparkSession {
assert(exception.getMessage.contains("Illegal character in scheme name"))
}

test("dumping query execution info to a file - explainMode=formatted") {
withTempDir { dir =>
val path = dir.getCanonicalPath + "/plans.txt"
val df = spark.range(0, 10)
df.queryExecution.debug.toFile(path, explainMode = Option("formatted"))
assert(Source.fromFile(path).getLines.toList
.takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List(
"== Physical Plan ==",
s"* Range (1)",
"",
"",
s"(1) Range [codegen id : 1]",
"Output [1]: [id#xL]",
s"Arguments: Range (0, 10, step=1, splits=Some(2))",
"",
""))
}
}

test("limit number of fields by sql config") {
def relationPlans: String = {
val ds = spark.createDataset(Seq(QueryExecutionTestRecord(
Expand Down