diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 921e6deb58f10..9725fb63f5553 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -49,27 +49,33 @@ import scala.collection.mutable.ArrayBuffer class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { private lazy val decimalConversions = new DecimalConversion() - def deserialize(data: Any): Any = rootCatalystType match { + private val converter: Any => Any = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - InternalRow.empty + (data: Any) => InternalRow.empty case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) val writer = getRecordWriter(rootAvroType, st, Nil) - val record = data.asInstanceOf[GenericRecord] - writer(fieldUpdater, record) - resultRow + (data: Any) => { + val record = data.asInstanceOf[GenericRecord] + writer(fieldUpdater, record) + resultRow + } case _ => val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) val fieldUpdater = new RowUpdater(tmpRow) val writer = newWriter(rootAvroType, rootCatalystType, Nil) - writer(fieldUpdater, 0, data) - tmpRow.get(0, rootCatalystType) + (data: Any) => { + writer(fieldUpdater, 0, data) + tmpRow.get(0, rootCatalystType) + } } + def deserialize(data: Any): Any = converter(data) + /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given * updater. diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index e0c734413870e..2b88be81656f9 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -47,6 +47,10 @@ import org.apache.spark.sql.types._ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) { def serialize(catalystData: Any): Any = { + converter.apply(catalystData) + } + + private val converter: Any => Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = rootCatalystType match { case st: StructType => @@ -59,13 +63,14 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: converter.apply(tmpRow, 0) } if (nullable) { - if (catalystData == null) { - null - } else { - baseConverter.apply(catalystData) - } + (data: Any) => + if (data == null) { + null + } else { + baseConverter.apply(data) + } } else { - baseConverter.apply(catalystData) + baseConverter } } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 61482ab96f3f8..5fb6d907bdc82 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -69,28 +69,34 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, private val timestampRebaseFunc = createTimestampRebaseFuncInRead( datetimeRebaseMode, "Avro") - def deserialize(data: Any): Option[Any] = rootCatalystType match { + private val converter: Any => Option[Any] = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - Some(InternalRow.empty) + (data: Any) => Some(InternalRow.empty) case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) val applyFilters = filters.skipRow(resultRow, _) val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters) - val record = data.asInstanceOf[GenericRecord] - val skipRow = writer(fieldUpdater, record) - if (skipRow) None else Some(resultRow) + (data: Any) => { + val record = data.asInstanceOf[GenericRecord] + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) + } case _ => val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) val fieldUpdater = new RowUpdater(tmpRow) val writer = newWriter(rootAvroType, rootCatalystType, Nil) - writer(fieldUpdater, 0, data) - Some(tmpRow.get(0, rootCatalystType)) + (data: Any) => { + writer(fieldUpdater, 0, data) + Some(tmpRow.get(0, rootCatalystType)) + } } + def deserialize(data: Any): Option[Any] = converter(data) + /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given * updater. diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 2397186a17fe1..36d86c1e01f05 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -57,13 +57,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))) } + def serialize(catalystData: Any): Any = { + converter.apply(catalystData) + } + private val dateRebaseFunc = createDateRebaseFuncInWrite( datetimeRebaseMode, "Avro") private val timestampRebaseFunc = createTimestampRebaseFuncInWrite( datetimeRebaseMode, "Avro") - def serialize(catalystData: Any): Any = { + private val converter: Any => Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = rootCatalystType match { case st: StructType => @@ -76,13 +80,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, converter.apply(tmpRow, 0) } if (nullable) { - if (catalystData == null) { - null - } else { - baseConverter.apply(catalystData) - } + (data: Any) => + if (data == null) { + null + } else { + baseConverter.apply(data) + } } else { - baseConverter.apply(catalystData) + baseConverter } } diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 9f3b60b8c30ca..0b609330756eb 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -72,33 +72,39 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro") - def deserialize(data: Any): Option[Any] = try { + private val converter: Any => Option[Any] = try { rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - Some(InternalRow.empty) + (_: Any) => Some(InternalRow.empty) case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) val applyFilters = filters.skipRow(resultRow, _) val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters) - val record = data.asInstanceOf[GenericRecord] - val skipRow = writer(fieldUpdater, record) - if (skipRow) None else Some(resultRow) + (data: Any) => { + val record = data.asInstanceOf[GenericRecord] + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) + } case _ => val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) val fieldUpdater = new RowUpdater(tmpRow) val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil) - writer(fieldUpdater, 0, data) - Some(tmpRow.get(0, rootCatalystType)) + (data: Any) => { + writer(fieldUpdater, 0, data) + Some(tmpRow.get(0, rootCatalystType)) + } } } catch { case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException( s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise) } + def deserialize(data: Any): Option[Any] = converter(data) + /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given * updater. diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 0f9b60c701632..ba9812b026744 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -66,13 +66,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE))) } + def serialize(catalystData: Any): Any = { + converter.apply(catalystData) + } + private val dateRebaseFunc = createDateRebaseFuncInWrite( datetimeRebaseMode, "Avro") private val timestampRebaseFunc = createTimestampRebaseFuncInWrite( datetimeRebaseMode, "Avro") - def serialize(catalystData: Any): Any = { + private val converter: Any => Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = try { rootCatalystType match { @@ -90,13 +94,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise) } if (nullable) { - if (catalystData == null) { - null - } else { - baseConverter.apply(catalystData) - } + (data: Any) => + if (data == null) { + null + } else { + baseConverter.apply(data) + } } else { - baseConverter.apply(catalystData) + baseConverter } } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 0b00b6d1ab03f..5e7bab3e51fb0 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -71,33 +71,39 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro") - def deserialize(data: Any): Option[Any] = try { + private val converter: Any => Option[Any] = try { rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => - Some(InternalRow.empty) + (_: Any) => Some(InternalRow.empty) case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) val fieldUpdater = new RowUpdater(resultRow) val applyFilters = filters.skipRow(resultRow, _) val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters) - val record = data.asInstanceOf[GenericRecord] - val skipRow = writer(fieldUpdater, record) - if (skipRow) None else Some(resultRow) + (data: Any) => { + val record = data.asInstanceOf[GenericRecord] + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) + } case _ => val tmpRow = new SpecificInternalRow(Seq(rootCatalystType)) val fieldUpdater = new RowUpdater(tmpRow) val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil) - writer(fieldUpdater, 0, data) - Some(tmpRow.get(0, rootCatalystType)) + (data: Any) => { + writer(fieldUpdater, 0, data) + Some(tmpRow.get(0, rootCatalystType)) + } } } catch { case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException( s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise) } + def deserialize(data: Any): Option[Any] = converter(data) + /** * Creates a writer to write avro values to Catalyst values at the given ordinal with the given * updater. diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index dfa970f573145..450d9d73465ce 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -65,13 +65,17 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE))) } + def serialize(catalystData: Any): Any = { + converter.apply(catalystData) + } + private val dateRebaseFunc = createDateRebaseFuncInWrite( datetimeRebaseMode, "Avro") private val timestampRebaseFunc = createTimestampRebaseFuncInWrite( datetimeRebaseMode, "Avro") - def serialize(catalystData: Any): Any = { + private val converter: Any => Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = try { rootCatalystType match { @@ -89,13 +93,14 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise) } if (nullable) { - if (catalystData == null) { - null - } else { - baseConverter.apply(catalystData) - } + (data: Any) => + if (data == null) { + null + } else { + baseConverter.apply(data) + } } else { - baseConverter.apply(catalystData) + baseConverter } }