diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index f8068a634977b..b54747a25d5a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -26,7 +26,7 @@ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.SparkUpgradeException -import org.apache.spark.sql.{SPARK_INT96_NO_REBASE, SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} +import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.util.RebaseDateTime @@ -115,14 +115,20 @@ object DataSourceUtils { lookupFileMeta: String => String, modeByConfig: String): LegacyBehaviorPolicy.Value = { if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { - LegacyBehaviorPolicy.CORRECTED - } else if (lookupFileMeta(SPARK_INT96_NO_REBASE) != null) { - LegacyBehaviorPolicy.CORRECTED - } else if (lookupFileMeta(SPARK_VERSION_METADATA_KEY) != null) { - LegacyBehaviorPolicy.LEGACY - } else { - LegacyBehaviorPolicy.withName(modeByConfig) + return LegacyBehaviorPolicy.CORRECTED } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to + // rebase the INT96 timestamp values. + // Files written by Spark 3.1 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.1.0" || lookupFileMeta(SPARK_LEGACY_INT96) != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) } def newRebaseExceptionInRead(format: String): SparkUpgradeException = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index b538c2f2493d0..26074719364a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -31,7 +31,7 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.spark.SPARK_VERSION_SHORT import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SPARK_INT96_NO_REBASE, SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} +import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -123,9 +123,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } } ++ { if (int96RebaseMode == LegacyBehaviorPolicy.LEGACY) { - None + Some(SPARK_LEGACY_INT96 -> "") } else { - Some(SPARK_INT96_NO_REBASE -> "") + None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 011be6d69c576..022fecf1ae412 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -63,7 +63,7 @@ package object sql { /** * Parquet file metadata key to indicate that the file with INT96 column type was written - * without rebasing. + * with rebasing. */ - private[sql] val SPARK_INT96_NO_REBASE = "org.apache.spark.int96NoRebase" + private[sql] val SPARK_LEGACY_INT96 = "org.apache.spark.legacyINT96" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index dac4e950a7823..34bdef7bdb402 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1163,9 +1163,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - test("SPARK-33160: write the metadata key 'org.apache.spark.int96NoRebase'") { - def saveTs(dir: java.io.File): Unit = { - Seq(Timestamp.valueOf("1000-01-01 01:02:03")).toDF() + test("SPARK-33160: write the metadata key 'org.apache.spark.legacyINT96'") { + def saveTs(dir: java.io.File, ts: String = "1000-01-01 01:02:03"): Unit = { + Seq(Timestamp.valueOf(ts)).toDF() .repartition(1) .write .parquet(dir.getAbsolutePath) @@ -1173,18 +1173,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { withTempPath { dir => saveTs(dir) - assert(getMetaData(dir).get(SPARK_INT96_NO_REBASE).isEmpty) + assert(getMetaData(dir)(SPARK_LEGACY_INT96) === "") } } withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { withTempPath { dir => saveTs(dir) - assert(getMetaData(dir)(SPARK_INT96_NO_REBASE) === "") + assert(getMetaData(dir).get(SPARK_LEGACY_INT96).isEmpty) } } withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { withTempPath { dir => intercept[SparkException] { saveTs(dir) } } } + withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) { + withTempPath { dir => + saveTs(dir, "2020-10-22 01:02:03") + assert(getMetaData(dir).get(SPARK_LEGACY_INT96).isEmpty) + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index db0e93787338e..7d5a200606356 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1513,7 +1513,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto Seq(tbl, ext_tbl).foreach { tblName => sql(s"INSERT INTO $tblName VALUES (1, 'a', '2019-12-13')") - val expectedSize = 636 + val expectedSize = 601 // analyze table sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS NOSCAN") var tableStats = getTableStats(tblName)