-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22226][SQL] splitExpression can create too many method calls in the outer class #19480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
c2cc295
d3a5b87
e63264b
bdc2fdb
b9ffc52
831fc40
76b5489
20626b4
5f39500
c4601b4
61cc445
37506dc
95b0ad8
bce3616
c4ab587
0215139
41c0b2b
6a1eeca
be84c4d
a39098d
6e8cd00
36d8e2e
4952880
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream | |
| import java.util.{Map => JavaMap} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.immutable.ListMap | ||
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.language.existentials | ||
|
|
@@ -77,6 +78,20 @@ case class SubExprEliminationState(isNull: String, value: String) | |
| */ | ||
| case class SubExprCodes(codes: Seq[String], states: Map[Expression, SubExprEliminationState]) | ||
|
|
||
| /** | ||
| * The main information about a new added function. | ||
| * | ||
| * @param functionName String representing the name of the function | ||
| * @param subclassName Optional value which is empty if the function is added to | ||
| * the outer class, otherwise it contains the name of the | ||
| * inner class in which the function has been added. | ||
| * @param subclassInstance Optional value which is empty if the function is added to | ||
| * the outer class, otherwise it contains the name of the | ||
| * instance of the inner class in the outer class. | ||
| */ | ||
| private[codegen] case class NewFunction(functionName: String, subclassName: Option[String], | ||
| subclassInstance: Option[String]) | ||
|
||
|
|
||
| /** | ||
| * A context for codegen, tracking a list of objects that could be passed into generated Java | ||
| * function. | ||
|
|
@@ -277,13 +292,25 @@ class CodegenContext { | |
| funcName: String, | ||
| funcCode: String, | ||
| inlineToOuterClass: Boolean = false): String = { | ||
| val newFunction = addNewFunctionInternal(funcName, funcCode, inlineToOuterClass) | ||
| newFunction match { | ||
| case NewFunction(functionName, None, None) => functionName | ||
| case NewFunction(functionName, Some(_), Some(subclassInstance)) => | ||
| subclassInstance + "." + functionName | ||
| } | ||
| } | ||
|
|
||
| private[this] def addNewFunctionInternal( | ||
| funcName: String, | ||
| funcCode: String, | ||
| inlineToOuterClass: Boolean): NewFunction = { | ||
| // The number of named constants that can exist in the class is limited by the Constant Pool | ||
| // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a | ||
| // threshold of 1600k bytes to determine when a function should be inlined to a private, nested | ||
| // threshold of 1000k bytes to determine when a function should be inlined to a private, nested | ||
|
||
| // sub-class. | ||
| val (className, classInstance) = if (inlineToOuterClass) { | ||
| outerClassName -> "" | ||
| } else if (currClassSize > 1600000) { | ||
| } else if (currClassSize > 1000000) { | ||
|
||
| val className = freshName("NestedClass") | ||
| val classInstance = freshName("nestedClassInstance") | ||
|
|
||
|
|
@@ -294,17 +321,23 @@ class CodegenContext { | |
| currClass() | ||
| } | ||
|
|
||
| classSize(className) += funcCode.length | ||
| classFunctions(className) += funcName -> funcCode | ||
| addNewFunctionToClass(funcName, funcCode, className) | ||
|
|
||
| if (className == outerClassName) { | ||
| funcName | ||
| NewFunction(funcName, None, None) | ||
| } else { | ||
|
|
||
| s"$classInstance.$funcName" | ||
| NewFunction(funcName, Some(className), Some(classInstance)) | ||
| } | ||
| } | ||
|
|
||
| private[this] def addNewFunctionToClass( | ||
| funcName: String, | ||
| funcCode: String, | ||
| className: String) = { | ||
| classSize(className) += funcCode.length | ||
| classFunctions(className) += funcName -> funcCode | ||
| } | ||
|
|
||
| /** | ||
| * Declares all function code. If the added functions are too many, split them into nested | ||
| * sub-classes to avoid hitting Java compiler constant pool limitation. | ||
|
|
@@ -798,10 +831,46 @@ class CodegenContext { | |
| | ${makeSplitFunction(body)} | ||
| |} | ||
| """.stripMargin | ||
| addNewFunction(name, code) | ||
| addNewFunctionInternal(name, code, inlineToOuterClass = false) | ||
| } | ||
|
|
||
| foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})")) | ||
| // Here we store all the methods which have been added to the outer class. | ||
| val outerClassFunctions = functions | ||
| .filter(_.subclassName.isEmpty) | ||
| .map(_.functionName) | ||
|
||
|
|
||
| // Here we handle all the methods which have been added to the nested subclasses and | ||
| // not to the outer class. | ||
| // Since they can be many, their direct invocation in the outer class adds many entries | ||
| // to the outer class' constant pool. This can cause the constant pool to past JVM limit. | ||
| // To avoid this problem, we group them and we call only the grouping methods in the | ||
| // outer class. | ||
| val innerClassFunctions = functions | ||
|
||
| .filter(_.subclassName.isDefined) | ||
| .foldLeft(ListMap.empty[(String, String), Seq[String]]) { case (acc, f) => | ||
| val key = (f.subclassName.get, f.subclassInstance.get) | ||
| acc.updated(key, acc.getOrElse(key, Seq.empty[String]) ++ Seq(f.functionName)) | ||
|
||
| } | ||
| .flatMap { case ((subclassName, subclassInstance), subclassFunctions) => | ||
|
||
| if (subclassFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { | ||
| // Adding a new function to each subclass which contains | ||
| // the invocation of all the ones which have been added to | ||
| // that subclass | ||
| val code = s""" | ||
| |private $returnType $func($argString) { | ||
| | ${makeSplitFunction(foldFunctions(subclassFunctions.map(name => | ||
| s"$name(${arguments.map(_._2).mkString(", ")})")))} | ||
|
||
| |} | ||
| """.stripMargin | ||
| addNewFunctionToClass(func, code, subclassName) | ||
| Seq(s"$subclassInstance.$func") | ||
| } else { | ||
| subclassFunctions.map(f => s"$subclassInstance.$f") | ||
| } | ||
| } | ||
|
||
|
|
||
| foldFunctions((outerClassFunctions ++ innerClassFunctions).map( | ||
| name => s"$name(${arguments.map(_._2).mkString(", ")})")) | ||
|
||
| } | ||
| } | ||
|
|
||
|
|
@@ -1010,6 +1079,10 @@ object CodeGenerator extends Logging { | |
| // This is the value of HugeMethodLimit in the OpenJDK JVM settings | ||
| val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 | ||
|
|
||
| // This is the threshold over which the methods in a inner class are grouped in a single | ||
|
||
| // method which is going to be called by the outer class instead of the many small ones | ||
| val MERGE_SPLIT_METHODS_THRESHOLD = 3 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a short comment for this too.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another magic number?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gatorsmile it comes from this discussion. |
||
|
|
||
| /** | ||
| * Compile the Java source code into a Java class, using Janino. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -201,6 +201,23 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { | |
| } | ||
| } | ||
|
|
||
| test("SPARK-22226: group splitted expressions into one method per nested class") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Besides the unit test, can you provide an end-to-end case that can trigger this issue too?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a use case where I faced this problem. And I tried this patch on it. Unfortunately this contains a very complex business logic and I have not been able to reproduce it in a simple one. But if needed, I can try again.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of copying your customer codes, can you making a fake one?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mgaido91 I can reproduce the issue by following test case. You can check it: test("SPARK-22226: too much splitted expressions should not exceed constant pool limit") {
withSQLConf(
(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")) {
val colNumber = 1000
val baseDF = spark.range(10).toDF()
val newCols = (1 to colNumber).map { colIndex =>
expr(s"id + $colIndex").as(s"_$colIndex")
}
val input = baseDF.select(newCols: _*)
val aggs = (1 to colNumber).flatMap { colIndex =>
val colName = s"_$colIndex"
Seq(expr(s"stddev($colName)"),
expr(s"stddev_samp($colName)"),
expr(s"stddev_pop($colName)"),
expr(s"variance($colName)"),
expr(s"var_samp($colName)"),
expr(s"var_pop($colName)"),
expr(s"skewness($colName)"),
expr(s"kurtosis($colName)"))
}
input.agg(aggs.head, aggs.tail: _*).collect()
}
}
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thank you very much for your help @viirya ! In my use cases it seemed to be connected to the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was adding it to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya I have a good and a bad news... Thanks to your suggestion I have been able to understand and reproduce the issue. Moreover, I found also another issue which is fixed by this problem and I am adding a UT for that too: in some cases, we might have a And with this PR the problem is fixed. The bad thing is that the UT you provided still fails, but with a different error: actually it is always a Constant Pool limit exceeded exception, but it is in a NestedClass. From my analysis, this is caused by another problem, ie. that we might reference too many fields of the superclass in the NestedClasses. This might be addressed maybe trying to tune the magic number which I brought to 1000k in this PR, but I am pretty sure that it will be also addressed by the ongoing PR for SPARK-18016, since he is trying to reduce the number of variables. Thus I consider this out of scope for this PR.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mgaido91 Do you meant "2: compact primitive declarations into arrays" in SPARK-18016?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mgaido91 Thanks for trying it. Yeah, those expressions like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya exactly, I meant that. Thank you for your suggestion. You have been very helpful to me. |
||
| val length = 10000 | ||
| val expressions = Seq.fill(length) { | ||
| ToUTCTimestamp( | ||
| Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType), | ||
| Literal.create("PST", StringType)) | ||
| } | ||
| val plan = GenerateMutableProjection.generate(expressions) | ||
| val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) | ||
| val expected = Seq.fill(length)( | ||
| DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00"))) | ||
|
|
||
| if (actual != expected) { | ||
| fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") | ||
| } | ||
| } | ||
|
|
||
| test("test generated safe and unsafe projection") { | ||
| val schema = new StructType(Array( | ||
| StructField("a", StringType, true), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2103,4 +2103,16 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { | |
| testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)), | ||
| Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2))) | ||
| } | ||
|
|
||
| test("SPARK-22226: splitExpressions should not generate codes beyond 64KB") { | ||
| val colNumber = 10000 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mgaido91 .
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this would be the maximum number currently, but #19518 will generate a great improvement for that. Actually I don't have an exact answer because this depends on many factors, like which transformations are performed, which are the datatypes, .... So I am not able to give an answer on that, sorry. |
||
| val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) | ||
| val df = sqlContext.createDataFrame(input, StructType( | ||
| (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false)))) | ||
| val newCols = (1 to colNumber).flatMap { colIndex => | ||
| Seq(expr(s"if(1000 < _$colIndex, 1000, _$colIndex)"), | ||
| expr(s"sqrt(_$colIndex)")) | ||
| } | ||
| df.select(newCols: _*).collect() | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw three ways to represent the same concept. subclass, inner class, nested classes.
How about renaming all of them to inner classes? Could you go over all the code changes in this PR to make them consistent?