diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 30c2325e775b4..1751fc37944ae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -103,7 +103,6 @@ import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; import static org.apache.hudi.avro.AvroSchemaUtils.isNullable; import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema; -import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema; import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros; import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; @@ -490,32 +489,9 @@ public static GenericRecord stitchRecords(GenericRecord left, GenericRecord righ * TODO: See if we can always pass GenericRecord instead of SpecificBaseRecord in some cases. */ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSchema) { - GenericRecord newRecord = new GenericData.Record(newSchema); boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase; - for (Schema.Field f : newSchema.getFields()) { - if (!(isSpecificRecord && isMetadataField(f.name()))) { - copyOldValueOrSetDefault(oldRecord, newRecord, f); - } - } - return newRecord; - } - - public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) { - GenericRecord newRecord = new GenericData.Record(newSchema); - for (Schema.Field f : newSchema.getFields()) { - copyOldValueOrSetDefault(genericRecord, newRecord, f); - } - // do not preserve FILENAME_METADATA_FIELD - newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName); - return newRecord; - } - - // TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function. - public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) { - GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>()); - // do not preserve FILENAME_METADATA_FIELD - newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName); - return newRecord; + Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldRecord.getSchema(), newSchema, Collections.emptyMap(), new LinkedList<>(), isSpecificRecord); + return (GenericData.Record) newRecord; } /** @@ -539,34 +515,6 @@ public static GenericRecord removeFields(GenericRecord record, Set field return rewriteRecord(record, newSchema); } - private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) { - Schema oldSchema = oldRecord.getSchema(); - Field oldSchemaField = oldSchema.getField(field.name()); - Object fieldValue = oldSchemaField == null ? null : oldRecord.get(oldSchemaField.pos()); - - if (fieldValue != null) { - // In case field's value is a nested record, we have to rewrite it as well - Object newFieldValue; - if (fieldValue instanceof GenericRecord) { - GenericRecord record = (GenericRecord) fieldValue; - // May return null when use rewrite - String recordFullName = record.getSchema().getFullName(); - String fullName = recordFullName != null ? recordFullName : oldSchemaField.name(); - newFieldValue = rewriteRecord(record, resolveUnionSchema(field.schema(), fullName)); - } else { - newFieldValue = fieldValue; - } - newRecord.put(field.pos(), newFieldValue); - } else if (field.defaultVal() instanceof JsonProperties.Null) { - newRecord.put(field.pos(), null); - } else { - if (!isNullable(field.schema()) && field.defaultVal() == null) { - throw new SchemaCompatibilityException("Field " + field.name() + " has no default value and is null in old record"); - } - newRecord.put(field.pos(), field.defaultVal()); - } - } - /** * Generate a reader schema off the provided writeSchema, to just project out the provided columns. */ @@ -930,7 +878,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr } // try to get real schema for union type Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); - Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames); + Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames, false); // validation is recursive so it only needs to be called on the original input if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) { throw new SchemaCompatibilityException( @@ -939,7 +887,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr return newRecord; } - private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames) { + private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames, boolean skipMetadataFields) { switch (newSchema.getType()) { case RECORD: if (!(oldRecord instanceof IndexedRecord)) { @@ -951,28 +899,33 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem for (int i = 0; i < fields.size(); i++) { Schema.Field field = fields.get(i); String fieldName = field.name(); - fieldNames.push(fieldName); - Schema.Field oldField = oldSchema.getField(field.name()); - if (oldField != null && !renameCols.containsKey(field.name())) { - newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, false)); - } else { - String fieldFullName = createFullName(fieldNames); - String fieldNameFromOldSchema = renameCols.get(fieldFullName); - // deal with rename - Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ? null : oldSchema.getField(fieldNameFromOldSchema); - if (oldFieldRenamed != null) { - // find rename - newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldFieldRenamed.pos()), oldFieldRenamed.schema(), fields.get(i).schema(), renameCols, fieldNames, false)); + if (!skipMetadataFields || !isMetadataField(fieldName)) { + fieldNames.push(fieldName); + Schema.Field oldField = oldSchema.getField(field.name()); + if (oldField != null && !renameCols.containsKey(field.name())) { + newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), field.schema(), renameCols, fieldNames, false)); } else { - // deal with default value - if (fields.get(i).defaultVal() instanceof JsonProperties.Null) { - newRecord.put(i, null); + String fieldFullName = createFullName(fieldNames); + String fieldNameFromOldSchema = renameCols.get(fieldFullName); + // deal with rename + Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ? null : oldSchema.getField(fieldNameFromOldSchema); + if (oldFieldRenamed != null) { + // find rename + newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldFieldRenamed.pos()), oldFieldRenamed.schema(), field.schema(), renameCols, fieldNames, false)); } else { - newRecord.put(i, fields.get(i).defaultVal()); + // deal with default value + if (field.defaultVal() instanceof JsonProperties.Null) { + newRecord.put(i, null); + } else { + if (!isNullable(field.schema()) && field.defaultVal() == null) { + throw new SchemaCompatibilityException("Field " + fieldFullName + " has no default value and is non-nullable"); + } + newRecord.put(i, field.defaultVal()); + } } } + fieldNames.pop(); } - fieldNames.pop(); } return newRecord; case ENUM: diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 256adaacc7785..ce7944d353df6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -192,7 +192,7 @@ public void testPropsPresent() { @Test public void testDefaultValue() { - GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EVOLVED_SCHEMA)); + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); rec.put("_row_key", "key1"); rec.put("non_pii_col", "val1"); rec.put("pii_col", "val2"); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index 39092887a82a5..791d22cb92a7f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -212,7 +212,7 @@ class TestHoodieSparkUtils { } catch { case e: Exception => if (HoodieSparkUtils.gteqSpark3_3) { - assertTrue(e.getMessage.contains("null value for (non-nullable) string at test_struct_name.nullableInnerStruct[nullableInnerStruct].new_nested_col")) + assertTrue(e.getMessage.contains("Field nullableInnerStruct.new_nested_col has no default value and is non-nullable")) } else { assertTrue(e.getMessage.contains("null of string in field new_nested_col of test_namespace.test_struct_name.nullableInnerStruct of union")) } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java index 995fbcaee2363..a108dec74b183 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java @@ -492,7 +492,7 @@ public void testNonNullableColumnDrop(String tableType, .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING))); assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0); } catch (Exception e) { - assertTrue(containsErrorMessage(e, "java.lang.NullPointerException", + assertTrue(containsErrorMessage(e, "has no default value and is non-nullable", "Schema validation failed due to missing field.")); } }