Skip to content
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.nio.file.{Files, Paths, StandardCopyOption}
import java.sql.{Date, Timestamp}
import java.time._
import java.util.Locale
Expand Down Expand Up @@ -45,7 +46,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, SparkUpgradeExcept
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -875,81 +876,152 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

// It generates input files for the test below:
// "SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps"
ignore("SPARK-31806: generate test files for checking compatibility with Spark 2.4") {
val resourceDir = "sql/core/src/test/resources/test-data"
val version = "2_4_5"
val N = 8
def save(
in: Seq[(String, String)],
t: String,
dstFile: String,
options: Map[String, String] = Map.empty): Unit = {
withTempDir { dir =>
in.toDF("dict", "plain")
.select($"dict".cast(t), $"plain".cast(t))
.repartition(1)
.write
.mode("overwrite")
.options(options)
.parquet(dir.getCanonicalPath)
Files.copy(
dir.listFiles().filter(_.getName.endsWith(".snappy.parquet")).head.toPath,
Paths.get(resourceDir, dstFile),
StandardCopyOption.REPLACE_EXISTING)
}
}
DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.LA) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId) {
save(
(1 to N).map(i => ("1001-01-01", s"1001-01-0$i")),
"date",
s"before_1582_date_v$version.snappy.parquet")
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MILLIS") {
save(
(1 to N).map(i => ("1001-01-01 01:02:03.123", s"1001-01-0$i 01:02:03.123")),
"timestamp",
s"before_1582_timestamp_millis_v$version.snappy.parquet")
}
val usTs = (1 to N).map(i => ("1001-01-01 01:02:03.123456", s"1001-01-0$i 01:02:03.123456"))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") {
save(usTs, "timestamp", s"before_1582_timestamp_micros_v$version.snappy.parquet")
}
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") {
// Comparing to other logical types, Parquet-MR chooses dictionary encoding for the
// INT96 logical type because it consumes less memory for small column cardinality.
// Huge parquet files doesn't make sense to place to the resource folder. That's why
// we explicitly set `parquet.enable.dictionary` and generate two files w/ and w/o
// dictionary encoding.
save(
usTs,
"timestamp",
s"before_1582_timestamp_int96_plain_v$version.snappy.parquet",
Copy link
Contributor

@cloud-fan cloud-fan May 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we have 2 files for plain and dictionary-encoding for int96? other types just have one file and 2 columns.

if it's caused by some parquet limitation, let's write a comment to explain it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because INT96 always use dictionary encoding independent from number of values and theirs uniqueness. I have to explicitly turn off dictionary encoding while saving to parquet files, see the test above.

Other types don't have such "problem" - for one column parquet lib uses dict encoding because all values are unique, for another one it applies plain enc because all values in date/timestamp columns are the same.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment

Map("parquet.enable.dictionary" -> "false"))
save(
usTs,
"timestamp",
s"before_1582_timestamp_int96_dict_v$version.snappy.parquet",
Map("parquet.enable.dictionary" -> "true"))
}
}
}
}

test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") {
val N = 8
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
def checkReadMixedFiles[T](
fileName: String,
catalystType: String,
rowFunc: Int => (String, String),
toJavaType: String => T,
checkDefaultLegacyRead: String => Unit,
tsOutputType: String = "TIMESTAMP_MICROS"): Unit = {
withTempPaths(2) { paths =>
paths.foreach(_.delete())
val path2_4 = getResourceParquetFilePath("test-data/" + fileName)
val path3_0 = paths(0).getCanonicalPath
val path3_0_rebase = paths(1).getCanonicalPath
if (dt == "date") {
val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))

val df = Seq.tabulate(N)(rowFunc).toDF("dict", "plain")
.select($"dict".cast(catalystType), $"plain".cast(catalystType))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) {
checkDefaultLegacyRead(path2_4)
// By default we should fail to write ancient datetime values.
var e = intercept[SparkException](df.write.parquet(path3_0))
val e = intercept[SparkException](df.write.parquet(path3_0))
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
// By default we should fail to read ancient datetime values.
e = intercept[SparkException](spark.read.parquet(path2_4).collect())
assert(e.getCause.isInstanceOf[SparkUpgradeException])

withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
df.write.mode("overwrite").parquet(path3_0)
}
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
df.write.parquet(path3_0_rebase)
}

// For Parquet files written by Spark 3.0, we know the writer info and don't need the
// config to guide the rebase behavior.
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
}
} else {
val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) {
// By default we should fail to write ancient datetime values.
var e = intercept[SparkException](df.write.parquet(path3_0))
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
// By default we should fail to read ancient datetime values.
e = intercept[SparkException](spark.read.parquet(path2_4).collect())
assert(e.getCause.isInstanceOf[SparkUpgradeException])

withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
df.write.mode("overwrite").parquet(path3_0)
}
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
df.write.parquet(path3_0_rebase)
}
}
// For Parquet files written by Spark 3.0, we know the writer info and don't need the
// config to guide the rebase behavior.
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
}
}
// For Parquet files written by Spark 3.0, we know the writer info and don't need the
// config to guide the rebase behavior.
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
(0 until N).flatMap { i =>
val (dictS, plainS) = rowFunc(i)
Seq.tabulate(3) { _ =>
Row(toJavaType(dictS), toJavaType(plainS))
}
})
}
}
}

withAllParquetReaders {
checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
checkReadMixedFiles(
"before_1582_timestamp_micros_v2_4.snappy.parquet",
"TIMESTAMP_MICROS",
"1001-01-01 01:02:03.123456")
checkReadMixedFiles(
"before_1582_timestamp_millis_v2_4.snappy.parquet",
"TIMESTAMP_MILLIS",
"1001-01-01 01:02:03.123")

// INT96 is a legacy timestamp format and we always rebase the seconds for it.
checkAnswer(readResourceParquetFile(
"test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
def failInRead(path: String): Unit = {
val e = intercept[SparkException](spark.read.parquet(path).collect())
assert(e.getCause.isInstanceOf[SparkUpgradeException])
}
def successInRead(path: String): Unit = spark.read.parquet(path).collect()
Seq(
// By default we should fail to read ancient datetime values when parquet files don't
// contain Spark version.
"2_4_5" -> failInRead _,
"2_4_6" -> successInRead _).foreach { case (version, checkDefaultRead) =>
withAllParquetReaders {
checkReadMixedFiles(
s"before_1582_date_v$version.snappy.parquet",
"date",
(i: Int) => ("1001-01-01", s"1001-01-0${i + 1}"),
java.sql.Date.valueOf,
checkDefaultRead)
checkReadMixedFiles(
s"before_1582_timestamp_micros_v$version.snappy.parquet",
"timestamp",
(i: Int) => ("1001-01-01 01:02:03.123456", s"1001-01-0${i + 1} 01:02:03.123456"),
java.sql.Timestamp.valueOf,
checkDefaultRead)
checkReadMixedFiles(
s"before_1582_timestamp_millis_v$version.snappy.parquet",
"timestamp",
(i: Int) => ("1001-01-01 01:02:03.123", s"1001-01-0${i + 1} 01:02:03.123"),
java.sql.Timestamp.valueOf,
checkDefaultRead,
tsOutputType = "TIMESTAMP_MILLIS")
// INT96 is a legacy timestamp format and we always rebase the seconds for it.
Seq("plain", "dict").foreach { enc =>
checkAnswer(readResourceParquetFile(
s"test-data/before_1582_timestamp_int96_${enc}_v$version.snappy.parquet"),
Seq.tabulate(N) { i =>
Row(
java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"),
java.sql.Timestamp.valueOf(s"1001-01-0${i + 1} 01:02:03.123456"))
})
}
}
}
}

Expand Down