diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 27fd2ccf4b74f..96949a33a137a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -240,6 +240,8 @@ class HadoopTableReader( fillPartitionKeys(partValues, mutableRow) val tableProperties = tableDesc.getProperties + val avroSchemaProperties = Seq(AvroTableProperties.SCHEMA_LITERAL, + AvroTableProperties.SCHEMA_URL).map(_.getPropName()) // Create local references so that the outer object isn't serialized. val localTableDesc = tableDesc @@ -256,7 +258,7 @@ class HadoopTableReader( // properties. val props = new Properties(tableProperties) partProps.asScala.filterNot { case (k, _) => - k == AvroTableProperties.SCHEMA_LITERAL.getPropName() && tableProperties.containsKey(k) + avroSchemaProperties.contains(k) && tableProperties.containsKey(k) }.foreach { case (key, value) => props.setProperty(key, value) } diff --git a/sql/hive/src/test/resources/schemaWithOneField.avsc b/sql/hive/src/test/resources/schemaWithOneField.avsc new file mode 100644 index 0000000000000..e6e2431707f6e --- /dev/null +++ b/sql/hive/src/test/resources/schemaWithOneField.avsc @@ -0,0 +1,12 @@ +{ + "namespace": "test", + "name": "some_schema", + "type": "record", + "fields": [ + { + "name": "col2", + "type": "string" + } + ] +} + diff --git a/sql/hive/src/test/resources/schemaWithTwoFields.avsc b/sql/hive/src/test/resources/schemaWithTwoFields.avsc new file mode 100644 index 0000000000000..3d1d24cfa2e96 --- /dev/null +++ b/sql/hive/src/test/resources/schemaWithTwoFields.avsc @@ -0,0 +1,16 @@ +{ + "namespace": "test", + "name": "some_schema", + "type": "record", + "fields": [ + { + "name": "col1", + "type": "string", + "default": "col1_default" + }, + { + "name": "col2", + "type": "string" + } + ] +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 9f5b0458ade13..d8a3bf622740b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} import org.apache.spark.sql.hive.orc.OrcFileOperator -import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHiveSparkSession} +import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, TestHiveSparkSession} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -1853,104 +1853,132 @@ class HiveDDLSuite } } - test("SPARK-26836: support Avro schema evolution (add column)") { + test("SPARK-34370: support Avro schema evolution (add column with avro.schema.url)") { + checkAvroSchemaEvolutionAddColumn( + s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithOneField.avsc").toURI}'", + s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithTwoFields.avsc").toURI}'") + } + + test("SPARK-26836: support Avro schema evolution (add column with avro.schema.literal)") { + val originalSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + val evolvedSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col1", + | "type": "string", + | "default": "col1_default" + | }, + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + checkAvroSchemaEvolutionAddColumn( + s"'avro.schema.literal'='$originalSchema'", + s"'avro.schema.literal'='$evolvedSchema'") + } + + private def checkAvroSchemaEvolutionAddColumn( + originalSerdeProperties: String, + evolvedSerdeProperties: String) = { withTable("t") { - val originalSchema = - """ - |{ - | "namespace": "test", - | "name": "some_schema", - | "type": "record", - | "fields": [ - | { - | "name": "col2", - | "type": "string" - | } - | ] - |} - """.stripMargin sql( s""" |CREATE TABLE t PARTITIONED BY (ds string) |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' - |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema') + |WITH SERDEPROPERTIES ($originalSerdeProperties) |STORED AS |INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' """.stripMargin) sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')") - val evolvedSchema = - """ - |{ - | "namespace": "test", - | "name": "some_schema", - | "type": "record", - | "fields": [ - | { - | "name": "col1", - | "type": "string", - | "default": "col1_default" - | }, - | { - | "name": "col2", - | "type": "string" - | } - | ] - |} - """.stripMargin - sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')") + sql(s"ALTER TABLE t SET SERDEPROPERTIES ($evolvedSerdeProperties)") sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')") checkAnswer(spark.table("t"), Row("col1_default", "col2_value", "1981-01-07") :: Row("col1_value", "col2_value", "1983-04-27") :: Nil) } } - test("SPARK-26836: support Avro schema evolution (remove column)") { + test("SPARK-34370: support Avro schema evolution (remove column with avro.schema.url)") { + checkAvroSchemaEvolutionRemoveColumn( + s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithTwoFields.avsc").toURI}'", + s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithOneField.avsc").toURI}'") + } + + test("SPARK-26836: support Avro schema evolution (remove column with avro.schema.literal)") { + val originalSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col1", + | "type": "string", + | "default": "col1_default" + | }, + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + val evolvedSchema = + """ + |{ + | "namespace": "test", + | "name": "some_schema", + | "type": "record", + | "fields": [ + | { + | "name": "col2", + | "type": "string" + | } + | ] + |} + """.stripMargin + checkAvroSchemaEvolutionRemoveColumn( + s"'avro.schema.literal'='$originalSchema'", + s"'avro.schema.literal'='$evolvedSchema'") + } + + private def checkAvroSchemaEvolutionRemoveColumn( + originalSerdeProperties: String, + evolvedSerdeProperties: String) = { withTable("t") { - val originalSchema = - """ - |{ - | "namespace": "test", - | "name": "some_schema", - | "type": "record", - | "fields": [ - | { - | "name": "col1", - | "type": "string", - | "default": "col1_default" - | }, - | { - | "name": "col2", - | "type": "string" - | } - | ] - |} - """.stripMargin sql( s""" |CREATE TABLE t PARTITIONED BY (ds string) |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' - |WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema') + |WITH SERDEPROPERTIES ($originalSerdeProperties) |STORED AS |INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' """.stripMargin) sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')") - val evolvedSchema = - """ - |{ - | "namespace": "test", - | "name": "some_schema", - | "type": "record", - | "fields": [ - | { - | "name": "col2", - | "type": "string" - | } - | ] - |} - """.stripMargin - sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')") + sql(s"ALTER TABLE t SET SERDEPROPERTIES ($evolvedSerdeProperties)") sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')") checkAnswer(spark.table("t"), Row("col2_value", "1981-01-07") :: Row("col2_value", "1983-04-27") :: Nil)