Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ private[avro] case class AvroDataToCatalyst(

@transient private lazy val reader = new GenericDatumReader[Any](actualSchema, expectedSchema)

@transient private lazy val deserializer = new AvroDeserializer(expectedSchema, dataType)
@transient private lazy val deserializer =
new AvroDeserializer(expectedSchema, dataType, avroOptions.datetimeRebaseModeInRead)

@transient private var decoder: BinaryDecoder = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand All @@ -49,11 +48,14 @@ private[sql] class AvroDeserializer(
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
filters: StructFilters) {

def this(rootAvroType: Schema, rootCatalystType: DataType) = {
def this(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: String) = {
this(
rootAvroType,
rootCatalystType,
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)),
LegacyBehaviorPolicy.withName(datetimeRebaseMode),
new NoopFilters)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private[sql] class AvroFileFormat extends FileFormat
val broadcastedConf =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val parsedOptions = new AvroOptions(options, hadoopConf)
val datetimeRebaseModeInRead = parsedOptions.datetimeRebaseModeInRead

(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
Expand Down Expand Up @@ -125,7 +126,7 @@ private[sql] class AvroFileFormat extends FileFormat

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
reader.asInstanceOf[DataFileReader[_]].getMetaString,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
datetimeRebaseModeInRead)

val avroFilters = if (SQLConf.get.avroFilterPushDown) {
new OrderedFilters(filters, requiredSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ private[sql] class AvroOptions(

val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)

/**
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
*/
val datetimeRebaseModeInRead: String = parameters
.get(AvroOptions.DATETIME_REBASE_MODE)
.getOrElse(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
}

private[sql] object AvroOptions {
Expand All @@ -105,4 +112,10 @@ private[sql] object AvroOptions {
}

val ignoreExtensionKey = "ignoreExtension"

// The option controls rebasing of the DATE and TIMESTAMP values between
// Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Avro
// datasource similarly to the SQL config `spark.sql.legacy.avro.datetimeRebaseModeInRead`,
// and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`.
val DATETIME_REBASE_MODE = "datetimeRebaseMode"
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ case class AvroPartitionReaderFactory(
partitionSchema: StructType,
parsedOptions: AvroOptions,
filters: Seq[Filter]) extends FilePartitionReaderFactory with Logging {
private val datetimeRebaseModeInRead = parsedOptions.datetimeRebaseModeInRead

override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
val conf = broadcastedConf.value.value
Expand Down Expand Up @@ -91,7 +92,7 @@ case class AvroPartitionReaderFactory(

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
reader.asInstanceOf[DataFileReader[_]].getMetaString,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
datetimeRebaseModeInRead)

val avroFilters = if (SQLConf.get.avroFilterPushDown) {
new OrderedFilters(filters, readDataSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.GenericRecordBuilder

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types.{IntegerType, StructType}

/**
Expand All @@ -29,13 +30,15 @@ import org.apache.spark.sql.types.{IntegerType, StructType}
class AvroSerdeSuite extends SparkFunSuite {
import AvroSerdeSuite._

private val defaultRebaseMode = LegacyBehaviorPolicy.CORRECTED.toString

test("Test basic conversion") {
val avro = createNestedAvroSchemaWithFields("foo", _.optionalInt("bar"))
val record = new GenericRecordBuilder(avro)
.set("foo", new GenericRecordBuilder(avro.getField("foo").schema()).set("bar", 42).build())
.build()
val serializer = new AvroSerializer(CATALYST_STRUCT, avro, false)
val deserializer = new AvroDeserializer(avro, CATALYST_STRUCT)
val deserializer = new AvroDeserializer(avro, CATALYST_STRUCT, defaultRebaseMode)
assert(serializer.serialize(deserializer.deserialize(record).get) === record)
}

Expand Down Expand Up @@ -69,7 +72,7 @@ class AvroSerdeSuite extends SparkFunSuite {
.add("foo", new StructType().add("bar", IntegerType, nullable = false))

// deserialize should have no issues when 'bar' is nullable but fail when it is nonnull
new AvroDeserializer(avro, CATALYST_STRUCT)
new AvroDeserializer(avro, CATALYST_STRUCT, defaultRebaseMode)
assertFailedConversionMessage(avro, deserialize = true,
"Cannot find non-nullable field 'foo.bar' in Avro schema.",
nonnullCatalyst)
Expand Down Expand Up @@ -120,7 +123,7 @@ class AvroSerdeSuite extends SparkFunSuite {
catalystSchema: StructType = CATALYST_STRUCT): Unit = {
val e = intercept[IncompatibleSchemaException] {
if (deserialize) {
new AvroDeserializer(avroSchema, catalystSchema)
new AvroDeserializer(avroSchema, catalystSchema, defaultRebaseMode)
} else {
new AvroSerializer(catalystSchema, avroSchema, false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, Da
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1755,6 +1756,20 @@ abstract class AvroSuite
}
}

private def runInMode(
modes: Seq[LegacyBehaviorPolicy.Value])(f: Map[String, String] => Unit): Unit = {
modes.foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
f(Map.empty)
}
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> EXCEPTION.toString) {
modes.foreach { mode =>
f(Map(AvroOptions.DATETIME_REBASE_MODE -> mode.toString))
}
}
}

test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
def checkReadMixedFiles(
Expand Down Expand Up @@ -1784,9 +1799,9 @@ abstract class AvroSuite

// For Avro files written by Spark 3.0, we know the writer info and don't need the config
// to guide the rebase behavior.
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
runInMode(Seq(LEGACY)) { options =>
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
spark.read.options(options).format("avro").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
}
} else {
Expand Down Expand Up @@ -1817,9 +1832,9 @@ abstract class AvroSuite

// For Avro files written by Spark 3.0, we know the writer info and don't need the config
// to guide the rebase behavior.
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
runInMode(Seq(LEGACY)) { options =>
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
spark.read.options(options).format("avro").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
}
}
Expand Down Expand Up @@ -1869,10 +1884,10 @@ abstract class AvroSuite

// The file metadata indicates if it needs rebase or not, so we can always get the correct
// result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
}
runInMode(Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).format("avro").load(path),
Row(Timestamp.valueOf(tsStr)))
}

// Force to not rebase to prove the written datetime values are rebased and we will get
Expand Down Expand Up @@ -1912,12 +1927,10 @@ abstract class AvroSuite

// The file metadata indicates if it needs rebase or not, so we can always get the correct
// result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(rebased)))
}
runInMode(Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(rebased)))
}

// Force to not rebase to prove the written datetime values are rebased and we will get
Expand All @@ -1943,10 +1956,10 @@ abstract class AvroSuite

// The file metadata indicates if it needs rebase or not, so we can always get the correct
// result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
}
runInMode(Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).format("avro").load(path),
Row(Date.valueOf("1001-01-01")))
}

// Force to not rebase to prove the written datetime values are rebased and we will get
Expand Down