Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.InternalCompilerException

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/**
* Catches compile error during code generation.
*/
object CodegenError {
def unapply(throwable: Throwable): Option[Exception] = throwable match {
case e: InternalCompilerException => Some(e)
case e: CompileException => Some(e)
case _ => None
}
}

/**
* A factory which can be used to create objects that have both codegen and interpreted
* implementations. This tries to create codegen object first, if any compile error happens,
* it fallbacks to interpreted version.
*/
abstract class CodegenObjectFactory[IN, OUT] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any better name? How about CodeGeneratorWithInterpretedFallback and make it extends CodeGenerator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it needs CodeGenerator? I think this CodeGeneratorWithInterpretedFallback just delegate to various CodeGenerator (e.g., GenerateUnsafeProjection) to produce codegen object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenerateUnsafeProjection also extends CodeGenerator.

Is there any class we want it to extend CodegenObjectFactory but not CodeGenerator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, no doubt GenerateUnsafeProjection extends CodeGenerator. It's the class doing generators of byte codes.

Previously UnsafeProjection has its own interface UnsafeProjectionCreator, does not extends CodeGenerator. So currently I let it follow previous UnsafeProjection's API.

We can change it to implement CodeGenerator and also change the places using UnsafeProjection, if you think it is necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's keep the previous way to minimize code changes. How about just rename it to CodeGeneratorWithInterpretedFallback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Reamed.


// Creates wanted object. First trying codegen implementation. If any compile error happens,
// fallbacks to interpreted version.
def createObject(in: IN): OUT = {
val fallbackMode = SQLConf.get.getConf(SQLConf.CODEGEN_OBJECT_FALLBACK)
Copy link
Member Author

@viirya viirya May 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only access SQLConf on the driver. Now this causes test failure. We can't know this config value inside createObject which can be called on executors.

To solve this, UnsafeProjection can't be an object as now, but a case class with a fallback mode parameter. So we can create this factory on driver. @cloud-fan @hvanhovell WDYT?

// Only in tests, we can use `SQLConf.CODEGEN_OBJECT_FALLBACK` to choose codegen/interpreted
// only path.
if (Utils.isTesting && fallbackMode != "fallback") {
fallbackMode match {
case "codegen-only" => createCodeGeneratedObject(in)
case "interpreted-only" => createInterpretedObject(in)
}
} else {
try {
createCodeGeneratedObject(in)
} catch {
case CodegenError(_) => createInterpretedObject(in)
}
}
}

protected def createCodeGeneratedObject(in: IN): OUT
protected def createInterpretedObject(in: IN): OUT
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@ class InterpretedUnsafeProjection(expressions: Array[Expression]) extends Unsafe
/**
* Helper functions for creating an [[InterpretedUnsafeProjection]].
*/
object InterpretedUnsafeProjection extends UnsafeProjectionCreator {

object InterpretedUnsafeProjection {
/**
* Returns an [[UnsafeProjection]] for given sequence of bound Expressions.
*/
override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
protected[sql] def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are inside object, if we want to expose it, just mark it as public

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you change it?

Copy link
Member Author

@viirya viirya May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to commit...

// We need to make sure that we do not reuse stateful expressions.
val cleanedExpressions = exprs.map(_.transform {
case s: Stateful => s.freshCopy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,31 @@ abstract class UnsafeProjection extends Projection {
override def apply(row: InternalRow): UnsafeRow
}

trait UnsafeProjectionCreator {
/**
* The factory object for `UnsafeProjection`.
*/
object UnsafeProjection extends CodegenObjectFactory[Seq[Expression], UnsafeProjection] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to make it a class is a good idea, we can set default valule like

class UnsafeProjection(mode: CodegenObjectFactoryMode = CodegenObjectFactoryMode.currentMode)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all subclasses of CodegenObjectFactory need mode, isn't we ask for it value at createObject better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean something like

abstract class CodegenObjectFactory[IN, OUT] {
  def mode: CodegenObjectFactoryMode

  def createObject(in: IN): OUT = {
    if (mode == ...) ...
    ...
  }
}

Copy link
Member Author

@viirya viirya May 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem of passing mode into class UnsafeProjection is, if we create the UnsafeProjection object and then change the SQL config, it can't be reflected in UnsafeProjection object's behavior.

If we don't let the config affect this behavior on production, then it should be fine. Then we don't actually need this parameter at most of time, because we only use it in tests. If so, this parameter seems useless. We just need to access CodegenObjectFactoryMode.currentMode when createObject is called.


override protected def createCodeGeneratedObject(in: Seq[Expression]): UnsafeProjection = {
GenerateUnsafeProjection.generate(in)
}

override protected def createInterpretedObject(in: Seq[Expression]): UnsafeProjection = {
InterpretedUnsafeProjection.createProjection(in)
}

protected def toBoundExprs(
exprs: Seq[Expression],
inputSchema: Seq[Attribute]): Seq[Expression] = {
exprs.map(BindReferences.bindReference(_, inputSchema))
}

protected def toUnsafeExprs(exprs: Seq[Expression]): Seq[Expression] = {
exprs.map(_ transform {
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
})
}

/**
* Returns an UnsafeProjection for given StructType.
*
Expand All @@ -129,10 +153,7 @@ trait UnsafeProjectionCreator {
* Returns an UnsafeProjection for given sequence of bound Expressions.
*/
def create(exprs: Seq[Expression]): UnsafeProjection = {
val unsafeExprs = exprs.map(_ transform {
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
})
createProjection(unsafeExprs)
createObject(toUnsafeExprs(exprs))
}

def create(expr: Expression): UnsafeProjection = create(Seq(expr))
Expand All @@ -142,34 +163,24 @@ trait UnsafeProjectionCreator {
* `inputSchema`.
*/
def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): UnsafeProjection = {
create(exprs.map(BindReferences.bindReference(_, inputSchema)))
}

/**
* Returns an [[UnsafeProjection]] for given sequence of bound Expressions.
*/
protected def createProjection(exprs: Seq[Expression]): UnsafeProjection
}

object UnsafeProjection extends UnsafeProjectionCreator {

override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
GenerateUnsafeProjection.generate(exprs)
create(toBoundExprs(exprs, inputSchema))
}

/**
* Same as other create()'s but allowing enabling/disabling subexpression elimination.
* TODO: refactor the plumbing and clean this up.
* The param `subexpressionEliminationEnabled` doesn't guarantee to work. For example,
* when fallbacking to interpreted execution, it is not supported.
*/
def create(
exprs: Seq[Expression],
inputSchema: Seq[Attribute],
subexpressionEliminationEnabled: Boolean): UnsafeProjection = {
val e = exprs.map(BindReferences.bindReference(_, inputSchema))
.map(_ transform {
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
})
GenerateUnsafeProjection.generate(e, subexpressionEliminationEnabled)
val unsafeExprs = toUnsafeExprs(toBoundExprs(exprs, inputSchema))
try {
GenerateUnsafeProjection.generate(unsafeExprs, subexpressionEliminationEnabled)
} catch {
case CodegenError(_) => InterpretedUnsafeProjection.createProjection(unsafeExprs)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,17 @@ object SQLConf {
.intConf
.createWithDefault(100)

val CODEGEN_OBJECT_FALLBACK = buildConf("spark.sql.test.codegenObject.fallback")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODEGEN_FACTORY_MODE and spark.sql.codegen.factoryMode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, shall we use CODEGEN_FACTORY_MODE or CODEGEN_FALLBACK_MODE?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODEGEN_FACTORY_MODE seems more accurate to me. This doesn't control all codegen fallback.

.doc("Determines the behavior of any factories extending `CodegenObjectFactory`" +
" during tests. `fallback` means trying codegen first and then fallbacking to" +
"interpreted if any compile error happens. Disabling fallback if `codegen-only`." +
"`interpreted-only` skips codegen and goes interpreted path always. Note that" +
"this config works only for tests. In production it always runs with `fallback` mode")
.internal()
.stringConf
.checkValues(Set("fallback", "codegen-only", "interpreted-only"))
.createWithDefault("fallback")

val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
.internal()
.doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, LongType}

class CodegenObjectFactorySuite extends SparkFunSuite with PlanTestBase {

// Given a factory object and corresponding input, checking if `SQLConf.CODEGEN_OBJECT_FALLBACK`
// can switch between codegen/interpreted implementation.
private def testCodegenFactory[IN, OUT](factory: CodegenObjectFactory[IN, OUT],
input: IN, checkerForCodegen: OUT => Unit, checkerForInterpreted: OUT => Unit) = {

val modes = Seq("codegen-only", "interpreted-only")
.zip(Seq(checkerForCodegen, checkerForInterpreted))

for ((fallbackMode, checker) <- modes) {
withSQLConf(SQLConf.CODEGEN_OBJECT_FALLBACK.key -> fallbackMode) {
val obj = factory.createObject(input)
checker(obj)
}
}
}

test("test UnsafeProjection factory") {
val input = Seq(LongType, IntegerType)
.zipWithIndex.map(x => BoundReference(x._2, x._1, true))

def checkerForCodegen(projection: UnsafeProjection): Unit = {
assert(projection.getClass.getName.contains("GeneratedClass$SpecificUnsafeProjection"))
}

def checkerForInterpreted(projection: UnsafeProjection): Unit = {
assert(projection.isInstanceOf[InterpretedUnsafeProjection])
}

testCodegenFactory(UnsafeProjection, input, checkerForCodegen, checkerForInterpreted)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.util.Utils
/**
* A few helper functions for expression evaluation testing. Mixin this trait to use them.
*/
trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBase {
self: SparkFunSuite =>

protected def create_row(values: Any*): InternalRow = {
Expand Down Expand Up @@ -196,39 +197,35 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
expression: Expression,
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
checkEvaluationWithUnsafeProjection(expression, expected, inputRow, UnsafeProjection)
checkEvaluationWithUnsafeProjection(expression, expected, inputRow, InterpretedUnsafeProjection)
}

protected def checkEvaluationWithUnsafeProjection(
expression: Expression,
expected: Any,
inputRow: InternalRow,
factory: UnsafeProjectionCreator): Unit = {
val unsafeRow = evaluateWithUnsafeProjection(expression, inputRow, factory)
val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"

if (expected == null) {
if (!unsafeRow.isNullAt(0)) {
val expectedRow = InternalRow(expected, expected)
fail("Incorrect evaluation in unsafe mode: " +
s"$expression, actual: $unsafeRow, expected: $expectedRow$input")
}
} else {
val lit = InternalRow(expected, expected)
val expectedRow =
factory.create(Array(expression.dataType, expression.dataType)).apply(lit)
if (unsafeRow != expectedRow) {
fail("Incorrect evaluation in unsafe mode: " +
s"$expression, actual: $unsafeRow, expected: $expectedRow$input")
for (fallbackMode <- Seq("codegen-only", "interpreted-only")) {
withSQLConf(SQLConf.CODEGEN_OBJECT_FALLBACK.key -> fallbackMode) {
val factory = UnsafeProjection
val unsafeRow = evaluateWithUnsafeProjection(expression, inputRow, factory)
val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"

if (expected == null) {
if (!unsafeRow.isNullAt(0)) {
val expectedRow = InternalRow(expected, expected)
fail("Incorrect evaluation in unsafe mode: " +
s"$expression, actual: $unsafeRow, expected: $expectedRow$input")
}
} else {
val lit = InternalRow(expected, expected)
val expectedRow =
factory.create(Array(expression.dataType, expression.dataType)).apply(lit)
if (unsafeRow != expectedRow) {
fail("Incorrect evaluation in unsafe mode: " +
s"$expression, actual: $unsafeRow, expected: $expectedRow$input")
}
}
}
}
}

protected def evaluateWithUnsafeProjection(
expression: Expression,
inputRow: InternalRow = EmptyRow,
factory: UnsafeProjectionCreator = UnsafeProjection): InternalRow = {
factory: UnsafeProjection.type = UnsafeProjection): InternalRow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this really need to take the parameter?

// SPARK-16489 Explicitly doing code generation twice so code gen will fail if
// some expression is reusing variable names across different instances.
// This behavior is tested in ExpressionEvalHelperSuite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,15 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val structExpected = new GenericArrayData(
Array(InternalRow.fromSeq(Seq(1, 2)), InternalRow.fromSeq(Seq(3, 4))))
checkEvaluationWithUnsafeProjection(
structEncoder.serializer.head,
structExpected,
structInputRow,
UnsafeProjection) // TODO(hvanhovell) revert this when SPARK-23587 is fixed
structEncoder.serializer.head, structExpected, structInputRow)

// test UnsafeArray-backed data
val arrayEncoder = ExpressionEncoder[Array[Array[Int]]]
val arrayInputRow = InternalRow.fromSeq(Seq(Array(Array(1, 2), Array(3, 4))))
val arrayExpected = new GenericArrayData(
Array(new GenericArrayData(Array(1, 2)), new GenericArrayData(Array(3, 4))))
checkEvaluationWithUnsafeProjection(
arrayEncoder.serializer.head,
arrayExpected,
arrayInputRow,
UnsafeProjection) // TODO(hvanhovell) revert this when SPARK-23587 is fixed
arrayEncoder.serializer.head, arrayExpected, arrayInputRow)

// test UnsafeMap-backed data
val mapEncoder = ExpressionEncoder[Array[Map[Int, Int]]]
Expand All @@ -109,10 +103,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
new GenericArrayData(Array(3, 4)),
new GenericArrayData(Array(300, 400)))))
checkEvaluationWithUnsafeProjection(
mapEncoder.serializer.head,
mapExpected,
mapInputRow,
UnsafeProjection) // TODO(hvanhovell) revert this when SPARK-23587 is fixed
mapEncoder.serializer.head, mapExpected, mapInputRow)
}

test("SPARK-23582: StaticInvoke should support interpreted execution") {
Expand Down Expand Up @@ -287,8 +278,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluationWithUnsafeProjection(
expr,
expected,
inputRow,
UnsafeProjection) // TODO(hvanhovell) revert this when SPARK-23587 is fixed
inputRow)
}
checkEvaluationWithOptimization(expr, expected, inputRow)
}
Expand Down
Loading