Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.{SPARK_INT96_NO_REBASE, SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime
Expand Down Expand Up @@ -115,14 +115,20 @@ object DataSourceUtils {
lookupFileMeta: String => String,
modeByConfig: String): LegacyBehaviorPolicy.Value = {
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
LegacyBehaviorPolicy.CORRECTED
} else if (lookupFileMeta(SPARK_INT96_NO_REBASE) != null) {
LegacyBehaviorPolicy.CORRECTED
} else if (lookupFileMeta(SPARK_VERSION_METADATA_KEY) != null) {
LegacyBehaviorPolicy.LEGACY
} else {
LegacyBehaviorPolicy.withName(modeByConfig)
return LegacyBehaviorPolicy.CORRECTED
}
// If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
// rebase the INT96 timestamp values.
// Files written by Spark 3.1 and latter may also need the rebase if they were written with
// the "LEGACY" rebase mode.
if (version < "3.1.0" || lookupFileMeta(SPARK_LEGACY_INT96) != null) {
LegacyBehaviorPolicy.LEGACY
} else {
LegacyBehaviorPolicy.CORRECTED
}
}.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
}

def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer}

import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SPARK_INT96_NO_REBASE, SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -123,9 +123,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
}
} ++ {
if (int96RebaseMode == LegacyBehaviorPolicy.LEGACY) {
None
Some(SPARK_LEGACY_INT96 -> "")
} else {
Some(SPARK_INT96_NO_REBASE -> "")
None
}
}

Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ package object sql {

/**
* Parquet file metadata key to indicate that the file with INT96 column type was written
* without rebasing.
* with rebasing.
*/
private[sql] val SPARK_INT96_NO_REBASE = "org.apache.spark.int96NoRebase"
private[sql] val SPARK_LEGACY_INT96 = "org.apache.spark.legacyINT96"
}
Original file line number Diff line number Diff line change
Expand Up @@ -1163,28 +1163,34 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

test("SPARK-33160: write the metadata key 'org.apache.spark.int96NoRebase'") {
def saveTs(dir: java.io.File): Unit = {
Seq(Timestamp.valueOf("1000-01-01 01:02:03")).toDF()
test("SPARK-33160: write the metadata key 'org.apache.spark.legacyINT96'") {
def saveTs(dir: java.io.File, ts: String = "1000-01-01 01:02:03"): Unit = {
Seq(Timestamp.valueOf(ts)).toDF()
.repartition(1)
.write
.parquet(dir.getAbsolutePath)
}
withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
withTempPath { dir =>
saveTs(dir)
assert(getMetaData(dir).get(SPARK_INT96_NO_REBASE).isEmpty)
assert(getMetaData(dir)(SPARK_LEGACY_INT96) === "")
}
}
withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
withTempPath { dir =>
saveTs(dir)
assert(getMetaData(dir)(SPARK_INT96_NO_REBASE) === "")
assert(getMetaData(dir).get(SPARK_LEGACY_INT96).isEmpty)
}
}
withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) {
withTempPath { dir => intercept[SparkException] { saveTs(dir) } }
}
withSQLConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) {
withTempPath { dir =>
saveTs(dir, "2020-10-22 01:02:03")
assert(getMetaData(dir).get(SPARK_LEGACY_INT96).isEmpty)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
Seq(tbl, ext_tbl).foreach { tblName =>
sql(s"INSERT INTO $tblName VALUES (1, 'a', '2019-12-13')")

val expectedSize = 636
val expectedSize = 601
// analyze table
sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS NOSCAN")
var tableStats = getTableStats(tblName)
Expand Down