From 984d9ea314fb2c1cffe2a2018e9d26ea753c8789 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 4 Feb 2021 22:01:05 +0300 Subject: [PATCH 01/12] Add parquet options names. --- .../datasources/parquet/ParquetOptions.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 9cfc30725f03..0670e7585f5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -89,4 +89,24 @@ object ParquetOptions { def getParquetCompressionCodecName(name: String): String = { shortParquetCompressionCodecNames(name).name() } + + // The option controls rebasing of the DATE and TIMESTAMP values between + // Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet + // datasource similarly to the SQL configs below, and can be set the same values as: + // - `spark.sql.legacy.parquet.datetimeRebaseModeInRead` in loading parquet files + // - `spark.sql.legacy.parquet.datetimeRebaseModeInWrite` in saving. + // The valid values: + // - "EXCEPTION", Spark fails in reads or writes of ancient dates/timestamps + // that are ambiguous between the two calendars. + // - "CORRECTED", no rebasing. Spark reads and writes dates/timestamps as is. + // - "LEGACY", dates and timestamps rebasing is on. + val DATETIME_REBASE_MODE = "datetimeRebaseMode" + + // The option controls rebasing of the INT96 timestamp values between Julian and Proleptic + // Gregorian calendars. It impacts on the behaviour of the Parquet + // datasource similarly to the SQL configs below, and can be set the same values as: + // - `spark.sql.legacy.parquet.int96RebaseModeInRead` in loading. + // - `spark.sql.legacy.parquet.int96RebaseModeInWrite` in saving parquet files. + // The valid option values are: "EXCEPTION", "LEGACY" or "CORRECTED". + val INT96_REBASE_MODE = "int96RebaseMode" } From 2b6bf1560497404f1730e57e1b5858f131a75282 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 4 Feb 2021 22:14:52 +0300 Subject: [PATCH 02/12] Add functions to ParquetOptions --- .../datasources/parquet/ParquetOptions.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 0670e7585f5e..2e1533beacee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -69,6 +69,25 @@ class ParquetOptions( .get(MERGE_SCHEMA) .map(_.toBoolean) .getOrElse(sqlConf.isParquetSchemaMergingEnabled) + + /** + * The rebasing modes of the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in read/write. + */ + def datetimeRebaseModeInRead: String = parameters + .get(DATETIME_REBASE_MODE) + .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + def datetimeRebaseModeInWrite: String = parameters + .get(DATETIME_REBASE_MODE) + .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)) + /** + * The rebasing mode of the INT96 timestamp values in read/write. + */ + def int96RebaseModeInRead: String = parameters + .get(INT96_REBASE_MODE) + .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) + def int96RebaseModeInWrite: String = parameters + .get(INT96_REBASE_MODE) + .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)) } From 1f8d429129edf22d4984e7b240f8d69468c7c96c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Feb 2021 15:08:14 +0300 Subject: [PATCH 03/12] Test options --- .../parquet/ParquetRebaseDatetimeSuite.scala | 62 ++++++++++++------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index d1342be4b0dd..4026611134df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException} import org.apache.spark.sql.{QueryTest, Row, SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, ParquetOutputTimestampType} import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, EXCEPTION, LEGACY} -import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.test.SharedSparkSession abstract class ParquetRebaseDatetimeSuite @@ -99,6 +99,27 @@ abstract class ParquetRebaseDatetimeSuite protected def failInRead(path: String): Unit + private def inReadConfToOptions( + conf: String, + mode: LegacyBehaviorPolicy.Value): Map[String, String] = conf match { + case SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key => + Map(ParquetOptions.INT96_REBASE_MODE -> mode.toString) + case _ => Map(ParquetOptions.DATETIME_REBASE_MODE -> mode.toString) + } + + private def runInMode( + conf: String, + modes: Seq[LegacyBehaviorPolicy.Value])(f: Map[String, String] => Unit): Unit = { + modes.foreach { mode => + withSQLConf(conf -> mode.toString) { f(Map.empty) } + } + withSQLConf(conf -> EXCEPTION.toString) { + modes.foreach { mode => + f(inReadConfToOptions(conf, mode)) + } + } + } + test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") { val N = 8 // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. @@ -134,15 +155,14 @@ abstract class ParquetRebaseDatetimeSuite } // For Parquet files written by Spark 3.0, we know the writer info and don't need the // config to guide the rebase behavior. - withSQLConf(inReadConf -> LEGACY.toString) { - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - (0 until N).flatMap { i => - val (dictS, plainS) = rowFunc(i) - Seq.tabulate(3) { _ => - Row(toJavaType(dictS), toJavaType(plainS)) - } - }) + val expected = (0 until N).flatMap { i => + val (dictS, plainS) = rowFunc(i) + Seq.tabulate(3) { _ => + Row(toJavaType(dictS), toJavaType(plainS)) + } + } + runInMode(inReadConf, Seq(LEGACY)) { options => + spark.read.format("parquet").options(options).load(path2_4, path3_0, path3_0_rebase) } } } @@ -233,12 +253,10 @@ abstract class ParquetRebaseDatetimeSuite withAllParquetReaders { // The file metadata indicates if it needs rebase or not, so we can always get the // correct result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf(inReadConf -> mode.toString) { - checkAnswer( - spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) - } + runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( + spark.read.options(options).parquet(path), + Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) } // Force to not rebase to prove the written datetime values are rebased @@ -273,12 +291,12 @@ abstract class ParquetRebaseDatetimeSuite withAllParquetReaders { // The file metadata indicates if it needs rebase or not, so we can always get the // correct result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { - checkAnswer( - spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) - } + runInMode( + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key, + Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( + spark.read.options(options).parquet(path), + Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) } // Force to not rebase to prove the written datetime values are rebased and we will get From bf2b77ea3fd915ac6db0375ac55e44bf4e6b37df Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Feb 2021 15:08:33 +0300 Subject: [PATCH 04/12] Support options in v1 --- .../parquet/ParquetFileFormat.scala | 7 ++++-- .../datasources/parquet/ParquetOptions.scala | 22 +++++-------------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 1901f5575470..64a1ac867510 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -251,6 +251,9 @@ class ParquetFileFormat val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -301,10 +304,10 @@ class ParquetFileFormat val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + datetimeRebaseModeInRead) val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) + int96RebaseModeInRead) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 2e1533beacee..d3cc74dddb45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -71,23 +71,17 @@ class ParquetOptions( .getOrElse(sqlConf.isParquetSchemaMergingEnabled) /** - * The rebasing modes of the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in read/write. + * The rebasing modes for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. */ def datetimeRebaseModeInRead: String = parameters .get(DATETIME_REBASE_MODE) .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) - def datetimeRebaseModeInWrite: String = parameters - .get(DATETIME_REBASE_MODE) - .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)) /** - * The rebasing mode of the INT96 timestamp values in read/write. + * The rebasing mode for INT96 timestamp values in reads. */ def int96RebaseModeInRead: String = parameters .get(INT96_REBASE_MODE) .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) - def int96RebaseModeInWrite: String = parameters - .get(INT96_REBASE_MODE) - .getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE)) } @@ -111,10 +105,8 @@ object ParquetOptions { // The option controls rebasing of the DATE and TIMESTAMP values between // Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet - // datasource similarly to the SQL configs below, and can be set the same values as: - // - `spark.sql.legacy.parquet.datetimeRebaseModeInRead` in loading parquet files - // - `spark.sql.legacy.parquet.datetimeRebaseModeInWrite` in saving. - // The valid values: + // datasource similarly to the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead`, + // and can be set the same values: // - "EXCEPTION", Spark fails in reads or writes of ancient dates/timestamps // that are ambiguous between the two calendars. // - "CORRECTED", no rebasing. Spark reads and writes dates/timestamps as is. @@ -122,10 +114,8 @@ object ParquetOptions { val DATETIME_REBASE_MODE = "datetimeRebaseMode" // The option controls rebasing of the INT96 timestamp values between Julian and Proleptic - // Gregorian calendars. It impacts on the behaviour of the Parquet - // datasource similarly to the SQL configs below, and can be set the same values as: - // - `spark.sql.legacy.parquet.int96RebaseModeInRead` in loading. - // - `spark.sql.legacy.parquet.int96RebaseModeInWrite` in saving parquet files. + // Gregorian calendars. It impacts on the behaviour of the Parquet datasource similarly to + // the SQL config `spark.sql.legacy.parquet.int96RebaseModeInRead`. // The valid option values are: "EXCEPTION", "LEGACY" or "CORRECTED". val INT96_REBASE_MODE = "int96RebaseMode" } From db16d690b6d8ed002afe565d9f4a904343e70844 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Feb 2021 15:08:48 +0300 Subject: [PATCH 05/12] Support options in v2 --- .../parquet/ParquetPartitionReaderFactory.scala | 10 +++++++--- .../datasources/v2/parquet/ParquetScan.scala | 15 ++++++++++++--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index e4d5e9b2d9f6..20d0de45ba35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -52,6 +52,7 @@ import org.apache.spark.util.SerializableConfiguration * @param readDataSchema Required schema of Parquet files. * @param partitionSchema Schema of partitions. * @param filters Filters to be pushed down in the batch scan. + * @param parquetOptions The options of Parquet datasource that are set for the read. */ case class ParquetPartitionReaderFactory( sqlConf: SQLConf, @@ -59,7 +60,8 @@ case class ParquetPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - filters: Array[Filter]) extends FilePartitionReaderFactory with Logging { + filters: Array[Filter], + parquetOptions: ParquetOptions) extends FilePartitionReaderFactory with Logging { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields) private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled @@ -74,6 +76,8 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled && @@ -174,10 +178,10 @@ case class ParquetPartitionReaderFactory( } val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + datetimeRebaseModeInRead) val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) + int96RebaseModeInRead) val reader = buildReaderFunc( split, file.partitionValues, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index c9c1e28a3696..37d38d5b7613 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2.parquet +import collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetInputFormat @@ -24,7 +26,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex -import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetReadSupport, ParquetWriteSupport} import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -76,8 +78,15 @@ case class ParquetScan( val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) - ParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, pushedFilters) + val sqlConf = sparkSession.sessionState.conf + ParquetPartitionReaderFactory( + sqlConf, + broadcastedConf, + dataSchema, + readDataSchema, + readPartitionSchema, + pushedFilters, + new ParquetOptions(options.asCaseSensitiveMap.asScala.toMap, sqlConf)) } override def equals(obj: Any): Boolean = obj match { From 1bb9980a2d071b58a4f02078374b40d58ed2cb05 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Feb 2021 15:19:03 +0300 Subject: [PATCH 06/12] Fix imports --- .../sql/execution/datasources/v2/parquet/ParquetScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index 37d38d5b7613..5feaeee6c616 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.datasources.v2.parquet -import collection.JavaConverters._ +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path From 018b1711871a7a09d9994103da61d1688a7d5255 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Feb 2021 15:20:25 +0300 Subject: [PATCH 07/12] Minor --- .../sql/execution/datasources/parquet/ParquetOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index d3cc74dddb45..b71a39ad95b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -71,7 +71,7 @@ class ParquetOptions( .getOrElse(sqlConf.isParquetSchemaMergingEnabled) /** - * The rebasing modes for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. + * The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. */ def datetimeRebaseModeInRead: String = parameters .get(DATETIME_REBASE_MODE) From 34f88dee1adf36d52ed49f1df26977e17fb32493 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Feb 2021 15:21:08 +0300 Subject: [PATCH 08/12] typo --- .../sql/execution/datasources/parquet/ParquetOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index b71a39ad95b0..d5efed885ee7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -106,7 +106,7 @@ object ParquetOptions { // The option controls rebasing of the DATE and TIMESTAMP values between // Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet // datasource similarly to the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead`, - // and can be set the same values: + // and can be set to the same values: // - "EXCEPTION", Spark fails in reads or writes of ancient dates/timestamps // that are ambiguous between the two calendars. // - "CORRECTED", no rebasing. Spark reads and writes dates/timestamps as is. From f898288f495406b98c3a72e0afb462099bbcc65b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Feb 2021 15:26:52 +0300 Subject: [PATCH 09/12] Fix test --- .../parquet/ParquetRebaseDatetimeSuite.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 4026611134df..ea197d96c994 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -155,14 +155,15 @@ abstract class ParquetRebaseDatetimeSuite } // For Parquet files written by Spark 3.0, we know the writer info and don't need the // config to guide the rebase behavior. - val expected = (0 until N).flatMap { i => - val (dictS, plainS) = rowFunc(i) - Seq.tabulate(3) { _ => - Row(toJavaType(dictS), toJavaType(plainS)) - } - } runInMode(inReadConf, Seq(LEGACY)) { options => - spark.read.format("parquet").options(options).load(path2_4, path3_0, path3_0_rebase) + checkAnswer( + spark.read.format("parquet").options(options).load(path2_4, path3_0, path3_0_rebase), + (0 until N).flatMap { i => + val (dictS, plainS) = rowFunc(i) + Seq.tabulate(3) { _ => + Row(toJavaType(dictS), toJavaType(plainS)) + } + }) } } } From eb77fc6581a1aeb67a3c84d1252e16f8dc9b43b6 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Feb 2021 23:07:36 +0300 Subject: [PATCH 10/12] Update docs --- python/pyspark/sql/readwriter.py | 23 +++++++++++++++++- python/pyspark/sql/streaming.py | 24 +++++++++++++++++-- .../apache/spark/sql/DataFrameReader.scala | 20 ++++++++++++++++ .../datasources/parquet/ParquetOptions.scala | 8 ++----- .../sql/streaming/DataStreamReader.scala | 20 ++++++++++++++++ 5 files changed, 86 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 53122d6c4460..0d11050520a6 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -439,6 +439,24 @@ def parquet(self, *paths, **options): modifiedAfter (batch only) : an optional timestamp to only include files with modification times occurring after the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + datetimeRebaseMode : str, optional + the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``, + ``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar. + + * ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps + that are ambiguous between the two calendars. + * ``CORRECTED``: loading of dates/timestamps without rebasing. + * ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian + to Proleptic Gregorian calendar. + int96RebaseMode : str, optional + the rebasing mode for ``INT96`` timestamps from the Julian to + Proleptic Gregorian calendar. + + * ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps + that are ambiguous between the two calendars. + * ``CORRECTED``: loading of ``INT96`` timestamps without rebasing. + * ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian + to Proleptic Gregorian calendar. Examples -------- @@ -451,9 +469,12 @@ def parquet(self, *paths, **options): modifiedBefore = options.get('modifiedBefore', None) modifiedAfter = options.get('modifiedAfter', None) recursiveFileLookup = options.get('recursiveFileLookup', None) + datetimeRebaseMode = options.get('datetimeRebaseMode', None) + int96RebaseMode = options.get('int96RebaseMode', None) self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore, - modifiedAfter=modifiedAfter) + modifiedAfter=modifiedAfter, datetimeRebaseMode=datetimeRebaseMode, + int96RebaseMode=int96RebaseMode) return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 51941a626907..5c7058510ecb 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -667,7 +667,8 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N else: raise TypeError("path can be only a single string") - def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None): + def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, + datetimeRebaseMode=None, int96RebaseMode=None): """ Loads a Parquet file stream, returning the result as a :class:`DataFrame`. @@ -688,6 +689,24 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook recursively scan a directory for files. Using this option disables `partition discovery `_. # noqa + datetimeRebaseMode : str, optional + the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``, + ``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar. + + * ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps + that are ambiguous between the two calendars. + * ``CORRECTED``: loading of dates/timestamps without rebasing. + * ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian + to Proleptic Gregorian calendar. + int96RebaseMode : str, optional + the rebasing mode for ``INT96`` timestamps from the Julian to + Proleptic Gregorian calendar. + + * ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps + that are ambiguous between the two calendars. + * ``CORRECTED``: loading of ``INT96`` timestamps without rebasing. + * ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian + to Proleptic Gregorian calendar. Examples -------- @@ -698,7 +717,8 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook True """ self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, - recursiveFileLookup=recursiveFileLookup) + recursiveFileLookup=recursiveFileLookup, + datetimeRebaseMode=datetimeRebaseMode, int96RebaseMode=int96RebaseMode) if isinstance(path, str): return self._df(self._jreader.parquet(path)) else: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b94c42a2c954..bfcd97998b93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -825,6 +825,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • + *
  • `datetimeRebaseMode`: the rebasing mode for the values of the `DATE`, `TIMESTAMP_MICROS`, + * `TIMESTAMP_MILLIS` logical types from the Julian to Proleptic Gregorian calendar: + *
      + *
    • `EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous + * between the two calendars
    • + *
    • `CORRECTED` : loading of dates/timestamps without rebasing
    • + *
    • `LEGACY` : perform rebasing of ancient dates/timestamps from the Julian to Proleptic + * Gregorian calendar
    • + *
    + *
  • + *
  • `int96RebaseMode`: the rebasing mode for `INT96` timestamps from the Julian to Proleptic + * Gregorian calendar: + *
      + *
    • `EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous + * between the two calendars
    • + *
    • `CORRECTED` : loading of timestamps without rebasing
    • + *
    • `LEGACY` : perform rebasing of ancient `INT96` timestamps from the Julian to Proleptic + * Gregorian calendar
    • + *
    + *
  • * * * @since 1.4.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index d5efed885ee7..e1edbb41b8ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -106,16 +106,12 @@ object ParquetOptions { // The option controls rebasing of the DATE and TIMESTAMP values between // Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet // datasource similarly to the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead`, - // and can be set to the same values: - // - "EXCEPTION", Spark fails in reads or writes of ancient dates/timestamps - // that are ambiguous between the two calendars. - // - "CORRECTED", no rebasing. Spark reads and writes dates/timestamps as is. - // - "LEGACY", dates and timestamps rebasing is on. + // and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`. val DATETIME_REBASE_MODE = "datetimeRebaseMode" // The option controls rebasing of the INT96 timestamp values between Julian and Proleptic // Gregorian calendars. It impacts on the behaviour of the Parquet datasource similarly to // the SQL config `spark.sql.legacy.parquet.int96RebaseModeInRead`. - // The valid option values are: "EXCEPTION", "LEGACY" or "CORRECTED". + // The valid option values are: `EXCEPTION`, `LEGACY` or `CORRECTED`. val INT96_REBASE_MODE = "int96RebaseMode" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index d82fa9e88592..c8eb31750c48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -493,6 +493,26 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * It does not change the behavior of partition discovery. *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • + *
  • `datetimeRebaseMode`: the rebasing mode for the values of the `DATE`, `TIMESTAMP_MICROS`, + * `TIMESTAMP_MILLIS` logical types from the Julian to Proleptic Gregorian calendar: + *
      + *
    • `EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous + * between the two calendars
    • + *
    • `CORRECTED` : loading of dates/timestamps without rebasing
    • + *
    • `LEGACY` : perform rebasing of ancient dates/timestamps from the Julian to Proleptic + * Gregorian calendar
    • + *
    + *
  • + *
  • `int96RebaseMode`: the rebasing mode for `INT96` timestamps from the Julian to Proleptic + * Gregorian calendar: + *
      + *
    • `EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous + * between the two calendars
    • + *
    • `CORRECTED` : loading of timestamps without rebasing
    • + *
    • `LEGACY` : perform rebasing of ancient `INT96` timestamps from the Julian to Proleptic + * Gregorian calendar
    • + *
    + *
  • * * * @since 2.0.0 From f33b5a85d56ee99f98df11a2f3cac4f1f568a836 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 6 Feb 2021 10:07:01 +0300 Subject: [PATCH 11/12] Trigger build From ebc7298bde1b6bb5c7eab05b731bea6d67127ff0 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 7 Feb 2021 10:48:05 +0300 Subject: [PATCH 12/12] Document the default values --- python/pyspark/sql/readwriter.py | 6 ++++++ python/pyspark/sql/streaming.py | 6 ++++++ .../scala/org/apache/spark/sql/DataFrameReader.scala | 11 +++++++---- .../apache/spark/sql/streaming/DataStreamReader.scala | 11 +++++++---- 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 0d11050520a6..1f6cbcde5ac1 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -448,6 +448,9 @@ def parquet(self, *paths, **options): * ``CORRECTED``: loading of dates/timestamps without rebasing. * ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian to Proleptic Gregorian calendar. + + If None is set, the value of the SQL config + ``spark.sql.legacy.parquet.datetimeRebaseModeInRead`` is used by default. int96RebaseMode : str, optional the rebasing mode for ``INT96`` timestamps from the Julian to Proleptic Gregorian calendar. @@ -458,6 +461,9 @@ def parquet(self, *paths, **options): * ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian to Proleptic Gregorian calendar. + If None is set, the value of the SQL config + ``spark.sql.legacy.parquet.int96RebaseModeInRead`` is used by default. + Examples -------- >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned') diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 5c7058510ecb..33297f2779dc 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -698,6 +698,9 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook * ``CORRECTED``: loading of dates/timestamps without rebasing. * ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian to Proleptic Gregorian calendar. + + If None is set, the value of the SQL config + ``spark.sql.legacy.parquet.datetimeRebaseModeInRead`` is used by default. int96RebaseMode : str, optional the rebasing mode for ``INT96`` timestamps from the Julian to Proleptic Gregorian calendar. @@ -708,6 +711,9 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook * ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian to Proleptic Gregorian calendar. + If None is set, the value of the SQL config + ``spark.sql.legacy.parquet.int96RebaseModeInRead`` is used by default. + Examples -------- >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index bfcd97998b93..170bac381c87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -825,8 +825,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • - *
  • `datetimeRebaseMode`: the rebasing mode for the values of the `DATE`, `TIMESTAMP_MICROS`, - * `TIMESTAMP_MILLIS` logical types from the Julian to Proleptic Gregorian calendar: + *
  • `datetimeRebaseMode` (default is the value specified in the SQL config + * `spark.sql.legacy.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values + * of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to + * Proleptic Gregorian calendar: *
      *
    • `EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous * between the two calendars
    • @@ -835,8 +837,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * Gregorian calendar *
    *
  • - *
  • `int96RebaseMode`: the rebasing mode for `INT96` timestamps from the Julian to Proleptic - * Gregorian calendar: + *
  • `int96RebaseMode` (default is the value specified in the SQL config + * `spark.sql.legacy.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps + * from the Julian to Proleptic Gregorian calendar: *
      *
    • `EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous * between the two calendars
    • diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index c8eb31750c48..43876a8c2f81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -493,8 +493,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * It does not change the behavior of partition discovery. *
    • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
    • - *
    • `datetimeRebaseMode`: the rebasing mode for the values of the `DATE`, `TIMESTAMP_MICROS`, - * `TIMESTAMP_MILLIS` logical types from the Julian to Proleptic Gregorian calendar: + *
    • `datetimeRebaseMode` (default is the value specified in the SQL config + * `spark.sql.legacy.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values + * of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to + * Proleptic Gregorian calendar: *
        *
      • `EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous * between the two calendars
      • @@ -503,8 +505,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * Gregorian calendar *
      *
    • - *
    • `int96RebaseMode`: the rebasing mode for `INT96` timestamps from the Julian to Proleptic - * Gregorian calendar: + *
    • `int96RebaseMode` (default is the value specified in the SQL config + * `spark.sql.legacy.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps + * from the Julian to Proleptic Gregorian calendar: *
        *
      • `EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous * between the two calendars