diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 53122d6c44602..1f6cbcde5ac1c 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -439,6 +439,30 @@ def parquet(self, *paths, **options): modifiedAfter (batch only) : an optional timestamp to only include files with modification times occurring after the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + datetimeRebaseMode : str, optional + the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``, + ``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar. + + * ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps + that are ambiguous between the two calendars. + * ``CORRECTED``: loading of dates/timestamps without rebasing. + * ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian + to Proleptic Gregorian calendar. + + If None is set, the value of the SQL config + ``spark.sql.legacy.parquet.datetimeRebaseModeInRead`` is used by default. + int96RebaseMode : str, optional + the rebasing mode for ``INT96`` timestamps from the Julian to + Proleptic Gregorian calendar. + + * ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps + that are ambiguous between the two calendars. + * ``CORRECTED``: loading of ``INT96`` timestamps without rebasing. + * ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian + to Proleptic Gregorian calendar. + + If None is set, the value of the SQL config + ``spark.sql.legacy.parquet.int96RebaseModeInRead`` is used by default. Examples -------- @@ -451,9 +475,12 @@ def parquet(self, *paths, **options): modifiedBefore = options.get('modifiedBefore', None) modifiedAfter = options.get('modifiedAfter', None) recursiveFileLookup = options.get('recursiveFileLookup', None) + datetimeRebaseMode = options.get('datetimeRebaseMode', None) + int96RebaseMode = options.get('int96RebaseMode', None) self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore, - modifiedAfter=modifiedAfter) + modifiedAfter=modifiedAfter, datetimeRebaseMode=datetimeRebaseMode, + int96RebaseMode=int96RebaseMode) return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 51941a6269074..33297f2779dc0 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -667,7 +667,8 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N else: raise TypeError("path can be only a single string") - def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None): + def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, + datetimeRebaseMode=None, int96RebaseMode=None): """ Loads a Parquet file stream, returning the result as a :class:`DataFrame`. @@ -688,6 +689,30 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook recursively scan a directory for files. Using this option disables `partition discovery `_. # noqa + datetimeRebaseMode : str, optional + the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``, + ``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar. + + * ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps + that are ambiguous between the two calendars. + * ``CORRECTED``: loading of dates/timestamps without rebasing. + * ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian + to Proleptic Gregorian calendar. + + If None is set, the value of the SQL config + ``spark.sql.legacy.parquet.datetimeRebaseModeInRead`` is used by default. + int96RebaseMode : str, optional + the rebasing mode for ``INT96`` timestamps from the Julian to + Proleptic Gregorian calendar. + + * ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps + that are ambiguous between the two calendars. + * ``CORRECTED``: loading of ``INT96`` timestamps without rebasing. + * ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian + to Proleptic Gregorian calendar. + + If None is set, the value of the SQL config + ``spark.sql.legacy.parquet.int96RebaseModeInRead`` is used by default. Examples -------- @@ -698,7 +723,8 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook True """ self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, - recursiveFileLookup=recursiveFileLookup) + recursiveFileLookup=recursiveFileLookup, + datetimeRebaseMode=datetimeRebaseMode, int96RebaseMode=int96RebaseMode) if isinstance(path, str): return self._df(self._jreader.parquet(path)) else: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b94c42a2c9544..170bac381c874 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -825,6 +825,29 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • + *
  • `datetimeRebaseMode` (default is the value specified in the SQL config + * `spark.sql.legacy.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values + * of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to + * Proleptic Gregorian calendar: + * + *
  • + *
  • `int96RebaseMode` (default is the value specified in the SQL config + * `spark.sql.legacy.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps + * from the Julian to Proleptic Gregorian calendar: + * + *
  • * * * @since 1.4.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 1901f5575470e..64a1ac8675104 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -251,6 +251,9 @@ class ParquetFileFormat val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -301,10 +304,10 @@ class ParquetFileFormat val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + datetimeRebaseModeInRead) val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) + int96RebaseModeInRead) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 9cfc30725f03a..e1edbb41b8ec0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -69,6 +69,19 @@ class ParquetOptions( .get(MERGE_SCHEMA) .map(_.toBoolean) .getOrElse(sqlConf.isParquetSchemaMergingEnabled) + + /** + * The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. + */ + def datetimeRebaseModeInRead: String = parameters + .get(DATETIME_REBASE_MODE) + .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + /** + * The rebasing mode for INT96 timestamp values in reads. + */ + def int96RebaseModeInRead: String = parameters + .get(INT96_REBASE_MODE) + .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) } @@ -89,4 +102,16 @@ object ParquetOptions { def getParquetCompressionCodecName(name: String): String = { shortParquetCompressionCodecNames(name).name() } + + // The option controls rebasing of the DATE and TIMESTAMP values between + // Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet + // datasource similarly to the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead`, + // and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`. + val DATETIME_REBASE_MODE = "datetimeRebaseMode" + + // The option controls rebasing of the INT96 timestamp values between Julian and Proleptic + // Gregorian calendars. It impacts on the behaviour of the Parquet datasource similarly to + // the SQL config `spark.sql.legacy.parquet.int96RebaseModeInRead`. + // The valid option values are: `EXCEPTION`, `LEGACY` or `CORRECTED`. + val INT96_REBASE_MODE = "int96RebaseMode" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index e4d5e9b2d9f6d..20d0de45ba352 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -52,6 +52,7 @@ import org.apache.spark.util.SerializableConfiguration * @param readDataSchema Required schema of Parquet files. * @param partitionSchema Schema of partitions. * @param filters Filters to be pushed down in the batch scan. + * @param parquetOptions The options of Parquet datasource that are set for the read. */ case class ParquetPartitionReaderFactory( sqlConf: SQLConf, @@ -59,7 +60,8 @@ case class ParquetPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - filters: Array[Filter]) extends FilePartitionReaderFactory with Logging { + filters: Array[Filter], + parquetOptions: ParquetOptions) extends FilePartitionReaderFactory with Logging { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields) private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled @@ -74,6 +76,8 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled && @@ -174,10 +178,10 @@ case class ParquetPartitionReaderFactory( } val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + datetimeRebaseModeInRead) val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) + int96RebaseModeInRead) val reader = buildReaderFunc( split, file.partitionValues, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index c9c1e28a36960..5feaeee6c616b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2.parquet +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetInputFormat @@ -24,7 +26,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex -import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetReadSupport, ParquetWriteSupport} import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -76,8 +78,15 @@ case class ParquetScan( val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) - ParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, pushedFilters) + val sqlConf = sparkSession.sessionState.conf + ParquetPartitionReaderFactory( + sqlConf, + broadcastedConf, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + new ParquetOptions(options.asCaseSensitiveMap.asScala.toMap, sqlConf)) } override def equals(obj: Any): Boolean = obj match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index d82fa9e88592f..43876a8c2f817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -493,6 +493,29 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * It does not change the behavior of partition discovery. *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • + *
  • `datetimeRebaseMode` (default is the value specified in the SQL config + * `spark.sql.legacy.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values + * of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to + * Proleptic Gregorian calendar: + * + *
  • + *
  • `int96RebaseMode` (default is the value specified in the SQL config + * `spark.sql.legacy.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps + * from the Julian to Proleptic Gregorian calendar: + * + *
  • * * * @since 2.0.0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index a327f83ecbabd..6baeb578ca6b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException} import org.apache.spark.sql.{QueryTest, Row, SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, ParquetOutputTimestampType} import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, EXCEPTION, LEGACY} -import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.test.SharedSparkSession abstract class ParquetRebaseDatetimeSuite @@ -97,6 +97,27 @@ abstract class ParquetRebaseDatetimeSuite } } + private def inReadConfToOptions( + conf: String, + mode: LegacyBehaviorPolicy.Value): Map[String, String] = conf match { + case SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key => + Map(ParquetOptions.INT96_REBASE_MODE -> mode.toString) + case _ => Map(ParquetOptions.DATETIME_REBASE_MODE -> mode.toString) + } + + private def runInMode( + conf: String, + modes: Seq[LegacyBehaviorPolicy.Value])(f: Map[String, String] => Unit): Unit = { + modes.foreach { mode => + withSQLConf(conf -> mode.toString) { f(Map.empty) } + } + withSQLConf(conf -> EXCEPTION.toString) { + modes.foreach { mode => + f(inReadConfToOptions(conf, mode)) + } + } + } + test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") { val N = 8 // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. @@ -132,9 +153,9 @@ abstract class ParquetRebaseDatetimeSuite } // For Parquet files written by Spark 3.0, we know the writer info and don't need the // config to guide the rebase behavior. - withSQLConf(inReadConf -> LEGACY.toString) { + runInMode(inReadConf, Seq(LEGACY)) { options => checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + spark.read.format("parquet").options(options).load(path2_4, path3_0, path3_0_rebase), (0 until N).flatMap { i => val (dictS, plainS) = rowFunc(i) Seq.tabulate(3) { _ => @@ -235,12 +256,10 @@ abstract class ParquetRebaseDatetimeSuite withAllParquetReaders { // The file metadata indicates if it needs rebase or not, so we can always get the // correct result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf(inReadConf -> mode.toString) { - checkAnswer( - spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) - } + runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( + spark.read.options(options).parquet(path), + Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) } // Force to not rebase to prove the written datetime values are rebased @@ -275,12 +294,12 @@ abstract class ParquetRebaseDatetimeSuite withAllParquetReaders { // The file metadata indicates if it needs rebase or not, so we can always get the // correct result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { - checkAnswer( - spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) - } + runInMode( + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key, + Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( + spark.read.options(options).parquet(path), + Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) } // Force to not rebase to prove the written datetime values are rebased and we will get