-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19372][SQL] Fix throwing a Java exception at df.fliter() due to 64KB bytecode size limit #17087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19372][SQL] Fix throwing a Java exception at df.fliter() due to 64KB bytecode size limit #17087
Changes from 12 commits
45c02b5
6a94c2f
1bb2211
96c1033
e870e3d
cb291cd
e653d50
8548b0e
51a71a6
ea67c8a
c2e6b8c
8b6ba75
1f19c80
3868bf5
a5fd465
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,10 @@ import scala.language.existentials | |
| import scala.util.control.NonFatal | ||
|
|
||
| import com.google.common.cache.{CacheBuilder, CacheLoader} | ||
| import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler} | ||
| import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} | ||
| import org.apache.commons.lang3.exception.ExceptionUtils | ||
| import org.codehaus.commons.compiler.CompileException | ||
| import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler} | ||
| import org.codehaus.janino.util.ClassFile | ||
|
|
||
| import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException} | ||
|
|
@@ -899,8 +902,20 @@ object CodeGenerator extends Logging { | |
| /** | ||
| * Compile the Java source code into a Java class, using Janino. | ||
| */ | ||
| def compile(code: CodeAndComment): GeneratedClass = { | ||
| def compile(code: CodeAndComment): GeneratedClass = try { | ||
| cache.get(code) | ||
| } catch { | ||
| // Cache.get() may wrap the original exception. See the following URL | ||
| // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ | ||
| // Cache.html#get(K,%20java.util.concurrent.Callable) | ||
| case e : UncheckedExecutionException => | ||
| val excChains = ExceptionUtils.getThrowables(e) | ||
| val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2) | ||
| throw exc | ||
| case e : ExecutionError => | ||
| val excChains = ExceptionUtils.getThrowables(e) | ||
| val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2) | ||
| throw exc | ||
|
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -951,10 +966,14 @@ object CodeGenerator extends Logging { | |
| evaluator.cook("generated.java", code.body) | ||
| recordCompilationStats(evaluator) | ||
| } catch { | ||
| case e: Exception => | ||
| case e: JaninoRuntimeException => | ||
|
||
| val msg = s"failed to compile: $e\n$formatted" | ||
| logError(msg, e) | ||
| throw new Exception(msg, e) | ||
| throw new JaninoRuntimeException(msg, e) | ||
| case e: CompileException => | ||
| val msg = s"failed to compile: $e\n$formatted" | ||
| logError(msg, e) | ||
| throw new CompileException(msg, e.asInstanceOf[CompileException].getLocation) | ||
|
||
| } | ||
| evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass] | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,20 +20,22 @@ package org.apache.spark.sql.catalyst.expressions | |
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.analysis.TypeCheckResult | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => BasePredicate} | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.util.TypeUtils | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
||
| object InterpretedPredicate { | ||
| def create(expression: Expression, inputSchema: Seq[Attribute]): (InternalRow => Boolean) = | ||
| def create(expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = | ||
| create(BindReferences.bindReference(expression, inputSchema)) | ||
|
|
||
| def create(expression: Expression): (InternalRow => Boolean) = { | ||
| (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean] | ||
| } | ||
| def create(expression: Expression): InterpretedPredicate = new InterpretedPredicate(expression) | ||
| } | ||
|
|
||
| class InterpretedPredicate(expression: Expression) extends BasePredicate { | ||
|
||
| def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] | ||
|
||
| } | ||
|
|
||
| /** | ||
| * An [[Expression]] that returns a boolean value. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,9 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da | |
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.concurrent.ExecutionContext | ||
|
|
||
| import org.codehaus.commons.compiler.CompileException | ||
| import org.codehaus.janino.JaninoRuntimeException | ||
|
|
||
| import org.apache.spark.{broadcast, SparkEnv} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.io.CompressionCodec | ||
|
|
@@ -353,9 +356,28 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ | |
| GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination) | ||
| } | ||
|
|
||
| private def genInterpretedPredicate( | ||
| expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = { | ||
| val str = expression.toString | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Expression.toString will truncate too big expression. Right?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, |
||
| val logMessage = if (str.length > 256) { | ||
| str.substring(0, 256 - 3) + "..." | ||
| } else { | ||
| str | ||
| } | ||
| logWarning(s"Codegen disabled for this expression:\n $logMessage") | ||
| InterpretedPredicate.create(expression, inputSchema) | ||
| } | ||
|
|
||
| protected def newPredicate( | ||
| expression: Expression, inputSchema: Seq[Attribute]): GenPredicate = { | ||
| GeneratePredicate.generate(expression, inputSchema) | ||
| try { | ||
| GeneratePredicate.generate(expression, inputSchema) | ||
|
||
| } catch { | ||
| case e: JaninoRuntimeException if sqlContext == null || sqlContext.conf.wholeStageFallback => | ||
|
||
| genInterpretedPredicate(expression, inputSchema) | ||
| case e: CompileException if sqlContext == null || sqlContext.conf.wholeStageFallback => | ||
| genInterpretedPredicate(expression, inputSchema) | ||
| } | ||
| } | ||
|
|
||
| protected def newOrdering( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the following simple codes:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, done. thanks.