From 414e48f652d65deccae683e935a1c90ab689f466 Mon Sep 17 00:00:00 2001 From: Mick Jermsurawong Date: Sat, 29 Jun 2019 22:24:44 -0700 Subject: [PATCH 1/6] add test --- .../encoders/ExpressionEncoderSuite.scala | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 86e43d71e460..59b80c946d4e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.ClosureCleaner @@ -379,6 +380,80 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes assert(e.getMessage.contains("tuple with more than 22 elements are not supported")) } + // Scala / Java big decimals ---------------------------------------------------------- + + encodeDecodeTest(BigDecimal(("9" * 20) + "." + "9" * 18), + "scala decimal within precision/scale limit") + encodeDecodeTest(new java.math.BigDecimal(("9" * 20) + "." + "9" * 18), + "java decimal within precision/scale limit") + + encodeDecodeTest(BigDecimal(("9" * 20) + "." + "9" * 18).unary_-, + "negative scala decimal within precision/scale limit") + encodeDecodeTest(new java.math.BigDecimal(("9" * 20) + "." + "9" * 18).negate, + "negative java decimal within precision/scale limit") + + testOverflowingBigNumeric(BigDecimal("1" * 21), "scala big decimal") + testOverflowingBigNumeric(new java.math.BigDecimal("1" * 21), "java big decimal") + + testOverflowingBigNumeric(-BigDecimal("1" * 21), "negative scala big decimal") + testOverflowingBigNumeric(new java.math.BigDecimal("1" * 21).negate, "negative java big decimal") + + testOverflowingBigNumeric(BigDecimal(("1" * 21) + ".123"), + "scala big decimal with fractional part") + testOverflowingBigNumeric(new java.math.BigDecimal(("1" * 21) + ".123"), + "java big decimal with fractional part") + + testOverflowingBigNumeric(BigDecimal(("1" * 21) + "." + "9999" * 100), + "scala big decimal with long fractional part") + testOverflowingBigNumeric(new java.math.BigDecimal(("1" * 21) + "." + "9999" * 100), + "java big decimal with long fractional part") + + // Scala / Java big integers ---------------------------------------------------------- + + encodeDecodeTest(BigInt("9" * 38), "scala big integer within precision limit") + encodeDecodeTest(new BigInteger("9" * 38), "java big integer within precision limit") + + encodeDecodeTest(-BigInt("9" * 38), + "negative scala big integer within precision limit") + encodeDecodeTest(new BigInteger("9" * 38).negate(), + "negative java big integer within precision limit") + + testOverflowingBigNumeric(BigInt("1" * 39), "scala big int") + testOverflowingBigNumeric(new BigInteger("1" * 39), "java big integer") + + testOverflowingBigNumeric(-BigInt("1" * 39), "negative scala big int") + testOverflowingBigNumeric(new BigInteger("1" * 39).negate, "negative java big integer") + + testOverflowingBigNumeric(BigInt("9" * 100), "scala very large big int") + testOverflowingBigNumeric(new BigInteger("9" * 100), "java very big int") + + private def testOverflowingBigNumeric[T: TypeTag](bigDecimal: T, testName: String): Unit = { + for { + allowNullOnOverflow <- Seq(true, false) + } { + testAndVerifyNotLeakingReflectionObjects( + s"overflowing $testName, allowNullOnOverflow=$allowNullOnOverflow") { + withSQLConf( + SQLConf.DECIMAL_OPERATIONS_NULL_ON_OVERFLOW.key -> allowNullOnOverflow.toString + ) { + // Need to construct Encoder here rather than implicitly resolving it + // so that SQLConf changes are respected. + val encoder = ExpressionEncoder[T]() + if (allowNullOnOverflow) { + val convertedBack = encoder.resolveAndBind().fromRow(encoder.toRow(bigDecimal)) + assert(convertedBack === null) + } else { + val e = intercept[RuntimeException] { + encoder.toRow(bigDecimal) + } + assert(e.getMessage.contains("Error while encoding")) + assert(e.getCause.getClass === classOf[ArithmeticException]) + } + } + } + } + } + private def encodeDecodeTest[T : ExpressionEncoder]( input: T, testName: String): Unit = { From b3740ba153920d0dbee9b9d769c0e3b79029befa Mon Sep 17 00:00:00 2001 From: Mick Jermsurawong Date: Sat, 29 Jun 2019 22:24:57 -0700 Subject: [PATCH 2/6] overflow check at expression encoder --- .../spark/sql/catalyst/SerializerBuildHelper.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala index e035c4be9724..75c278e78114 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql.catalyst -import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, Expression, IsNull, UnsafeArrayData} +import org.apache.spark.sql.catalyst.expressions.{CheckOverflow, CreateNamedStruct, Expression, IsNull, UnsafeArrayData} import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String object SerializerBuildHelper { + private def nullOnOverflow: Boolean = SQLConf.get.decimalOperationsNullOnOverflow + def createSerializerForBoolean(inputObject: Expression): Expression = { Invoke(inputObject, "booleanValue", BooleanType) } @@ -99,12 +102,12 @@ object SerializerBuildHelper { } def createSerializerForJavaBigDecimal(inputObject: Expression): Expression = { - StaticInvoke( + CheckOverflow(StaticInvoke( Decimal.getClass, DecimalType.SYSTEM_DEFAULT, "apply", inputObject :: Nil, - returnNullable = false) + returnNullable = false), DecimalType.SYSTEM_DEFAULT, nullOnOverflow) } def createSerializerForScalaBigDecimal(inputObject: Expression): Expression = { @@ -112,12 +115,12 @@ object SerializerBuildHelper { } def createSerializerForJavaBigInteger(inputObject: Expression): Expression = { - StaticInvoke( + CheckOverflow(StaticInvoke( Decimal.getClass, DecimalType.BigIntDecimal, "apply", inputObject :: Nil, - returnNullable = false) + returnNullable = false), DecimalType.BigIntDecimal, nullOnOverflow) } def createSerializerForScalaBigInt(inputObject: Expression): Expression = { From f922ff30b823cf74e87754b3c3cc88a4f50b7ced Mon Sep 17 00:00:00 2001 From: Mick Jermsurawong Date: Sun, 30 Jun 2019 09:32:18 -0700 Subject: [PATCH 3/6] test overflow flag on row encoder --- .../catalyst/encoders/RowEncoderSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 4b3d0612a9b8..12138f787d50 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -162,6 +162,32 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { assert(row.toSeq(schema).head == decimal) } + test("RowEncoder should respect nullOnOverflow for decimals") { + val schema = new StructType().add("decimal", DecimalType.SYSTEM_DEFAULT) + testDecimalOverflow(schema, Row(BigDecimal("9" * 100))) + testDecimalOverflow(schema, Row(new java.math.BigDecimal("9" * 100))) + } + + private def testDecimalOverflow(schema: StructType, row: Row): Unit = { + withSQLConf(SQLConf.DECIMAL_OPERATIONS_NULL_ON_OVERFLOW.key -> false.toString) { + val encoder = RowEncoder(schema).resolveAndBind() + intercept[Exception] { + encoder.toRow(row) + } match { + case e: ArithmeticException => + assert(e.getMessage.contains("cannot be represented as Decimal")) + case e: RuntimeException => + assert(e.getCause.isInstanceOf[ArithmeticException]) + assert(e.getCause.getMessage.contains("cannot be represented as Decimal")) + } + } + + withSQLConf(SQLConf.DECIMAL_OPERATIONS_NULL_ON_OVERFLOW.key -> true.toString) { + val encoder = RowEncoder(schema).resolveAndBind() + assert(encoder.fromRow(encoder.toRow(row)).get(0) == null) + } + } + test("RowEncoder should preserve schema nullability") { val schema = new StructType().add("int", IntegerType, nullable = false) val encoder = RowEncoder(schema).resolveAndBind() From 0ed62b3747b065ea66c0bc329bc288e399b64ac2 Mon Sep 17 00:00:00 2001 From: Mick Jermsurawong Date: Sun, 30 Jun 2019 09:32:32 -0700 Subject: [PATCH 4/6] improve test style --- .../sql/catalyst/encoders/ExpressionEncoderSuite.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 59b80c946d4e..ef80fceed4ee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -427,10 +427,8 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes testOverflowingBigNumeric(BigInt("9" * 100), "scala very large big int") testOverflowingBigNumeric(new BigInteger("9" * 100), "java very big int") - private def testOverflowingBigNumeric[T: TypeTag](bigDecimal: T, testName: String): Unit = { - for { - allowNullOnOverflow <- Seq(true, false) - } { + private def testOverflowingBigNumeric[T: TypeTag](bigNumeric: T, testName: String): Unit = { + Seq(true, false).foreach { allowNullOnOverflow => testAndVerifyNotLeakingReflectionObjects( s"overflowing $testName, allowNullOnOverflow=$allowNullOnOverflow") { withSQLConf( @@ -440,11 +438,11 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes // so that SQLConf changes are respected. val encoder = ExpressionEncoder[T]() if (allowNullOnOverflow) { - val convertedBack = encoder.resolveAndBind().fromRow(encoder.toRow(bigDecimal)) + val convertedBack = encoder.resolveAndBind().fromRow(encoder.toRow(bigNumeric)) assert(convertedBack === null) } else { val e = intercept[RuntimeException] { - encoder.toRow(bigDecimal) + encoder.toRow(bigNumeric) } assert(e.getMessage.contains("Error while encoding")) assert(e.getCause.getClass === classOf[ArithmeticException]) From 2513ea779767b4afbaa979a25ec402a7e2d9aa4c Mon Sep 17 00:00:00 2001 From: Mick Jermsurawong Date: Sun, 30 Jun 2019 23:21:21 -0700 Subject: [PATCH 5/6] address style --- .../spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index ef80fceed4ee..f4feeca1d05a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -387,7 +387,7 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes encodeDecodeTest(new java.math.BigDecimal(("9" * 20) + "." + "9" * 18), "java decimal within precision/scale limit") - encodeDecodeTest(BigDecimal(("9" * 20) + "." + "9" * 18).unary_-, + encodeDecodeTest(-BigDecimal(("9" * 20) + "." + "9" * 18), "negative scala decimal within precision/scale limit") encodeDecodeTest(new java.math.BigDecimal(("9" * 20) + "." + "9" * 18).negate, "negative java decimal within precision/scale limit") From 87073daaa19b606f80208117247aa3d407fa7bd2 Mon Sep 17 00:00:00 2001 From: Mick Jermsurawong Date: Mon, 1 Jul 2019 09:46:43 -0700 Subject: [PATCH 6/6] improve test style and follow naming convention --- .../spark/sql/catalyst/encoders/RowEncoderSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 12138f787d50..5d21e4a2a83c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -162,14 +162,14 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { assert(row.toSeq(schema).head == decimal) } - test("RowEncoder should respect nullOnOverflow for decimals") { + test("SPARK-23179: RowEncoder should respect nullOnOverflow for decimals") { val schema = new StructType().add("decimal", DecimalType.SYSTEM_DEFAULT) testDecimalOverflow(schema, Row(BigDecimal("9" * 100))) testDecimalOverflow(schema, Row(new java.math.BigDecimal("9" * 100))) } private def testDecimalOverflow(schema: StructType, row: Row): Unit = { - withSQLConf(SQLConf.DECIMAL_OPERATIONS_NULL_ON_OVERFLOW.key -> false.toString) { + withSQLConf(SQLConf.DECIMAL_OPERATIONS_NULL_ON_OVERFLOW.key -> "false") { val encoder = RowEncoder(schema).resolveAndBind() intercept[Exception] { encoder.toRow(row) @@ -182,7 +182,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { } } - withSQLConf(SQLConf.DECIMAL_OPERATIONS_NULL_ON_OVERFLOW.key -> true.toString) { + withSQLConf(SQLConf.DECIMAL_OPERATIONS_NULL_ON_OVERFLOW.key -> "true") { val encoder = RowEncoder(schema).resolveAndBind() assert(encoder.fromRow(encoder.toRow(row)).get(0) == null) }