diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 9813f7fa30f1..1df1b1ebf31a 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -61,7 +61,8 @@ private[avro] case class AvroDataToCatalyst( @transient private lazy val reader = new GenericDatumReader[Any](actualSchema, expectedSchema) - @transient private lazy val deserializer = new AvroDeserializer(expectedSchema, dataType) + @transient private lazy val deserializer = + new AvroDeserializer(expectedSchema, dataType, avroOptions.datetimeRebaseModeInRead) @transient private var decoder: BinaryDecoder = _ diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 390fce9c1363..a19a7b0d0edd 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY import org.apache.spark.sql.execution.datasources.DataSourceUtils -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -49,11 +48,14 @@ private[sql] class AvroDeserializer( datetimeRebaseMode: LegacyBehaviorPolicy.Value, filters: StructFilters) { - def this(rootAvroType: Schema, rootCatalystType: DataType) = { + def this( + rootAvroType: Schema, + rootCatalystType: DataType, + datetimeRebaseMode: String) = { this( rootAvroType, rootCatalystType, - LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)), + LegacyBehaviorPolicy.withName(datetimeRebaseMode), new NoopFilters) } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index fa4b6b829bdd..ad80d81cca34 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -87,6 +87,7 @@ private[sql] class AvroFileFormat extends FileFormat val broadcastedConf = spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) val parsedOptions = new AvroOptions(options, hadoopConf) + val datetimeRebaseModeInRead = parsedOptions.datetimeRebaseModeInRead (file: PartitionedFile) => { val conf = broadcastedConf.value.value @@ -125,7 +126,7 @@ private[sql] class AvroFileFormat extends FileFormat val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( reader.asInstanceOf[DataFileReader[_]].getMetaString, - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) + datetimeRebaseModeInRead) val avroFilters = if (SQLConf.get.avroFilterPushDown) { new OrderedFilters(filters, requiredSchema) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index a2228f2564dd..bb6d500de1b7 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -93,6 +93,13 @@ private[sql] class AvroOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + /** + * The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. + */ + val datetimeRebaseModeInRead: String = parameters + .get(AvroOptions.DATETIME_REBASE_MODE) + .getOrElse(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) } private[sql] object AvroOptions { @@ -105,4 +112,10 @@ private[sql] object AvroOptions { } val ignoreExtensionKey = "ignoreExtension" + + // The option controls rebasing of the DATE and TIMESTAMP values between + // Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Avro + // datasource similarly to the SQL config `spark.sql.legacy.avro.datetimeRebaseModeInRead`, + // and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`. + val DATETIME_REBASE_MODE = "datetimeRebaseMode" } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 1e6c382041ef..a7d106a62657 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -57,6 +57,7 @@ case class AvroPartitionReaderFactory( partitionSchema: StructType, parsedOptions: AvroOptions, filters: Seq[Filter]) extends FilePartitionReaderFactory with Logging { + private val datetimeRebaseModeInRead = parsedOptions.datetimeRebaseModeInRead override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value @@ -91,7 +92,7 @@ case class AvroPartitionReaderFactory( val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( reader.asInstanceOf[DataFileReader[_]].getMetaString, - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) + datetimeRebaseModeInRead) val avroFilters = if (SQLConf.get.avroFilterPushDown) { new OrderedFilters(filters, readDataSchema) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala index 8bf51e8442ff..5be760f0056b 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala @@ -20,6 +20,7 @@ import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.generic.GenericRecordBuilder import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types.{IntegerType, StructType} /** @@ -29,13 +30,15 @@ import org.apache.spark.sql.types.{IntegerType, StructType} class AvroSerdeSuite extends SparkFunSuite { import AvroSerdeSuite._ + private val defaultRebaseMode = LegacyBehaviorPolicy.CORRECTED.toString + test("Test basic conversion") { val avro = createNestedAvroSchemaWithFields("foo", _.optionalInt("bar")) val record = new GenericRecordBuilder(avro) .set("foo", new GenericRecordBuilder(avro.getField("foo").schema()).set("bar", 42).build()) .build() val serializer = new AvroSerializer(CATALYST_STRUCT, avro, false) - val deserializer = new AvroDeserializer(avro, CATALYST_STRUCT) + val deserializer = new AvroDeserializer(avro, CATALYST_STRUCT, defaultRebaseMode) assert(serializer.serialize(deserializer.deserialize(record).get) === record) } @@ -69,7 +72,7 @@ class AvroSerdeSuite extends SparkFunSuite { .add("foo", new StructType().add("bar", IntegerType, nullable = false)) // deserialize should have no issues when 'bar' is nullable but fail when it is nonnull - new AvroDeserializer(avro, CATALYST_STRUCT) + new AvroDeserializer(avro, CATALYST_STRUCT, defaultRebaseMode) assertFailedConversionMessage(avro, deserialize = true, "Cannot find non-nullable field 'foo.bar' in Avro schema.", nonnullCatalyst) @@ -120,7 +123,7 @@ class AvroSerdeSuite extends SparkFunSuite { catalystSchema: StructType = CATALYST_STRUCT): Unit = { val e = intercept[IncompatibleSchemaException] { if (deserialize) { - new AvroDeserializer(avroSchema, catalystSchema) + new AvroDeserializer(avroSchema, catalystSchema, defaultRebaseMode) } else { new AvroSerializer(catalystSchema, avroSchema, false) } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index e08b4fec7e96..22729cf2e705 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, Da import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -1755,6 +1756,20 @@ abstract class AvroSuite } } + private def runInMode( + modes: Seq[LegacyBehaviorPolicy.Value])(f: Map[String, String] => Unit): Unit = { + modes.foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { + f(Map.empty) + } + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> EXCEPTION.toString) { + modes.foreach { mode => + f(Map(AvroOptions.DATETIME_REBASE_MODE -> mode.toString)) + } + } + } + test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") { // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. def checkReadMixedFiles( @@ -1784,9 +1799,9 @@ abstract class AvroSuite // For Avro files written by Spark 3.0, we know the writer info and don't need the config // to guide the rebase behavior. - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + runInMode(Seq(LEGACY)) { options => checkAnswer( - spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), + spark.read.options(options).format("avro").load(path2_4, path3_0, path3_0_rebase), 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) } } else { @@ -1817,9 +1832,9 @@ abstract class AvroSuite // For Avro files written by Spark 3.0, we know the writer info and don't need the config // to guide the rebase behavior. - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + runInMode(Seq(LEGACY)) { options => checkAnswer( - spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), + spark.read.options(options).format("avro").load(path2_4, path3_0, path3_0_rebase), 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) } } @@ -1869,10 +1884,10 @@ abstract class AvroSuite // The file metadata indicates if it needs rebase or not, so we can always get the correct // result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { - checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) - } + runInMode(Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( + spark.read.options(options).format("avro").load(path), + Row(Timestamp.valueOf(tsStr))) } // Force to not rebase to prove the written datetime values are rebased and we will get @@ -1912,12 +1927,10 @@ abstract class AvroSuite // The file metadata indicates if it needs rebase or not, so we can always get the correct // result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { - checkAnswer( - spark.read.schema("ts timestamp").format("avro").load(path), - Row(Timestamp.valueOf(rebased))) - } + runInMode(Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( + spark.read.options(options).schema("ts timestamp").format("avro").load(path), + Row(Timestamp.valueOf(rebased))) } // Force to not rebase to prove the written datetime values are rebased and we will get @@ -1943,10 +1956,10 @@ abstract class AvroSuite // The file metadata indicates if it needs rebase or not, so we can always get the correct // result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { - checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) - } + runInMode(Seq(LEGACY, CORRECTED, EXCEPTION)) { options => + checkAnswer( + spark.read.options(options).format("avro").load(path), + Row(Date.valueOf("1001-01-01"))) } // Force to not rebase to prove the written datetime values are rebased and we will get