Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.avro.util.Utf8

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
Expand All @@ -42,6 +43,9 @@ import org.apache.spark.unsafe.types.UTF8String
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private lazy val decimalConversions = new DecimalConversion()

// Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
private val rebaseDateTime = SQLConf.get.avroRebaseDateTimeEnabled

private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
Expand Down Expand Up @@ -88,21 +92,35 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])

case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
val days = value.asInstanceOf[Int]
val rebasedDays = DateTimeUtils.rebaseJulianToGregorianDays(days)
updater.setInt(ordinal, rebasedDays)

case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])

case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])

case (LONG, TimestampType) => avroType.getLogicalType match {
case _: TimestampMillis => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), the value is processed as timestamp type with millisecond precision.
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
if (rebaseDateTime) {
updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
} else {
updater.setLong(ordinal, micros)
}
case _: TimestampMicros => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
case null => (updater, ordinal, value) =>
// For backward compatibility, if the Avro type is Long and it is not logical type,
// the value is processed as timestamp type with millisecond precision.
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
val micros = value.asInstanceOf[Long]
if (rebaseDateTime) {
updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
} else {
updater.setLong(ordinal, micros)
}
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand All @@ -42,6 +44,9 @@ import org.apache.spark.sql.types._
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
extends Logging {

// Whether to rebase datetimes from Gregorian to Julian calendar in write
private val rebaseDateTime: Boolean = SQLConf.get.avroRebaseDateTimeEnabled

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}
Expand Down Expand Up @@ -135,15 +140,26 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
case (BinaryType, BYTES) =>
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))

case (DateType, INT) if rebaseDateTime =>
(getter, ordinal) => DateTimeUtils.rebaseGregorianToJulianDays(getter.getInt(ordinal))

case (DateType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)

case (TimestampType, LONG) => avroType.getLogicalType match {
case _: TimestampMillis => (getter, ordinal) => getter.getLong(ordinal) / 1000
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
// For backward compatibility, if the Avro type is Long and it is not logical type,
// output the timestamp value as with millisecond precision.
case null => (getter, ordinal) => getter.getLong(ordinal) / 1000
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
val rebasedMicros = if (rebaseDateTime) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing, why don't we return a function rather than checking rebaseDateTime for every time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I assumed timestamps in milliseconds is rare case. By default, Spark writes microseconds.
  2. Checking the boolean flag shouldn't have significant overhead.
  3. If the function is hot, jvm should optimize it

I can move the flag checking out of the function body in a follow PR, or in the same for #27953 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's easy to switch with almost no additional complexity. Seems fine to change rather than relying on other optimization like JIT, or having a bad example.

DateTimeUtils.rebaseGregorianToJulianMicros(micros)
} else micros
DateTimeUtils.microsToMillis(rebasedMicros)
case _: TimestampMicros => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
if (rebaseDateTime) {
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
} else micros
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
}
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package org.apache.spark.sql.avro

import java.io.File
import java.sql.Timestamp
import java.sql.{Date, Timestamp}

import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -348,6 +348,100 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
assert(msg.contains("Unscaled value too large for precision"))
}
}

private def readResourceAvroFile(name: String): DataFrame = {
val url = Thread.currentThread().getContextClassLoader.getResource(name)
spark.read.format("avro").load(url.toString)
}

test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed one thing. I think the test is not very related to logical types and probably should be put in AvroSuite.

@MaxGekk can you move the test in your next PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean only this test, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the new tests added here. The are more about compatibility, not logical type.

withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
checkAnswer(
readResourceAvroFile("before_1582_date_v2_4.avro"),
Row(java.sql.Date.valueOf("1001-01-01")))
checkAnswer(
readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
checkAnswer(
readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
}
}

test("SPARK-31183: rebasing microseconds timestamps in write") {
val tsStr = "1001-01-01 01:02:03.123456"
val nonRebased = "1001-01-07 01:09:05.123456"
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write.format("avro")
.save(path)

checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
}
}
}

test("SPARK-31183: rebasing milliseconds timestamps in write") {
val tsStr = "1001-01-01 01:02:03.123456"
val rebased = "1001-01-01 01:02:03.123"
val nonRebased = "1001-01-07 01:09:05.123"
Seq(
"""{"type": "long","logicalType": "timestamp-millis"}""",
""""long"""").foreach { tsType =>
val timestampSchema = s"""
|{
| "namespace": "logical",
| "type": "record",
| "name": "test",
| "fields": [
| {"name": "ts", "type": $tsType}
| ]
|}""".stripMargin
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write
.option("avroSchema", timestampSchema)
.format("avro")
.save(path)

checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(rebased)))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
}
}
}
}

test("SPARK-31183: rebasing dates in write") {
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq("1001-01-01").toDF("dateS")
.select($"dateS".cast("date").as("date"))
.write.format("avro")
.save(path)

checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
}
}
}
}

class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2501,6 +2501,20 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_AVRO_REBASE_DATETIME =
buildConf("spark.sql.legacy.avro.rebaseDateTime.enabled")
.internal()
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
"to the hybrid calendar (Julian + Gregorian) in write and " +
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
"The rebasing is performed by converting micros/millis/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of micros/millis/days " +
"since the epoch 1970-01-01 00:00:00Z.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3080,6 +3094,8 @@ class SQLConf extends Serializable with Logging {

def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)

def avroRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down