Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
} else {
if (t == DataTypes.BooleanType) {
col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
} else if (t == DataTypes.BinaryType) {
col.putByteArray(0, row.getBinary(fieldIdx));
Comment on lines +57 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm? If there is no case for BinaryType, seems there is also no final else block, so it just leaves as unpopulate previously?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add else block, and throw an exception?

Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Dec 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add else block, and throw an exception?

Updated and add UT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm? If there is no case for BinaryType, seems there is also no final else block, so it just leaves as unpopulate previously?

It looks like it does, so that value is empty

} else if (t == DataTypes.ByteType) {
col.putBytes(0, capacity, row.getByte(fieldIdx));
} else if (t == DataTypes.ShortType) {
Expand Down Expand Up @@ -94,6 +96,9 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
} else {
throw new RuntimeException(String.format("DataType %s is not supported" +
" in column vectorized reader.", t.sql()));
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3745,6 +3745,32 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
Seq("false", "true").foreach(value => {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
withTable("t") {
sql(
"""CREATE TABLE t(name STRING, id BINARY, part BINARY)
|USING PARQUET PARTITIONED BY (part)""".stripMargin)
sql("INSERT INTO t PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t"),
Row("a", "Spark SQL", "Spark SQL"))
}
}

withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
withTable("t") {
sql(
"""CREATE TABLE t(name STRING, id BINARY, part BINARY)
|USING ORC PARTITIONED BY (part)""".stripMargin)
sql("INSERT INTO t PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t"),
Row("a", "Spark SQL", "Spark SQL"))
}
}
})
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,29 @@

package org.apache.spark.sql.execution.datasources.orc

import java.io.File

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.TypeDescription

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.UTF8String.fromString

class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {

import testImplicits._

private val dataSchema = StructType.fromDDL("col1 int, col2 int")
private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
Expand Down Expand Up @@ -77,4 +90,66 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
}
}

test("SPARK-33593: OrcColumnarBatchReader - partition column types") {
withTempPath { dir =>
Seq(1).toDF().repartition(1).write.orc(dir.getCanonicalPath)

val dataTypes =
Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)

val constantValues =
Seq(
UTF8String.fromString("a string"),
true,
1.toByte,
"Spark SQL".getBytes,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75D,
Decimal("1234.23456"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))

dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
val partitionValues = new GenericInternalRow(Array(v))
val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
val taskConf = sqlContext.sessionState.newHadoopConf()
val orcFileSchema = TypeDescription.fromString(schema.simpleString)
val vectorizedReader = new OrcColumnarBatchReader(4096)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)

try {
vectorizedReader.initialize(fileSplit, taskAttemptContext)
vectorizedReader.initBatch(
orcFileSchema,
schema.toArray,
Array(0, -1),
Array(-1, 0),
partitionValues)
vectorizedReader.nextKeyValue()
val row = vectorizedReader.getCurrentValue.getRow(0)

// Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
if (dt.isInstanceOf[BinaryType]) {
assert(actual.asInstanceOf[Array[Byte]]
sameElements expected.asInstanceOf[Array[Byte]])
} else {
assert(actual == expected)
}
} finally {
vectorizedReader.close()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -790,14 +790,15 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)

val dataTypes =
Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
Seq(StringType, BooleanType, ByteType, BinaryType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)

val constantValues =
Seq(
UTF8String.fromString("a string"),
true,
1.toByte,
"Spark SQL".getBytes,
2.toShort,
3,
Long.MaxValue,
Expand Down Expand Up @@ -825,7 +826,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
// in order to use get(...) method which is not implemented in `ColumnarBatch`.
val actual = row.copy().get(1, dt)
val expected = v
assert(actual == expected)
if (dt.isInstanceOf[BinaryType]) {
assert(actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]])
} else {
assert(actual == expected)
}
} finally {
vectorizedReader.close()
}
Expand Down