diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 0f193c527d3e..3ad359167bd6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -58,6 +58,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -326,18 +327,86 @@ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSch return newRecord; } - private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) { + private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field newField) { // cache the result of oldRecord.get() to save CPU expensive hash lookup Schema oldSchema = oldRecord.getSchema(); - Object fieldValue = oldSchema.getField(f.name()) == null ? null : oldRecord.get(f.name()); - if (fieldValue == null) { - if (f.defaultVal() instanceof JsonProperties.Null) { - newRecord.put(f.name(), null); + String fieldName = newField.name(); + if (oldRecord.get(fieldName) == null) { + setDefaultVal(newRecord, newField); + } else { + if (newField.schema().equals(oldSchema.getField(fieldName).schema())) { + newRecord.put(fieldName, oldRecord.get(fieldName)); } else { - newRecord.put(f.name(), f.defaultVal()); + newRecord.put(fieldName, rewriteEvolvedFields(oldRecord.get(fieldName), newField.schema())); } + } + } + + private static void setDefaultVal(GenericRecord newRecord, Schema.Field f) { + if (f.defaultVal() instanceof JsonProperties.Null) { + newRecord.put(f.name(), null); } else { - newRecord.put(f.name(), fieldValue); + newRecord.put(f.name(), f.defaultVal()); + } + } + + /* + *
+   *  OldRecord:                     NewRecord:
+   *      field1 : String                field1 : String
+   *      field2 : record                field2 : record
+   *         field_21 : string              field_21 : string
+   *         field_22 : Integer             field_22 : Integer
+   *      field3: Integer                   field_23 : String
+   *                                        field_24 : Integer
+   *                                     field3: Integer
+   * 
+ *

+ * When a nested record has changed/evolved, newRecord.put(field2, oldRecord.get(field2)), is not sufficient. + * Requires a deep-copy/rewrite of the evolved field. + */ + private static Object rewriteEvolvedFields(Object datum, Schema newSchema) { + switch (newSchema.getType()) { + case RECORD: + if (!(datum instanceof GenericRecord)) { + return datum; + } + GenericRecord record = (GenericRecord) datum; + // if schema of the record being rewritten does not match + // with the new schema, some nested records with schema change + // will require rewrite. + if (!record.getSchema().equals(newSchema)) { + GenericRecord newRecord = new GenericData.Record(newSchema); + for (Schema.Field f : newSchema.getFields()) { + if (record.get(f.name()) == null) { + setDefaultVal(newRecord, f); + } else { + newRecord.put(f.name(), rewriteEvolvedFields(record.get(f.name()), f.schema())); + } + } + return newRecord; + } + return datum; + case UNION: + Integer idx = (newSchema.getTypes().get(0).getType() == Schema.Type.NULL) ? 1 : 0; + return rewriteEvolvedFields(datum, newSchema.getTypes().get(idx)); + case ARRAY: + List arrayValue = (List)datum; + List arrayCopy = new GenericData.Array( + arrayValue.size(), newSchema); + for (Object obj : arrayValue) { + arrayCopy.add(rewriteEvolvedFields(obj, newSchema.getElementType())); + } + return arrayCopy; + case MAP: + Map map = (Map)datum; + Map mapCopy = new HashMap<>(map.size()); + for (Map.Entry entry : map.entrySet()) { + mapCopy.put(entry.getKey(), rewriteEvolvedFields(entry.getValue(), newSchema.getValueType())); + } + return mapCopy; + default: + return datum; } } diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 8887cfe3d300..9bc4ae168e3d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.exception.SchemaCompatibilityException; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Test; @@ -236,4 +237,81 @@ public void testGetNestedFieldVal() { } } + @Test + public void testRewriteToEvolvedNestedRecord() throws Exception { + // schema definition for inner record + Schema nestedSchema = SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id").endRecord(); + Schema evolvedNestedSchema = SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id") + .optionalString("color_name").endRecord(); + + // schema definition for outer record + Schema recordSchema = SchemaBuilder.record("outer_rec").fields().requiredDouble("timestamp") + .requiredString("_row_key").requiredString("non_pii_col").name("color_rec").type(nestedSchema) + .noDefault().requiredString("pii_col").endRecord(); + Schema evolvedRecordSchema = SchemaBuilder.record("outer_rec").fields().requiredDouble("timestamp") + .requiredString("_row_key").requiredString("non_pii_col").name("color_rec").type(evolvedNestedSchema) + .noDefault().requiredString("pii_col").endRecord(); + + // populate inner record, with fewer fields + GenericRecord nestedRec = new GenericData.Record(nestedSchema); + nestedRec.put("color_id", 55.5); + + // populate outer record + GenericRecord rec = new GenericData.Record(recordSchema); + rec.put("timestamp", 3.5); + rec.put("_row_key", "key1"); + rec.put("non_pii_col", "val1"); + rec.put("color_rec", nestedRec); + rec.put("pii_col", "val2"); + + // rewrite record with less number of fields into an evolved record (with optional fields added). + try { + GenericRecord newRecord = HoodieAvroUtils.rewriteRecord(rec, evolvedRecordSchema); + assertEquals("val2", newRecord.get("pii_col")); + assertEquals(null, ((GenericRecord)newRecord.get("color_rec")).get("color_name")); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false, "Failed to rewrite Record"); + } + + } + + @Test + public void testRewriteToShorterRecord() throws Exception { + // schema definition for inner record + Schema nestedSchema = SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id").endRecord(); + Schema largerNestedSchema = SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id") + .requiredString("color_name").endRecord(); + + // schema definition for outer record + Schema recordSchema = SchemaBuilder.record("outer_rec").fields().requiredDouble("timestamp") + .requiredString("_row_key").requiredString("non_pii_col").name("color_rec").type(nestedSchema) + .noDefault().requiredString("pii_col").endRecord(); + Schema largerRecordSchema = SchemaBuilder.record("outer_rec").fields().requiredDouble("timestamp") + .requiredString("_row_key").requiredString("non_pii_col").name("color_rec").type(largerNestedSchema) + .noDefault().requiredString("pii_col").endRecord(); + + // populate larger inner record + GenericRecord nestedRec = new GenericData.Record(largerNestedSchema); + nestedRec.put("color_id", 55.5); + nestedRec.put("color_name", "blue"); + + // populate outer record, with larger inner record + GenericRecord largerRec = new GenericData.Record(largerRecordSchema); + largerRec.put("timestamp", 3.5); + largerRec.put("_row_key", "key1"); + largerRec.put("non_pii_col", "val1"); + largerRec.put("color_rec", nestedRec); + largerRec.put("pii_col", "val2"); + + // rewrite record with larger inner record to record with shorter inner record. + try { + GenericRecord shorterRec = HoodieAvroUtils.rewriteRecord(largerRec, recordSchema); + assertEquals("val2", shorterRec.get("pii_col")); + assertEquals(null, ((GenericRecord)shorterRec.get("color_rec")).get("color_name")); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false, "Failed to rewrite Record"); + } + } }