Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,12 @@

package org.apache.spark.sql.catalyst.expressions

import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.InternalCompilerException
import scala.util.control.NonFatal

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/**
* Catches compile error during code generation.
*/
object CodegenError {
def unapply(throwable: Throwable): Option[Exception] = throwable match {
case e: InternalCompilerException => Some(e)
case e: CompileException => Some(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have a test case for checking the fallback before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't in the original PR. @maropu -san added one in this PR which is great, plugs the hole.

case _ => None
}
}

/**
* Defines values for `SQLConf` config of fallback mode. Use for test only.
*/
Expand All @@ -47,7 +35,7 @@ object CodegenObjectFactoryMode extends Enumeration {
* error happens, it can fallback to interpreted implementation. In tests, we can use a SQL config
* `SQLConf.CODEGEN_FACTORY_MODE` to control fallback behavior.
*/
abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] {
abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging {

def createObject(in: IN): OUT = {
// We are allowed to choose codegen-only or no-codegen modes if under tests.
Expand All @@ -63,7 +51,10 @@ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] {
try {
createCodeGeneratedObject(in)
} catch {
case CodegenError(_) => createInterpretedObject(in)
case NonFatal(_) =>
// We should have already seen the error message in `CodeGenerator`
logWarning("Expr codegen error and falling back to interpreter mode")
createInterpretedObject(in)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.expressions

import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.types.{DataType, StructType}
Expand Down Expand Up @@ -180,7 +182,10 @@ object UnsafeProjection
try {
GenerateUnsafeProjection.generate(unsafeExprs, subexpressionEliminationEnabled)
} catch {
case CodegenError(_) => InterpretedUnsafeProjection.createProjection(unsafeExprs)
case NonFatal(_) =>
// We should have already seen the error message in `CodeGenerator`
logWarning("Expr codegen error and falling back to interpreter mode")
InterpretedUnsafeProjection.createProjection(unsafeExprs)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,33 @@

package org.apache.spark.sql.catalyst.expressions

import java.util.concurrent.ExecutionException

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, LongType}
import org.apache.spark.sql.types.IntegerType

class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanTestBase {

test("UnsafeProjection with codegen factory mode") {
val input = Seq(LongType, IntegerType)
.zipWithIndex.map(x => BoundReference(x._2, x._1, true))
object FailedCodegenProjection
extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] {

override protected def createCodeGeneratedObject(in: Seq[Expression]): UnsafeProjection = {
val invalidCode = new CodeAndComment("invalid code", Map.empty)
// We assume this compilation throws an exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use this comment as part of an exception (say IllegalStateException or similar) that should be thrown rather than returning null. I think that would make the comment part of the code itself and can be checked in tests (by catching the exception).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggested change is only for making this test suite cleaner, right? In that case I'd +1 with the suggestion of being able to clearly check we're catching the exception we know we're throwing.
Would you like to submit a PR for it?

rather than returning null
The intent is never to actually reach the null-return, but always cause an exception to be thrown at CodeGenerator.compile() and abruptly return to the caller with the exception. To make the compiler happy you'll have to have some definite-returning statement to end the function, so a useless null-return would probably have to be there anyway (since the compiler can't tell you'll always be throwing an exception unless you do a throw inline)

CodeGenerator.compile(invalidCode)
null
}

override protected def createInterpretedObject(in: Seq[Expression]): UnsafeProjection = {
InterpretedUnsafeProjection.createProjection(in)
}
}

test("UnsafeProjection with codegen factory mode") {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
val obj = UnsafeProjection.createObject(input)
Expand All @@ -40,4 +56,24 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
assert(obj.isInstanceOf[InterpretedUnsafeProjection])
}
}

test("fallback to the interpreter mode") {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
val fallback = CodegenObjectFactoryMode.FALLBACK.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> fallback) {
val obj = FailedCodegenProjection.createObject(input)
assert(obj.isInstanceOf[InterpretedUnsafeProjection])
}
}

test("codegen failures in the CODEGEN_ONLY mode") {
val errMsg = intercept[ExecutionException] {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
FailedCodegenProjection.createObject(input)
}
}.getMessage
assert(errMsg.contains("failed to compile: org.codehaus.commons.compiler.CompileException:"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Locale
import java.util.function.Supplier

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -582,7 +583,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
val (_, maxCodeSize) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback =>
// We should already saw the error message
logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString")
return child.execute()
Expand Down