Skip to content

Conversation

@yihua
Copy link
Contributor

@yihua yihua commented Oct 18, 2025

Describe the issue this Pull Request addresses

Summary and Changelog

Impact

Risk Level

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Oct 18, 2025
case LONG:
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn' get why we need this flag skipLogicalTimestampEvolution, we should always rewrite the field if the logical type mismatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding, previously, AvroSchemaCompatibility#calculateCompatibility does not validate the logical timestamp evolution before, so timestamp micros to timestamp millis can happen which leads to precision loss, and such schema evolution should not be allowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, for handling the timestamp issue this PR addresses, the ingestion writer needs to rewrite the schema from timestamp micros to millis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is true, but there is actually also another case where we do need to support micros -> millis:
If the user has a transformer but are using avro writer, which is still standard, then when we are in spark, we will always be in micros, so when we convert the spark back to avro, it will be in micros. But then if the target schema specifies millis, we need to convert micros to millis.

Copy link
Contributor

@danny0405 danny0405 Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so timestamp micros to timestamp millis can happen which leads to precision loss, and such schema evolution should not be allowed

This may be right, but at least type promotion like ts(3) to ts(6) should be always allowed? Did't really get the flag skipLogicalTimestampEvolution or in which case we can skip the evolution. And the code here did handles the case for micors -> millis conversion, if it is disallowed, we should forbidden it?

.withDocumentation("Enables support for Schema Evolution feature");

public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ALLOW_LOGICAL_EVOLUTION = ConfigProperty
.key("hoodie.schema.evolution.allow.logical.evolution")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this flag? timestamp-millis to/from timestamp-micros should always be feasible in schema evolution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the other thread, timestamp-micros to timestamp-millis should not be allowed as it loses precision.

Copy link
Contributor

@danny0405 danny0405 Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why we need this flag if the precision loss is never allowed in schema evolution. Just abadon the converion in the code?

BTW, not only timestamp type got logical types in avro schema right? and I didn't see it works for preventing precision loss from this name. At least type promotion should be allowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flag can be added in an independent PR, not to be coupled with the logical timestamp fix, to avoid confusion.

On master, the timestamp-micros to timestamp-millis type change is allowed although it should not be, given that it incurs precision loss. Thus for new table on table version 9, such type change in the schema should be validated and disallowed.

However, for the logical timestamp fix to work, the Hudi streamer needs to automatically change the field type from timestamp-micros (because of the regression) to timestamp-millis based on the target schema in the schema provider. So such field type change needs to be allowed for table version 8 and below, where the field is incorrectly changed from timestamp-millis in the target schema from the schema provider to timestamp-micros in the table schema due to the regression.

this.readRecords++;
if (this.promotedSchema.isPresent()) {
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, this.promotedSchema.get());
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, this.promotedSchema.get(), skipLogicalTimestampEvolution);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we only got problems for Parquets, so avro logs also got mismatch precision for timestamp type and it's values? the avro schema in the log block head comes from the table schema which should be correct right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a parquet problem, it's anything ingested with deltastreamer. When the issue happens, the table schema still matches the data schema

if (isTimestampMicros(fileType) && isTimestampMillis(tableType)) {
columnsToMultiply.add(path);
} else if (isLong(fileType) && isLocalTimestampMillis(tableType)) {
columnsToMultiply.add(path);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new breaking case to handle?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not new, I brought it up a bunch of times. In the doc we have rows to show what is happening

Cast(expr, dec, if (needTimeZone) timeZoneId else None)
case (StringType, DateType) =>
Cast(expr, DateType, if (needTimeZone) timeZoneId else None)
case (LongType, TimestampNTZType) => expr // @ethan I think we just want a no-op here?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I kind of get it. Is this because the local timestamp or TimestampNTZType is written as Long type in parquet before? Also, there is no regression micros in schema vs millis in values for TimestampNTZType for published Hudi releases correct? If so, there is no need for conversion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you go to the codegen for casting, they don't support long->timestampntz but spark has natural handling. But I got rid of this, because now we will repair the data on this case. If we want to protect when repair is disabled, we can actually make a better change: org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper.isDataTypeEqual we can add a case for (TimesatmpNTZType, LongType) => true, so that way we won't even need to use the schema evolution paths for this col

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think avoiding going through the schema evolution read path would be preferred to avoid the additional overhead from the read path.

case LONG:
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding, previously, AvroSchemaCompatibility#calculateCompatibility does not validate the logical timestamp evolution before, so timestamp micros to timestamp millis can happen which leads to precision loss, and such schema evolution should not be allowed.

case LONG:
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, for handling the timestamp issue this PR addresses, the ingestion writer needs to rewrite the schema from timestamp micros to millis.

})
}

def recursivelyApplyMultiplication(expr: Expression, columnPath: String, dataType: DataType): Expression = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can change HoodieParquetReadSupport and adding a read support implementation for Avro parquet reader for handling the millis interpretation, which is one layer below the current approach? Would that incur less overhead than the projection?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I was able to do so and it will infact incur less overhead. My original approach was to spoof the schema read from the parquet footer, but I thought it was too deep into the parquet-java hadoop stuff to work. But after trying it out, it seems like that readsupport requested schema is all that needs to change for it to work

@yihua yihua added this to the release-1.1.0 milestone Oct 20, 2025
override def init(context: InitContext): ReadContext = {
val readContext = super.init(context)
val requestedParquetSchema = readContext.getRequestedSchema
val requestedParquetSchema = SchemaRepair.repairLogicalTypes(readContext.getRequestedSchema, tableSchemaOpt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is fix for the Avro parquet reader? Also, the Hive reader needs a fix too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be ensured that the readContext.getRequestedSchema coming from the parquet footer?

if (!existingTableSchema.isPresent()) {
return;
}
boolean allowLogicalEvolutions = config.shouldAllowLogicalEvolutions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like we discussed: https://github.com/apache/hudi/pull/14120/files#r2448965924, for V9 table, the flag allowLogicalEvolutions should always be false while for V8 and below, it should be true to allow the fix to work.

So that we can get rid of the option config.shouldAllowLogicalEvolutions() and just decide by table version?


if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, readerSchema)) {
this.reader = new GenericDatumReader<>(writerSchema, writerSchema);
Schema repairedWriterSchema = AvroSchemaRepair.repairLogicalTypes(writerSchema, readerSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the writer schema and reader schema are both schema from the log header which is actually the same, the fix seems not working as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then how does the existing evolution fixes like recordNeedsRewriteForExtendedAvroTypePromotion work then? I will validate to see if the fix is working, but the schemas should be able to be different

Copy link
Contributor

@danny0405 danny0405 Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, looks like if there is no schema evolution, the read schema is right:

private Option<Schema> getTargetReaderSchemaForBlock() {
, otherwise the read schema would be empty and set as the header schema in the HoodieDataBlock constructors.

And we should invoke AvroSchemaRepair.repairLogicalTypes( after checking recordNeedsRewriteForExtendedAvroTypePromotion as true because micros to millis check is added there?

}

// if exists read columns, we need to filter columns.
List<String> readColNames = Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(conf));
Copy link
Contributor

@danny0405 danny0405 Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this code never works before? looks like it servers for Hive queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, it's just redundant because we already read the footer in the fg reader. So we were doing the repair in the fg reader, and then the repair was getting undone because here we get the footer from the schema again

|| filePath.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension());
Schema avroFileSchema = isParquetOrOrc ? HoodieIOFactory.getIOFactory(storage)
.getFileFormatUtils(filePath).readAvroSchema(storage, filePath) : dataSchema;
Schema avroFileSchema = AvroSchemaRepair.repairLogicalTypes(isParquetOrOrc ? HoodieIOFactory.getIOFactory(storage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it isn't parquet or orc, there is even no need to call AvroSchemaRepair.repairLogicalTypes

repairedFields.add(repaired);
}

return Schema.createRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. there is no need to instantiate a new schema if the timestmap type matches in precision;
  2. we better intern the schema if is new generated

@danny0405
Copy link
Contributor

create_schema_only_when_necessary_for_schema_repair.patch
@jonvex , here is the patch to address the schema repair instantiation issue.

@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Oct 23, 2025
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@jonvex
Copy link
Contributor

jonvex commented Oct 26, 2025

switching to #14161

@yihua yihua closed this Oct 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-1.1.0 size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants