Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation._
import org.apache.hudi.AvroConversionUtils.getAvroSchemaWithDefaults
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
Expand Down Expand Up @@ -67,12 +68,7 @@ import scala.util.{Failure, Success, Try}

trait HoodieFileSplit {}

case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) {

def this(structTypeSchema: StructType) =
this(structTypeSchema, convertToAvroSchema(structTypeSchema).toString)

}
case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None)

case class HoodieTableState(tablePath: String,
latestCommitTimestamp: Option[String],
Expand Down Expand Up @@ -106,6 +102,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

protected lazy val resolver: Resolver = sparkSession.sessionState.analyzer.resolver

protected def tableName: String = metaClient.getTableConfig.getTableName

protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
protected lazy val jobConf = new JobConf(conf)

Expand Down Expand Up @@ -162,12 +160,13 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
}

val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
val avroSchema = internalSchemaOpt.map { is =>
AvroInternalSchemaConverter.convert(is, "schema")
AvroInternalSchemaConverter.convert(is, namespace + "." + name)
} orElse {
specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
} orElse {
schemaSpec.map(convertToAvroSchema)
schemaSpec.map(s => convertToAvroSchema(s, tableName))
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
Expand Down Expand Up @@ -356,7 +355,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and
// could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions
// w/ more than 2 types are involved)
val sourceSchema = prunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema)
val sourceSchema = prunedDataSchema.map(s => convertToAvroSchema(s, tableName)).getOrElse(tableAvroSchema)
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns)

Expand Down Expand Up @@ -622,8 +621,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)

(partitionSchema,
new HoodieTableSchema(prunedDataStructSchema),
new HoodieTableSchema(prunedRequiredSchema))
HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema, tableName).toString),
HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema, tableName).toString))
} else {
(StructType(Nil), tableSchema, requiredSchema)
}
Expand All @@ -650,8 +649,11 @@ object HoodieBaseRelation extends SparkAdapterSupport {
def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file)
}

def convertToAvroSchema(structSchema: StructType): Schema =
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record")
def convertToAvroSchema(structSchema: StructType, tableName: String ): Schema = {
val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
val avroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, recordName, namespace)
getAvroSchemaWithDefaults(avroSchema, structSchema)
}
Copy link
Contributor Author

@xiarixiaoyao xiarixiaoyao Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qidian99 keep default value for avro schema,
as we need do schema evolution. this modify also fix the bug #7915

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao i still don't understand why we need to set defaults in the schema. Can you please elaborate on that one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,
schemaConverters.toAvroType will lost default value. see #2765
In the schema evolution scenario, the default value is very important, avroSchema cares about this。

eg: If we add a new column newCol: string to the table, the default value of newCol will be null
after schemaConverters.toAvroType , the default vaule of newCol will be lost
now if we use this schema to read old avro log(not contains column newCol), avro will complain that there is no default value, and throw exception.
#7915 The root cause of this pr is that we lost the default value in the conversion process


def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
import org.apache.hudi.HoodieBootstrapRelation.validate
import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -119,9 +120,9 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,

val bootstrapDataFileReader = createBaseFileReader(
spark = sqlContext.sparkSession,
dataSchema = new HoodieTableSchema(bootstrapDataFileSchema),
dataSchema = new HoodieTableSchema(bootstrapDataFileSchema, convertToAvroSchema(bootstrapDataFileSchema, tableName).toString),
partitionSchema = partitionSchema,
requiredDataSchema = new HoodieTableSchema(requiredBootstrapDataFileSchema),
requiredDataSchema = new HoodieTableSchema(requiredBootstrapDataFileSchema, convertToAvroSchema(requiredBootstrapDataFileSchema, tableName).toString),
// NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with
// a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files
filters = if (requiredSkeletonFileSchema.isEmpty) filters else Seq(),
Expand All @@ -136,11 +137,11 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,

val boostrapSkeletonFileReader = createBaseFileReader(
spark = sqlContext.sparkSession,
dataSchema = new HoodieTableSchema(skeletonSchema),
dataSchema = new HoodieTableSchema(skeletonSchema, convertToAvroSchema(skeletonSchema, tableName).toString),
// NOTE: Here we specify partition-schema as empty since we don't need Spark to inject partition-values
// parsed from the partition-path
partitionSchema = StructType(Seq.empty),
requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema),
requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema, convertToAvroSchema(requiredSkeletonFileSchema, tableName).toString),
// NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with
// a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files
filters = if (requiredBootstrapDataFileSchema.isEmpty) filters else Seq(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext,
StructType(requiredDataSchema.structTypeSchema.fields
.filterNot(f => unusedMandatoryColumnNames.contains(f.name)))

new HoodieTableSchema(prunedStructSchema)
HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString)
}

val requiredSchemaReaderSkipMerging = createBaseFileReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,38 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
}
})
}

test("Test decimal type") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| ff decimal(38, 10)
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 10.0")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

spark.sql(s"update $tableName set price = 22 where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 22.0, 1000)
)
}
}
}