Skip to content

Commit 88aacb9

Browse files
committed
fail by default when reading/writing ancient datetime values from/to Parquet/Avro files
1 parent 272d229 commit 88aacb9

File tree

24 files changed

+478
-292
lines changed

24 files changed

+478
-292
lines changed

core/src/main/scala/org/apache/spark/SparkException.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String)
4848
* Exception thrown when Spark returns different result after upgrading to a new version.
4949
*/
5050
private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
51-
extends SparkException("You may get a different result due to the upgrading of Spark" +
51+
extends RuntimeException("You may get a different result due to the upgrading of Spark" +
5252
s" $version: $message", cause)

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,50 @@ import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
3535
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
3636
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
37-
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
37+
import org.apache.spark.sql.catalyst.util.RebaseDateTime
38+
import org.apache.spark.sql.execution.datasources.DataSourceUtils
3839
import org.apache.spark.sql.internal.SQLConf
40+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
3941
import org.apache.spark.sql.types._
4042
import org.apache.spark.unsafe.types.UTF8String
4143
/**
4244
* A deserializer to deserialize data in avro format to data in catalyst format.
4345
*/
44-
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {
46+
class AvroDeserializer(
47+
rootAvroType: Schema,
48+
rootCatalystType: DataType,
49+
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
4550

4651
def this(rootAvroType: Schema, rootCatalystType: DataType) {
4752
this(rootAvroType, rootCatalystType,
48-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
53+
LegacyBehaviorPolicy.withName(
54+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)))
4955
}
5056

5157
private lazy val decimalConversions = new DecimalConversion()
5258

59+
private val dateRebaseFunc: Int => Int = datetimeRebaseMode match {
60+
case LegacyBehaviorPolicy.EXCEPTION =>
61+
days: Int =>
62+
if (days < RebaseDateTime.lastSwitchJulianDay) {
63+
throw DataSourceUtils.newRebaseExceptionInRead("Avro")
64+
}
65+
days
66+
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianDays
67+
case LegacyBehaviorPolicy.CORRECTED => identity[Int]
68+
}
69+
70+
private val timestampRebaseFunc: Long => Long = datetimeRebaseMode match {
71+
case LegacyBehaviorPolicy.EXCEPTION =>
72+
micros: Long =>
73+
if (micros < RebaseDateTime.lastSwitchJulianTs) {
74+
throw DataSourceUtils.newRebaseExceptionInRead("Avro")
75+
}
76+
micros
77+
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianMicros
78+
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
79+
}
80+
5381
private val converter: Any => Any = rootCatalystType match {
5482
// A shortcut for empty schema.
5583
case st: StructType if st.isEmpty =>
@@ -96,36 +124,22 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD
96124
case (INT, IntegerType) => (updater, ordinal, value) =>
97125
updater.setInt(ordinal, value.asInstanceOf[Int])
98126

99-
case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
100-
val days = value.asInstanceOf[Int]
101-
val rebasedDays = rebaseJulianToGregorianDays(days)
102-
updater.setInt(ordinal, rebasedDays)
103-
104127
case (INT, DateType) => (updater, ordinal, value) =>
105-
updater.setInt(ordinal, value.asInstanceOf[Int])
128+
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
106129

107130
case (LONG, LongType) => (updater, ordinal, value) =>
108131
updater.setLong(ordinal, value.asInstanceOf[Long])
109132

110133
case (LONG, TimestampType) => avroType.getLogicalType match {
111134
// For backward compatibility, if the Avro type is Long and it is not logical type
112135
// (the `null` case), the value is processed as timestamp type with millisecond precision.
113-
case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
114-
val millis = value.asInstanceOf[Long]
115-
val micros = DateTimeUtils.millisToMicros(millis)
116-
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
117-
updater.setLong(ordinal, rebasedMicros)
118136
case null | _: TimestampMillis => (updater, ordinal, value) =>
119137
val millis = value.asInstanceOf[Long]
120138
val micros = DateTimeUtils.millisToMicros(millis)
121-
updater.setLong(ordinal, micros)
122-
case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
123-
val micros = value.asInstanceOf[Long]
124-
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
125-
updater.setLong(ordinal, rebasedMicros)
139+
updater.setLong(ordinal, timestampRebaseFunc(micros))
126140
case _: TimestampMicros => (updater, ordinal, value) =>
127141
val micros = value.asInstanceOf[Long]
128-
updater.setLong(ordinal, micros)
142+
updater.setLong(ordinal, timestampRebaseFunc(micros))
129143
case other => throw new IncompatibleSchemaException(
130144
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
131145
}

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
124124
reader.sync(file.start)
125125
val stop = file.start + file.length
126126

127-
val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
128-
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
129-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
130-
}
127+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
128+
reader.asInstanceOf[DataFileReader[_]].getMetaString,
129+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
130+
131131
val deserializer = new AvroDeserializer(
132-
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)
132+
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode)
133133

134134
new Iterator[InternalRow] {
135135
private[this] var completed = false

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
3333
import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.execution.datasources.OutputWriter
3535
import org.apache.spark.sql.internal.SQLConf
36+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
3637
import org.apache.spark.sql.types._
3738

3839
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
@@ -43,20 +44,24 @@ private[avro] class AvroOutputWriter(
4344
avroSchema: Schema) extends OutputWriter {
4445

4546
// Whether to rebase datetimes from Gregorian to Julian calendar in write
46-
private val rebaseDateTime: Boolean =
47-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
47+
private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
48+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))
4849

4950
// The input rows will never be null.
5051
private lazy val serializer =
51-
new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
52+
new AvroSerializer(schema, avroSchema, nullable = false, datetimeRebaseMode)
5253

5354
/**
5455
* Overrides the couple of methods responsible for generating the output streams / files so
5556
* that the data can be correctly partitioned
5657
*/
5758
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
5859
val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
59-
if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
60+
if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
61+
Some(SPARK_LEGACY_DATETIME -> "")
62+
} else {
63+
None
64+
}
6065
}
6166

6267
new SparkAvroKeyOutputFormat(fileMeta.asJava) {

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ import org.apache.avro.util.Utf8
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.sql.catalyst.InternalRow
3636
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
37-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
38-
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
37+
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime}
38+
import org.apache.spark.sql.execution.datasources.DataSourceUtils
3939
import org.apache.spark.sql.internal.SQLConf
40+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
4041
import org.apache.spark.sql.types._
4142

4243
/**
@@ -46,17 +47,40 @@ class AvroSerializer(
4647
rootCatalystType: DataType,
4748
rootAvroType: Schema,
4849
nullable: Boolean,
49-
rebaseDateTime: Boolean) extends Logging {
50+
datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
5051

5152
def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
5253
this(rootCatalystType, rootAvroType, nullable,
53-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
54+
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(
55+
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
5456
}
5557

5658
def serialize(catalystData: Any): Any = {
5759
converter.apply(catalystData)
5860
}
5961

62+
private val dateRebaseFunc: Int => Int = datetimeRebaseMode match {
63+
case LegacyBehaviorPolicy.EXCEPTION =>
64+
days: Int =>
65+
if (days < RebaseDateTime.lastSwitchGregorianDay) {
66+
throw DataSourceUtils.newRebaseExceptionInWrite("Avro")
67+
}
68+
days
69+
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianDays
70+
case LegacyBehaviorPolicy.CORRECTED => identity[Int]
71+
}
72+
73+
private val timestampRebaseFunc: Long => Long = datetimeRebaseMode match {
74+
case LegacyBehaviorPolicy.EXCEPTION =>
75+
micros: Long =>
76+
if (micros < RebaseDateTime.lastSwitchGregorianTs) {
77+
throw DataSourceUtils.newRebaseExceptionInWrite("Avro")
78+
}
79+
micros
80+
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianMicros
81+
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
82+
}
83+
6084
private val converter: Any => Any = {
6185
val actualAvroType = resolveNullableType(rootAvroType, nullable)
6286
val baseConverter = rootCatalystType match {
@@ -146,24 +170,16 @@ class AvroSerializer(
146170
case (BinaryType, BYTES) =>
147171
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
148172

149-
case (DateType, INT) if rebaseDateTime =>
150-
(getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal))
151-
152173
case (DateType, INT) =>
153-
(getter, ordinal) => getter.getInt(ordinal)
174+
(getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
154175

155176
case (TimestampType, LONG) => avroType.getLogicalType match {
156177
// For backward compatibility, if the Avro type is Long and it is not logical type
157178
// (the `null` case), output the timestamp value as with millisecond precision.
158-
case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
159-
val micros = getter.getLong(ordinal)
160-
val rebasedMicros = rebaseGregorianToJulianMicros(micros)
161-
DateTimeUtils.microsToMillis(rebasedMicros)
162179
case null | _: TimestampMillis => (getter, ordinal) =>
163-
DateTimeUtils.microsToMillis(getter.getLong(ordinal))
164-
case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
165-
rebaseGregorianToJulianMicros(getter.getLong(ordinal))
166-
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
180+
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
181+
case _: TimestampMicros => (getter, ordinal) =>
182+
timestampRebaseFunc(getter.getLong(ordinal))
167183
case other => throw new IncompatibleSchemaException(
168184
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
169185
}

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory(
8888
reader.sync(partitionedFile.start)
8989
val stop = partitionedFile.start + partitionedFile.length
9090

91-
val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
92-
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
93-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
94-
}
91+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
92+
reader.asInstanceOf[DataFileReader[_]].getMetaString,
93+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
9594
val deserializer = new AvroDeserializer(
96-
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)
95+
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode)
9796

9897
val fileReader = new PartitionReader[InternalRow] {
9998
private[this] var completed = false

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
288288
""".stripMargin
289289
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
290290
val dataType = SchemaConverters.toSqlType(avroSchema).dataType
291-
val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)
291+
val deserializer = new AvroDeserializer(avroSchema, dataType)
292292

293293
def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
294294
assert(checkResult(

0 commit comments

Comments
 (0)