diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index eb9ce877fc8d2..27fd2ccf4b74f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde2.Deserializer +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.AvroTableProperties import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.io.Writable @@ -248,10 +249,15 @@ class HadoopTableReader( // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema // information) may be defined in table properties. Here we should merge table properties // and partition properties before initializing the deserializer. Note that partition - // properties take a higher priority here. For example, a partition may have a different - // SerDe as the one defined in table properties. + // properties take a higher priority here except for the Avro table properties + // to support schema evolution: in that case the properties given at table level will + // be used (for details please check SPARK-26836). + // For example, a partition may have a different SerDe as the one defined in table + // properties. val props = new Properties(tableProperties) - partProps.asScala.foreach { + partProps.asScala.filterNot { case (k, _) => + k == AvroTableProperties.SCHEMA_LITERAL.getPropName() && tableProperties.containsKey(k) + }.foreach { case (key, value) => props.setProperty(key, value) } DeserializerLock.synchronized { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 50b1dd952c61e..eab471e328e8d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1883,6 +1883,110 @@ class HiveDDLSuite } } + test("SPARK-26836: support Avro schema evolution (add column)") { + withTable("t") { + val originalSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + sql( + s""" + |CREATE TABLE t PARTITIONED BY (ds string) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema') + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + """.stripMargin) + sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')") + val evolvedSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col1", + | "type": "string", + | "default": "col1_default" + | }, + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')") + sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')") + checkAnswer(spark.table("t"), Row("col1_default", "col2_value", "1981-01-07") + :: Row("col1_value", "col2_value", "1983-04-27") :: Nil) + } + } + + test("SPARK-26836: support Avro schema evolution (remove column)") { + withTable("t") { + val originalSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col1", + | "type": "string", + | "default": "col1_default" + | }, + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + sql( + s""" + |CREATE TABLE t PARTITIONED BY (ds string) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema') + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + """.stripMargin) + sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')") + val evolvedSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')") + sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')") + checkAnswer(spark.table("t"), Row("col2_value", "1981-01-07") + :: Row("col2_value", "1983-04-27") :: Nil) + } + } + test("append data to hive serde table") { withTable("t", "t1") { Seq(1 -> "a").toDF("i", "j")