Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -781,10 +781,9 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared

test("Filter applied on merged Parquet schema with new column should work") {
import testImplicits._
Seq("true", "false").foreach { vectorized =>
withAllParquetReaders {
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
val path1 = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1)
Expand Down Expand Up @@ -1219,33 +1218,31 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}

test("SPARK-17213: Broken Parquet filter push-down for string columns") {
Seq(true, false).foreach { vectorizedEnabled =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedEnabled.toString) {
withTempPath { dir =>
import testImplicits._
withAllParquetReaders {
withTempPath { dir =>
import testImplicits._

val path = dir.getCanonicalPath
// scalastyle:off nonascii
Seq("a", "é").toDF("name").write.parquet(path)
// scalastyle:on nonascii
val path = dir.getCanonicalPath
// scalastyle:off nonascii
Seq("a", "é").toDF("name").write.parquet(path)
// scalastyle:on nonascii

assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)
assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)

// scalastyle:off nonascii
assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
// scalastyle:on nonascii
}
// scalastyle:off nonascii
assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
// scalastyle:on nonascii
}
}
}

test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") {
import testImplicits._

Seq(true, false).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString,
withAllParquetReaders {
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString,
SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
withTempPath { path =>
Expand All @@ -1255,7 +1252,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString,
withSQLConf(
// Makes sure disabling 'spark.sql.parquet.recordFilter' still enables
// row group level filtering.
SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> "false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,47 +647,39 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}

test("read dictionary encoded decimals written as INT32") {
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
readResourceParquetFile("test-data/dec-in-i32.parquet"),
spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec))
}
withAllParquetReaders {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
readResourceParquetFile("test-data/dec-in-i32.parquet"),
spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec))
}
}

test("read dictionary encoded decimals written as INT64") {
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
readResourceParquetFile("test-data/dec-in-i64.parquet"),
spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
}
withAllParquetReaders {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
readResourceParquetFile("test-data/dec-in-i64.parquet"),
spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
}
}

test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
readResourceParquetFile("test-data/dec-in-fixed-len.parquet"),
spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
}
withAllParquetReaders {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
readResourceParquetFile("test-data/dec-in-fixed-len.parquet"),
spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
}
}

test("read dictionary and plain encoded timestamp_millis written as INT64") {
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
checkAnswer(
// timestamp column in this file is encoded using combination of plain
// and dictionary encodings.
readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
(1 to 3).map(i => Row(new java.sql.Timestamp(10))))
}
withAllParquetReaders {
checkAnswer(
// timestamp column in this file is encoded using combination of plain
// and dictionary encodings.
readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
(1 to 3).map(i => Row(new java.sql.Timestamp(10))))
}
}

Expand Down Expand Up @@ -943,23 +935,21 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
checkReadMixedFiles(
"before_1582_timestamp_micros_v2_4.snappy.parquet",
"TIMESTAMP_MICROS",
"1001-01-01 01:02:03.123456")
checkReadMixedFiles(
"before_1582_timestamp_millis_v2_4.snappy.parquet",
"TIMESTAMP_MILLIS",
"1001-01-01 01:02:03.123")

// INT96 is a legacy timestamp format and we always rebase the seconds for it.
checkAnswer(readResourceParquetFile(
"test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
}
withAllParquetReaders {
checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
checkReadMixedFiles(
"before_1582_timestamp_micros_v2_4.snappy.parquet",
"TIMESTAMP_MICROS",
"1001-01-01 01:02:03.123456")
checkReadMixedFiles(
"before_1582_timestamp_millis_v2_4.snappy.parquet",
"TIMESTAMP_MILLIS",
"1001-01-01 01:02:03.123")

// INT96 is a legacy timestamp format and we always rebase the seconds for it.
checkAnswer(readResourceParquetFile(
"test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
}
}

Expand All @@ -984,27 +974,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
.parquet(path)
}

Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
// The file metadata indicates if it needs rebase or not, so we can always get the
// correct result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
}
}

// Force to not rebase to prove the written datetime values are rebased
// and we will get wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
withAllParquetReaders {
// The file metadata indicates if it needs rebase or not, so we can always get the
// correct result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
}
}

// Force to not rebase to prove the written datetime values are rebased
// and we will get wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
}
}
}
}
Expand All @@ -1027,26 +1015,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
.parquet(path)
}

Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
// The file metadata indicates if it needs rebase or not, so we can always get the
// correct result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
}
}

// Force to not rebase to prove the written datetime values are rebased and we will get
// wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
withAllParquetReaders {
// The file metadata indicates if it needs rebase or not, so we can always get the
// correct result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
}
}

// Force to not rebase to prove the written datetime values are rebased and we will get
// wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,11 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
FileUtils.copyFile(new File(impalaPath), new File(tableDir, "part-00001.parq"))

Seq(false, true).foreach { int96TimestampConversion =>
Seq(false, true).foreach { vectorized =>
withAllParquetReaders {
withSQLConf(
(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
SQLConf.ParquetOutputTimestampType.INT96.toString),
(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString()),
(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString())
(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString())
) {
val readBack = spark.read.parquet(tableDir.getAbsolutePath).collect()
assert(readBack.size === 6)
Expand All @@ -149,7 +148,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray
val actual = readBack.map(_.getTimestamp(0).toString).sorted
withClue(
s"int96TimestampConversion = $int96TimestampConversion; vectorized = $vectorized") {
s"int96TimestampConversion = $int96TimestampConversion; " +
s"vectorized = ${SQLConf.get.parquetVectorizedReaderEnabled}") {
assert(fullExpectations === actual)

// Now test that the behavior is still correct even with a filter which could get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,9 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
withTempPath { file =>
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
df.write.parquet(file.getCanonicalPath)
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
val df2 = spark.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
}
withAllParquetReaders {
val df2 = spark.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
}
}
}
Expand Down Expand Up @@ -791,15 +789,13 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}

test("SPARK-26677: negated null-safe equality comparison should not filter matched row groups") {
(true :: false :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
withTempPath { path =>
// Repeated values for dictionary encoding.
Seq(Some("A"), Some("A"), None).toDF.repartition(1)
.write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
}
withAllParquetReaders {
withTempPath { path =>
// Repeated values for dictionary encoding.
Seq(Some("A"), Some("A"), None).toDF.repartition(1)
.write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
}
}
}
Expand All @@ -821,10 +817,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> toTsType) {
write(df2.write.mode(SaveMode.Append))
}
Seq("true", "false").foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
checkAnswer(readback, df1.unionAll(df2))
}
withAllParquetReaders {
checkAnswer(readback, df1.unionAll(df2))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,11 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
protected def getResourceParquetFilePath(name: String): String = {
Thread.currentThread().getContextClassLoader.getResource(name).toString
}

def withAllParquetReaders(code: => Unit): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simplify it?

def withAllParquetReaders(code: => Unit): Unit = {
  // test the row-based reader
  withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false")(code)
  // test the vectorized reader
  withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say code- > func but no big deal.

// test the row-based reader
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false")(code)
// test the vectorized reader
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
}
}