diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index cb02c59a69011..eb92a2f67328f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation._ +import org.apache.hudi.AvroConversionUtils.getAvroSchemaWithDefaults import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.utils.SparkInternalSchemaConverter @@ -67,12 +68,7 @@ import scala.util.{Failure, Success, Try} trait HoodieFileSplit {} -case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) { - - def this(structTypeSchema: StructType) = - this(structTypeSchema, convertToAvroSchema(structTypeSchema).toString) - -} +case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) case class HoodieTableState(tablePath: String, latestCommitTimestamp: Option[String], @@ -106,6 +102,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val resolver: Resolver = sparkSession.sessionState.analyzer.resolver + protected def tableName: String = metaClient.getTableConfig.getTableName + protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) protected lazy val jobConf = new JobConf(conf) @@ -162,12 +160,13 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } } + val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) val avroSchema = internalSchemaOpt.map { is => - AvroInternalSchemaConverter.convert(is, "schema") + AvroInternalSchemaConverter.convert(is, namespace + "." + name) } orElse { specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) } orElse { - schemaSpec.map(convertToAvroSchema) + schemaSpec.map(s => convertToAvroSchema(s, tableName)) } getOrElse { Try(schemaResolver.getTableAvroSchema) match { case Success(schema) => schema @@ -356,7 +355,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and // could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions // w/ more than 2 types are involved) - val sourceSchema = prunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema) + val sourceSchema = prunedDataSchema.map(s => convertToAvroSchema(s, tableName)).getOrElse(tableAvroSchema) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns) @@ -622,8 +621,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema) (partitionSchema, - new HoodieTableSchema(prunedDataStructSchema), - new HoodieTableSchema(prunedRequiredSchema)) + HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema, tableName).toString), + HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema, tableName).toString)) } else { (StructType(Nil), tableSchema, requiredSchema) } @@ -650,8 +649,11 @@ object HoodieBaseRelation extends SparkAdapterSupport { def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file) } - def convertToAvroSchema(structSchema: StructType): Schema = - sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record") + def convertToAvroSchema(structSchema: StructType, tableName: String ): Schema = { + val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) + val avroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, recordName, namespace) + getAvroSchemaWithDefaults(avroSchema, structSchema) + } def getPartitionPath(fileStatus: FileStatus): Path = fileStatus.getPath.getParent diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index 5c58c10493d1b..e0f46a5e77ec6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -21,6 +21,7 @@ package org.apache.hudi import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} import org.apache.hudi.HoodieBootstrapRelation.validate +import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.spark.rdd.RDD @@ -119,9 +120,9 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext, val bootstrapDataFileReader = createBaseFileReader( spark = sqlContext.sparkSession, - dataSchema = new HoodieTableSchema(bootstrapDataFileSchema), + dataSchema = new HoodieTableSchema(bootstrapDataFileSchema, convertToAvroSchema(bootstrapDataFileSchema, tableName).toString), partitionSchema = partitionSchema, - requiredDataSchema = new HoodieTableSchema(requiredBootstrapDataFileSchema), + requiredDataSchema = new HoodieTableSchema(requiredBootstrapDataFileSchema, convertToAvroSchema(requiredBootstrapDataFileSchema, tableName).toString), // NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with // a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files filters = if (requiredSkeletonFileSchema.isEmpty) filters else Seq(), @@ -136,11 +137,11 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext, val boostrapSkeletonFileReader = createBaseFileReader( spark = sqlContext.sparkSession, - dataSchema = new HoodieTableSchema(skeletonSchema), + dataSchema = new HoodieTableSchema(skeletonSchema, convertToAvroSchema(skeletonSchema, tableName).toString), // NOTE: Here we specify partition-schema as empty since we don't need Spark to inject partition-values // parsed from the partition-path partitionSchema = StructType(Seq.empty), - requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema), + requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema, convertToAvroSchema(requiredSkeletonFileSchema, tableName).toString), // NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with // a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files filters = if (requiredBootstrapDataFileSchema.isEmpty) filters else Seq(), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 94168755cbfcd..290df5bebdb37 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -190,7 +190,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, StructType(requiredDataSchema.structTypeSchema.fields .filterNot(f => unusedMandatoryColumnNames.contains(f.name))) - new HoodieTableSchema(prunedStructSchema) + HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString) } val requiredSchemaReaderSkipMerging = createBaseFileReader( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index 8937e8595d389..4fb1555c50ffa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -204,4 +204,38 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { } }) } + + test("Test decimal type") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | ff decimal(38, 10) + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 10.0") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + spark.sql(s"update $tableName set price = 22 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 22.0, 1000) + ) + } + } }