Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Aug 10, 2022

Change Logs

Currently, HoodieParquetReader is not specifying projected schema properly when reading Parquet files which ends up failing in cases when the provided schema is not equal to the schema of the file being read (even though it might be a proper projection, ie subset of it), like in common CDC case when column is being dropped from the RDS and the schema of the new batch is lacking the old column.

To address the original issue described in HUDI-4588, we also have to relax the constraints imposed by TableSchemaResolver.isSchemaCompatible method not allowing columns to be evolved by the way of dropping columns.

After addressing the original problem a considerable amount of new issues have been discovered including following:

  • Writer's schemas were deduced differently for different flows (for ex, Bulk Insert vs other ops)
  • Writer's schema was not reconciled against table's schemas in terms of nullability (this is asserted w/in AvroSchemaConverter, that is now invoked as part of the projection)
  • (After enabling schema validations) Incorrect schema handling w/in HoodieWriteHandle has been detected (there were 2 ambiguous schema references, set incorrectly, creating confusion)
  • (After enabling schema validations) Incorrect schema handling w/in MergeHelper impls, where writer's schema was used as a existing file's reader-schema (failing in cases when these 2 diverge)

Changes:

  1. Adding missing schema projection when reading Parquet file (using AvroParquetReader)
  2. Relaxing schema evolution constraints to allow columns to be dropped
  3. Revisiting schema reconciliation logic to make sure it's coherent
  4. Streamlining schema handling in HoodieSparkSqlWriter to make sure it's uniform for all operations (it isn't applied properly for Bulk-insert at the moment)
  5. Added comprehensive test for Basic Schema Evolution (columns being added, dropped)
  6. Fixing HoodieWriteHandle impls to properly handle writer schema and avoid duplication
  7. Fixing MergeHelper impls to properly handle schema evolution

Impact

High

There are a few critical changes taken forward by this PR:

Now, w/ incoming batches being able to drop columns (relative to the Table's existing schema), unless hoodie.datasource.write.reconcile.schema is enabled:

  • Incoming batch's schema will be taken as Writer's schema (same as before)
  • New/updated base files will be (re)written in the new schema (previously it would have failed)

This subtle change in behavior (dropped columns would not be leading to failures anymore) could open up a new set of problems where data quality issues (for ex, column is missing while it shouldn't) could trickle down into existing table.

To alleviate that we should consider flipping hoodie.datasource.write.reconcile.schema to true by default (there's already #6196 for that)

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@alexeykudinkin alexeykudinkin force-pushed the ak/prq-prj-fix branch 3 times, most recently from 378a375 to 3c4adb6 Compare August 17, 2022 04:14
@yihua yihua self-assigned this Aug 23, 2022
@yihua yihua added the priority:critical Production degraded; pipelines stalled label Aug 31, 2022
@alexeykudinkin alexeykudinkin changed the title [HUDI-4588] Fixing HoodieParquetReader to properly specify projected schema when reading Parquet file [HUDI-4588][HUDI-4472] Fixing HoodieParquetReader to properly specify projected schema when reading Parquet file Sep 3, 2022
SchemaCompatibility.SchemaPairCompatibility result =
SchemaCompatibility.checkReaderWriterCompatibility(newSchema, prevSchema);
return result.getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically the schema check is relaxed here without field-by-field comparison, with only read compatibility check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline: previously we had custom implementation that was specifically disallowing columns being dropped. Since we're relaxing this constraint in this PR there's no reason for us to continue maintain that custom logic. Instead we can solely rely on Avro schema compatibility checking.

Comment on lines 183 to 187
// Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always had that, this code just been moved from below to make sure we handle the schema in the same way for bulk-insert (w/ row-writing) as we do for any other operation

Comment on lines 188 to 226
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean

val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)

val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration)

val writerSchema: Schema = latestTableSchemaOpt match {
// In case table schema is empty we're just going to use the source schema as a
// writer's schema. No additional handling is required
case None => sourceSchema
// Otherwise, we need to make sure we reconcile incoming and latest table schemas
case Some(latestTableSchema) =>
if (reconcileSchema) {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))
}

if (internalSchemaOpt.isDefined) {
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getName)
} else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
// In case schema reconciliation is enabled and source and latest table schemas
// are compatible (as defined by [[TableSchemaResolver#isSchemaCompatible]]), then we
// will rebase incoming batch onto the table's latest schema (ie, reconcile them)
//
// NOTE: Since we'll be converting incoming batch from [[sourceSchema]] into [[latestTableSchema]]
// we're validating in that order (where [[sourceSchema]] is treated as a reader's schema,
// and [[latestTableSchema]] is treated as a writer's schema)
latestTableSchema
} else {
log.error(
s"""
|Failed to reconcile incoming batch schema with the table's one.
|Incoming schema ${sourceSchema.toString(true)}

|Table's schema ${latestTableSchema.toString(true)}

|""".stripMargin)
throw new SchemaCompatibilityException("Failed to reconcile incoming schema with the table's one")
}
} else {
// Before validating whether schemas are compatible, we need to "canonicalize" source's schema
// relative to the table's one, by doing a (minor) reconciliation of the nullability constraints:
// for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable
// in the table's one we want to proceed w/ such operation, simply relaxing such constraint in the
// source schema.
val canonicalizedSourceSchema = AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema)
// In case reconciliation is disabled, we have to validate that the source's schema
// is compatible w/ the table's latest schema, such that we're able to read existing table's
// records using [[sourceSchema]].
if (TableSchemaResolver.isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema)) {
canonicalizedSourceSchema
} else {
log.error(
s"""
|Incoming batch schema is not compatible with the table's one.
|Incoming schema ${canonicalizedSourceSchema.toString(true)}
|Table's schema ${latestTableSchema.toString(true)}
|""".stripMargin)
throw new SchemaCompatibilityException("Incoming batch schema is not compatible with the table's one")
}
}
}

validateSchemaForHoodieIsDeleted(writerSchema)

// NOTE: PLEASE READ CAREFULLY BEFORE CHANGING THIS
// We have to register w/ Kryo all of the Avro schemas that might potentially be used to decode
// records into Avro format. Otherwise, Kryo wouldn't be able to apply an optimization allowing
// it to avoid the need to ser/de the whole schema along _every_ Avro record
val targetAvroSchemas = sourceSchema +: writerSchema +: latestTableSchemaOpt.toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this schema reconcilation logic is not specific to Spark. Could we extract it and allow Flink/Java engine to leverage it as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! Am going to extract it as a standalone method (we can take the unification as a follow-up)

@yihua yihua added the engine:spark Spark integration label Sep 9, 2022
@alexeykudinkin alexeykudinkin force-pushed the ak/prq-prj-fix branch 3 times, most recently from 7d71695 to c4b6bb8 Compare September 15, 2022 20:48
@alexeykudinkin
Copy link
Contributor Author

As outlined in #6196 (comment), this PR should go hand in hand w/ the #6196, which flips Schema Reconciliation to be enabled by default (entailing that every incoming batch would be reconciled relative to the table's schema)

@alexeykudinkin alexeykudinkin force-pushed the ak/prq-prj-fix branch 2 times, most recently from a0ebc2a to d19a4db Compare September 21, 2022 00:08
@yihua
Copy link
Contributor

yihua commented Sep 21, 2022

@alexeykudinkin could you check the CI failure?

val trimmedSourceDF = removeMetaFields(sourceDF)

// Supply original record's Avro schema to provided to [[ExpressionPayload]]
writeParams += (PAYLOAD_RECORD_AVRO_SCHEMA ->
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzx140 this is what i'm referring to

@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException {
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieWriteConfig writeConfig = table.getConfig();
Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece had to change b/c we're now setting up a projection schema when reading from Parquet and that fails unless projection schema is matching the file-schema. This unfortunately has not been the case as this code was evolving over the years, so it forced me to untangle flows here to fix it.

Changes:

  • Abstracted and extracted Advanced Schema Evolution into standalone method (this code doesn't change, except it was abstracted to return a function rewriting the record instead of returning multiple values)
  • Simplified logic pertaining to record rewriting to be shared by all flows (bootstrapping, etc)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 def looks better now.


public InternalSchema(List<Field> columns) {
this(DEFAULT_VERSION_ID, columns);
public InternalSchema(RecordType recordType) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary to allow to preserve Avro record's name during conversion to InternalSchema and back.

Changes

  • Rebasing ctors to accept RecordType instead of list of fields
  • Inlining redundant ctors

log.info(s"Registered avro schema : ${schema.toString(true)}")
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
writerSchema: Schema): (Boolean, common.util.Option[String]) = {
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes make sure that Row-writing Bulk Insert flow is relying on the same schema handling as any other operation:

Changes

  • Rebased to rely on writerSchema passed in externally
  • Cleaning up duplication

var writeParams = parameters +
(OPERATION.key -> operation) +
(HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString) +
(HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key -> getTableSchema.toString) +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary to align and streamline schema handling for MERGE INTO operation.

Changes:

  • Make sure it passes expected record's schema to the ExpressionPayload (via PAYLOAD_RECORD_AVRO_SCHEMA prop)
  • Make sure proper configs are specified to disable inapplicable operations such as: a) schema reconciliation, b) schema canonicalization (null/non-null), c) schema validation (since input to MI might not necessarily be matching the target table

@alexeykudinkin alexeykudinkin force-pushed the ak/prq-prj-fix branch 5 times, most recently from 2441295 to f873230 Compare October 8, 2022 06:01
public static final ConfigProperty<String> AVRO_SCHEMA_VALIDATE_ENABLE = ConfigProperty
.key("hoodie.avro.schema.validate")
.defaultValue("false")
.defaultValue("true")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is flipped to default to make sure proper schema validation are run for every operation on the table

*/
public abstract void runMerge(HoodieTable<T, I, K, O> table, HoodieMergeHandle<T, I, K, O> upsertHandle) throws IOException;

protected GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader<GenericRecord> gReader, GenericDatumWriter<GenericRecord> gWriter,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not necessary anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the purpose of this method?
my understanding is.
incase of Bootstrapping feature, we need to read records from old file using the schema w/ which it was written and then read back using newer schema thats of interest to us.

also,
Within composeSchemaEvolutionTransformer() as per this patch, we could return Option.empty() when there is no InternalSchema even for bootstrap scenarios. So, in that case, there is no rewrite of records happening. can you help throw some light please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes, this method was rewriting records, but it was using ser/de for that. Now, records are being rewritten using rewriteRecord* utils.

  2. composeSchemaEvolutionTransformer only refers to schema evolution case. Bootstrap case is handled as before (it wasn't calling into this method previously)

* </ol>
*
*/
public class AvroSchemaCompatibility {
Copy link
Contributor Author

@alexeykudinkin alexeykudinkin Oct 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context: Avro requires at all times that schema's names have to match in order for them to be counted as compatible. Provided that only Avro bears the names on the schemas themselves (Spark does not, for ex) this makes for ex, some schemas converted from Spark's [[StructType]] incompatible w/ Avro

This code is mostly borrowed as is from Avro 1.10 w/ the following critical adjustments:

  1. Schema names now are only checked in following 2 cases:
    1.a In case it's a top-level schema
    1.b In case schema is enclosed into a union (in which case its name might be used for reverse-lookup)

  2. Default value access is adapted to work for both Avro 1.8.x as well as 1.10.x

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid to copy the clazz directly here, it's hard to maintain for following avro upgrade, say Avro may have some bug fix for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of such borrowings myself, but unfortunately there's no other way for us to extend it to modify it's behavior: we need to avoid namespace matching, b/c we do numerous rounds of conversions Avro > Spark > InternalSchema and back and b/c of that we can't reconstruct the namespaces that sometimes used/gen'd by Avro.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@alexeykudinkin
Copy link
Contributor Author

CI is green:

Screenshot 2022-11-24 at 1 31 03 AM

https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/results?buildId=13222&view=results

@alexeykudinkin alexeykudinkin merged commit b6124ff into apache:master Nov 24, 2022
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
… path (apache#6358)

Currently, HoodieParquetReader is not specifying projected schema properly when reading Parquet files which ends up failing in cases when the provided schema is not equal to the schema of the file being read (even though it might be a proper projection, ie subset of it), like in common CDC case when column is being dropped from the RDS and the schema of the new batch is lacking the old column.

To address the original issue described in HUDI-4588, we also have to relax the constraints imposed by TableSchemaResolver.isSchemaCompatible method not allowing columns to be evolved by the way of dropping columns.

After addressing the original problem a considerable amount of new issues have been discovered including following:

 - Writer's schemas were deduced differently for different flows (for ex, Bulk Insert vs other ops)
 - Writer's schema was not reconciled against table's schemas in terms of nullability (this is asserted w/in AvroSchemaConverter, that is now invoked as part of the projection)
 - (After enabling schema validations) Incorrect schema handling w/in HoodieWriteHandle has been detected (there were 2 ambiguous schema references, set incorrectly, creating confusion)
 - (After enabling schema validations) Incorrect schema handling w/in MergeHelper impls, where writer's schema was used as a existing file's reader-schema (failing in cases when these 2 diverge)

Changes:

 - Adding missing schema projection when reading Parquet file (using AvroParquetReader)
 - Relaxing schema evolution constraints to allow columns to be dropped
 - Revisiting schema reconciliation logic to make sure it's coherent
 - Streamlining schema handling in HoodieSparkSqlWriter to make sure it's uniform for all operations (it isn't applied properly for Bulk-insert at the moment)
 - Added comprehensive test for Basic Schema Evolution (columns being added, dropped)
 - Fixing HoodieWriteHandle impls to properly handle writer schema and avoid duplication
 - Fixing MergeHelper impls to properly handle schema evolution
@zyclove
Copy link

zyclove commented Jun 9, 2023

@aditiwari01 @ad1happy2go @alexeykudinkin alexeykudinkin

Execuse me.
This error #8904

I use hudi 0.13.1 and set hoodie.datasource.write.reconcile.schema=true; but, spark-sql with hudi query still error.

Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:374)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:223)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:198)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:114)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:73)
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:464)
at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
at org.apache.hudi.LogFileIterator.(Iterators.scala:92)
at org.apache.hudi.RecordMergingFileIterator.(Iterators.scala:172)
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:100)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroTypeException: Found string, expecting union
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:199)
at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:149)
at org.apache.hudi.common.util.MappingIterator.next(MappingIterator.java:40)
at org.apache.hudi.common.util.ClosableIteratorWithSchema.next(ClosableIteratorWithSchema.java:53)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:630)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:670)
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:365)
... 25 more

}

latestTableSchemaFromCommitMetadata.orElse {
getCatalogTable(spark, tableId).map { catalogTable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we added this fix to poll the catalog if its table does not have any valid commits.
we should just remove this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:ingest Ingestion into Hudi area:schema Schema evolution and data types engine:spark Spark integration priority:blocker Production down; release blocker

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

9 participants