Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -301,4 +302,20 @@ object QueryPlan extends PredicateHelper {
Nil
}
}

/**
* Converts the query plan to string and appends it via provided function.
*/
def append[T <: QueryPlan[T]](
plan: => QueryPlan[T],
append: String => Unit,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int = SQLConf.get.maxToStringFields): Unit = {
try {
plan.treeString(append, verbose, addSuffix, maxFields)
} catch {
case e: AnalysisException => append(e.toString)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.sql.catalyst.trees

import java.io.Writer
import java.util.UUID

import scala.collection.Map
import scala.reflect.ClassTag

import org.apache.commons.io.output.StringBuilderWriter
import org.apache.commons.lang3.ClassUtils
import org.json4s.JsonAST._
import org.json4s.JsonDSL._
Expand All @@ -37,6 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -481,21 +480,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
val writer = new StringBuilderWriter()
try {
treeString(writer, verbose, addSuffix, maxFields)
writer.toString
} finally {
writer.close()
}
val concat = new StringConcat()

treeString(concat.append, verbose, addSuffix, maxFields)
concat.toString
}

def treeString(
writer: Writer,
append: String => Unit,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int): Unit = {
generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields)
generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields)
}

/**
Expand Down Expand Up @@ -558,42 +554,42 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
writer: Writer,
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {

if (depth > 0) {
lastChildren.init.foreach { isLast =>
writer.write(if (isLast) " " else ": ")
append(if (isLast) " " else ": ")
}
writer.write(if (lastChildren.last) "+- " else ":- ")
append(if (lastChildren.last) "+- " else ":- ")
}

val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields)
} else {
simpleString(maxFields)
}
writer.write(prefix)
writer.write(str)
writer.write("\n")
append(prefix)
append(str)
append("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose,
addSuffix = addSuffix, maxFields = maxFields))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose,
addSuffix = addSuffix, maxFields = maxFields)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields))
depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields))
children.last.generateTreeString(
depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields)
depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util

import java.util.regex.{Pattern, PatternSyntaxException}

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -87,4 +89,34 @@ object StringUtils {
}
funcNames.toSeq
}

/**
* Concatenation of sequence of strings to final string with cheap append method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if StringConcatenation is a better name for this class, as this class is technically a rope (that is a binary tree).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringRope looks nicer ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and is wrong :P...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringConcat?

* and one memory allocation for the final string.
*/
class StringConcat {
private val strings = new ArrayBuffer[String]
private var length: Int = 0

/**
* Appends a string and accumulates its length to allocate a string buffer for all
* appended strings once in the toString method.
*/
def append(s: String): Unit = {
if (s != null) {
strings.append(s)
length += s.length
}
}

/**
* The method allocates memory for all appended strings, writes them to the memory and
* returns concatenated string.
*/
override def toString: String = {
val result = new StringBuffer(length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use java.lang.StringBuilder here for the sake of reducing one useless allocation of the Scala wrapper?
Which StringBuffer is this anyway? If you're using the Scala scala.collection.mutable.*, it should have been StringBuilder, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced by java.lang.StringBuilder

strings.foreach(result.append)
result.toString
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,17 @@ class StringUtilsSuite extends SparkFunSuite {
assert(filterPattern(names, " a. ") === Seq("a1", "a2"))
assert(filterPattern(names, " d* ") === Nil)
}

test("string rope") {
def toRope(seq: String*): String = {
seq.foldLeft(new StringConcat())((rope, s) => {rope.append(s); rope}).toString
}

assert(new StringConcat().toString == "")
assert(toRope("") == "")
assert(toRope(null) == "")
assert(toRope("a") == "a")
assert(toRope("1", "2") == "12")
assert(toRope("abc", "\n", "123") == "abc\n123")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

package org.apache.spark.sql.execution

import java.io.{BufferedWriter, OutputStreamWriter, Writer}
import java.io.{BufferedWriter, OutputStreamWriter}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import org.apache.commons.io.output.StringBuilderWriter
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
Expand Down Expand Up @@ -108,10 +109,6 @@ class QueryExecution(
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: AnalysisException => e.toString }


/**
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
Expand Down Expand Up @@ -197,55 +194,53 @@ class QueryExecution(
}

def simpleString: String = withRedaction {
s"""== Physical Plan ==
|${stringOrError(executedPlan.treeString(verbose = false))}
""".stripMargin.trim
}

private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
try f(writer)
catch {
case e: AnalysisException => writer.write(e.toString)
}
val concat = new StringConcat()
concat.append("== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
concat.append("\n")
concat.toString
}

private def writePlans(writer: Writer, maxFields: Int): Unit = {
private def writePlans(append: String => Unit, maxFields: Int): Unit = {
val (verbose, addSuffix) = (true, false)

writer.write("== Parsed Logical Plan ==\n")
writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Analyzed Logical Plan ==\n")
val analyzedOutput = stringOrError(truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))
writer.write(analyzedOutput)
writer.write("\n")
writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Optimized Logical Plan ==\n")
writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields))
writer.write("\n== Physical Plan ==\n")
writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields))
append("== Parsed Logical Plan ==\n")
QueryPlan.append(logical, append, verbose, addSuffix, maxFields)
append("\n== Analyzed Logical Plan ==\n")
val analyzedOutput = try {
truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)
} catch {
case e: AnalysisException => e.toString
}
append(analyzedOutput)
append("\n")
QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)
append("\n== Optimized Logical Plan ==\n")
QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)
append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)
}

override def toString: String = withRedaction {
val writer = new StringBuilderWriter()
try {
writePlans(writer, SQLConf.get.maxToStringFields)
writer.toString
} finally {
writer.close()
}
val concat = new StringConcat()
writePlans(concat.append, SQLConf.get.maxToStringFields)
concat.toString
}

def stringWithStats: String = withRedaction {
val concat = new StringConcat()
val maxFields = SQLConf.get.maxToStringFields

// trigger to compute stats for logical plans
optimizedPlan.stats

// only show optimized logical plan and physical plan
s"""== Optimized Logical Plan ==
|${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))}
|== Physical Plan ==
|${stringOrError(executedPlan.treeString(verbose = true))}
""".stripMargin.trim
concat.append("== Optimized Logical Plan ==\n")
QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields)
concat.append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields)
concat.append("\n")
concat.toString
}

/**
Expand Down Expand Up @@ -282,17 +277,17 @@ class QueryExecution(
/**
* Dumps debug information about query execution into the specified file.
*
* @param maxFields maximim number of fields converted to string representation.
* @param maxFields maximum number of fields converted to string representation.
*/
def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))

try {
writePlans(writer, maxFields)
writePlans(writer.write, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
} finally {
writer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,15 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
writer: Writer,
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
child.generateTreeString(
depth,
lastChildren,
writer,
append,
verbose,
prefix = "",
addSuffix = false,
Expand Down Expand Up @@ -777,15 +777,15 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
writer: Writer,
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
child.generateTreeString(
depth,
lastChildren,
writer,
append,
verbose,
s"*($codegenStageId) ",
false,
Expand Down
Loading