Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.AvroTableProperties
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
Expand Down Expand Up @@ -248,10 +249,15 @@ class HadoopTableReader(
// SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema
// information) may be defined in table properties. Here we should merge table properties
// and partition properties before initializing the deserializer. Note that partition
// properties take a higher priority here. For example, a partition may have a different
// SerDe as the one defined in table properties.
// properties take a higher priority here except for the Avro table properties
// to support schema evolution: in that case the properties given at table level will
// be used (for details please check SPARK-26836).
// For example, a partition may have a different SerDe as the one defined in table
// properties.
val props = new Properties(tableProperties)
partProps.asScala.foreach {
partProps.asScala.filterNot { case (k, _) =>
k == AvroTableProperties.SCHEMA_LITERAL.getPropName() && tableProperties.containsKey(k)
}.foreach {
case (key, value) => props.setProperty(key, value)
}
DeserializerLock.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,110 @@ class HiveDDLSuite
}
}

test("SPARK-26836: support Avro schema evolution (add column)") {
withTable("t") {
val originalSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
sql(
s"""
|CREATE TABLE t PARTITIONED BY (ds string)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema')
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
""".stripMargin)
sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')")
val evolvedSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col1",
| "type": "string",
| "default": "col1_default"
| },
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')")
sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')")
checkAnswer(spark.table("t"), Row("col1_default", "col2_value", "1981-01-07")
:: Row("col1_value", "col2_value", "1983-04-27") :: Nil)
}
}

test("SPARK-26836: support Avro schema evolution (remove column)") {
withTable("t") {
val originalSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col1",
| "type": "string",
| "default": "col1_default"
| },
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
sql(
s"""
|CREATE TABLE t PARTITIONED BY (ds string)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema')
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
""".stripMargin)
sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')")
val evolvedSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')")
sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')")
checkAnswer(spark.table("t"), Row("col2_value", "1981-01-07")
:: Row("col2_value", "1983-04-27") :: Nil)
}
}

test("append data to hive serde table") {
withTable("t", "t1") {
Seq(1 -> "a").toDF("i", "j")
Expand Down