Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class HadoopTableReader(
fillPartitionKeys(partValues, mutableRow)

val tableProperties = tableDesc.getProperties
val avroSchemaProperties = Seq(AvroTableProperties.SCHEMA_LITERAL,
AvroTableProperties.SCHEMA_URL).map(_.getPropName())

// Create local references so that the outer object isn't serialized.
val localTableDesc = tableDesc
Expand All @@ -256,7 +258,7 @@ class HadoopTableReader(
// properties.
val props = new Properties(tableProperties)
partProps.asScala.filterNot { case (k, _) =>
k == AvroTableProperties.SCHEMA_LITERAL.getPropName() && tableProperties.containsKey(k)
avroSchemaProperties.contains(k) && tableProperties.containsKey(k)
}.foreach {
case (key, value) => props.setProperty(key, value)
}
Expand Down
12 changes: 12 additions & 0 deletions sql/hive/src/test/resources/schemaWithOneField.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"namespace": "test",
"name": "some_schema",
"type": "record",
"fields": [
{
"name": "col2",
"type": "string"
}
]
}

16 changes: 16 additions & 0 deletions sql/hive/src/test/resources/schemaWithTwoFields.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"namespace": "test",
"name": "some_schema",
"type": "record",
"fields": [
{
"name": "col1",
"type": "string",
"default": "col1_default"
},
{
"name": "col2",
"type": "string"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.{TestHiveSingleton, TestHiveSparkSession}
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, TestHiveSparkSession}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
Expand Down Expand Up @@ -1853,104 +1853,132 @@ class HiveDDLSuite
}
}

test("SPARK-26836: support Avro schema evolution (add column)") {
test("SPARK-34370: support Avro schema evolution (add column with avro.schema.url)") {
checkAvroSchemaEvolutionAddColumn(
s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithOneField.avsc").toURI}'",
s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithTwoFields.avsc").toURI}'")
}

test("SPARK-26836: support Avro schema evolution (add column with avro.schema.literal)") {
val originalSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
val evolvedSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col1",
| "type": "string",
| "default": "col1_default"
| },
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
checkAvroSchemaEvolutionAddColumn(
s"'avro.schema.literal'='$originalSchema'",
s"'avro.schema.literal'='$evolvedSchema'")
}

private def checkAvroSchemaEvolutionAddColumn(
originalSerdeProperties: String,
evolvedSerdeProperties: String) = {
withTable("t") {
val originalSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
sql(
s"""
|CREATE TABLE t PARTITIONED BY (ds string)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema')
|WITH SERDEPROPERTIES ($originalSerdeProperties)
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
""".stripMargin)
sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')")
val evolvedSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col1",
| "type": "string",
| "default": "col1_default"
| },
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')")
sql(s"ALTER TABLE t SET SERDEPROPERTIES ($evolvedSerdeProperties)")
sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')")
checkAnswer(spark.table("t"), Row("col1_default", "col2_value", "1981-01-07")
:: Row("col1_value", "col2_value", "1983-04-27") :: Nil)
}
}

test("SPARK-26836: support Avro schema evolution (remove column)") {
test("SPARK-34370: support Avro schema evolution (remove column with avro.schema.url)") {
checkAvroSchemaEvolutionRemoveColumn(
s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithTwoFields.avsc").toURI}'",
s"'avro.schema.url'='${TestHive.getHiveFile("schemaWithOneField.avsc").toURI}'")
}

test("SPARK-26836: support Avro schema evolution (remove column with avro.schema.literal)") {
val originalSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col1",
| "type": "string",
| "default": "col1_default"
| },
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
val evolvedSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
checkAvroSchemaEvolutionRemoveColumn(
s"'avro.schema.literal'='$originalSchema'",
s"'avro.schema.literal'='$evolvedSchema'")
}

private def checkAvroSchemaEvolutionRemoveColumn(
originalSerdeProperties: String,
evolvedSerdeProperties: String) = {
withTable("t") {
val originalSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col1",
| "type": "string",
| "default": "col1_default"
| },
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
sql(
s"""
|CREATE TABLE t PARTITIONED BY (ds string)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|WITH SERDEPROPERTIES ('avro.schema.literal'='$originalSchema')
|WITH SERDEPROPERTIES ($originalSerdeProperties)
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
""".stripMargin)
sql("INSERT INTO t partition (ds='1983-04-27') VALUES ('col1_value', 'col2_value')")
val evolvedSchema =
"""
|{
| "namespace": "test",
| "name": "some_schema",
| "type": "record",
| "fields": [
| {
| "name": "col2",
| "type": "string"
| }
| ]
|}
""".stripMargin
sql(s"ALTER TABLE t SET SERDEPROPERTIES ('avro.schema.literal'='$evolvedSchema')")
sql(s"ALTER TABLE t SET SERDEPROPERTIES ($evolvedSerdeProperties)")
sql("INSERT INTO t partition (ds='1981-01-07') VALUES ('col2_value')")
checkAnswer(spark.table("t"), Row("col2_value", "1981-01-07")
:: Row("col2_value", "1983-04-27") :: Nil)
Expand Down