Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 20 additions & 60 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
Expand Down Expand Up @@ -490,32 +489,9 @@ public static GenericRecord stitchRecords(GenericRecord left, GenericRecord righ
* TODO: See if we can always pass GenericRecord instead of SpecificBaseRecord in some cases.
*/
public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSchema) {
GenericRecord newRecord = new GenericData.Record(newSchema);
boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
for (Schema.Field f : newSchema.getFields()) {
if (!(isSpecificRecord && isMetadataField(f.name()))) {
copyOldValueOrSetDefault(oldRecord, newRecord, f);
}
}
return newRecord;
}

public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = new GenericData.Record(newSchema);
for (Schema.Field f : newSchema.getFields()) {
copyOldValueOrSetDefault(genericRecord, newRecord, f);
}
// do not preserve FILENAME_METADATA_FIELD
newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName);
return newRecord;
}

// TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
// do not preserve FILENAME_METADATA_FIELD
newRecord.put(HoodieRecord.FILENAME_META_FIELD_ORD, fileName);
return newRecord;
Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldRecord.getSchema(), newSchema, Collections.emptyMap(), new LinkedList<>(), isSpecificRecord);
return (GenericData.Record) newRecord;
}

/**
Expand All @@ -539,34 +515,6 @@ public static GenericRecord removeFields(GenericRecord record, Set<String> field
return rewriteRecord(record, newSchema);
}

private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) {
Schema oldSchema = oldRecord.getSchema();
Field oldSchemaField = oldSchema.getField(field.name());
Object fieldValue = oldSchemaField == null ? null : oldRecord.get(oldSchemaField.pos());

if (fieldValue != null) {
// In case field's value is a nested record, we have to rewrite it as well
Object newFieldValue;
if (fieldValue instanceof GenericRecord) {
GenericRecord record = (GenericRecord) fieldValue;
// May return null when use rewrite
String recordFullName = record.getSchema().getFullName();
String fullName = recordFullName != null ? recordFullName : oldSchemaField.name();
newFieldValue = rewriteRecord(record, resolveUnionSchema(field.schema(), fullName));
} else {
newFieldValue = fieldValue;
}
newRecord.put(field.pos(), newFieldValue);
} else if (field.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(field.pos(), null);
} else {
if (!isNullable(field.schema()) && field.defaultVal() == null) {
throw new SchemaCompatibilityException("Field " + field.name() + " has no default value and is null in old record");
}
newRecord.put(field.pos(), field.defaultVal());
}
}

/**
* Generate a reader schema off the provided writeSchema, to just project out the provided columns.
*/
Expand Down Expand Up @@ -930,7 +878,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
}
// try to get real schema for union type
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames);
Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames, false);
// validation is recursive so it only needs to be called on the original input
if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
Expand All @@ -939,7 +887,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
return newRecord;
}

private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames, boolean skipMetadataFields) {
switch (newSchema.getType()) {
case RECORD:
if (!(oldRecord instanceof IndexedRecord)) {
Expand All @@ -951,24 +899,36 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
String fieldName = field.name();
if (skipMetadataFields && isMetadataField(fieldName)) {
if (field.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(i, null);
} else {
newRecord.put(i, field.defaultVal());
}
continue;
}

fieldNames.push(fieldName);
Schema.Field oldField = oldSchema.getField(field.name());
if (oldField != null && !renameCols.containsKey(field.name())) {
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames, false));
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), field.schema(), renameCols, fieldNames, false));
} else {
String fieldFullName = createFullName(fieldNames);
String fieldNameFromOldSchema = renameCols.get(fieldFullName);
// deal with rename
Schema.Field oldFieldRenamed = fieldNameFromOldSchema == null ? null : oldSchema.getField(fieldNameFromOldSchema);
if (oldFieldRenamed != null) {
// find rename
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldFieldRenamed.pos()), oldFieldRenamed.schema(), fields.get(i).schema(), renameCols, fieldNames, false));
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldFieldRenamed.pos()), oldFieldRenamed.schema(), field.schema(), renameCols, fieldNames, false));
} else {
// deal with default value
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
if (field.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(i, null);
} else {
newRecord.put(i, fields.get(i).defaultVal());
if (!isNullable(field.schema()) && field.defaultVal() == null) {
throw new SchemaCompatibilityException("Field " + fieldFullName + " has no default value and is non-nullable");
}
newRecord.put(i, field.defaultVal());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testPropsPresent() {

@Test
public void testDefaultValue() {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EVOLVED_SCHEMA));
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
rec.put("_row_key", "key1");
rec.put("non_pii_col", "val1");
rec.put("pii_col", "val2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class TestHoodieSparkUtils {
} catch {
case e: Exception =>
if (HoodieSparkUtils.gteqSpark3_3) {
assertTrue(e.getMessage.contains("null value for (non-nullable) string at test_struct_name.nullableInnerStruct[nullableInnerStruct].new_nested_col"))
assertTrue(e.getMessage.contains("Field nullableInnerStruct.new_nested_col has no default value and is non-nullable"))
} else {
assertTrue(e.getMessage.contains("null of string in field new_nested_col of test_namespace.test_struct_name.nullableInnerStruct of union"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ public void testNonNullableColumnDrop(String tableType,
.stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0);
} catch (Exception e) {
assertTrue(containsErrorMessage(e, "java.lang.NullPointerException",
assertTrue(containsErrorMessage(e, "has no default value and is non-nullable",
"Schema validation failed due to missing field."));
}
}
Expand Down