diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java index 5f1a1ef5576dc..a197c91da946b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java @@ -59,10 +59,9 @@ public JavaRDD> deduplicateRecords(JavaRDD> reco }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") T reducedData = (T) rec1.getData().preCombine(rec2.getData()); - // we cannot allow the user to change the key or partitionPath, since that will affect - // everything - // so pick it from one of the records. - return new HoodieRecord(rec1.getKey(), reducedData); + HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); + + return new HoodieRecord(reducedKey, reducedData); }, parallelism).map(Tuple2::_2); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index bbb40488bb04e..f0f9b2c0e46af 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -241,6 +241,7 @@ private void testDeduplication( when(index.isGlobal()).thenReturn(true); List> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect(); assertEquals(1, dedupedRecs.size()); + assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); // non-Global dedup should be done based on both recordKey and partitionPath