Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
if (!overwriteField(value, defaultValue)) {
builder.set(field, value);
} else {
builder.set(field, currentRecord.get(field.pos()));
builder.set(field, currentRecord.get(field.name()));
}
});
return Option.of(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -54,13 +55,15 @@ public void setUp() throws Exception {

@Test
public void testActiveRecords() throws IOException {
Schema writerSchema = HoodieAvroUtils.addMetadataFields(schema);

GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition1");
record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);
record1.put("city", "NY0");
record1.put("child", Arrays.asList("A"));
record1.put("child", Collections.singletonList("A"));

GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "2");
Expand All @@ -76,11 +79,38 @@ public void testActiveRecords() throws IOException {
record3.put("ts", 1L);
record3.put("_hoodie_is_deleted", false);
record3.put("city", "NY0");
record3.put("child", Arrays.asList("A"));

record3.put("child", Collections.singletonList("A"));

// same content with record1 plus metadata fields
GenericRecord record4 = createRecordWithMetadataFields(writerSchema, "1", "partition1");
record4.put("id", "1");
record4.put("partition", "partition1");
record4.put("ts", 0L);
record4.put("_hoodie_is_deleted", false);
record4.put("city", "NY0");
record4.put("child", Collections.singletonList("A"));

// same content with record2 plus metadata fields
GenericRecord record5 = createRecordWithMetadataFields(writerSchema, "2", "");
record5.put("id", "2");
record5.put("partition", "");
record5.put("ts", 1L);
record5.put("_hoodie_is_deleted", false);
record5.put("city", "NY");
record5.put("child", Collections.emptyList());

// same content with record3 plus metadata fields
GenericRecord record6 = createRecordWithMetadataFields(writerSchema, "2", "");
record6.put("id", "2");
record6.put("partition", "partition1");
record6.put("ts", 1L);
record6.put("_hoodie_is_deleted", false);
record6.put("city", "NY0");
record6.put("child", Collections.singletonList("A"));

OverwriteNonDefaultsWithLatestAvroPayload payload1 = new OverwriteNonDefaultsWithLatestAvroPayload(record1, 1);
OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 2);
OverwriteNonDefaultsWithLatestAvroPayload payload5 = new OverwriteNonDefaultsWithLatestAvroPayload(record5, 2);
assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2);

Expand All @@ -94,6 +124,19 @@ public void testActiveRecords() throws IOException {
IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema).get();
assertEquals(combinedVal2, record3);
assertNotSame(combinedVal2, record3);

// the real case in production is: the current record to be combined includes the metadata fields,
// the payload record could include the metadata fields (for compaction) or not (for normal writer path).

// case1: validate normal writer path
IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, schema).get();
assertEquals(combinedVal3, record3);
assertNotSame(combinedVal3, record3);

// case2: validate compaction path
IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, writerSchema).get();
assertEquals(combinedVal4, record6);
assertNotSame(combinedVal4, record6);
}

@Test
Expand Down Expand Up @@ -164,4 +207,14 @@ public void testNullColumn() throws IOException {
OverwriteNonDefaultsWithLatestAvroPayload payload2 = new OverwriteNonDefaultsWithLatestAvroPayload(record2, 1);
assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3);
}

private static GenericRecord createRecordWithMetadataFields(Schema schema, String recordKey, String partitionPath) {
GenericRecord record = new GenericData.Record(schema);
record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, "001");
record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, "123");
record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
record.put(HoodieRecord.FILENAME_METADATA_FIELD, "file1");
return record;
}
}