From 4784edd3e8938257143788bd5022c39023cf231b Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 17 Dec 2020 23:12:56 +0800 Subject: [PATCH 1/8] [SPARK-33593][SQL] Parquet vector reader incorrect with binary partition value --- .../execution/vectorized/ColumnVectorUtils.java | 2 ++ .../org/apache/spark/sql/SQLQuerySuite.scala | 15 +++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index bce6aa28c42a..16c5770f2872 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -54,6 +54,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field } else { if (t == DataTypes.BooleanType) { col.putBooleans(0, capacity, row.getBoolean(fieldIdx)); + } else if (t == DataTypes.BinaryType) { + col.putByteArray(0, row.getBinary(fieldIdx)); } else if (t == DataTypes.ByteType) { col.putBytes(0, capacity, row.getByte(fieldIdx)); } else if (t == DataTypes.ShortType) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 112b1a7210cb..c2f7803210f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3745,6 +3745,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-33593: Parquet vector reader incorrect with binary partition value") { + Seq(true).foreach(tag => { + withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) { + withTable("t1") { + sql( + """CREATE TABLE t1(name STRING, id BINARY, part BINARY) + | USING PARQUET PARTITIONED BY (part)""".stripMargin) + sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") + checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"), + Row("a", "Spark SQL", "Spark SQL")) + } + } + }) + } } case class Foo(bar: Option[String]) From 8001fa2a4b5f7b4278226b4e02b1decd9e2d0701 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 18 Dec 2020 09:49:26 +0800 Subject: [PATCH 2/8] Update ColumnVectorUtils.java --- .../spark/sql/execution/vectorized/ColumnVectorUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 16c5770f2872..2f5277ceb5ef 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -96,6 +96,9 @@ public static void populate(WritableColumnVector col, InternalRow row, int field col.putInts(0, capacity, row.getInt(fieldIdx)); } else if (t instanceof TimestampType) { col.putLongs(0, capacity, row.getLong(fieldIdx)); + } else { + throw new RuntimeException(String.format("DataType %s is not supported" + + " when use column vectorized reader.", t.sql())); } } } From eeaf38ffdd3d8fe278909f4492144898d39edf02 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 18 Dec 2020 10:08:18 +0800 Subject: [PATCH 3/8] add UT --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 12 ++++++------ .../datasources/parquet/ParquetIOSuite.scala | 9 +++++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c2f7803210f9..40150ce30c79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3747,14 +3747,14 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("SPARK-33593: Parquet vector reader incorrect with binary partition value") { - Seq(true).foreach(tag => { - withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) { - withTable("t1") { + Seq("false", "true").foreach(value => { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) { + withTable("t") { sql( - """CREATE TABLE t1(name STRING, id BINARY, part BINARY) + """CREATE TABLE t(name STRING, id BINARY, part BINARY) | USING PARQUET PARTITIONED BY (part)""".stripMargin) - sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") - checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"), + sql(s"INSERT INTO t PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") + checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t"), Row("a", "Spark SQL", "Spark SQL")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index d13b3e58a30f..c69f2e6911ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -790,7 +790,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath) val dataTypes = - Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType, + Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType) val constantValues = @@ -798,6 +798,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession UTF8String.fromString("a string"), true, 1.toByte, + "Spark SQL".getBytes, 2.toShort, 3, Long.MaxValue, @@ -825,7 +826,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // in order to use get(...) method which is not implemented in `ColumnarBatch`. val actual = row.copy().get(1, dt) val expected = v - assert(actual == expected) + if (dt.isInstanceOf[BinaryType]) { + assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]]) + } else { + assert(actual == expected) + } } finally { vectorizedReader.close() } From e237541b5df616502035af3d20050f6eee076dd4 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 18 Dec 2020 10:49:19 +0800 Subject: [PATCH 4/8] follow comment --- .../spark/sql/execution/vectorized/ColumnVectorUtils.java | 2 +- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 2f5277ceb5ef..25aabcd08628 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -98,7 +98,7 @@ public static void populate(WritableColumnVector col, InternalRow row, int field col.putLongs(0, capacity, row.getLong(fieldIdx)); } else { throw new RuntimeException(String.format("DataType %s is not supported" + - " when use column vectorized reader.", t.sql())); + " in column vectorized reader.", t.sql())); } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 40150ce30c79..4832a50cdbf3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3752,8 +3752,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark withTable("t") { sql( """CREATE TABLE t(name STRING, id BINARY, part BINARY) - | USING PARQUET PARTITIONED BY (part)""".stripMargin) - sql(s"INSERT INTO t PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") + |USING PARQUET PARTITIONED BY (part)""".stripMargin) + sql("INSERT INTO t PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t"), Row("a", "Spark SQL", "Spark SQL")) } From 74b59ffef7e2637fc328539eac06a52470b1920c Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 18 Dec 2020 11:15:39 +0800 Subject: [PATCH 5/8] Update SQLQuerySuite.scala --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4832a50cdbf3..9f4ee7d08cc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3746,7 +3746,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } - test("SPARK-33593: Parquet vector reader incorrect with binary partition value") { + test("SPARK-33593: Vector reader got incorrect data with binary partition value") { Seq("false", "true").foreach(value => { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) { withTable("t") { @@ -3758,6 +3758,17 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Row("a", "Spark SQL", "Spark SQL")) } } + + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) { + withTable("t") { + sql( + """CREATE TABLE t(name STRING, id BINARY, part BINARY) + |USING ORC PARTITIONED BY (part)""".stripMargin) + sql("INSERT INTO t PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") + checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t"), + Row("a", "Spark SQL", "Spark SQL")) + } + } }) } } From ed0928c96ba3b1e70c1c436d5f2582a313581d45 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 18 Dec 2020 12:44:04 +0800 Subject: [PATCH 6/8] Update OrcColumnarBatchReaderSuite.scala --- .../orc/OrcColumnarBatchReaderSuite.scala | 77 ++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index 719bf91e1786..0d30be72c17e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -17,16 +17,29 @@ package org.apache.spark.sql.execution.datasources.orc +import java.io.File + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.orc.TypeDescription import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.unsafe.types.UTF8String.fromString class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + private val dataSchema = StructType.fromDDL("col1 int, col2 int") private val partitionSchema = StructType.fromDDL("p1 string, p2 string") private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2")) @@ -77,4 +90,66 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0)) } } + + test("SPARK-33593: OrcColumnarBatchReader - partition column types") { + withTempPath { dir => + Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath) + + val dataTypes = + Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType) + + val constantValues = + Seq( + UTF8String.fromString("a string"), + true, + 1.toByte, + "Spark SQL".getBytes, + 2.toShort, + 3, + Long.MaxValue, + 0.25.toFloat, + 0.75D, + Decimal("1234.23456"), + DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), + DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"))) + + dataTypes.zip(constantValues).foreach { case (dt, v) => + val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil) + val partitionValues = new GenericInternalRow(Array(v)) + val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0)) + val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty) + val taskConf = sqlContext.sessionState.newHadoopConf() + val orcFileSchema = TypeDescription.fromString(schema.simpleString) + val vectorizedReader = new OrcColumnarBatchReader(4096) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) + + try { + vectorizedReader.initialize(fileSplit, taskAttemptContext) + vectorizedReader.initBatch( + orcFileSchema, + schema.toArray, + Array(0, -1), + Array(-1, 0), + partitionValues) + vectorizedReader.nextKeyValue() + val row = vectorizedReader.getCurrentValue.getRow(0) + + // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch` + // in order to use get(...) method which is not implemented in `ColumnarBatch`. + val actual = row.copy().get(1, dt) + val expected = v + if (dt.isInstanceOf[BinaryType]) { + assert(actual.asInstanceOf[Array[Byte]] + sameElements expected.asInstanceOf[Array[Byte]]) + } else { + assert(actual == expected) + } + } finally { + vectorizedReader.close() + } + } + } + } } From 3b1b89622da95e2470cc9d488e9fc147c8be2151 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 18 Dec 2020 12:49:03 +0800 Subject: [PATCH 7/8] Update OrcColumnarBatchReaderSuite.scala --- .../execution/datasources/orc/OrcColumnarBatchReaderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index 0d30be72c17e..bfcef4633990 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -91,7 +91,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-33593: OrcColumnarBatchReader - partition column types") { + test("SPARK-33593: partition column types") { withTempPath { dir => Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath) From 5d55b388eaba22551bc8611772f91542e2f2c8ff Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 18 Dec 2020 14:14:47 +0800 Subject: [PATCH 8/8] Update SQLQuerySuite.scala --- .../org/apache/spark/sql/SQLQuerySuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9f4ee7d08cc2..b7cec5524556 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3749,23 +3749,23 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark test("SPARK-33593: Vector reader got incorrect data with binary partition value") { Seq("false", "true").foreach(value => { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) { - withTable("t") { + withTable("t1") { sql( - """CREATE TABLE t(name STRING, id BINARY, part BINARY) + """CREATE TABLE t1(name STRING, id BINARY, part BINARY) |USING PARQUET PARTITIONED BY (part)""".stripMargin) - sql("INSERT INTO t PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") - checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t"), + sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") + checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"), Row("a", "Spark SQL", "Spark SQL")) } } withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) { - withTable("t") { + withTable("t2") { sql( - """CREATE TABLE t(name STRING, id BINARY, part BINARY) + """CREATE TABLE t2(name STRING, id BINARY, part BINARY) |USING ORC PARTITIONED BY (part)""".stripMargin) - sql("INSERT INTO t PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") - checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t"), + sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") + checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"), Row("a", "Spark SQL", "Spark SQL")) } }