Skip to content

Commit e0bbb1b

Browse files
DaveDeCapriodvallejo
authored andcommitted
[SPARK-26103][SQL] Limit the length of debug strings for query plans
The PR puts in a limit on the size of a debug string generated for a tree node. Helps to fix out of memory errors when large plans have huge debug strings. In addition to SPARK-26103, this should also address SPARK-23904 and SPARK-25380. AN alternative solution was proposed in apache#23076, but that solution doesn't address all the cases that can cause a large query. This limit is only on calls treeString that don't pass a Writer, which makes it play nicely with apache#22429, apache#23018 and apache#23039. Full plans can be written to files, but truncated plans will be used when strings are held in memory, such as for the UI. - A new configuration parameter called spark.sql.debug.maxPlanLength was added to control the length of the plans. - When plans are truncated, "..." is printed to indicate that it isn't a full plan - A warning is printed out the first time a truncated plan is displayed. The warning explains what happened and how to adjust the limit. Unit tests were created for the new SizeLimitedWriter. Also a unit test for TreeNode was created that checks that a long plan is correctly truncated. Closes apache#23169 from DaveDeCaprio/text-plan-size. Lead-authored-by: Dave DeCaprio <[email protected]> Co-authored-by: David DeCaprio <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 6512158 commit e0bbb1b

File tree

6 files changed

+145
-24
lines changed

6 files changed

+145
-24
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.errors._
3434
import org.apache.spark.sql.catalyst.expressions._
3535
import org.apache.spark.sql.catalyst.plans.JoinType
3636
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
37-
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
37+
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
3838
import org.apache.spark.sql.catalyst.util.truncatedString
3939
import org.apache.spark.sql.internal.SQLConf
4040
import org.apache.spark.sql.types._
@@ -477,7 +477,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
477477
verbose: Boolean,
478478
addSuffix: Boolean = false,
479479
maxFields: Int = SQLConf.get.maxToStringFields): String = {
480-
val concat = new StringConcat()
480+
val concat = new PlanStringConcat()
481481

482482
treeString(concat.append, verbose, addSuffix, maxFields)
483483
concat.toString

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717

1818
package org.apache.spark.sql.catalyst.util
1919

20+
import java.util.concurrent.atomic.AtomicBoolean
2021
import java.util.regex.{Pattern, PatternSyntaxException}
2122

2223
import scala.collection.mutable.ArrayBuffer
2324

25+
import org.apache.spark.internal.Logging
2426
import org.apache.spark.sql.AnalysisException
27+
import org.apache.spark.sql.internal.SQLConf
28+
import org.apache.spark.unsafe.array.ByteArrayMethods
2529
import org.apache.spark.unsafe.types.UTF8String
2630

27-
object StringUtils {
31+
object StringUtils extends Logging {
2832

2933
/**
3034
* Validate and convert SQL 'like' pattern to a Java regular expression.
@@ -90,20 +94,29 @@ object StringUtils {
9094

9195
/**
9296
* Concatenation of sequence of strings to final string with cheap append method
93-
* and one memory allocation for the final string.
97+
* and one memory allocation for the final string. Can also bound the final size of
98+
* the string.
9499
*/
95-
class StringConcat {
96-
private val strings = new ArrayBuffer[String]
97-
private var length: Int = 0
100+
class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
101+
protected val strings = new ArrayBuffer[String]
102+
protected var length: Int = 0
103+
104+
def atLimit: Boolean = length >= maxLength
98105

99106
/**
100107
* Appends a string and accumulates its length to allocate a string buffer for all
101-
* appended strings once in the toString method.
108+
* appended strings once in the toString method. Returns true if the string still
109+
* has room for further appends before it hits its max limit.
102110
*/
103111
def append(s: String): Unit = {
104112
if (s != null) {
105-
strings.append(s)
106-
length += s.length
113+
val sLen = s.length
114+
if (!atLimit) {
115+
val available = maxLength - length
116+
val stringToAppend = if (available >= sLen) s else s.substring(0, available)
117+
strings.append(stringToAppend)
118+
}
119+
length += sLen
107120
}
108121
}
109122

@@ -112,9 +125,36 @@ object StringUtils {
112125
* returns concatenated string.
113126
*/
114127
override def toString: String = {
115-
val result = new java.lang.StringBuilder(length)
128+
val finalLength = if (atLimit) maxLength else length
129+
val result = new java.lang.StringBuilder(finalLength)
116130
strings.foreach(result.append)
117131
result.toString
118132
}
119133
}
134+
135+
/**
136+
* A string concatenator for plan strings. Uses length from a configured value, and
137+
* prints a warning the first time a plan is truncated.
138+
*/
139+
class PlanStringConcat extends StringConcat(Math.max(0, SQLConf.get.maxPlanStringLength - 30)) {
140+
override def toString: String = {
141+
if (atLimit) {
142+
logWarning(
143+
"Truncated the string representation of a plan since it was too long. The " +
144+
s"plan had length ${length} and the maximum is ${maxLength}. This behavior " +
145+
"can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
146+
val truncateMsg = if (maxLength == 0) {
147+
s"Truncated plan of $length characters"
148+
} else {
149+
s"... ${length - maxLength} more characters"
150+
}
151+
val result = new java.lang.StringBuilder(maxLength + truncateMsg.length)
152+
strings.foreach(result.append)
153+
result.append(truncateMsg)
154+
result.toString
155+
} else {
156+
super.toString
157+
}
158+
}
159+
}
120160
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,32 @@ object SQLConf {
15851585
.booleanConf
15861586
.createWithDefault(false)
15871587

1588+
val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields")
1589+
.doc("Maximum number of fields of sequence-like entries can be converted to strings " +
1590+
"in debug output. Any elements beyond the limit will be dropped and replaced by a" +
1591+
""" "... N more fields" placeholder.""")
1592+
.intConf
1593+
.createWithDefault(25)
1594+
1595+
val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
1596+
.doc("Maximum number of characters to output for a plan string. If the plan is " +
1597+
"longer, further output will be truncated. The default setting always generates a full " +
1598+
"plan. Set this to a lower value such as 8k if plan strings are taking up too much " +
1599+
"memory or are causing OutOfMemory errors in the driver or UI processes.")
1600+
.bytesConf(ByteUnit.BYTE)
1601+
.checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " +
1602+
"value for 'spark.sql.maxPlanStringLength'. Length must be a valid string length " +
1603+
"(nonnegative and shorter than the maximum size).")
1604+
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
1605+
1606+
val SET_COMMAND_REJECTS_SPARK_CORE_CONFS =
1607+
buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs")
1608+
.internal()
1609+
.doc("When set to true, TRUNCATE TABLE command will not try to set back original " +
1610+
"permission and ACLs when re-creating the table/partition paths.")
1611+
.booleanConf
1612+
.createWithDefault(false)
1613+
15881614
val LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED =
15891615
buildConf("spark.sql.legacy.mssqlserver.numericMapping.enabled")
15901616
.internal()
@@ -2003,6 +2029,11 @@ class SQLConf extends Serializable with Logging {
20032029

20042030
def legacyMsSqlServerNumericMappingEnabled: Boolean =
20052031
getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)
2032+
2033+
def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt
2034+
2035+
def setCommandRejectsSparkCoreConfs: Boolean =
2036+
getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)
20062037

20072038
/** ********************** SQLConf functionality methods ************ */
20082039

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ import org.apache.spark.sql.catalyst.expressions._
3535
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
3636
import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin}
3737
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, SubqueryAlias, Union}
38+
import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper}
39+
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union}
3840
import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition}
41+
import org.apache.spark.sql.internal.SQLConf
3942
import org.apache.spark.sql.types._
4043
import org.apache.spark.storage.StorageLevel
4144

@@ -81,7 +84,7 @@ case class SelfReferenceUDF(
8184
def apply(key: String): Boolean = config.contains(key)
8285
}
8386

84-
class TreeNodeSuite extends SparkFunSuite {
87+
class TreeNodeSuite extends SparkFunSuite with SQLHelper {
8588
test("top node changed") {
8689
val after = Literal(1) transform { case Literal(1, _) => Literal(2) }
8790
assert(after === Literal(2))
@@ -617,4 +620,28 @@ class TreeNodeSuite extends SparkFunSuite {
617620
val expected = Coalesce(Stream(Literal(1), Literal(3)))
618621
assert(result === expected)
619622
}
623+
624+
test("treeString limits plan length") {
625+
withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "200") {
626+
val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) =>
627+
Add(Literal(x), treeNode)
628+
}
629+
630+
val planString = ds.treeString
631+
logWarning("Plan string: " + planString)
632+
assert(planString.endsWith(" more characters"))
633+
assert(planString.length <= SQLConf.get.maxPlanStringLength)
634+
}
635+
}
636+
637+
test("treeString limit at zero") {
638+
withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") {
639+
val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) =>
640+
Add(Literal(x), treeNode)
641+
}
642+
643+
val planString = ds.treeString
644+
assert(planString.startsWith("Truncated plan of"))
645+
}
646+
}
620647
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,35 @@ class StringUtilsSuite extends SparkFunSuite {
4646

4747
test("string concatenation") {
4848
def concat(seq: String*): String = {
49-
seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString
49+
seq.foldLeft(new StringConcat()) { (acc, s) => acc.append(s); acc }.toString
5050
}
5151

5252
assert(new StringConcat().toString == "")
53-
assert(concat("") == "")
54-
assert(concat(null) == "")
55-
assert(concat("a") == "a")
56-
assert(concat("1", "2") == "12")
57-
assert(concat("abc", "\n", "123") == "abc\n123")
53+
assert(concat("") === "")
54+
assert(concat(null) === "")
55+
assert(concat("a") === "a")
56+
assert(concat("1", "2") === "12")
57+
assert(concat("abc", "\n", "123") === "abc\n123")
58+
}
59+
60+
test("string concatenation with limit") {
61+
def concat(seq: String*): String = {
62+
seq.foldLeft(new StringConcat(7)) { (acc, s) => acc.append(s); acc }.toString
63+
}
64+
assert(concat("under") === "under")
65+
assert(concat("under", "over", "extra") === "underov")
66+
assert(concat("underover") === "underov")
67+
assert(concat("under", "ov") === "underov")
68+
}
69+
70+
test("string concatenation return value") {
71+
def checkLimit(s: String): Boolean = {
72+
val sc = new StringConcat(7)
73+
sc.append(s)
74+
sc.atLimit
75+
}
76+
assert(!checkLimit("under"))
77+
assert(checkLimit("1234567"))
78+
assert(checkLimit("1234567890"))
5879
}
5980
}

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
3131
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
3232
import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.catalyst.util.DateTimeUtils
34-
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
34+
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
3535
import org.apache.spark.sql.catalyst.util.truncatedString
3636
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
3737
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
@@ -193,7 +193,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
193193
}
194194

195195
def simpleString: String = withRedaction {
196-
val concat = new StringConcat()
196+
val concat = new PlanStringConcat()
197197
concat.append("== Physical Plan ==\n")
198198
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
199199
concat.append("\n")
@@ -221,13 +221,13 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
221221
}
222222

223223
override def toString: String = withRedaction {
224-
val concat = new StringConcat()
224+
val concat = new PlanStringConcat()
225225
writePlans(concat.append, SQLConf.get.maxToStringFields)
226226
concat.toString
227227
}
228228

229229
def stringWithStats: String = withRedaction {
230-
val concat = new StringConcat()
230+
val concat = new PlanStringConcat()
231231
val maxFields = SQLConf.get.maxToStringFields
232232

233233
// trigger to compute stats for logical plans
@@ -282,9 +282,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
282282
val filePath = new Path(path)
283283
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
284284
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
285-
285+
val append = (s: String) => {
286+
writer.write(s)
287+
}
286288
try {
287-
writePlans(writer.write, maxFields)
289+
writePlans(append, maxFields)
288290
writer.write("\n== Whole Stage Codegen ==\n")
289291
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
290292
} finally {

0 commit comments

Comments
 (0)