From 851d7235cb2d60af95da3cde6420fea6e63c52d3 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 1 Oct 2018 18:22:34 +0200 Subject: [PATCH 1/6] [SPARK-25582][SQL] Zero-out all bytes when writing decimal --- .../catalyst/expressions/codegen/UnsafeRowWriter.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java index 71c49d8ed017..fa5649d47d38 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java @@ -185,13 +185,13 @@ public void write(int ordinal, Decimal input, int precision, int scale) { // grow the global buffer before writing data. holder.grow(16); + // zero-out the bytes + Platform.putLong(getBuffer(), cursor(), 0L); + Platform.putLong(getBuffer(), cursor() + 8, 0L); + // Make sure Decimal object has the same scale as DecimalType. // Note that we may pass in null Decimal object to set null for it. if (input == null || !input.changePrecision(precision, scale)) { - // zero-out the bytes - Platform.putLong(getBuffer(), cursor(), 0L); - Platform.putLong(getBuffer(), cursor() + 8, 0L); - BitSetMethods.set(getBuffer(), startingOffset, ordinal); // keep the offset for future update setOffsetAndSize(ordinal, 0); @@ -200,8 +200,6 @@ public void write(int ordinal, Decimal input, int precision, int scale) { final int numBytes = bytes.length; assert numBytes <= 16; - zeroOutPaddingBytes(numBytes); - // Write the bytes to the variable length portion. Platform.copyMemory( bytes, Platform.BYTE_ARRAY_OFFSET, getBuffer(), cursor(), numBytes); From 6b84b41915ae184912922bb820a147060ac13afc Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 2 Oct 2018 10:46:08 +0200 Subject: [PATCH 2/6] add UT + address comment --- .../expressions/codegen/UnsafeRowWriter.java | 2 +- .../codegen/UnsafeWriterSuite.scala | 43 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java index fa5649d47d38..3960d6d52047 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java @@ -185,7 +185,7 @@ public void write(int ordinal, Decimal input, int precision, int scale) { // grow the global buffer before writing data. holder.grow(16); - // zero-out the bytes + // always zero-out the 16-byte buffer Platform.putLong(getBuffer(), cursor(), 0L); Platform.putLong(getBuffer(), cursor() + 8, 0L); diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala new file mode 100644 index 000000000000..e79e42da95c5 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types.Decimal + +class UnsafeWriterSuite extends SparkFunSuite { + + test("SPARK-25538: zero-out all bits for decimals") { + // This decimal holds 8 bytes + val decimal1 = Decimal(0.431) + decimal1.changePrecision(38, 18) + // This decimal holds 11 bytes + val decimal2 = Decimal(123456789.1232456789) + decimal2.changePrecision(38, 18) + val unsafeRowWriter = new UnsafeRowWriter(1) + unsafeRowWriter.resetRowWriter() + unsafeRowWriter.write(0, decimal2, decimal2.precision, decimal2.scale) + unsafeRowWriter.reset() + unsafeRowWriter.write(0, decimal1, decimal1.precision, decimal1.scale) + val res = unsafeRowWriter.getRow + assert(res.getDecimal(0, decimal1.precision, decimal1.scale) == decimal1) + // Check that the bytes which are not used by decimal1 (but are allocated) are zero-ed out + assert(res.getBytes()(25) == 0x00) + } + +} From 64f4ed0e286d1a01192400203e167d046cb800f5 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 2 Oct 2018 14:49:59 +0200 Subject: [PATCH 3/6] address comment --- .../sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala index e79e42da95c5..6af8151497dc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala @@ -36,7 +36,10 @@ class UnsafeWriterSuite extends SparkFunSuite { unsafeRowWriter.write(0, decimal1, decimal1.precision, decimal1.scale) val res = unsafeRowWriter.getRow assert(res.getDecimal(0, decimal1.precision, decimal1.scale) == decimal1) - // Check that the bytes which are not used by decimal1 (but are allocated) are zero-ed out + // Check that the bytes which are not used by decimal1 (but are allocated) are zero-ed out. + // The first 16 bytes are used for the offset and size, then 8 bytes contain the value of + // decimal1. So from byte 25 to byte 32 there is the leftover of decimal2 which should have + // been zero-ed. assert(res.getBytes()(25) == 0x00) } From 72b7c5c16b33368bb332b168a99e99e13d4636cf Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 2 Oct 2018 15:54:53 +0200 Subject: [PATCH 4/6] improve ut --- .../codegen/UnsafeWriterSuite.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala index 6af8151497dc..e40b68103aa5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala @@ -29,18 +29,20 @@ class UnsafeWriterSuite extends SparkFunSuite { // This decimal holds 11 bytes val decimal2 = Decimal(123456789.1232456789) decimal2.changePrecision(38, 18) - val unsafeRowWriter = new UnsafeRowWriter(1) - unsafeRowWriter.resetRowWriter() - unsafeRowWriter.write(0, decimal2, decimal2.precision, decimal2.scale) - unsafeRowWriter.reset() - unsafeRowWriter.write(0, decimal1, decimal1.precision, decimal1.scale) - val res = unsafeRowWriter.getRow - assert(res.getDecimal(0, decimal1.precision, decimal1.scale) == decimal1) - // Check that the bytes which are not used by decimal1 (but are allocated) are zero-ed out. - // The first 16 bytes are used for the offset and size, then 8 bytes contain the value of - // decimal1. So from byte 25 to byte 32 there is the leftover of decimal2 which should have - // been zero-ed. - assert(res.getBytes()(25) == 0x00) + // On an UnsafeRowWriter we write decimal2 first and then decimal1 + val unsafeRowWriter1 = new UnsafeRowWriter(1) + unsafeRowWriter1.resetRowWriter() + unsafeRowWriter1.write(0, decimal2, decimal2.precision, decimal2.scale) + unsafeRowWriter1.reset() + unsafeRowWriter1.write(0, decimal1, decimal1.precision, decimal1.scale) + val res1 = unsafeRowWriter1.getRow + // On a second UnsafeRowWriter we write directly decimal2 + val unsafeRowWriter2 = new UnsafeRowWriter(1) + unsafeRowWriter2.resetRowWriter() + unsafeRowWriter2.write(0, decimal1, decimal1.precision, decimal1.scale) + val res2 = unsafeRowWriter2.getRow + // The two rows should be the equal + assert(res1 == res2) } } From d7d17d8362912978851a55726ed2154435bebb2c Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 3 Oct 2018 10:03:06 +0200 Subject: [PATCH 5/6] address comment --- .../{UnsafeWriterSuite.scala => UnsafeRowWriterSuite.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/{UnsafeWriterSuite.scala => UnsafeRowWriterSuite.scala} (97%) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriterSuite.scala similarity index 97% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriterSuite.scala index e40b68103aa5..18add9529fbe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriterSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types.Decimal -class UnsafeWriterSuite extends SparkFunSuite { +class UnsafeRowWriterSuite extends SparkFunSuite { test("SPARK-25538: zero-out all bits for decimals") { // This decimal holds 8 bytes From be38c4c47524605c4661333900d7b6f83ae2ce5a Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 3 Oct 2018 11:40:22 +0200 Subject: [PATCH 6/6] address comments --- .../expressions/codegen/UnsafeRowWriterSuite.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriterSuite.scala index 18add9529fbe..fb651b76fc16 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriterSuite.scala @@ -22,13 +22,18 @@ import org.apache.spark.sql.types.Decimal class UnsafeRowWriterSuite extends SparkFunSuite { + def checkDecimalSizeInBytes(decimal: Decimal, numBytes: Int): Unit = { + assert(decimal.toJavaBigDecimal.unscaledValue().toByteArray.length == numBytes) + } + test("SPARK-25538: zero-out all bits for decimals") { - // This decimal holds 8 bytes val decimal1 = Decimal(0.431) decimal1.changePrecision(38, 18) - // This decimal holds 11 bytes + checkDecimalSizeInBytes(decimal1, 8) + val decimal2 = Decimal(123456789.1232456789) decimal2.changePrecision(38, 18) + checkDecimalSizeInBytes(decimal2, 11) // On an UnsafeRowWriter we write decimal2 first and then decimal1 val unsafeRowWriter1 = new UnsafeRowWriter(1) unsafeRowWriter1.resetRowWriter() @@ -36,7 +41,7 @@ class UnsafeRowWriterSuite extends SparkFunSuite { unsafeRowWriter1.reset() unsafeRowWriter1.write(0, decimal1, decimal1.precision, decimal1.scale) val res1 = unsafeRowWriter1.getRow - // On a second UnsafeRowWriter we write directly decimal2 + // On a second UnsafeRowWriter we write directly decimal1 val unsafeRowWriter2 = new UnsafeRowWriter(1) unsafeRowWriter2.resetRowWriter() unsafeRowWriter2.write(0, decimal1, decimal1.precision, decimal1.scale)