diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 1df812d1aa809..89915d254883d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -22,6 +22,7 @@ import java.util.UUID import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} @@ -50,7 +51,7 @@ import org.apache.spark.util.Utils class QueryExecution( val sparkSession: SparkSession, val logical: LogicalPlan, - val tracker: QueryPlanningTracker = new QueryPlanningTracker) { + val tracker: QueryPlanningTracker = new QueryPlanningTracker) extends Logging { // TODO: Move the planner an optimizer into here from SessionState. protected def planner = sparkSession.sessionState.planner @@ -133,26 +134,42 @@ class QueryExecution( tracker.measurePhase(phase)(block) } - def simpleString: String = simpleString(false) - - def simpleString(formatted: Boolean): String = withRedaction { + def simpleString: String = { val concat = new PlanStringConcat() - concat.append("== Physical Plan ==\n") + simpleString(false, SQLConf.get.maxToStringFields, concat.append) + withRedaction { + concat.toString + } + } + + private def simpleString( + formatted: Boolean, + maxFields: Int, + append: String => Unit): Unit = { + append("== Physical Plan ==\n") if (formatted) { try { - ExplainUtils.processPlan(executedPlan, concat.append) + ExplainUtils.processPlan(executedPlan, append) } catch { - case e: AnalysisException => concat.append(e.toString) - case e: IllegalArgumentException => concat.append(e.toString) + case e: AnalysisException => append(e.toString) + case e: IllegalArgumentException => append(e.toString) } } else { - QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) + QueryPlan.append(executedPlan, + append, verbose = false, addSuffix = false, maxFields = maxFields) } - concat.append("\n") - concat.toString + append("\n") } def explainString(mode: ExplainMode): String = { + val concat = new PlanStringConcat() + explainString(mode, SQLConf.get.maxToStringFields, concat.append) + withRedaction { + concat.toString + } + } + + private def explainString(mode: ExplainMode, maxFields: Int, append: String => Unit): Unit = { val queryExecution = if (logical.isStreaming) { // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the // output mode does not matter since there is no `Sink`. @@ -165,19 +182,19 @@ class QueryExecution( mode match { case SimpleMode => - queryExecution.simpleString + queryExecution.simpleString(false, maxFields, append) case ExtendedMode => - queryExecution.toString + queryExecution.toString(maxFields, append) case CodegenMode => try { - org.apache.spark.sql.execution.debug.codegenString(queryExecution.executedPlan) + org.apache.spark.sql.execution.debug.writeCodegen(append, queryExecution.executedPlan) } catch { - case e: AnalysisException => e.toString + case e: AnalysisException => append(e.toString) } case CostMode => - queryExecution.stringWithStats + queryExecution.stringWithStats(maxFields, append) case FormattedMode => - queryExecution.simpleString(formatted = true) + queryExecution.simpleString(formatted = true, maxFields = maxFields, append) } } @@ -204,27 +221,39 @@ class QueryExecution( override def toString: String = withRedaction { val concat = new PlanStringConcat() - writePlans(concat.append, SQLConf.get.maxToStringFields) - concat.toString + toString(SQLConf.get.maxToStringFields, concat.append) + withRedaction { + concat.toString + } + } + + private def toString(maxFields: Int, append: String => Unit): Unit = { + writePlans(append, maxFields) } - def stringWithStats: String = withRedaction { + def stringWithStats: String = { val concat = new PlanStringConcat() + stringWithStats(SQLConf.get.maxToStringFields, concat.append) + withRedaction { + concat.toString + } + } + + private def stringWithStats(maxFields: Int, append: String => Unit): Unit = { val maxFields = SQLConf.get.maxToStringFields // trigger to compute stats for logical plans try { optimizedPlan.stats } catch { - case e: AnalysisException => concat.append(e.toString + "\n") + case e: AnalysisException => append(e.toString + "\n") } // only show optimized logical plan and physical plan - concat.append("== Optimized Logical Plan ==\n") - QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields) - concat.append("\n== Physical Plan ==\n") - QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields) - concat.append("\n") - concat.toString + append("== Optimized Logical Plan ==\n") + QueryPlan.append(optimizedPlan, append, verbose = true, addSuffix = true, maxFields) + append("\n== Physical Plan ==\n") + QueryPlan.append(executedPlan, append, verbose = true, addSuffix = false, maxFields) + append("\n") } /** @@ -261,19 +290,26 @@ class QueryExecution( /** * Dumps debug information about query execution into the specified file. * + * @param path path of the file the debug info is written to. * @param maxFields maximum number of fields converted to string representation. + * @param explainMode the explain mode to be used to generate the string + * representation of the plan. */ - def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = { + def toFile( + path: String, + maxFields: Int = Int.MaxValue, + explainMode: Option[String] = None): Unit = { val filePath = new Path(path) val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) - val append = (s: String) => { - writer.write(s) - } try { - writePlans(append, maxFields) - writer.write("\n== Whole Stage Codegen ==\n") - org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan) + val mode = explainMode.map(ExplainMode.fromString(_)).getOrElse(ExtendedMode) + explainString(mode, maxFields, writer.write) + if (mode != CodegenMode) { + writer.write("\n== Whole Stage Codegen ==\n") + org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan) + } + log.info(s"Debug information was written at: $filePath") } finally { writer.close() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index eca39f3f81726..5c35cedba9bab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -53,6 +53,7 @@ class QueryExecutionSuite extends SharedSparkSession { s"*(1) Range (0, $expected, step=1, splits=2)", "")) } + test("dumping query execution info to a file") { withTempDir { dir => val path = dir.getCanonicalPath + "/plans.txt" @@ -93,6 +94,25 @@ class QueryExecutionSuite extends SharedSparkSession { assert(exception.getMessage.contains("Illegal character in scheme name")) } + test("dumping query execution info to a file - explainMode=formatted") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val df = spark.range(0, 10) + df.queryExecution.debug.toFile(path, explainMode = Option("formatted")) + assert(Source.fromFile(path).getLines.toList + .takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List( + "== Physical Plan ==", + s"* Range (1)", + "", + "", + s"(1) Range [codegen id : 1]", + "Output [1]: [id#xL]", + s"Arguments: Range (0, 10, step=1, splits=Some(2))", + "", + "")) + } + } + test("limit number of fields by sql config") { def relationPlans: String = { val ds = spark.createDataset(Seq(QueryExecutionTestRecord(