Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.catalyst.trees

import java.io.Writer
import java.util.UUID

import scala.collection.Map
import scala.reflect.ClassTag

import org.apache.commons.io.output.StringBuilderWriter
import org.apache.commons.lang3.ClassUtils
import org.json4s.JsonAST._
import org.json4s.JsonDSL._
Expand Down Expand Up @@ -469,7 +471,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
def treeString: String = treeString(verbose = true)

def treeString(verbose: Boolean, addSuffix: Boolean = false): String = {
generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix = addSuffix).toString
val writer = new StringBuilderWriter()
try {
treeString(writer, verbose, addSuffix)
writer.toString
} finally {
writer.close()
}
}

def treeString(
writer: Writer,
verbose: Boolean,
addSuffix: Boolean): Unit = {
generateTreeString(0, Nil, writer, verbose, "", addSuffix)
Copy link
Member

@wangyum wangyum Nov 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add another function only save nodeName? We can use it here: #22879

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If #22879 is merged first, we should add that function here. If this one is merged first, that PR better should have the function there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to avoid overcomplicating the PR again, frankly speaking.

}

/**
Expand Down Expand Up @@ -521,7 +536,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
protected def innerChildren: Seq[TreeNode[_]] = Seq.empty

/**
* Appends the string representation of this node and its children to the given StringBuilder.
* Appends the string representation of this node and its children to the given Writer.
*
* The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at
* depth `i + 1` is the last child of its own parent node. The depth of the root node is 0, and
Expand All @@ -532,44 +547,42 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
builder: StringBuilder,
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
addSuffix: Boolean = false): Unit = {

if (depth > 0) {
lastChildren.init.foreach { isLast =>
builder.append(if (isLast) " " else ": ")
writer.write(if (isLast) " " else ": ")
}
builder.append(if (lastChildren.last) "+- " else ":- ")
writer.write(if (lastChildren.last) "+- " else ":- ")
}

val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix else verboseString
} else {
simpleString
}
builder.append(prefix)
builder.append(str)
builder.append("\n")
writer.write(prefix)
writer.write(str)
writer.write("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose,
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
addSuffix = addSuffix))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose,
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
addSuffix = addSuffix)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, builder, verbose, prefix, addSuffix))
depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix))
children.last.generateTreeString(
depth + 1, lastChildren :+ true, builder, verbose, prefix, addSuffix)
depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix)
}

builder
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.spark.sql.execution

import java.io.{BufferedWriter, OutputStreamWriter, Writer}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import org.apache.commons.io.output.StringBuilderWriter
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -189,23 +193,38 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
""".stripMargin.trim
}

private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
try f(writer)
catch {
case e: AnalysisException => writer.write(e.toString)
}
}

private def writePlans(writer: Writer): Unit = {
val (verbose, addSuffix) = (true, false)

writer.write("== Parsed Logical Plan ==\n")
writeOrError(writer)(logical.treeString(_, verbose, addSuffix))
writer.write("\n== Analyzed Logical Plan ==\n")
val analyzedOutput = stringOrError(Utils.truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", "))
writer.write(analyzedOutput)
writer.write("\n")
writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix))
writer.write("\n== Optimized Logical Plan ==\n")
writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix))
writer.write("\n== Physical Plan ==\n")
writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix))
}

override def toString: String = withRedaction {
def output = Utils.truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
val analyzedPlan = Seq(
stringOrError(output),
stringOrError(analyzed.treeString(verbose = true))
).filter(_.nonEmpty).mkString("\n")

s"""== Parsed Logical Plan ==
|${stringOrError(logical.treeString(verbose = true))}
|== Analyzed Logical Plan ==
|$analyzedPlan
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan.treeString(verbose = true))}
|== Physical Plan ==
|${stringOrError(executedPlan.treeString(verbose = true))}
""".stripMargin.trim
val writer = new StringBuilderWriter()
try {
writePlans(writer)
writer.toString
} finally {
writer.close()
}
}

def stringWithStats: String = withRedaction {
Expand Down Expand Up @@ -250,5 +269,22 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
def codegenToSeq(): Seq[(String, String)] = {
org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
}

/**
* Dumps debug information about query execution into the specified file.
*/
def toFile(path: String): Unit = {
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))

try {
writePlans(writer)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
} finally {
writer.close()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution

import java.io.Writer
import java.util.Locale
import java.util.function.Supplier

Expand Down Expand Up @@ -450,11 +451,11 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
builder: StringBuilder,
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
child.generateTreeString(depth, lastChildren, builder, verbose, "")
addSuffix: Boolean = false): Unit = {
child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false)
}

override def needCopyResult: Boolean = false
Expand Down Expand Up @@ -726,11 +727,11 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
builder: StringBuilder,
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
child.generateTreeString(depth, lastChildren, builder, verbose, s"*($codegenStageId) ")
addSuffix: Boolean = false): Unit = {
child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false)
}

override def needStopCheck: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.execution

import java.io.Writer
import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.commons.io.output.StringBuilderWriter

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
Expand All @@ -30,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, Codegen
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}

Expand Down Expand Up @@ -70,15 +72,25 @@ package object debug {
* @return single String containing all WholeStageCodegen subtrees and corresponding codegen
*/
def codegenString(plan: SparkPlan): String = {
val writer = new StringBuilderWriter()

try {
writeCodegen(writer, plan)
writer.toString
} finally {
writer.close()
}
}

def writeCodegen(writer: Writer, plan: SparkPlan): Unit = {
val codegenSeq = codegenStringSeq(plan)
var output = s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n"
writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
output += s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n"
output += subtree
output += "\nGenerated code:\n"
output += s"${code}\n"
writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
writer.write(subtree)
writer.write("\nGenerated code:\n")
writer.write(s"${code}\n")
}
output
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,70 @@
*/
package org.apache.spark.sql.execution

import scala.io.Source

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.test.SharedSQLContext

class QueryExecutionSuite extends SharedSQLContext {
def checkDumpedPlans(path: String, expected: Int): Unit = {
assert(Source.fromFile(path).getLines.toList
.takeWhile(_ != "== Whole Stage Codegen ==") == List(
"== Parsed Logical Plan ==",
s"Range (0, $expected, step=1, splits=Some(2))",
"",
"== Analyzed Logical Plan ==",
"id: bigint",
s"Range (0, $expected, step=1, splits=Some(2))",
"",
"== Optimized Logical Plan ==",
s"Range (0, $expected, step=1, splits=Some(2))",
"",
"== Physical Plan ==",
s"*(1) Range (0, $expected, step=1, splits=2)",
""))
}
test("dumping query execution info to a file") {
withTempDir { dir =>
val path = dir.getCanonicalPath + "/plans.txt"
val df = spark.range(0, 10)
df.queryExecution.debug.toFile(path)

checkDumpedPlans(path, expected = 10)
}
}

test("dumping query execution info to an existing file") {
withTempDir { dir =>
val path = dir.getCanonicalPath + "/plans.txt"
val df = spark.range(0, 10)
df.queryExecution.debug.toFile(path)

val df2 = spark.range(0, 1)
df2.queryExecution.debug.toFile(path)
checkDumpedPlans(path, expected = 1)
}
}

test("dumping query execution info to non-existing folder") {
withTempDir { dir =>
val path = dir.getCanonicalPath + "/newfolder/plans.txt"
val df = spark.range(0, 100)
df.queryExecution.debug.toFile(path)
checkDumpedPlans(path, expected = 100)
}
}

test("dumping query execution info by invalid path") {
val path = "1234567890://plans.txt"
val exception = intercept[IllegalArgumentException] {
spark.range(0, 100).queryExecution.debug.toFile(path)
}

assert(exception.getMessage.contains("Illegal character in scheme name"))
}

test("toString() exception/error handling") {
spark.experimental.extraStrategies = Seq(
new SparkStrategy {
Expand Down