Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ public void write(GenericRecord oldRecord) {
}
try {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
}
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
Expand All @@ -112,9 +112,9 @@ public List<WriteStatus> close() {
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
Comment thread
xushiyan marked this conversation as resolved.
Outdated
}
insertRecordsWritten++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.common.model;

import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -56,7 +55,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
if (recordBytes.length == 0) {
return Option.empty();
}
HoodieConfig hoodieConfig = new HoodieConfig(properties);

GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);

// Null check is needed here to support schema evolution. The record in storage may be from old schema where
Expand All @@ -68,17 +67,27 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
/*
* We reached a point where the value is disk is older than the incoming record.
*/
eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord, hoodieConfig
.getString(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true));
eventTime = updateEventTime(incomingRecord, properties);

/*
* Now check if the incoming record is a delete record.
*/
if (isDeleteRecord(incomingRecord)) {
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
} else {
return Option.of(incomingRecord);
}
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
eventTime = updateEventTime(incomingRecord, properties);

return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
}

private static Option<Object> updateEventTime(GenericRecord record, Properties properties) {
return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public void testActiveRecords() throws IOException {
assertEquals(payload1.preCombine(payload2, props), payload2);
assertEquals(payload2.preCombine(payload1, props), payload2);

assertEquals(record1, payload1.getInsertValue(schema).get());
assertEquals(record2, payload2.getInsertValue(schema).get());
assertEquals(record1, payload1.getInsertValue(schema, props).get());
assertEquals(record2, payload2.getInsertValue(schema, props).get());

assertEquals(payload1.combineAndGetUpdateValue(record2, schema, props).get(), record2);
assertEquals(payload2.combineAndGetUpdateValue(record1, schema, props).get(), record2);
Expand All @@ -103,8 +103,8 @@ public void testDeletedRecord() throws IOException {
assertEquals(payload1.preCombine(payload2, props), payload2);
assertEquals(payload2.preCombine(payload1, props), payload2);

assertEquals(record1, payload1.getInsertValue(schema).get());
assertFalse(payload2.getInsertValue(schema).isPresent());
assertEquals(record1, payload1.getInsertValue(schema, props).get());
assertFalse(payload2.getInsertValue(schema, props).isPresent());

assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, props).get(), delRecord1);
assertFalse(payload2.combineAndGetUpdateValue(record1, schema, props).isPresent());
Expand Down Expand Up @@ -142,4 +142,20 @@ public void testGetEventTimeInMetadata(long eventTime) throws IOException {
assertEquals(eventTime,
Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
}

@ParameterizedTest
@ValueSource(longs = {1L, 1612542030000L})
public void testGetEventTimeInMetadataForInserts(long eventTime) throws IOException {
GenericRecord record = new GenericData.Record(schema);

record.put("id", "1");
record.put("partition", "partition0");
record.put("ts", eventTime);
record.put("_hoodie_is_deleted", false);
DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, eventTime);
payload.getInsertValue(schema, props);
assertTrue(payload.getMetadata().isPresent());
assertEquals(eventTime,
Long.parseLong(payload.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
}
}