diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 5e4b445dfc85e..5a588eafa5f3f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -89,10 +89,15 @@ private static Option updateEventTime(GenericRecord record, Properties p boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); + String eventTimeField = properties + .getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY); + if (eventTimeField == null) { + return Option.empty(); + } return Option.ofNullable( HoodieAvroUtils.getNestedFieldVal( record, - properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), + eventTimeField, true, consistentLogicalTimestampEnabled) ); @@ -118,14 +123,18 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path * and need to be dealt with separately. */ + String orderField = properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); + if (orderField == null) { + return true; + } boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue, - properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), + orderField, true, consistentLogicalTimestampEnabled); Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord, - properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), + orderField, true, consistentLogicalTimestampEnabled); return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java index 87d4e746d81cb..c0896e723ea07 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -143,6 +143,26 @@ public void testGetEventTimeInMetadata(long eventTime) throws IOException { Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY))); } + @Test + public void testEmptyProperty() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition0"); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + + DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(Option.of(record1)); + Properties properties = new Properties(); + payload.getInsertValue(schema, properties); + payload.combineAndGetUpdateValue(record2, schema, properties); + } + @ParameterizedTest @ValueSource(longs = {1L, 1612542030000L}) public void testGetEventTimeInMetadataForInserts(long eventTime) throws IOException {