Subject: [PATCH] create schema only when necessary for schema repair --- Index: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java (revision 674babbddb8100d0dd4ba69f6c91557157cfcfed) +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java (revision 4dc3dac3c4673f4e145728a1e6e2e5c83708e2ef) @@ -187,13 +187,12 @@ if (new HoodieAvroDataBlockVersion(version).hasRecordCount()) { this.totalRecords = this.dis.readInt(); } - - Schema repairedWriterSchema = AvroSchemaRepair.repairLogicalTypes(writerSchema, readerSchema); - if (recordNeedsRewriteForExtendedAvroTypePromotion(repairedWriterSchema, readerSchema)) { - this.reader = new GenericDatumReader<>(repairedWriterSchema, repairedWriterSchema); + if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, readerSchema)) { + writerSchema = AvroSchemaRepair.repairLogicalTypes(writerSchema, readerSchema); + this.reader = new GenericDatumReader<>(writerSchema, writerSchema); this.promotedSchema = Option.of(readerSchema); } else { - this.reader = new GenericDatumReader<>(repairedWriterSchema, readerSchema); + this.reader = new GenericDatumReader<>(writerSchema, readerSchema); } } Index: hudi-common/src/main/java/org/apache/parquet/schema/AvroSchemaRepair.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/hudi-common/src/main/java/org/apache/parquet/schema/AvroSchemaRepair.java b/hudi-common/src/main/java/org/apache/parquet/schema/AvroSchemaRepair.java --- a/hudi-common/src/main/java/org/apache/parquet/schema/AvroSchemaRepair.java (revision 674babbddb8100d0dd4ba69f6c91557157cfcfed) +++ b/hudi-common/src/main/java/org/apache/parquet/schema/AvroSchemaRepair.java (revision 4dc3dac3c4673f4e145728a1e6e2e5c83708e2ef) @@ -19,6 +19,9 @@ package org.apache.parquet.schema; +import org.apache.hudi.avro.AvroSchemaCache; +import org.apache.hudi.avro.AvroSchemaUtils; + import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -34,60 +37,51 @@ List repairedFields = new ArrayList<>(); + boolean needsRepair = false; for (Schema.Field requestedField : requestedSchema.getFields()) { Schema.Field tableField = tableSchema.getField(requestedField.name()); - Schema.Field repaired; + Schema.Field repaired = requestedField; if (tableField != null) { repaired = repairAvroField(requestedField, tableField); - } else { - repaired = new Schema.Field(requestedField.name(), requestedField.schema(), requestedField.doc(), requestedField.defaultVal()); + if (repaired != requestedField) { + needsRepair = true; + } } repairedFields.add(repaired); } - return Schema.createRecord( - requestedSchema.getName(), - requestedSchema.getDoc(), - requestedSchema.getNamespace(), - requestedSchema.isError(), - repairedFields - ); + return needsRepair + ? AvroSchemaCache.intern(Schema.createRecord(requestedSchema.getName(), requestedSchema.getDoc(), requestedSchema.getNamespace(), requestedSchema.isError(), repairedFields)) + : requestedSchema; } private static Schema.Field repairAvroField(Schema.Field requested, Schema.Field table) { - Schema repairedSchema = repairAvroSchema(requested.schema(), table.schema()); + Schema requestedSchema = requested.schema(); + Schema repairedSchema = repairAvroSchema(requestedSchema, table.schema()); - return new Schema.Field( - requested.name(), - repairedSchema, - requested.doc(), - requested.defaultVal(), - requested.order() - ); + return repairedSchema != requestedSchema + ? new Schema.Field(requested.name(), table.schema(), requested.doc(), requested.defaultVal(), requested.order()) + : requested; } private static Schema repairAvroSchema(Schema requested, Schema table) { // Handle union types (nullable fields) if (requested.getType() == Schema.Type.UNION) { List repairedUnionTypes = new ArrayList<>(); + Schema tableNonNull = AvroSchemaUtils.resolveNullableSchema(table); + boolean needsRepair = false; for (Schema unionType : requested.getTypes()) { if (unionType.getType() == Schema.Type.NULL) { repairedUnionTypes.add(unionType); } else { - // Find corresponding non-null type in table schema - Schema tableNonNull = table; - if (table.getType() == Schema.Type.UNION) { - for (Schema tableUnionType : table.getTypes()) { - if (tableUnionType.getType() != Schema.Type.NULL) { - tableNonNull = tableUnionType; - break; - } - } + Schema repaired = repairAvroSchema(unionType, tableNonNull); + if (repaired != unionType) { + needsRepair = true; } - repairedUnionTypes.add(repairAvroSchema(unionType, tableNonNull)); + repairedUnionTypes.add(repaired); } } - return Schema.createUnion(repairedUnionTypes); + return needsRepair ? Schema.createUnion(repairedUnionTypes) : requested; } // Handle record types (nested structs) @@ -97,19 +91,17 @@ // Handle array types if (requested.getType() == Schema.Type.ARRAY && table.getType() == Schema.Type.ARRAY) { - Schema repairedElementSchema = repairAvroSchema(requested.getElementType(), table.getElementType()); - return Schema.createArray(repairedElementSchema); + return needsRepair(requested.getElementType(), table.getElementType()) ? table : requested; } // Handle map types if (requested.getType() == Schema.Type.MAP && table.getType() == Schema.Type.MAP) { - Schema repairedValueSchema = repairAvroSchema(requested.getValueType(), table.getValueType()); - return Schema.createMap(repairedValueSchema); + return needsRepair(requested.getValueType(), table.getValueType()) ? table : requested; } // Handle primitive types with logical types if (isPrimitiveType(requested) && isPrimitiveType(table)) { - return repairAvroLogicalType(requested, table); + return needsRepair(requested, table) ? table : requested; } // Default: return requested schema @@ -124,41 +116,26 @@ || type == Schema.Type.BYTES; } - private static Schema repairAvroLogicalType(Schema requested, Schema table) { + private static boolean needsRepair(Schema requested, Schema table) { LogicalType reqLogical = requested.getLogicalType(); LogicalType tblLogical = table.getLogicalType(); - boolean useTableType = false; - // Rule 1: requested is timestamp-micros, table is timestamp-millis if (reqLogical instanceof LogicalTypes.TimestampMicros && tblLogical instanceof LogicalTypes.TimestampMillis) { - useTableType = true; + return true; } // Rule 2: requested is LONG (no logical type), table is local-timestamp-millis if (reqLogical == null && requested.getType() == Schema.Type.LONG && tblLogical instanceof LogicalTypes.LocalTimestampMillis) { - useTableType = true; + return true; } // Rule 3: requested is LONG (no logical type), table is local-timestamp-micros - if (reqLogical == null + return reqLogical == null && requested.getType() == Schema.Type.LONG - && tblLogical instanceof LogicalTypes.LocalTimestampMicros) { - useTableType = true; - } - - if (useTableType) { - // Create a new schema with the table's logical type - Schema repaired = Schema.create(table.getType()); - if (tblLogical != null) { - tblLogical.addToSchema(repaired); - } - return repaired; - } else { - return requested; - } + && tblLogical instanceof LogicalTypes.LocalTimestampMicros; } } Index: hudi-hadoop-common/src/main/java/org/apache/parquet/schema/SchemaRepair.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/hudi-hadoop-common/src/main/java/org/apache/parquet/schema/SchemaRepair.java b/hudi-hadoop-common/src/main/java/org/apache/parquet/schema/SchemaRepair.java --- a/hudi-hadoop-common/src/main/java/org/apache/parquet/schema/SchemaRepair.java (revision 674babbddb8100d0dd4ba69f6c91557157cfcfed) +++ b/hudi-hadoop-common/src/main/java/org/apache/parquet/schema/SchemaRepair.java (revision 4dc3dac3c4673f4e145728a1e6e2e5c83708e2ef) @@ -36,16 +36,20 @@ static MessageType repairLogicalTypes(MessageType requestedSchema, MessageType tableSchema) { List repairedFields = new ArrayList<>(); + boolean needsRepair = false; for (Type requestedField : requestedSchema.getFields()) { Type repaired = requestedField; if (tableSchema.containsField(requestedField.getName())) { Type tableField = tableSchema.getType(requestedField.getName()); repaired = repairField(requestedField, tableField); + if (repaired != requestedField) { + needsRepair = true; + } } repairedFields.add(repaired); } - return new MessageType(requestedSchema.getName(), repairedFields); + return needsRepair ? new MessageType(requestedSchema.getName(), repairedFields) : requestedSchema; } private static Type repairField(Type requested, Type table) { @@ -59,12 +63,9 @@ MessageType nestedTbl = new MessageType(tblGroup.getName(), tblGroup.getFields()); MessageType repairedNested = repairLogicalTypes(nestedReq, nestedTbl); - return new GroupType( - reqGroup.getRepetition(), - reqGroup.getName(), - reqGroup.getLogicalTypeAnnotation(), - repairedNested.getFields() - ); + return repairedNested != nestedReq + ? new GroupType(reqGroup.getRepetition(), reqGroup.getName(), reqGroup.getLogicalTypeAnnotation(), repairedNested.getFields()) + : requested; } else { // fallback: keep requested return requested; @@ -72,11 +73,18 @@ } private static PrimitiveType repairPrimitiveType(PrimitiveType requested, PrimitiveType table) { + if (needsRepair(requested, table)) { + return Types.primitive(table.getPrimitiveTypeName(), requested.getRepetition()) + .as(table.getLogicalTypeAnnotation()) + .named(requested.getName()); + } + return requested; + } + + private static boolean needsRepair(PrimitiveType requested, PrimitiveType table) { LogicalTypeAnnotation reqLogical = requested.getLogicalTypeAnnotation(); LogicalTypeAnnotation tblLogical = table.getLogicalTypeAnnotation(); - boolean useTableType = false; - // Rule 1: requested is timestamp(micros), table is timestamp(millis) if (reqLogical instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation && tblLogical instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { @@ -88,24 +96,14 @@ && tblTs.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS && tblTs.isAdjustedToUTC() && reqTs.isAdjustedToUTC()) { - useTableType = true; + return true; } } // Rule 2: requested is LONG (no logical type), table is local-timestamp - if (reqLogical == null + return reqLogical == null && requested.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64 && tblLogical instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation - && !((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) tblLogical).isAdjustedToUTC()) { - useTableType = true; - } - - if (useTableType) { - return Types.primitive(table.getPrimitiveTypeName(), requested.getRepetition()) - .as(table.getLogicalTypeAnnotation()) - .named(requested.getName()); - } else { - return requested; - } + && !((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) tblLogical).isAdjustedToUTC(); } }