diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java index f5e5e358c5376..379c08826125a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; @@ -83,9 +84,9 @@ public I combineOnCondition( */ public I deduplicateRecords( I records, HoodieTable table, int parallelism) { - return deduplicateRecords(records, table.getIndex(), parallelism); + return deduplicateRecords(records, table.getIndex(), table.getConfig(), parallelism); } public abstract I deduplicateRecords( - I records, HoodieIndex index, int parallelism); + I records, HoodieIndex index, HoodieWriteConfig writeConfig, int parallelism); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java index 5f1a1ef5576dc..11fec01257849 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java @@ -22,10 +22,13 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.avro.Schema; import org.apache.spark.api.java.JavaRDD; +import scala.Serializable; import scala.Tuple2; /** @@ -49,21 +52,48 @@ public static SparkWriteHelper newInstance() { @Override public JavaRDD> deduplicateRecords(JavaRDD> records, HoodieIndex>, JavaRDD, JavaRDD> index, - int parallelism) { - boolean isIndexingGlobal = index.isGlobal(); - return records.mapToPair(record -> { - HoodieKey hoodieKey = record.getKey(); - // If index used is global, then records are expected to differ in their partitionPath - Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; - return new Tuple2<>(key, record); - }).reduceByKey((rec1, rec2) -> { - @SuppressWarnings("unchecked") - T reducedData = (T) rec1.getData().preCombine(rec2.getData()); - // we cannot allow the user to change the key or partitionPath, since that will affect - // everything - // so pick it from one of the records. - return new HoodieRecord(rec1.getKey(), reducedData); - }, parallelism).map(Tuple2::_2); + HoodieWriteConfig writeConfig, int parallelism) { + return new RecordDeduper(index.isGlobal(), writeConfig.getSchema(), parallelism).deduplicateRecords(records); } + /** + * Helper class to dedupe records. + * @param + */ + private static class RecordDeduper implements Serializable { + + private transient Schema schema; + private final String schemaStr; + private final boolean isIndexingGlobal; + private final int parallelism; + + public RecordDeduper(boolean isIndexingGlobal, String schemaStr, int parallelism) { + this.isIndexingGlobal = isIndexingGlobal; + this.parallelism = parallelism; + this.schemaStr = schemaStr; + } + + private Schema getSchema() { + if (null == schema) { + schema = Schema.parse(schemaStr); + } + return schema; + } + + public JavaRDD> deduplicateRecords(JavaRDD> records) { + return records.mapToPair(record -> { + HoodieKey hoodieKey = record.getKey(); + // If index used is global, then records are expected to differ in their partitionPath + Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; + return new Tuple2<>(key, record); + }).reduceByKey((rec1, rec2) -> { + @SuppressWarnings("unchecked") + T reducedData = (T) rec1.getData().preCombine(rec2.getData(), getSchema()); + // we cannot allow the user to change the key or partitionPath, since that will affect + // everything + // so pick it from one of the records. + return new HoodieRecord(rec1.getKey(), reducedData); + }, parallelism).map(Tuple2::_2); + } + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index bbb40488bb04e..1345404be931a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -236,17 +236,23 @@ private void testDeduplication( JavaRDD> records = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1); + String schemaStr = "{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\", " + + "\"name\": \"HoodieInstantInfo\", \"fields\": [{\"name\": \"commitTime\", \"type\": \"string\"}]}"; // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - List> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect(); + HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class); + when(writeConfig.getSchema()).thenReturn(schemaStr); + List> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, writeConfig, 1).collect(); assertEquals(1, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); // non-Global dedup should be done based on both recordKey and partitionPath index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(false); - dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect(); + writeConfig = mock(HoodieWriteConfig.class); + when(writeConfig.getSchema()).thenReturn(schemaStr); + dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, writeConfig, 1).collect(); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 1afdd1b59af64..833982bd743de 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -41,9 +41,23 @@ public interface HoodieRecordPayload extends Seri * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to * insert/upsert (if combining turned on in HoodieClientConfig). */ - @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + @Deprecated T preCombine(T another); + /** + * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to + * insert/upsert (if combining turned on in HoodieClientConfig). + * Any custom Record payload implementation that requires schema must override this method. + * + * @param another Record payload to combine with. + * @param schema Schema used for writing to dataset. + * @return Combined Record. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + default T preCombine(T another, Schema schema) { + return preCombine(another); + } + /** * This methods lets you write custom merging/combining logic to produce new values as a function of current value on * storage and whats contained in this object. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index e6246c48d8729..b9782ef764c59 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -111,7 +111,7 @@ protected void processNextRecord(HoodieRecord hoo if (records.containsKey(key)) { // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be // done when a delete (empty payload) is encountered before or after an insert/update. - HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData()); + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData(), readerSchema); records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); } else { // Put the record as is