diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 2c17c16f06da7..5ec28ba6b63a3 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -32,8 +32,9 @@ import org.apache.avro.util.Utf8 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** @@ -42,6 +43,9 @@ import org.apache.spark.unsafe.types.UTF8String class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { private lazy val decimalConversions = new DecimalConversion() + // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar + private val rebaseDateTime = SQLConf.get.avroRebaseDateTimeEnabled + private val converter: Any => Any = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => @@ -88,6 +92,11 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { case (INT, IntegerType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) + case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) => + val days = value.asInstanceOf[Int] + val rebasedDays = DateTimeUtils.rebaseJulianToGregorianDays(days) + updater.setInt(ordinal, rebasedDays) + case (INT, DateType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) @@ -95,14 +104,23 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { updater.setLong(ordinal, value.asInstanceOf[Long]) case (LONG, TimestampType) => avroType.getLogicalType match { - case _: TimestampMillis => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) + // For backward compatibility, if the Avro type is Long and it is not logical type + // (the `null` case), the value is processed as timestamp type with millisecond precision. + case null | _: TimestampMillis => (updater, ordinal, value) => + val millis = value.asInstanceOf[Long] + val micros = DateTimeUtils.millisToMicros(millis) + if (rebaseDateTime) { + updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros)) + } else { + updater.setLong(ordinal, micros) + } case _: TimestampMicros => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case null => (updater, ordinal, value) => - // For backward compatibility, if the Avro type is Long and it is not logical type, - // the value is processed as timestamp type with millisecond precision. - updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) + val micros = value.asInstanceOf[Long] + if (rebaseDateTime) { + updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros)) + } else { + updater.setLong(ordinal, micros) + } case other => throw new IncompatibleSchemaException( s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index b7bf7e5543033..9d95c84676bef 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8 import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -42,6 +44,9 @@ import org.apache.spark.sql.types._ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) extends Logging { + // Whether to rebase datetimes from Gregorian to Julian calendar in write + private val rebaseDateTime: Boolean = SQLConf.get.avroRebaseDateTimeEnabled + def serialize(catalystData: Any): Any = { converter.apply(catalystData) } @@ -135,15 +140,26 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: case (BinaryType, BYTES) => (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) + case (DateType, INT) if rebaseDateTime => + (getter, ordinal) => DateTimeUtils.rebaseGregorianToJulianDays(getter.getInt(ordinal)) + case (DateType, INT) => (getter, ordinal) => getter.getInt(ordinal) case (TimestampType, LONG) => avroType.getLogicalType match { - case _: TimestampMillis => (getter, ordinal) => getter.getLong(ordinal) / 1000 - case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal) - // For backward compatibility, if the Avro type is Long and it is not logical type, - // output the timestamp value as with millisecond precision. - case null => (getter, ordinal) => getter.getLong(ordinal) / 1000 + // For backward compatibility, if the Avro type is Long and it is not logical type + // (the `null` case), output the timestamp value as with millisecond precision. + case null | _: TimestampMillis => (getter, ordinal) => + val micros = getter.getLong(ordinal) + val rebasedMicros = if (rebaseDateTime) { + DateTimeUtils.rebaseGregorianToJulianMicros(micros) + } else micros + DateTimeUtils.microsToMillis(rebasedMicros) + case _: TimestampMicros => (getter, ordinal) => + val micros = getter.getLong(ordinal) + if (rebaseDateTime) { + DateTimeUtils.rebaseGregorianToJulianMicros(micros) + } else micros case other => throw new IncompatibleSchemaException( s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}") } diff --git a/external/avro/src/test/resources/before_1582_date_v2_4.avro b/external/avro/src/test/resources/before_1582_date_v2_4.avro new file mode 100644 index 0000000000000..96aa7cbf176a5 Binary files /dev/null and b/external/avro/src/test/resources/before_1582_date_v2_4.avro differ diff --git a/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro new file mode 100644 index 0000000000000..efe5e71a58813 Binary files /dev/null and b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro differ diff --git a/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro b/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro new file mode 100644 index 0000000000000..dbaec814eb954 Binary files /dev/null and b/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro differ diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 82569653c1f23..9e89b69c0b33c 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.avro import java.io.File -import java.sql.Timestamp +import java.sql.{Date, Timestamp} import org.apache.avro.{LogicalTypes, Schema} import org.apache.avro.Conversions.DecimalConversion @@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord} import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -348,6 +348,100 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { assert(msg.contains("Unscaled value too large for precision")) } } + + private def readResourceAvroFile(name: String): DataFrame = { + val url = Thread.currentThread().getContextClassLoader.getResource(name) + spark.read.format("avro").load(url.toString) + } + + test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + checkAnswer( + readResourceAvroFile("before_1582_date_v2_4.avro"), + Row(java.sql.Date.valueOf("1001-01-01"))) + checkAnswer( + readResourceAvroFile("before_1582_ts_micros_v2_4.avro"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + checkAnswer( + readResourceAvroFile("before_1582_ts_millis_v2_4.avro"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124"))) + } + } + + test("SPARK-31183: rebasing microseconds timestamps in write") { + val tsStr = "1001-01-01 01:02:03.123456" + val nonRebased = "1001-01-07 01:09:05.123456" + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq(tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .write.format("avro") + .save(path) + + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased))) + } + } + } + + test("SPARK-31183: rebasing milliseconds timestamps in write") { + val tsStr = "1001-01-01 01:02:03.123456" + val rebased = "1001-01-01 01:02:03.123" + val nonRebased = "1001-01-07 01:09:05.123" + Seq( + """{"type": "long","logicalType": "timestamp-millis"}""", + """"long"""").foreach { tsType => + val timestampSchema = s""" + |{ + | "namespace": "logical", + | "type": "record", + | "name": "test", + | "fields": [ + | {"name": "ts", "type": $tsType} + | ] + |}""".stripMargin + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq(tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .write + .option("avroSchema", timestampSchema) + .format("avro") + .save(path) + + checkAnswer( + spark.read.schema("ts timestamp").format("avro").load(path), + Row(Timestamp.valueOf(rebased))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer( + spark.read.schema("ts timestamp").format("avro").load(path), + Row(Timestamp.valueOf(nonRebased))) + } + } + } + } + + test("SPARK-31183: rebasing dates in write") { + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq("1001-01-01").toDF("dateS") + .select($"dateS".cast("date").as("date")) + .write.format("avro") + .save(path) + + checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07"))) + } + } + } } class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1ccbd3573e772..9a524defb2816 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2501,6 +2501,20 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_AVRO_REBASE_DATETIME = + buildConf("spark.sql.legacy.avro.rebaseDateTime.enabled") + .internal() + .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the hybrid calendar (Julian + Gregorian) in write and " + + "from the hybrid calendar to Proleptic Gregorian calendar in read. " + + "The rebasing is performed by converting micros/millis/days to " + + "a local date/timestamp in the source calendar, interpreting the resulted date/" + + "timestamp in the target calendar, and getting the number of micros/millis/days " + + "since the epoch 1970-01-01 00:00:00Z.") + .version("3.0.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3080,6 +3094,8 @@ class SQLConf extends Serializable with Logging { def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME) + def avroRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */