diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet deleted file mode 100644 index 7d5cc12eefe04..0000000000000 Binary files a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet and /dev/null differ diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000..edd61c9b9fec8 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_date_v2_4_5.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4_6.snappy.parquet new file mode 100644 index 0000000000000..01f4887f5e994 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_date_v2_4_6.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000..c7e8d3926f63a Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_6.snappy.parquet new file mode 100644 index 0000000000000..939e2b8088eb0 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_6.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000..88a94ac482052 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_5.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_6.snappy.parquet new file mode 100644 index 0000000000000..68bfa33aac13f Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_6.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet deleted file mode 100644 index 13254bd93a5e6..0000000000000 Binary files a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet and /dev/null differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet deleted file mode 100644 index 7d2b46e9bea41..0000000000000 Binary files a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet and /dev/null differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000..62e6048354dc1 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_5.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_6.snappy.parquet new file mode 100644 index 0000000000000..d7fdaa3e67212 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_6.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet deleted file mode 100644 index e9825455c2015..0000000000000 Binary files a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet and /dev/null differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000..a7cef9e60f134 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_5.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_6.snappy.parquet new file mode 100644 index 0000000000000..4c213f4540a73 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_6.snappy.parquet differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index f075d04165697..79c32976f02ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.nio.file.{Files, Paths, StandardCopyOption} import java.sql.{Date, Timestamp} import java.time._ import java.util.Locale @@ -45,7 +46,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, SparkUpgradeExcept import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -875,81 +876,152 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + // It generates input files for the test below: + // "SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps" + ignore("SPARK-31806: generate test files for checking compatibility with Spark 2.4") { + val resourceDir = "sql/core/src/test/resources/test-data" + val version = "2_4_5" + val N = 8 + def save( + in: Seq[(String, String)], + t: String, + dstFile: String, + options: Map[String, String] = Map.empty): Unit = { + withTempDir { dir => + in.toDF("dict", "plain") + .select($"dict".cast(t), $"plain".cast(t)) + .repartition(1) + .write + .mode("overwrite") + .options(options) + .parquet(dir.getCanonicalPath) + Files.copy( + dir.listFiles().filter(_.getName.endsWith(".snappy.parquet")).head.toPath, + Paths.get(resourceDir, dstFile), + StandardCopyOption.REPLACE_EXISTING) + } + } + DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.LA) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId) { + save( + (1 to N).map(i => ("1001-01-01", s"1001-01-0$i")), + "date", + s"before_1582_date_v$version.snappy.parquet") + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MILLIS") { + save( + (1 to N).map(i => ("1001-01-01 01:02:03.123", s"1001-01-0$i 01:02:03.123")), + "timestamp", + s"before_1582_timestamp_millis_v$version.snappy.parquet") + } + val usTs = (1 to N).map(i => ("1001-01-01 01:02:03.123456", s"1001-01-0$i 01:02:03.123456")) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") { + save(usTs, "timestamp", s"before_1582_timestamp_micros_v$version.snappy.parquet") + } + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + // Comparing to other logical types, Parquet-MR chooses dictionary encoding for the + // INT96 logical type because it consumes less memory for small column cardinality. + // Huge parquet files doesn't make sense to place to the resource folder. That's why + // we explicitly set `parquet.enable.dictionary` and generate two files w/ and w/o + // dictionary encoding. + save( + usTs, + "timestamp", + s"before_1582_timestamp_int96_plain_v$version.snappy.parquet", + Map("parquet.enable.dictionary" -> "false")) + save( + usTs, + "timestamp", + s"before_1582_timestamp_int96_dict_v$version.snappy.parquet", + Map("parquet.enable.dictionary" -> "true")) + } + } + } + } + test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") { + val N = 8 // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. - def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = { + def checkReadMixedFiles[T]( + fileName: String, + catalystType: String, + rowFunc: Int => (String, String), + toJavaType: String => T, + checkDefaultLegacyRead: String => Unit, + tsOutputType: String = "TIMESTAMP_MICROS"): Unit = { withTempPaths(2) { paths => paths.foreach(_.delete()) val path2_4 = getResourceParquetFilePath("test-data/" + fileName) val path3_0 = paths(0).getCanonicalPath val path3_0_rebase = paths(1).getCanonicalPath - if (dt == "date") { - val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date")) - + val df = Seq.tabulate(N)(rowFunc).toDF("dict", "plain") + .select($"dict".cast(catalystType), $"plain".cast(catalystType)) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { + checkDefaultLegacyRead(path2_4) // By default we should fail to write ancient datetime values. - var e = intercept[SparkException](df.write.parquet(path3_0)) + val e = intercept[SparkException](df.write.parquet(path3_0)) assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) - // By default we should fail to read ancient datetime values. - e = intercept[SparkException](spark.read.parquet(path2_4).collect()) - assert(e.getCause.isInstanceOf[SparkUpgradeException]) - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { df.write.mode("overwrite").parquet(path3_0) } withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.parquet(path3_0_rebase) } - - // For Parquet files written by Spark 3.0, we know the writer info and don't need the - // config to guide the rebase behavior. - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) - } - } else { - val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts")) - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) { - // By default we should fail to write ancient datetime values. - var e = intercept[SparkException](df.write.parquet(path3_0)) - assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) - // By default we should fail to read ancient datetime values. - e = intercept[SparkException](spark.read.parquet(path2_4).collect()) - assert(e.getCause.isInstanceOf[SparkUpgradeException]) - - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { - df.write.mode("overwrite").parquet(path3_0) - } - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { - df.write.parquet(path3_0_rebase) - } - } - // For Parquet files written by Spark 3.0, we know the writer info and don't need the - // config to guide the rebase behavior. - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) - } + } + // For Parquet files written by Spark 3.0, we know the writer info and don't need the + // config to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + (0 until N).flatMap { i => + val (dictS, plainS) = rowFunc(i) + Seq.tabulate(3) { _ => + Row(toJavaType(dictS), toJavaType(plainS)) + } + }) } } } - - withAllParquetReaders { - checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") - checkReadMixedFiles( - "before_1582_timestamp_micros_v2_4.snappy.parquet", - "TIMESTAMP_MICROS", - "1001-01-01 01:02:03.123456") - checkReadMixedFiles( - "before_1582_timestamp_millis_v2_4.snappy.parquet", - "TIMESTAMP_MILLIS", - "1001-01-01 01:02:03.123") - - // INT96 is a legacy timestamp format and we always rebase the seconds for it. - checkAnswer(readResourceParquetFile( - "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + def failInRead(path: String): Unit = { + val e = intercept[SparkException](spark.read.parquet(path).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + } + def successInRead(path: String): Unit = spark.read.parquet(path).collect() + Seq( + // By default we should fail to read ancient datetime values when parquet files don't + // contain Spark version. + "2_4_5" -> failInRead _, + "2_4_6" -> successInRead _).foreach { case (version, checkDefaultRead) => + withAllParquetReaders { + checkReadMixedFiles( + s"before_1582_date_v$version.snappy.parquet", + "date", + (i: Int) => ("1001-01-01", s"1001-01-0${i + 1}"), + java.sql.Date.valueOf, + checkDefaultRead) + checkReadMixedFiles( + s"before_1582_timestamp_micros_v$version.snappy.parquet", + "timestamp", + (i: Int) => ("1001-01-01 01:02:03.123456", s"1001-01-0${i + 1} 01:02:03.123456"), + java.sql.Timestamp.valueOf, + checkDefaultRead) + checkReadMixedFiles( + s"before_1582_timestamp_millis_v$version.snappy.parquet", + "timestamp", + (i: Int) => ("1001-01-01 01:02:03.123", s"1001-01-0${i + 1} 01:02:03.123"), + java.sql.Timestamp.valueOf, + checkDefaultRead, + tsOutputType = "TIMESTAMP_MILLIS") + // INT96 is a legacy timestamp format and we always rebase the seconds for it. + Seq("plain", "dict").foreach { enc => + checkAnswer(readResourceParquetFile( + s"test-data/before_1582_timestamp_int96_${enc}_v$version.snappy.parquet"), + Seq.tabulate(N) { i => + Row( + java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"), + java.sql.Timestamp.valueOf(s"1001-01-0${i + 1} 01:02:03.123456")) + }) + } + } } }