Skip to content

Commit 0603913

Browse files
AngersZhuuuudongjoon-hyun
authored andcommitted
[SPARK-33593][SQL] Vector reader got incorrect data with binary partition value
### What changes were proposed in this pull request? Currently when enable parquet vectorized reader, use binary type as partition col will return incorrect value as below UT ```scala test("Parquet vector reader incorrect with binary partition value") { Seq(false, true).foreach(tag => { withSQLConf("spark.sql.parquet.enableVectorizedReader" -> tag.toString) { withTable("t1") { sql( """CREATE TABLE t1(name STRING, id BINARY, part BINARY) | USING PARQUET PARTITIONED BY (part)""".stripMargin) sql(s"INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')") if (tag) { checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"), Row("a", "Spark SQL", "")) } else { checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"), Row("a", "Spark SQL", "Spark SQL")) } } } }) } ``` ### Why are the changes needed? Fix data incorrect issue ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #30824 from AngersZhuuuu/SPARK-33593. Authored-by: angerszhu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b0da2bc commit 0603913

File tree

4 files changed

+114
-3
lines changed

4 files changed

+114
-3
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
5454
} else {
5555
if (t == DataTypes.BooleanType) {
5656
col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
57+
} else if (t == DataTypes.BinaryType) {
58+
col.putByteArray(0, row.getBinary(fieldIdx));
5759
} else if (t == DataTypes.ByteType) {
5860
col.putBytes(0, capacity, row.getByte(fieldIdx));
5961
} else if (t == DataTypes.ShortType) {
@@ -94,6 +96,9 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
9496
col.putInts(0, capacity, row.getInt(fieldIdx));
9597
} else if (t instanceof TimestampType) {
9698
col.putLongs(0, capacity, row.getLong(fieldIdx));
99+
} else {
100+
throw new RuntimeException(String.format("DataType %s is not supported" +
101+
" in column vectorized reader.", t.sql()));
97102
}
98103
}
99104
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3745,6 +3745,32 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
37453745
}
37463746
}
37473747
}
3748+
3749+
test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
3750+
Seq("false", "true").foreach(value => {
3751+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
3752+
withTable("t1") {
3753+
sql(
3754+
"""CREATE TABLE t1(name STRING, id BINARY, part BINARY)
3755+
|USING PARQUET PARTITIONED BY (part)""".stripMargin)
3756+
sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
3757+
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
3758+
Row("a", "Spark SQL", "Spark SQL"))
3759+
}
3760+
}
3761+
3762+
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
3763+
withTable("t2") {
3764+
sql(
3765+
"""CREATE TABLE t2(name STRING, id BINARY, part BINARY)
3766+
|USING ORC PARTITIONED BY (part)""".stripMargin)
3767+
sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
3768+
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"),
3769+
Row("a", "Spark SQL", "Spark SQL"))
3770+
}
3771+
}
3772+
})
3773+
}
37483774
}
37493775

37503776
case class Foo(bar: Option[String])

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,29 @@
1717

1818
package org.apache.spark.sql.execution.datasources.orc
1919

20+
import java.io.File
21+
22+
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
24+
import org.apache.hadoop.mapreduce.lib.input.FileSplit
25+
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
2026
import org.apache.orc.TypeDescription
2127

2228
import org.apache.spark.sql.QueryTest
2329
import org.apache.spark.sql.catalyst.InternalRow
30+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
31+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
32+
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
2433
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
2534
import org.apache.spark.sql.test.SharedSparkSession
26-
import org.apache.spark.sql.types.{StructField, StructType}
35+
import org.apache.spark.sql.types._
36+
import org.apache.spark.unsafe.types.UTF8String
2737
import org.apache.spark.unsafe.types.UTF8String.fromString
2838

2939
class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
40+
41+
import testImplicits._
42+
3043
private val dataSchema = StructType.fromDDL("col1 int, col2 int")
3144
private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
3245
private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
@@ -77,4 +90,66 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
7790
assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
7891
}
7992
}
93+
94+
test("SPARK-33593: partition column types") {
95+
withTempPath { dir =>
96+
Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath)
97+
98+
val dataTypes =
99+
Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
100+
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
101+
102+
val constantValues =
103+
Seq(
104+
UTF8String.fromString("a string"),
105+
true,
106+
1.toByte,
107+
"Spark SQL".getBytes,
108+
2.toShort,
109+
3,
110+
Long.MaxValue,
111+
0.25.toFloat,
112+
0.75D,
113+
Decimal("1234.23456"),
114+
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
115+
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
116+
117+
dataTypes.zip(constantValues).foreach { case (dt, v) =>
118+
val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
119+
val partitionValues = new GenericInternalRow(Array(v))
120+
val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
121+
val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
122+
val taskConf = sqlContext.sessionState.newHadoopConf()
123+
val orcFileSchema = TypeDescription.fromString(schema.simpleString)
124+
val vectorizedReader = new OrcColumnarBatchReader(4096)
125+
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
126+
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
127+
128+
try {
129+
vectorizedReader.initialize(fileSplit, taskAttemptContext)
130+
vectorizedReader.initBatch(
131+
orcFileSchema,
132+
schema.toArray,
133+
Array(0, -1),
134+
Array(-1, 0),
135+
partitionValues)
136+
vectorizedReader.nextKeyValue()
137+
val row = vectorizedReader.getCurrentValue.getRow(0)
138+
139+
// Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
140+
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
141+
val actual = row.copy().get(1, dt)
142+
val expected = v
143+
if (dt.isInstanceOf[BinaryType]) {
144+
assert(actual.asInstanceOf[Array[Byte]]
145+
sameElements expected.asInstanceOf[Array[Byte]])
146+
} else {
147+
assert(actual == expected)
148+
}
149+
} finally {
150+
vectorizedReader.close()
151+
}
152+
}
153+
}
154+
}
80155
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -790,14 +790,15 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
790790
Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
791791

792792
val dataTypes =
793-
Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
793+
Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
794794
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
795795

796796
val constantValues =
797797
Seq(
798798
UTF8String.fromString("a string"),
799799
true,
800800
1.toByte,
801+
"Spark SQL".getBytes,
801802
2.toShort,
802803
3,
803804
Long.MaxValue,
@@ -825,7 +826,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
825826
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
826827
val actual = row.copy().get(1, dt)
827828
val expected = v
828-
assert(actual == expected)
829+
if (dt.isInstanceOf[BinaryType]) {
830+
assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]])
831+
} else {
832+
assert(actual == expected)
833+
}
829834
} finally {
830835
vectorizedReader.close()
831836
}

0 commit comments

Comments
 (0)