From 49682cb523a772d4dc6b6abee8944f4e6d2607f2 Mon Sep 17 00:00:00 2001 From: TengHuo Date: Thu, 24 Nov 2022 10:35:41 +0800 Subject: [PATCH] [HUDI-5271] fix issue inconsistent reader and writer schema in HoodieAvroDataBlock Co-authored-by: voonhous --- .../org/apache/hudi/HoodieBaseRelation.scala | 14 +++-- .../hudi/MergeOnReadSnapshotRelation.scala | 2 +- .../apache/spark/sql/hudi/TestMorTable.scala | 62 +++++++++++++++++++ 3 files changed, 71 insertions(+), 7 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMorTable.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index afc0781eb1b0e..067ebc9b9e83c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -159,7 +159,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } orElse { specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) } orElse { - schemaSpec.map(convertToAvroSchema) + schemaSpec.map(schema => convertToAvroSchema(schema, tableConfig.getTableName)) } getOrElse { Try(schemaResolver.getTableAvroSchema) match { case Success(schema) => schema @@ -330,7 +330,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and // could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions // w/ more than 2 types are involved) - val sourceSchema = optimizerPrunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema) + val sourceSchema = optimizerPrunedDataSchema.map(schema => convertToAvroSchema(schema, tableConfig.getTableName)).getOrElse(tableAvroSchema) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns) @@ -630,8 +630,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema) (partitionSchema, - HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString), - HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString)) + HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema, tableConfig.getTableName).toString), + HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema, tableConfig.getTableName).toString)) } else { (StructType(Nil), tableSchema, requiredSchema) } @@ -659,8 +659,10 @@ object HoodieBaseRelation extends SparkAdapterSupport { def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to) - def convertToAvroSchema(structSchema: StructType): Schema = - sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record") + def convertToAvroSchema(structSchema: StructType, tableName: String): Schema = { + val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) + AvroConversionUtils.convertStructTypeToAvroSchema(structSchema, structName, namespace) + } def getPartitionPath(fileStatus: FileStatus): Path = fileStatus.getPath.getParent diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 1d0d533e5bb81..50de64ee28e62 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -165,7 +165,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, StructType(requiredDataSchema.structTypeSchema.fields .filterNot(f => superfluousColumnNames.contains(f.name))) - HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString) + HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableConfig.getTableName).toString) } val requiredSchemaReaderSkipMerging = createBaseFileReader( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMorTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMorTable.scala new file mode 100644 index 0000000000000..4d6e0e46b5dbe --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMorTable.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestMorTable extends HoodieSparkSqlTestBase { + test("Test Insert Into MOR table") { + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long, + | test_decimal_col decimal(25, 4) + |) using hudi + |options + |( + | type = 'mor' + | ,primaryKey = 'id' + | ,hoodie.index.type = 'INMEMORY' + |) + | tblproperties (primaryKey = 'id') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + | """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1', 10, 1000, 1.0, "2021-01-05"), + | (2, 'a2', 20, 2000, 2.0, "2021-01-06"), + | (3, 'a3', 30, 3000, 3.0, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, cast(test_decimal_col AS string), dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "1.0000", "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2.0000", "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "3.0000", "2021-01-07") + ) + } + } +}