diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 7a08d1542f53a..a9d67a0d723ef 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -480,9 +480,10 @@ private Pair>> fetchFromSourc } boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); + List partitionColumns = getPartitionColumns(keyGenerator, props); JavaRDD avroRDD = avroRDDOptional.get(); JavaRDD records = avroRDD.map(record -> { - GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, getPartitionColumns(keyGenerator, props)) : record; + GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, partitionColumns) : record; HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),