Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ object CodeFormatter {
}
new CodeAndComment(code.result().trim(), map)
}

def stripExtraNewLinesAndComments(input: String): String = {
val commentReg =
("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/
"""([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment
val codeWithoutComment = commentReg.replaceAllIn(input, "")
codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines
}
}

private class CodeFormatter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,20 +373,6 @@ class CodegenContext {
*/
private val placeHolderToComments = new mutable.HashMap[String, String]

/**
* It will count the lines of every Java function generated by whole-stage codegen,
* if there is a function of length greater than spark.sql.codegen.maxLinesPerFunction,
* it will return true.
*/
def isTooLongGeneratedFunction: Boolean = {
classFunctions.values.exists { _.values.exists {
code =>
val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code)
codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction
}
}
}

/**
* Returns a term name that is unique within this instance of a `CodegenContext`.
*/
Expand Down Expand Up @@ -1020,10 +1006,16 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
}

object CodeGenerator extends Logging {

// This is the value of HugeMethodLimit in the OpenJDK JVM settings
val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000

/**
* Compile the Java source code into a Java class, using Janino.
*
* @return a pair of a generated class and the max bytecode size of generated functions.
*/
def compile(code: CodeAndComment): GeneratedClass = try {
def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
cache.get(code)
} catch {
// Cache.get() may wrap the original exception. See the following URL
Expand All @@ -1036,7 +1028,7 @@ object CodeGenerator extends Logging {
/**
* Compile the Java source code into a Java class, using Janino.
*/
private[this] def doCompile(code: CodeAndComment): GeneratedClass = {
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
val evaluator = new ClassBodyEvaluator()

// A special classloader used to wrap the actual parent classloader of
Expand Down Expand Up @@ -1075,9 +1067,9 @@ object CodeGenerator extends Logging {
s"\n${CodeFormatter.format(code)}"
})

try {
val maxCodeSize = try {
evaluator.cook("generated.java", code.body)
recordCompilationStats(evaluator)
updateAndGetCompilationStats(evaluator)
} catch {
case e: JaninoRuntimeException =>
val msg = s"failed to compile: $e"
Expand All @@ -1092,13 +1084,15 @@ object CodeGenerator extends Logging {
logInfo(s"\n${CodeFormatter.format(code, maxLines)}")
throw new CompileException(msg, e.getLocation)
}
evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass]

(evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass], maxCodeSize)
}

/**
* Records the generated class and method bytecode sizes by inspecting janino private fields.
* Returns the max bytecode size of the generated functions by inspecting janino private fields.
* Also, this method updates the metrics information.
*/
private def recordCompilationStats(evaluator: ClassBodyEvaluator): Unit = {
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
// First retrieve the generated classes.
val classes = {
val resultField = classOf[SimpleCompiler].getDeclaredField("result")
Expand All @@ -1113,23 +1107,26 @@ object CodeGenerator extends Logging {
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
val codeAttrField = codeAttr.getDeclaredField("code")
codeAttrField.setAccessible(true)
classes.foreach { case (_, classBytes) =>
val codeSizes = classes.flatMap { case (_, classBytes) =>
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
try {
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
cf.methodInfos.asScala.foreach { method =>
method.getAttributes().foreach { a =>
if (a.getClass.getName == codeAttr.getName) {
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
}
val stats = cf.methodInfos.asScala.flatMap { method =>
method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a =>
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
byteCodeSize
}
}
Some(stats)
} catch {
case NonFatal(e) =>
logWarning("Error calculating stats of compiled class.", e)
None
}
}
}.flatten

codeSizes.max
}

/**
Expand All @@ -1144,8 +1141,8 @@ object CodeGenerator extends Logging {
private val cache = CacheBuilder.newBuilder()
.maximumSize(100)
.build(
new CacheLoader[CodeAndComment, GeneratedClass]() {
override def load(code: CodeAndComment): GeneratedClass = {
new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
override def load(code: CodeAndComment): (GeneratedClass, Int) = {
val startTime = System.nanoTime()
val result = doCompile(code)
val endTime = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")

val c = CodeGenerator.compile(code)
c.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated Ordering by ${ordering.mkString(",")}:\n${CodeFormatter.format(code)}")

CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated predicate '$predicate':\n${CodeFormatter.format(code)}")

CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[Predicate]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")

val c = CodeGenerator.compile(code)
val (clazz, _) = CodeGenerator.compile(code)
val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
clazz.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")

val c = CodeGenerator.compile(code)
c.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
val code = CodeFormatter.stripOverlappingComments(new CodeAndComment(codeBody, Map.empty))
logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}")

val c = CodeGenerator.compile(code)
c.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -575,15 +576,15 @@ object SQLConf {
"disable logging or -1 to apply no limit.")
.createWithDefault(1000)

val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction")
val WHOLESTAGE_HUGE_METHOD_LIMIT = buildConf("spark.sql.codegen.hugeMethodLimit")
.internal()
.doc("The maximum lines of a single Java function generated by whole-stage codegen. " +
"When the generated function exceeds this threshold, " +
.doc("The maximum bytecode size of a single compiled Java function generated by whole-stage " +
"codegen. When the compiled function exceeds this threshold, " +
"the whole-stage codegen is deactivated for this subtree of the current query plan. " +
"The default value 4000 is the max length of byte code JIT supported " +
"for a single function(8000) divided by 2.")
s"The default value is ${CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT} and " +
"this is a limit in the OpenJDK JVM implementation.")
.intConf
.createWithDefault(4000)
.createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)

val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
Expand Down Expand Up @@ -1058,7 +1059,7 @@ class SQLConf extends Serializable with Logging {

def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)

def maxLinesPerFunction: Int = getConf(WHOLESTAGE_MAX_LINES_PER_FUNCTION)
def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)

def tableRelationCacheSize: Int =
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,38 +53,6 @@ class CodeFormatterSuite extends SparkFunSuite {
assert(reducedCode.body === "/*project_c4*/")
}

test("removing extra new lines and comments") {
val code =
"""
|/*
| * multi
| * line
| * comments
| */
|
|public function() {
|/*comment*/
| /*comment_with_space*/
|code_body
|//comment
|code_body
| //comment_with_space
|
|code_body
|}
""".stripMargin

val reducedCode = CodeFormatter.stripExtraNewLinesAndComments(code)
assert(reducedCode ===
"""
|public function() {
|code_body
|code_body
|code_body
|}
""".stripMargin)
}

testCase("basic example") {
"""
|class A {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,23 +380,26 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co

override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
if (ctx.isTooLongGeneratedFunction) {
logWarning("Found too long generated codes and JIT optimization might not work, " +
"Whole-stage codegen disabled for this plan, " +
"You can change the config spark.sql.codegen.MaxFunctionLength " +
"to adjust the function length limit:\n "
+ s"$treeString")
return child.execute()
}
// try to compile and fallback if it failed
try {
val (_, maxCodeSize) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
// We should already saw the error message
logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
return child.execute()
}

// Check if compiled code has a too large function
if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
logWarning(s"Found too long generated codes and JIT optimization might not work: " +
s"the bytecode size was $maxCodeSize, this value went over the limit " +
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
s"for this plan. To avoid this, you can raise the limit " +
s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
return child.execute()
}

val references = ctx.references.toArray

val durationMs = longMetric("pipelineTime")
Expand All @@ -405,7 +408,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitionsWithIndex { (index, iter) =>
val clazz = CodeGenerator.compile(cleanedSource)
val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(iter))
new Iterator[InternalRow] {
Expand All @@ -424,7 +427,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
// a small hack to obtain the correct partition index
}.mapPartitionsWithIndex { (index, zippedIter) =>
val (leftIter, rightIter) = zippedIter.next()
val clazz = CodeGenerator.compile(cleanedSource)
val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(leftIter, rightIter))
new Iterator[InternalRow] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated ColumnarIterator:\n${CodeFormatter.format(code)}")

CodeGenerator.compile(code).generate(Array.empty).asInstanceOf[ColumnarIterator]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(Array.empty).asInstanceOf[ColumnarIterator]
}
}
Loading