Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -301,4 +302,20 @@ object QueryPlan extends PredicateHelper {
Nil
}
}

/**
* Converts the query plan to string and appends it via provided function.
*/
def append[T <: QueryPlan[T]](
plan: => QueryPlan[T],
append: String => Unit,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int = SQLConf.get.maxToStringFields): Unit = {
try {
plan.treeString(append, verbose, addSuffix, maxFields)
} catch {
case e: AnalysisException => append(e.toString)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.sql.catalyst.trees

import java.io.Writer
import java.util.UUID

import scala.collection.Map
import scala.reflect.ClassTag

import org.apache.commons.io.output.StringBuilderWriter
import org.apache.commons.lang3.ClassUtils
import org.json4s.JsonAST._
import org.json4s.JsonDSL._
Expand All @@ -37,6 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -481,21 +480,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
val writer = new StringBuilderWriter()
try {
treeString(writer, verbose, addSuffix, maxFields)
writer.toString
} finally {
writer.close()
}
val concat = new StringConcat()

treeString(concat.append, verbose, addSuffix, maxFields)
concat.toString
}

def treeString(
writer: Writer,
append: String => Unit,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int): Unit = {
generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields)
generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields)
}

/**
Expand Down Expand Up @@ -558,42 +554,42 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
writer: Writer,
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {

if (depth > 0) {
lastChildren.init.foreach { isLast =>
writer.write(if (isLast) " " else ": ")
append(if (isLast) " " else ": ")
}
writer.write(if (lastChildren.last) "+- " else ":- ")
append(if (lastChildren.last) "+- " else ":- ")
}

val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields)
} else {
simpleString(maxFields)
}
writer.write(prefix)
writer.write(str)
writer.write("\n")
append(prefix)
append(str)
append("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose,
addSuffix = addSuffix, maxFields = maxFields))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose,
addSuffix = addSuffix, maxFields = maxFields)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields))
depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields))
children.last.generateTreeString(
depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields)
depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util

import java.util.regex.{Pattern, PatternSyntaxException}

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -87,4 +89,34 @@ object StringUtils {
}
funcNames.toSeq
}

/**
* Concatenation of sequence of strings to final string with cheap append method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if StringConcatenation is a better name for this class, as this class is technically a rope (that is a binary tree).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringRope looks nicer ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and is wrong :P...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringConcat?

* and one memory allocation for the final string.
*/
class StringConcat {
private val strings = new ArrayBuffer[String]
private var length: Int = 0

/**
* Appends a string and accumulates its length to allocate a string buffer for all
* appended strings once in the toString method.
*/
def append(s: String): Unit = {
if (s != null) {
strings.append(s)
length += s.length
}
}

/**
* The method allocates memory for all appended strings, writes them to the memory and
* returns concatenated string.
*/
override def toString: String = {
val result = new java.lang.StringBuilder(length)
strings.foreach(result.append)
result.toString
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,17 @@ class StringUtilsSuite extends SparkFunSuite {
assert(filterPattern(names, " a. ") === Seq("a1", "a2"))
assert(filterPattern(names, " d* ") === Nil)
}

test("string concatenation") {
def concat(seq: String*): String = {
seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString
}

assert(new StringConcat().toString == "")
assert(concat("") == "")
assert(concat(null) == "")
assert(concat("a") == "a")
assert(concat("1", "2") == "12")
assert(concat("abc", "\n", "123") == "abc\n123")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

package org.apache.spark.sql.execution

import java.io.{BufferedWriter, OutputStreamWriter, Writer}
import java.io.{BufferedWriter, OutputStreamWriter}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import org.apache.commons.io.output.StringBuilderWriter
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
Expand Down Expand Up @@ -108,10 +109,6 @@ class QueryExecution(
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: AnalysisException => e.toString }


/**
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
Expand Down Expand Up @@ -197,55 +194,53 @@ class QueryExecution(
}

def simpleString: String = withRedaction {
s"""== Physical Plan ==
|${stringOrError(executedPlan.treeString(verbose = false))}
""".stripMargin.trim
}

private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
try f(writer)
catch {
case e: AnalysisException => writer.write(e.toString)
}
val concat = new StringConcat()
concat.append("== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
concat.append("\n")
concat.toString
}

private def writePlans(writer: Writer, maxFields: Int): Unit = {
private def writePlans(append: String => Unit, maxFields: Int): Unit = {
val (verbose, addSuffix) = (true, false)

writer.write("== Parsed Logical Plan ==\n")
writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Analyzed Logical Plan ==\n")
val analyzedOutput = stringOrError(truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))
writer.write(analyzedOutput)
writer.write("\n")
writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Optimized Logical Plan ==\n")
writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Physical Plan ==\n")
writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields))
append("== Parsed Logical Plan ==\n")
QueryPlan.append(logical, append, verbose, addSuffix, maxFields)
append("\n== Analyzed Logical Plan ==\n")
val analyzedOutput = try {
truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)
} catch {
case e: AnalysisException => e.toString
}
append(analyzedOutput)
append("\n")
QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)
append("\n== Optimized Logical Plan ==\n")
QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)
append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)
}

override def toString: String = withRedaction {
val writer = new StringBuilderWriter()
try {
writePlans(writer, SQLConf.get.maxToStringFields)
writer.toString
} finally {
writer.close()
}
val concat = new StringConcat()
writePlans(concat.append, SQLConf.get.maxToStringFields)
concat.toString
}

def stringWithStats: String = withRedaction {
val concat = new StringConcat()
val maxFields = SQLConf.get.maxToStringFields

// trigger to compute stats for logical plans
optimizedPlan.stats

// only show optimized logical plan and physical plan
s"""== Optimized Logical Plan ==
|${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))}
|== Physical Plan ==
|${stringOrError(executedPlan.treeString(verbose = true))}
""".stripMargin.trim
concat.append("== Optimized Logical Plan ==\n")
QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields)
concat.append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields)
concat.append("\n")
concat.toString
}

/**
Expand Down Expand Up @@ -282,17 +277,17 @@ class QueryExecution(
/**
* Dumps debug information about query execution into the specified file.
*
* @param maxFields maximim number of fields converted to string representation.
* @param maxFields maximum number of fields converted to string representation.
*/
def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))

try {
writePlans(writer, maxFields)
writePlans(writer.write, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
} finally {
writer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,15 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
writer: Writer,
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
child.generateTreeString(
depth,
lastChildren,
writer,
append,
verbose,
prefix = "",
addSuffix = false,
Expand Down Expand Up @@ -777,15 +777,15 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
writer: Writer,
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
child.generateTreeString(
depth,
lastChildren,
writer,
append,
verbose,
s"*($codegenStageId) ",
false,
Expand Down
Loading