diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 4003a07de7f0b..11cf59a74c37c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; @@ -234,6 +235,12 @@ public class HoodieCompactionConfig extends HoodieConfig { + "the record payload class to merge records in the log against each other, merge again with the base file and " + "produce the final record to be written after compaction."); + public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty + .key("hoodie.compaction.merge.class") + .defaultValue(HoodieAvroRecordMerge.class.getName()) + .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord " + + "types, such as Spark records or Flink records."); + public static final ConfigProperty COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty .key("hoodie.compaction.lazy.block.read") .defaultValue("true") @@ -691,6 +698,11 @@ public Builder withPayloadClass(String payloadClassName) { return this; } + public Builder withMergeClass(String mergeClass) { + compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass); + return this; + } + public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) { compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d18238fa4b6b8..cbf75b3efb91f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FileSystemRetryConfig; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -123,6 +124,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); + public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty + .key("hoodie.datasource.write.merge.class") + .defaultValue(HoodieAvroRecordMerge.class.getName()) + .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord " + + "types, such as Spark records or Flink records."); + public static final ConfigProperty KEYGENERATOR_CLASS_NAME = ConfigProperty .key("hoodie.datasource.write.keygenerator.class") .noDefaultValue() @@ -1324,6 +1331,10 @@ public String getPayloadClass() { return getString(HoodieCompactionConfig.PAYLOAD_CLASS_NAME); } + public String getMergeClass() { + return getString(HoodieCompactionConfig.MERGE_CLASS_NAME); + } + public int getTargetPartitionsPerDayBasedCompaction() { return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 3b6bda787780b..fff4aa6d054c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -338,10 +338,7 @@ public void write(HoodieRecord oldRecord) { // writing the first record. So make a copy of the record to be merged HoodieRecord hoodieRecord = keyToNewRecords.get(key).newInstance(); try { - Option combinedRecord = - hoodieRecord.combineAndGetUpdateValue(oldRecord, - schema, - props); + Option combinedRecord = merge.combineAndGetUpdateValue(oldRecord, hoodieRecord, schema, props); if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) { // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 09f8831f8b71d..9ee6e0884dc04 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -27,7 +27,9 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -59,6 +61,7 @@ public abstract class HoodieWriteHandle extends HoodieIOHandle table, int parallelism) { - return deduplicateRecords(records, table.getIndex(), parallelism); + HoodieMerge merge = HoodieRecordUtils.loadMerge(table.getConfig().getMergeClass()); + return deduplicateRecords(records, table.getIndex(), parallelism, merge); } public abstract I deduplicateRecords( - I records, HoodieIndex index, int parallelism); + I records, HoodieIndex index, int parallelism, HoodieMerge merge); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index e24cd71ab64c4..57bb511c634ee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -23,13 +23,13 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; public class HoodieWriteHelper extends BaseWriteHelper>, HoodieData, HoodieData, R> { - private HoodieWriteHelper() { } @@ -49,7 +49,7 @@ protected HoodieData> tag(HoodieData> dedupedRec @Override public HoodieData> deduplicateRecords( - HoodieData> records, HoodieIndex index, int parallelism) { + HoodieData> records, HoodieIndex index, int parallelism, HoodieMerge merge) { boolean isIndexingGlobal = index.isGlobal(); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); @@ -58,10 +58,9 @@ public HoodieData> deduplicateRecords( return Pair.of(key, record); }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") - HoodieRecord reducedRec = rec2.preCombine(rec1); - HoodieKey reducedKey = rec1.getData().equals(reducedRec) ? rec1.getKey() : rec2.getKey(); - - return (HoodieRecord) reducedRec.newInstance(reducedKey); + HoodieRecord reducedRecord = merge.preCombine(rec1, rec2); + HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey(); + return reducedRecord.newInstance(reducedKey); }, parallelism).map(Pair::getRight); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index d2ecd09a23212..a9525760261cd 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -19,6 +19,12 @@ package org.apache.hudi.testutils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -44,13 +50,6 @@ import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.metadata.HoodieTableMetadataWriter; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.orc.CompressionKind; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index cc639a4c1656c..ef1047564a591 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; @@ -87,13 +88,14 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism) { + List> records, HoodieIndex index, int parallelism, HoodieMerge merge) { // If index used is global, then records are expected to differ in their partitionPath Map>> keyedRecords = records.stream() .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> { - @SuppressWarnings("unchecked") final HoodieRecord reducedRec = rec2.preCombine(rec1); + @SuppressWarnings("unchecked") + final HoodieRecord reducedRec = merge.preCombine(rec1, rec2); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java index 091c2287f19b9..3a7513a75acd4 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; @@ -54,7 +55,7 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism) { + List> records, HoodieIndex index, int parallelism, HoodieMerge merge) { boolean isIndexingGlobal = index.isGlobal(); Map>>> keyedRecords = records.stream().map(record -> { HoodieKey hoodieKey = record.getKey(); @@ -65,11 +66,11 @@ public List> deduplicateRecords( return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { @SuppressWarnings("unchecked") - HoodieRecord reducedRec = rec2.preCombine(rec1); + HoodieRecord reducedRecord = merge.preCombine(rec1,rec2); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. - return (HoodieRecord) reducedRec.newInstance(rec1.getKey()); + return reducedRecord.newInstance(rec1.getKey()); }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList()); } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala index a3b9c210b9835..636dd299fe032 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction} -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableProjection, SubqueryExpression, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan} import org.apache.spark.sql.types.StructType @@ -42,6 +42,23 @@ trait HoodieCatalystExpressionUtils { GenerateUnsafeProjection.generate(targetExprs, attrs) } + /** + * Generates instance of [[MutableProjection]] projecting row of one [[StructType]] into another [[StructType]] + * + * NOTE: No safety checks are executed to validate that this projection is actually feasible, + * it's up to the caller to make sure that such projection is possible. + * + * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if + * B is a subset of A + */ + def generateMutableProjection(from: StructType, to: StructType): MutableProjection = { + val attrs = from.toAttributes + val attrsMap = attrs.map(attr => (attr.name, attr)).toMap + val targetExprs = to.fields.map(f => attrsMap(f.name)) + + GenerateMutableProjection.generate(targetExprs, attrs) + } + /** * Parses and resolves expression against the attributes of the given table schema. * diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index cebf3145bfd28..e8a4a82026528 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -38,12 +38,14 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; @@ -461,7 +463,8 @@ private void testDeduplication( // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + HoodieMerge merge = new HoodieAvroRecordMerge(); + List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, merge).collectAsList(); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -469,7 +472,7 @@ private void testDeduplication( // non-Global dedup should be done based on both recordKey and partitionPath index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(false); - dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, merge).collectAsList(); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 470bc723f65db..cfe8f2dafee37 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -765,9 +766,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch } switch (newSchema.getType()) { case RECORD: - if (!(oldRecord instanceof IndexedRecord)) { - throw new IllegalArgumentException("cannot rewrite record with different type"); - } + ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type"); IndexedRecord indexedRecord = (IndexedRecord) oldRecord; List fields = newSchema.getFields(); Map helper = new HashMap<>(); @@ -806,9 +805,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch } return newRecord; case ARRAY: - if (!(oldRecord instanceof Collection)) { - throw new IllegalArgumentException("cannot rewrite record with different type"); - } + ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type"); Collection array = (Collection)oldRecord; List newArray = new ArrayList(); fieldNames.push("element"); @@ -818,9 +815,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch fieldNames.pop(); return newArray; case MAP: - if (!(oldRecord instanceof Map)) { - throw new IllegalArgumentException("cannot rewrite record with different type"); - } + ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type"); Map map = (Map) oldRecord; Map newMap = new HashMap<>(); fieldNames.push("value"); @@ -836,7 +831,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch } } - private static String createFullName(Deque fieldNames) { + public static String createFullName(Deque fieldNames) { String result = ""; if (!fieldNames.isEmpty()) { List parentNames = new ArrayList<>(); @@ -971,7 +966,7 @@ private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Sche } // convert days to Date - private static java.sql.Date toJavaDate(int days) { + public static java.sql.Date toJavaDate(int days) { long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY); int timeZoneOffset; TimeZone defaultTimeZone = TimeZone.getDefault(); @@ -984,7 +979,7 @@ private static java.sql.Date toJavaDate(int days) { } // convert Date to days - private static int fromJavaDate(Date date) { + public static int fromJavaDate(Date date) { long millisUtc = date.getTime(); long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc); int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, MILLIS_PER_DAY)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java index ac2df00151649..daec2fee0338f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java @@ -48,7 +48,7 @@ public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data) { } public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data, HoodieOperation operation) { - super(key, data, operation); + super(key, data, operation, null); } public HoodieAvroIndexedRecord(HoodieRecord record) { @@ -67,11 +67,6 @@ public Option toIndexedRecord() { return Option.of(data); } - @Override - public Comparable getOrderingValue() { - throw new UnsupportedOperationException(); - } - @Override public HoodieRecord newInstance() { throw new UnsupportedOperationException(); @@ -99,16 +94,6 @@ public String getRecordKey(String keyFieldName) { .map(Object::toString).orElse(null); } - @Override - public HoodieRecord preCombine(HoodieRecord previousRecord) { - throw new UnsupportedOperationException(); - } - - @Override - public Option combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException { - return Option.empty(); - } - @Override public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException { ValidationUtils.checkState(other instanceof HoodieAvroIndexedRecord); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index 9a9011da37a82..65866238e9b5c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -21,11 +21,10 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.keygen.BaseKeyGenerator; -import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -47,7 +46,7 @@ public HoodieAvroRecord(HoodieKey key, T data) { } public HoodieAvroRecord(HoodieKey key, T data, HoodieOperation operation) { - super(key, data, operation); + super(key, data, operation, null); } public HoodieAvroRecord(HoodieRecord record) { @@ -106,34 +105,6 @@ public Option toIndexedRecord(Schema schema, Properties prop) thr // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` // is complete - // - // TODO cleanup - - // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other - // object, and may not create a new one - @Override - public HoodieRecord preCombine(HoodieRecord previousRecord) { - T picked = unsafeCast(getData().preCombine(previousRecord.getData())); - if (picked instanceof HoodieMetadataPayload) { - // NOTE: HoodieMetadataPayload return a new payload - return new HoodieAvroRecord<>(getKey(), picked, getOperation()); - } - return picked.equals(getData()) ? this : previousRecord; - } - - // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could - // be combined - @Override - public Option combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException { - Option previousRecordAvroPayload = previousRecord.toIndexedRecord(schema, props); - if (!previousRecordAvroPayload.isPresent()) { - return Option.empty(); - } - - return getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props) - .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload)); - } - @Override public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException { ValidationUtils.checkState(other instanceof HoodieAvroRecord); @@ -141,7 +112,7 @@ public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema wr (GenericRecord) toIndexedRecord(readerSchema, new Properties()).get(), (GenericRecord) other.toIndexedRecord(readerSchema, new Properties()).get(), writerSchema); - return new HoodieAvroRecord(getKey(), instantiateRecordPayloadWrapper(mergedPayload, getPrecombineValue(getData())), getOperation()); + return new HoodieAvroRecord(getKey(), instantiateRecordPayloadWrapper(mergedPayload, getOrderingValue()), getOperation()); } @Override @@ -234,20 +205,10 @@ public boolean shouldIgnore(Schema schema, Properties prop) throws IOException { @Nonnull private T instantiateRecordPayloadWrapper(Object combinedAvroPayload, Comparable newPreCombineVal) { return unsafeCast( - ReflectionUtils.loadPayload( + HoodieRecordUtils.loadPayload( getData().getClass().getCanonicalName(), new Object[]{combinedAvroPayload, newPreCombineVal}, GenericRecord.class, Comparable.class)); } - - private static Comparable getPrecombineValue(T data) { - if (data instanceof BaseAvroPayload) { - return ((BaseAvroPayload) data).orderingVal; - } - - return -1; - } - - ////////////////////////////////////////////////////////////////////////////// } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java new file mode 100644 index 0000000000000..c115b5b94dcbe --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieMetadataPayload; + +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hudi.TypeUtils.unsafeCast; + +public class HoodieAvroRecordMerge implements HoodieMerge { + @Override + public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) { + HoodieRecordPayload picked = unsafeCast(((HoodieAvroRecord) newer).getData().preCombine(((HoodieAvroRecord) older).getData())); + if (picked instanceof HoodieMetadataPayload) { + // NOTE: HoodieMetadataPayload return a new payload + return new HoodieAvroRecord(newer.getKey(), picked, newer.getOperation()); + } + return picked.equals(((HoodieAvroRecord) newer).getData()) ? newer : older; + } + + @Override + public Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { + Option previousRecordAvroPayload; + if (older instanceof HoodieAvroIndexedRecord) { + previousRecordAvroPayload = Option.ofNullable(((HoodieAvroIndexedRecord) older).getData()); + } else { + previousRecordAvroPayload = ((HoodieRecordPayload)older.getData()).getInsertValue(schema, props); + } + if (!previousRecordAvroPayload.isPresent()) { + return Option.empty(); + } + + return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props) + .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload)); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java new file mode 100644 index 0000000000000..6becf355914c5 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Properties; + +/** + * HoodieMerge defines how to merge two records. It is a stateless component. + * It can implement the merging logic of HoodieRecord of different engines + * and avoid the performance consumption caused by the serialization/deserialization of Avro payload. + */ +public interface HoodieMerge extends Serializable { + + HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer); + + Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException; +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 1b6e79f09710e..8450aad5e1fc8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -131,25 +131,35 @@ public String getFieldName() { */ private HoodieOperation operation; + /** + * For purposes of preCombining. + */ + private Comparable orderingVal; + public HoodieRecord(HoodieKey key, T data) { - this(key, data, null); + this(key, data, null, null); } - public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) { + public HoodieRecord(HoodieKey key, T data, Comparable orderingVal) { + this(key, data, null, orderingVal); + } + + public HoodieRecord(HoodieKey key, T data, HoodieOperation operation, Comparable orderingVal) { this.key = key; this.data = data; this.currentLocation = null; this.newLocation = null; this.sealed = false; this.operation = operation; + // default natural order is 0 + this.orderingVal = orderingVal == null ? 0 : orderingVal; } public HoodieRecord(HoodieRecord record) { - this(record.key, record.data); + this(record.key, record.data, record.operation, record.orderingVal); this.currentLocation = record.currentLocation; this.newLocation = record.newLocation; this.sealed = record.sealed; - this.operation = record.operation; } public HoodieRecord() { @@ -169,15 +179,17 @@ public HoodieOperation getOperation() { return operation; } + public Comparable getOrderingValue() { + return orderingVal; + } + public T getData() { if (data == null) { - throw new IllegalStateException("Payload already deflated for record."); + throw new IllegalStateException("HoodieRecord already deflated for record."); } return data; } - public abstract Comparable getOrderingValue(); - /** * Release the actual payload, to ease memory pressure. To be called after the record has been written to storage. * Once deflated, cannot be inflated. @@ -281,16 +293,6 @@ public void checkState() { // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` // is complete // - // TODO cleanup - - // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other - // object, and may not create a new one - public abstract HoodieRecord preCombine(HoodieRecord previousRecord); - - // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could - // be combined - public abstract Option combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException; - public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException; public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 886911466b95f..bb477f9de7bf1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.OrderedProperties; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -154,6 +155,12 @@ public class HoodieTableConfig extends HoodieConfig { .withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then " + " produce a new base file."); + public static final ConfigProperty MERGE_CLASS_NAME = ConfigProperty + .key("hoodie.compaction.merge.class") + .defaultValue(HoodieAvroRecordMerge.class.getName()) + .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord " + + "types, such as Spark records or Flink records."); + public static final ConfigProperty ARCHIVELOG_FOLDER = ConfigProperty .key("hoodie.archivelog.folder") .defaultValue("archived") @@ -480,6 +487,14 @@ public String getPayloadClass() { "org.apache.hudi"); } + /** + * Read the hoodie merge class for HoodieRecords from the table properties. + */ + public String getMergeClass() { + return getStringOrDefault(MERGE_CLASS_NAME).replace("com.uber.hoodie", + "org.apache.hudi"); + } + public String getPreCombineField() { return getString(PRECOMBINE_FIELD); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 9945eb0650feb..c6fd18dc4124d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -725,6 +725,7 @@ public static class PropertyBuilder { private String recordKeyFields; private String archiveLogFolder; private String payloadClassName; + private String mergeClassName; private Integer timelineLayoutVersion; private String baseFileFormat; private String preCombineField; @@ -791,6 +792,11 @@ public PropertyBuilder setPayloadClassName(String payloadClassName) { return this; } + public PropertyBuilder setMergeClassName(String mergeClassName) { + this.mergeClassName = mergeClassName; + return this; + } + public PropertyBuilder setPayloadClass(Class payloadClass) { return setPayloadClassName(payloadClass.getName()); } @@ -1004,6 +1010,10 @@ public Properties build() { tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, payloadClassName); } + if (mergeClassName != null) { + tableConfig.setValue(HoodieTableConfig.MERGE_CLASS_NAME, mergeClassName); + } + if (null != tableCreateSchema) { tableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA, tableCreateSchema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 84abfec3de84a..209a358abeb8c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -96,6 +96,8 @@ public abstract class AbstractHoodieLogRecordReader { private final String payloadClassFQN; // preCombine field private final String preCombineField; + // Stateless component for merging records + private final String mergeClassFQN; // simple key gen fields private Option> simpleKeyGenFields = Option.empty(); // Log File Paths @@ -160,6 +162,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List getPartitionName() { return partitionName; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 6d313b64f9f5f..dfc3c14b5b83e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -24,10 +24,12 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -78,6 +80,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader // Stores the total time taken to perform reading and merging of log blocks private long totalTimeTakenToReadAndMergeBlocks; + private final HoodieMerge merge; + @SuppressWarnings("unchecked") protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, @@ -95,6 +99,7 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled); this.maxMemorySizeInBytes = maxMemorySizeInBytes; + this.merge = HoodieRecordUtils.loadMerge(getMergeClassFQN()); } catch (IOException e) { throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e); } @@ -150,7 +155,7 @@ protected void processNextRecord(HoodieRecord hoodieRecord) throws IOException { HoodieRecord oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); - HoodieRecordPayload combinedValue = (HoodieRecordPayload)hoodieRecord.preCombine(oldRecord).getData(); + HoodieRecordPayload combinedValue = (HoodieRecordPayload) merge.preCombine(oldRecord, hoodieRecord).getData(); // If combinedValue is oldValue, no need rePut oldRecord if (combinedValue != oldValue) { HoodieOperation operation = hoodieRecord.getOperation(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index ebe53fe471350..98f5dcf3a073a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -31,8 +31,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetReaderIterator; -import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieAvroFileReader.HoodieRecordTransformIterator; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieParquetStreamWriter; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java new file mode 100644 index 0000000000000..075d117fe2120 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.model.HoodieMerge; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.exception.HoodieException; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; + +/** + * A utility class for HoodieRecord. + */ +public class HoodieRecordUtils { + + private static final Map INSTANCE_CACHE = new HashMap<>(); + + /** + * Instantiate a given class with a record merge. + */ + public static HoodieMerge loadMerge(String mergeClass) { + try { + HoodieMerge merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass); + if (null == merge) { + synchronized (HoodieMerge.class) { + merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass); + if (null == merge) { + merge = (HoodieMerge)ReflectionUtils.loadClass(mergeClass, new Object[]{}); + INSTANCE_CACHE.put(mergeClass, merge); + } + } + } + return merge; + } catch (HoodieException e) { + throw new HoodieException("Unable to instantiate hoodie merge class ", e); + } + } + + /** + * Instantiate a given class with an avro record payload. + */ + public static T loadPayload(String recordPayloadClass, Object[] payloadArgs, + Class... constructorArgTypes) { + try { + return (T) ReflectionUtils.getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new HoodieException("Unable to instantiate payload class ", e); + } + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index a4ef09641d50c..3cd2396491d43 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.util; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; @@ -69,18 +68,6 @@ public static T loadClass(String fqcn) { } } - /** - * Instantiate a given class with a generic record payload. - */ - public static T loadPayload(String recordPayloadClass, Object[] payloadArgs, - Class... constructorArgTypes) { - try { - return (T) getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new HoodieException("Unable to instantiate payload class ", e); - } - } - /** * Creates an instance of the given class. Use this version when dealing with interface types as constructor args. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index d4bafd9c9feee..d2d91bbfb6e11 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -137,7 +137,7 @@ public static R convertToHoodieRecordPayload(GenericRecord record, String pa HoodieOperation operation = withOperationField ? HoodieOperation.fromName(getNullableValAsString(record, HoodieRecord.OPERATION_METADATA_FIELD)) : null; HoodieRecord hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), - ReflectionUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class, + HoodieRecordUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class, Comparable.class), operation); return (R) hoodieRecord; @@ -163,7 +163,7 @@ private static Object getPreCombineVal(GenericRecord rec, String preCombineField */ public static R generateEmptyPayload(String recKey, String partitionPath, Comparable orderingVal, String payloadClazz) { HoodieRecord hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), - ReflectionUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class)); + HoodieRecordUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class)); return (R) hoodieRecord; } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java new file mode 100644 index 0000000000000..0c51571c9e1b8 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; +import org.apache.hudi.common.model.HoodieMerge; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class HoodieRecordUtilsTest { + + @Test + void loadHoodieMerge() { + String mergeClassName = HoodieAvroRecordMerge.class.getName(); + HoodieMerge merge1 = HoodieRecordUtils.loadMerge(mergeClassName); + HoodieMerge merge2 = HoodieRecordUtils.loadMerge(mergeClassName); + assertEquals(merge1.getClass().getName(), mergeClassName); + assertEquals(merge1, merge2); + } + + @Test + void loadPayload() { + String payloadClassName = DefaultHoodieRecordPayload.class.getName(); + HoodieRecordPayload payload = HoodieRecordUtils.loadPayload(payloadClassName, new Object[]{null, 0}, GenericRecord.class, Comparable.class); + assertEquals(payload.getClass().getName(), payloadClassName); + } +} \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index a436963bfcfb5..0dc0cd1b884b7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; @@ -283,6 +284,13 @@ private FlinkOptions() { .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n" + "This will render any value set for the option in-effective"); + public static final ConfigOption MERGE_CLASS_NAME = ConfigOptions + .key("write.merge.class") + .stringType() + .defaultValue(HoodieAvroRecordMerge.class.getName()) + .withDescription("Merge class provide stateless component interface for merging records, and support various HoodieRecord " + + "types, such as Spark records or Flink records."); + /** * Flag to indicate whether to drop duplicates before insert/upsert. * By default false to gain extra performance. diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index c2f54dd8aaffe..4d4af6e218672 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -23,9 +23,11 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; @@ -102,6 +104,8 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { private transient BiFunction, String, List> writeFunction; + private transient HoodieMerge merge; + /** * Total size tracer. */ @@ -121,6 +125,7 @@ public void open(Configuration parameters) throws IOException { this.tracer = new TotalSizeTracer(this.config); initBuffer(); initWriteFunction(); + initMergeClass(); } @Override @@ -194,6 +199,12 @@ private void initWriteFunction() { } } + private void initMergeClass() { + String mergeClassName = metaClient.getTableConfig().getMergeClass(); + LOG.info("init hoodie merge with class [{}]", mergeClassName); + merge = HoodieRecordUtils.loadMerge(mergeClassName); + } + /** * Represents a data item in the buffer, this is needed to reduce the * memory footprint. @@ -420,7 +431,7 @@ private boolean flushBucket(DataBucket bucket) { List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, merge); } bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); @@ -455,7 +466,7 @@ private void flushRemaining(boolean endInput) { List records = bucket.writeBuffer(); if (records.size() > 0) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, merge); } bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index e9574dd52bedd..fef948b797dd6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -19,6 +19,7 @@ package org.apache.hudi.streamer; import org.apache.hudi.client.utils.OperationConverter; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -117,6 +118,10 @@ public class FlinkStreamerConfig extends Configuration { + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value.") public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); + @Parameter(names = {"--merge-class"}, description = "Implements of HoodieMerge, that defines how to merge two records." + + "Implement your own, if you want to implement specific record merge logic.") + public String mergeClassName = HoodieAvroRecordMerge.class.getName(); + @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " + "is purely new data/inserts to gain speed).", converter = OperationConverter.class) public WriteOperationType operation = WriteOperationType.UPSERT; @@ -356,6 +361,7 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName); + conf.setString(FlinkOptions.MERGE_CLASS_NAME, config.mergeClassName); conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine); conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes)); conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index e76bb29bdf50a..08d0cf66da0d8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -425,7 +425,8 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(requiredRowType).toString(), inputSplits, - conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); + conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","), + conf.getString(FlinkOptions.MERGE_CLASS_NAME)); return MergeOnReadInputFormat.builder() .config(this.conf) .tableState(hoodieTableState) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 8eaa9d0b886f4..5fd89657917ef 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -18,12 +18,15 @@ package org.apache.hudi.table.format.mor; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; @@ -62,6 +65,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.stream.IntStream; @@ -202,7 +206,8 @@ public void open(MergeOnReadInputSplit split) throws IOException { this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), - getFullSchemaReader(split.getBasePath().get())); + getFullSchemaReader(split.getBasePath().get()), + tableState.getMergeClass()); } else { throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for " + "file path: " + split.getBasePath() @@ -629,6 +634,8 @@ static class MergeIterator implements RecordIterator { private final InstantRange instantRange; + private final HoodieMerge merge; + // add the flag because the flink ParquetColumnarRowSplitReader is buggy: // method #reachedEnd() returns false after it returns true. // refactor it out once FLINK-22370 is resolved. @@ -649,7 +656,8 @@ static class MergeIterator implements RecordIterator { int[] requiredPos, boolean emitDelete, int operationPos, - ParquetColumnarRowSplitReader reader) { // the reader should be with full schema + ParquetColumnarRowSplitReader reader, // the reader should be with full schema + String mergeClass) { this.tableSchema = tableSchema; this.reader = reader; this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf); @@ -663,6 +671,7 @@ static class MergeIterator implements RecordIterator { this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); this.projection = RowDataProjection.instance(requiredRowType, requiredPos); this.instantRange = split.getInstantRange().orElse(null); + this.merge = HoodieRecordUtils.loadMerge(mergeClass); } @Override @@ -753,7 +762,8 @@ private Option mergeRowWithLog( String curKey) throws IOException { final HoodieAvroRecord record = (HoodieAvroRecord) scanner.getRecords().get(curKey); GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); - return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); + Option resultRecord = merge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(historyAvroRecord), record, tableSchema, new Properties()); + return ((HoodieAvroIndexedRecord) resultRecord.get()).toIndexedRecord(); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java index 36dfecbb79a5f..bbb21db7f8d28 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java @@ -41,6 +41,7 @@ public class MergeOnReadTableState implements Serializable { private final List inputSplits; private final String[] pkFields; private final int operationPos; + private final String mergeClass; public MergeOnReadTableState( RowType rowType, @@ -48,7 +49,8 @@ public MergeOnReadTableState( String avroSchema, String requiredAvroSchema, List inputSplits, - String[] pkFields) { + String[] pkFields, + String mergeClass) { this.rowType = rowType; this.requiredRowType = requiredRowType; this.avroSchema = avroSchema; @@ -56,6 +58,7 @@ public MergeOnReadTableState( this.inputSplits = inputSplits; this.pkFields = pkFields; this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD); + this.mergeClass = mergeClass; } public RowType getRowType() { @@ -82,6 +85,10 @@ public int getOperationPos() { return operationPos; } + public String getMergeClass() { + return mergeClass; + } + public int[] getRequiredPositions() { final List fieldNames = rowType.getFieldNames(); return requiredRowType.getFieldNames().stream() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index d292b3832ac3b..c38af7eff440f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -181,6 +181,7 @@ public static HoodieWriteConfig getHoodieClientConfig( .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) + .withMergeClass(conf.getString(FlinkOptions.MERGE_CLASS_NAME)) .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO)) .withInlineCompactionTriggerStrategy( CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) @@ -275,6 +276,7 @@ public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) thr .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) + .setMergeClassName(conf.getString(FlinkOptions.MERGE_CLASS_NAME)) .setPreCombineField(OptionsResolver.getPreCombineField(conf)) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index 9f2aba77c1105..4dd5f0232b912 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -261,7 +261,8 @@ private OneInputStreamOperatorTestHarness create tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), Collections.emptyList(), - new String[0]); + new String[0], + metaClient.getTableConfig().getMergeClass()); MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder() .config(conf) .tableState(hoodieTableState) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 819c4b55a9e4f..26755b3b85f01 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -301,6 +301,12 @@ object DataSourceWriteOptions { */ val PAYLOAD_CLASS_NAME = HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME + /** + * HoodieMerge will replace the payload to process the merge of data + * and provide the same capabilities as the payload + */ + val MERGE_CLASS_NAME = HoodieWriteConfig.MERGE_CLASS_NAME + /** * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value * will be obtained by invoking .toString() on the field value. Nested fields can be specified using diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index ab492b05cc46f..da13142fe84fd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -67,7 +67,8 @@ case class HoodieTableState(tablePath: String, preCombineFieldOpt: Option[String], usesVirtualKeys: Boolean, recordPayloadClassName: String, - metadataConfig: HoodieMetadataConfig) + metadataConfig: HoodieMetadataConfig, + mergeClass: String) /** * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. @@ -395,7 +396,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, preCombineFieldOpt = preCombineFieldOpt, usesVirtualKeys = !tableConfig.populateMetaFields(), recordPayloadClassName = tableConfig.getPayloadClass, - metadataConfig = fileIndex.metadataConfig + metadataConfig = fileIndex.metadataConfig, + mergeClass = tableConfig.getMergeClass ) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 37567e0bb21f5..3ff04212a55ae 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -29,8 +29,9 @@ import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath -import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner +import org.apache.hudi.common.util.HoodieRecordUtils import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.exception.HoodieException @@ -261,6 +262,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema)) private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) + private val merge = HoodieRecordUtils.loadMerge(tableState.mergeClass) override def hasNext: Boolean = hasNextInternal @@ -303,7 +305,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = { // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API // on the record from the Delta Log - toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps)) + val combinedRecord = merge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema, payloadProps) + if (combinedRecord.isPresent) { + toScalaOption(combinedRecord.get.asInstanceOf[HoodieAvroIndexedRecord].toIndexedRecord) + } else { + Option.empty + } } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f9deda6779e6c..1e0179868c5c2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -150,6 +150,7 @@ object HoodieSparkSqlWriter { .setBaseFileFormat(baseFileFormat) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) + .setMergeClassName(hoodieConfig.getString(MERGE_CLASS_NAME)) // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, // but we are interested in what user has set, hence fetching from optParams. .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) @@ -457,6 +458,7 @@ object HoodieSparkSqlWriter { .setRecordKeyFields(recordKeyFields) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)) + .setMergeClassName(hoodieConfig.getStringOrDefault(MERGE_CLASS_NAME)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) .setBootstrapIndexClass(bootstrapIndexClass) .setBaseFileFormat(baseFileFormat) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala new file mode 100644 index 0000000000000..4ff5cceef7f38 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import java.nio.charset.StandardCharsets +import java.util +import java.util.concurrent.ConcurrentHashMap +import org.apache.avro.Schema +import org.apache.hudi.AvroConversionUtils +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, fromJavaDate, toJavaDate} +import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField +import org.apache.hudi.common.util.ValidationUtils +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, MutableProjection, Projection} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils.AllowedTransformationExpression.exprUtils.generateMutableProjection +import org.apache.spark.sql.types._ + +import scala.collection.mutable + +/** + * Helper class to do common stuff across Spark InternalRow. + * Provides common methods similar to {@link HoodieAvroUtils}. + */ +object HoodieInternalRowUtils { + + val projectionMap = new ConcurrentHashMap[(StructType, StructType), MutableProjection] + val schemaMap = new ConcurrentHashMap[Schema, StructType] + val SchemaPosMap = new ConcurrentHashMap[StructType, Map[String, (StructField, Int)]] + + def stitchRecords(left: InternalRow, leftSchema: StructType, right: InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow = { + val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields) + val row = new JoinedRow(left, right) + val projection = getCachedProjection(mergeSchema, stitchedSchema) + projection(row) + } + + def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = { + val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]]) + + val oldFieldMap = getCachedSchemaPosMap(oldSchema) + for ((field, pos) <- newSchema.fields.zipWithIndex) { + var oldValue: AnyRef = null + if (oldFieldMap.contains(field.name)) { + val (oldField, oldPos) = oldFieldMap(field.name) + oldValue = oldRecord.get(oldPos, oldField.dataType) + } + if (oldValue != null) { + field.dataType match { + case structType: StructType => + val oldField = oldFieldMap(field.name)._1.asInstanceOf[StructType] + rewriteRecord(oldValue.asInstanceOf[InternalRow], oldField, structType) + case decimalType: DecimalType => + val oldField = oldFieldMap(field.name)._1.asInstanceOf[DecimalType] + if (decimalType.scale != oldField.scale || decimalType.precision != oldField.precision) { + newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale)) + ) + } else { + newRow.update(pos, oldValue) + } + case _ => + newRow.update(pos, oldValue) + } + } else { + // TODO default value in newSchema + } + } + + newRow + } + + def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: util.Map[String, String]): InternalRow = { + rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new util.LinkedList[String]).asInstanceOf[InternalRow] + } + + private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: util.Map[String, String], fieldNames: util.Deque[String]): Any = { + if (oldRecord == null) { + null + } else { + newSchema match { + case targetSchema: StructType => + ValidationUtils.checkArgument(oldRecord.isInstanceOf[InternalRow], "cannot rewrite record with different type") + val oldRow = oldRecord.asInstanceOf[InternalRow] + val helper = mutable.Map[Integer, Any]() + + val oldSchemaPos = getCachedSchemaPosMap(oldSchema.asInstanceOf[StructType]) + targetSchema.fields.zipWithIndex.foreach { case (field, i) => + fieldNames.push(field.name) + if (oldSchemaPos.contains(field.name)) { + val (oldField, oldPos) = oldSchemaPos(field.name) + helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames) + } else { + val fieldFullName = createFullName(fieldNames) + val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.") + val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1) + // deal with rename + if (!oldSchemaPos.contains(field.name) && oldSchemaPos.contains(lastColNameFromOldSchema)) { + // find rename + val (oldField, oldPos) = oldSchemaPos(lastColNameFromOldSchema) + helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames) + } + } + fieldNames.pop() + } + val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]]) + targetSchema.fields.zipWithIndex.foreach { case (_, i) => + if (helper.contains(i)) { + newRow.update(i, helper(i)) + } else { + // TODO add default val + newRow.update(i, null) + } + } + + newRow + case targetSchema: ArrayType => + ValidationUtils.checkArgument(oldRecord.isInstanceOf[ArrayData], "cannot rewrite record with different type") + val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType + val oldArray = oldRecord.asInstanceOf[ArrayData] + val newElementType = targetSchema.elementType + val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]]) + fieldNames.push("element") + oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) } + fieldNames.pop() + + newArray + case targetSchema: MapType => + ValidationUtils.checkArgument(oldRecord.isInstanceOf[MapData], "cannot rewrite record with different type") + val oldValueType = oldSchema.asInstanceOf[MapType].valueType + val oldKeyType = oldSchema.asInstanceOf[MapType].keyType + val oldMap = oldRecord.asInstanceOf[MapData] + val newValueType = targetSchema.valueType + val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]]) + val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]]) + val newMap = new ArrayBasedMapData(newKeyArray, newValueArray) + fieldNames.push("value") + oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, value) } + oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) } + fieldNames.pop() + + newMap + case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema) + } + } + } + + def rewriteRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = { + val newRecord = rewriteRecord(record, oldSchema, newSchema) + newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName) + + newRecord + } + + def rewriteEvolutionRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = { + val newRecord = rewriteRecordWithNewSchema(record, oldSchema, newSchema, new util.HashMap[String, String]()) + newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName) + + newRecord + } + + def getCachedSchema(schema: Schema): StructType = { + if (!schemaMap.contains(schema)) { + schemaMap.synchronized { + if (!schemaMap.contains(schema)) { + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + schemaMap.put(schema, structType) + } + } + } + schemaMap.get(schema) + } + + private def getCachedProjection(from: StructType, to: StructType): Projection = { + val schemaPair = (from, to) + if (!projectionMap.contains(schemaPair)) { + projectionMap.synchronized { + if (!projectionMap.contains(schemaPair)) { + val projection = generateMutableProjection(from, to) + projectionMap.put(schemaPair, projection) + } + } + } + projectionMap.get(schemaPair) + } + + def getCachedSchemaPosMap(schema: StructType): Map[String, (StructField, Int)] = { + if (!SchemaPosMap.contains(schema)) { + SchemaPosMap.synchronized { + if (!SchemaPosMap.contains(schema)) { + val fieldMap = schema.fields.zipWithIndex.map { case (field, i) => (field.name, (field, i)) }.toMap + SchemaPosMap.put(schema, fieldMap) + } + } + } + SchemaPosMap.get(schema) + } + + private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = { + if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) { + oldSchema match { + case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | StringType | DateType | TimestampType | BinaryType => + oldValue + case DecimalType() => + Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale)) + case _ => + throw new HoodieException("Unknown schema type: " + newSchema) + } + } else { + rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema) + } + } + + private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = { + val value = newSchema match { + case NullType | BooleanType => + case DateType if oldSchema.equals(StringType) => + fromJavaDate(java.sql.Date.valueOf(oldValue.toString)) + case LongType => + oldSchema match { + case IntegerType => oldValue.asInstanceOf[Int].longValue() + case _ => + } + case FloatType => + oldSchema match { + case IntegerType => oldValue.asInstanceOf[Int].floatValue() + case LongType => oldValue.asInstanceOf[Long].floatValue() + case _ => + } + case DoubleType => + oldSchema match { + case IntegerType => oldValue.asInstanceOf[Int].doubleValue() + case LongType => oldValue.asInstanceOf[Long].doubleValue() + case FloatType => java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + "") + case _ => + } + case BinaryType => + oldSchema match { + case StringType => oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8) + case _ => + } + case StringType => + oldSchema match { + case BinaryType => new String(oldValue.asInstanceOf[Array[Byte]]) + case DateType => toJavaDate(oldValue.asInstanceOf[Integer]).toString + case IntegerType | LongType | FloatType | DoubleType | DecimalType() => oldValue.toString + case _ => + } + case DecimalType() => + oldSchema match { + case IntegerType | LongType | FloatType | DoubleType | StringType => + val scale = newSchema.asInstanceOf[DecimalType].scale + + Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale)) + case _ => + } + case _ => + } + if (value == None) { + throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)) + } else { + value + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java new file mode 100644 index 0000000000000..a22e78af21e3c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.keygen.BaseKeyGenerator; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.apache.spark.sql.types.DataTypes.BooleanType; + +/** + * Spark Engine-specific Implementations of `HoodieRecord`. + */ +public class HoodieSparkRecord extends HoodieRecord { + + public HoodieSparkRecord(HoodieKey key, InternalRow data, Comparable orderingVal) { + super(key, data, orderingVal); + } + + public HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, Comparable orderingVal) { + super(key, data, operation, orderingVal); + } + + public HoodieSparkRecord(HoodieRecord record) { + super(record); + } + + public HoodieSparkRecord() { + } + + @Override + public HoodieRecord newInstance() { + return new HoodieSparkRecord(this); + } + + @Override + public HoodieRecord newInstance(HoodieKey key, HoodieOperation op) { + return new HoodieSparkRecord(key, data, op, getOrderingValue()); + } + + @Override + public HoodieRecord newInstance(HoodieKey key) { + return new HoodieSparkRecord(key, data, getOrderingValue()); + } + + @Override + public String getRecordKey(Option keyGeneratorOpt) { + return getRecordKey(); + } + + @Override + public String getRecordKey(String keyFieldName) { + return getRecordKey(); + } + + @Override + public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(readerSchema); + StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema); + InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, readerStructType, (InternalRow) other.getData(), readerStructType, writerStructType); + return new HoodieSparkRecord(getKey(), mergeRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema); + InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, readerStructType, targetStructType); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields); + InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, writeSchemaWithMetaFieldsStructType, new HashMap<>()) + : HoodieInternalRowUtils.rewriteRecord(data, readerStructType, writeSchemaWithMetaFieldsStructType); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields); + InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, readerStructType, writeSchemaWithMetaFieldsStructType, fileName) + : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, readerStructType, writeSchemaWithMetaFieldsStructType, fileName); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map renameCols) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema); + InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, newStructType, renameCols); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map renameCols, Mapper mapper) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema); + InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, newStructType, renameCols); + // TODO change mapper type + return mapper.apply((IndexedRecord) rewriteRow); + } + + @Override + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema); + InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, readerStructType, newStructType); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException { + data.update(pos, newValue); + return this; + } + + @Override + public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map metadataValues) throws IOException { + Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> { + String value = metadataValues.get(metadataField); + if (value != null) { + data.update(recordSchema.getField(metadataField.getFieldName()).pos(), value); + } + }); + return this; + } + + @Override + public Option> getMetadata() { + return Option.empty(); + } + + @Override + public boolean isPresent(Schema schema, Properties prop) throws IOException { + if (null == data) { + return false; + } + Object deleteMarker = data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType); + return !(deleteMarker instanceof Boolean && (boolean) deleteMarker); + } + + @Override + public boolean shouldIgnore(Schema schema, Properties prop) throws IOException { + // TODO SENTINEL should refactor without Avro(GenericRecord) + if (null != data && data.equals(SENTINEL)) { + return true; + } else { + return false; + } + } + + @Override + public Option toIndexedRecord(Schema schema, Properties prop) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java new file mode 100644 index 0000000000000..88ae7c13df168 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi; + +import org.apache.avro.Schema; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.util.Properties; + +public class HoodieSparkRecordMerge implements HoodieMerge { + + @Override + public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) { + if (older.getData() == null) { + // use natural order for delete record + return older; + } + if (older.getOrderingValue().compareTo(newer.getOrderingValue()) > 0) { + return older; + } else { + return newer; + } + } + + @Override + public Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { + return Option.of(newer); + } +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala new file mode 100644 index 0000000000000..7a08ee64bf6ef --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.testutils.HoodieClientTestUtils +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.hudi.HoodieInternalRowUtils +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SparkSession} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll { + + private var sparkSession: SparkSession = _ + + private val schema1 = StructType( + Array( + StructField("name", StringType), + StructField("age", IntegerType) + ) + ) + private val schema2 = StructType( + Array( + StructField("name1", StringType), + StructField("age1", IntegerType) + ) + ) + private val schemaMerge = StructType(schema1.fields ++ schema2.fields) + private val schema1WithMetaData = StructType(Array( + StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType), + StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType), + StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType), + StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType), + StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType), + StructField(HoodieRecord.OPERATION_METADATA_FIELD, StringType), + StructField(HoodieRecord.HOODIE_IS_DELETED_FIELD, BooleanType) + ) ++ schema1.fields) + + override protected def beforeAll(): Unit = { + // Initialize a local spark env + val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName)) + jsc.setLogLevel("ERROR") + sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate + } + + override protected def afterAll(): Unit = { + sparkSession.close() + } + + test("test merge") { + val data1 = sparkSession.sparkContext.parallelize(Seq(Row("like", 18))) + val data2 = sparkSession.sparkContext.parallelize(Seq(Row("like1", 181))) + val row1 = sparkSession.createDataFrame(data1, schema1).queryExecution.toRdd.first() + val row2 = sparkSession.createDataFrame(data2, schema2).queryExecution.toRdd.first() + val rowMerge = HoodieInternalRowUtils.stitchRecords(row1, schema1, row2, schema2, schemaMerge) + assert(rowMerge.get(0, StringType).toString.equals("like")) + assert(rowMerge.get(1, IntegerType) == 18) + assert(rowMerge.get(2, StringType).toString.equals("like1")) + assert(rowMerge.get(3, IntegerType) == 181) + } + + test("test rewrite") { + val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181))) + val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first() + val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema1) + val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema2) + assert(newRow1.get(0, StringType).toString.equals("like")) + assert(newRow1.get(1, IntegerType) == 18) + assert(newRow2.get(0, StringType).toString.equals("like1")) + assert(newRow2.get(1, IntegerType) == 181) + } + + test("test rewrite with nullable value") { + val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18))) + val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first() + val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, schemaMerge) + assert(newRow.get(0, StringType).toString.equals("like")) + assert(newRow.get(1, IntegerType) == 18) + assert(newRow.get(2, StringType) == null) + assert(newRow.get(3, IntegerType) == null) + } + + test("test rewrite with metaDataFiled value") { + val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18))) + val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first() + val newRow = HoodieInternalRowUtils.rewriteRecordWithMetadata(oldRow, schema1, schema1WithMetaData, "file1") + assert(newRow.get(0, StringType) == null) + assert(newRow.get(1, StringType) == null) + assert(newRow.get(2, StringType) == null) + assert(newRow.get(3, StringType) == null) + assert(newRow.get(4, StringType).toString.equals("file1")) + assert(newRow.get(5, StringType) == null) + assert(newRow.get(6, BooleanType) == null) + assert(newRow.get(7, StringType).toString.equals("like")) + assert(newRow.get(8, IntegerType) == 18) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala new file mode 100644 index 0000000000000..cb5529721cf67 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.nio.ByteBuffer +import java.util.{ArrayList, HashMap, Objects} +import org.apache.avro.generic.GenericData +import org.apache.avro.{LogicalTypes, Schema} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.internal.schema.Types +import org.apache.hudi.internal.schema.action.TableChanges +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.utils.SchemaChangeUtils +import org.apache.hudi.testutils.HoodieClientTestUtils +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.hudi.HoodieInternalRowUtils +import org.apache.spark.sql.types._ +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with BeforeAndAfterAll { + private var sparkSession: SparkSession = _ + + override protected def beforeAll(): Unit = { + // Initialize a local spark env + val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName)) + jsc.setLogLevel("ERROR") + sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate + } + + override protected def afterAll(): Unit = { + sparkSession.close() + } + + /** + * test record data type changes. + * int => long/float/double/string + * long => float/double/string + * float => double/String + * double => String/Decimal + * Decimal => Decimal/String + * String => date/decimal + * date => String + */ + test("test rewrite record with type changed") { + val avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\"" + + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"comb\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"com1\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"col0\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"col11\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"col12\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null}," + + "{\"name\":\"col21\",\"type\":[\"null\",\"float\"],\"default\":null}," + + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null}," + + "{\"name\":\"col31\",\"type\":[\"null\",\"double\"],\"default\":null}," + + "{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\"," + + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null}," + + "{\"name\":\"col41\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col41\"," + + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null}," + + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"col51\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}," + + "{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null}," + + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null}," + + "{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}") + // create a test record with avroSchema + val avroRecord = new GenericData.Record(avroSchema) + avroRecord.put("id", 1) + avroRecord.put("comb", 100) + avroRecord.put("com1", -100) + avroRecord.put("col0", 256) + avroRecord.put("col1", 1000L) + avroRecord.put("col11", -100L) + avroRecord.put("col12", 2000L) + avroRecord.put("col2", -5.001f) + avroRecord.put("col21", 5.001f) + avroRecord.put("col3", 12.999d) + avroRecord.put("col31", 9999.999d) + val currentDecimalType = avroSchema.getField("col4").schema.getTypes.get(1) + val bd = new java.math.BigDecimal("123.456").setScale(currentDecimalType.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale) + avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, currentDecimalType, currentDecimalType.getLogicalType)) + val currentDecimalType1 = avroSchema.getField("col41").schema.getTypes.get(1) + val bd1 = new java.math.BigDecimal("7890.456").setScale(currentDecimalType1.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale) + avroRecord.put("col41", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd1, currentDecimalType1, currentDecimalType1.getLogicalType)) + avroRecord.put("col5", "2011-01-01") + avroRecord.put("col51", "199.342") + avroRecord.put("col6", 18987) + avroRecord.put("col7", 1640491505000000L) + avroRecord.put("col8", false) + val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53)) + avroRecord.put("col9", bb) + assert(GenericData.get.validate(avroSchema, avroRecord)) + val internalSchema = AvroInternalSchemaConverter.convert(avroSchema) + // do change type operation + val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema) + updateChange.updateColumnType("id", Types.LongType.get).updateColumnType("comb", Types.FloatType.get).updateColumnType("com1", Types.DoubleType.get).updateColumnType("col0", Types.StringType.get).updateColumnType("col1", Types.FloatType.get).updateColumnType("col11", Types.DoubleType.get).updateColumnType("col12", Types.StringType.get).updateColumnType("col2", Types.DoubleType.get).updateColumnType("col21", Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateColumnType("col31", Types.DecimalType.get(18, 9)).updateColumnType("col4", Types.DecimalType.get(18, 9)).updateColumnType("col41", Types.StringType.get).updateColumnType("col5", Types.DateType.get).updateColumnType("col51", Types.DecimalType.get(18, 9)).updateColumnType("col6", Types.StringType.get) + val newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange) + val newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName) + val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String]) + assert(GenericData.get.validate(newAvroSchema, newRecord)) + // Convert avro to internalRow + val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(avroSchema) + val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema) + val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, structTypeSchema).apply(avroRecord).get + val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema) + .apply(newRecord).get + val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String]) + internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema) + } + + test("test rewrite nest record") { + val record = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get()), + Types.Field.get(1, true, "data", Types.StringType.get()), + Types.Field.get(2, true, "preferences", + Types.RecordType.get(Types.Field.get(5, false, "feature1", + Types.BooleanType.get()), Types.Field.get(6, true, "feature2", Types.BooleanType.get()))), + Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, Types.DoubleType.get())), + Types.Field.get(4, false, "locations", Types.MapType.get(8, 9, Types.StringType.get(), + Types.RecordType.get(Types.Field.get(10, false, "lat", Types.FloatType.get()), Types.Field.get(11, false, "long", Types.FloatType.get())), false)) + ) + val schema = AvroInternalSchemaConverter.convert(record, "test1") + val avroRecord = new GenericData.Record(schema) + GenericData.get.validate(schema, avroRecord) + avroRecord.put("id", 2) + avroRecord.put("data", "xs") + // fill record type + val preferencesRecord = new GenericData.Record(AvroInternalSchemaConverter.convert(record.fieldType("preferences"), "test1_preferences")) + preferencesRecord.put("feature1", false) + preferencesRecord.put("feature2", true) + assert(GenericData.get.validate(AvroInternalSchemaConverter.convert(record.fieldType("preferences"), "test1_preferences"), preferencesRecord)) + avroRecord.put("preferences", preferencesRecord) + // fill mapType + val locations = new HashMap[String, GenericData.Record] + val mapSchema = AvroInternalSchemaConverter.convert(record.field("locations").`type`.asInstanceOf[Types.MapType].valueType, "test1_locations") + val locationsValue: GenericData.Record = new GenericData.Record(mapSchema) + locationsValue.put("lat", 1.2f) + locationsValue.put("long", 1.4f) + val locationsValue1: GenericData.Record = new GenericData.Record(mapSchema) + locationsValue1.put("lat", 2.2f) + locationsValue1.put("long", 2.4f) + locations.put("key1", locationsValue) + locations.put("key2", locationsValue1) + avroRecord.put("locations", locations) + val doubles = new ArrayList[Double] + doubles.add(2.0d) + doubles.add(3.0d) + avroRecord.put("doubles", doubles) + // do check + assert(GenericData.get.validate(schema, avroRecord)) + // create newSchema + val newRecord = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get), Types.Field.get(1, true, "data", Types.StringType.get), Types.Field.get(2, true, "preferences", Types.RecordType.get(Types.Field.get(5, false, "feature1", Types.BooleanType.get), Types.Field.get(5, true, "featurex", Types.BooleanType.get), Types.Field.get(6, true, "feature2", Types.BooleanType.get))), Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, Types.DoubleType.get)), Types.Field.get(4, false, "locations", Types.MapType.get(8, 9, Types.StringType.get, Types.RecordType.get(Types.Field.get(10, true, "laty", Types.FloatType.get), Types.Field.get(11, false, "long", Types.FloatType.get)), false))) + val newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName) + val newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String]) + // test the correctly of rewrite + assert(GenericData.get.validate(newAvroSchema, newAvroRecord)) + // Convert avro to internalRow + val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(schema) + val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema) + val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, structTypeSchema).apply(avroRecord).get + val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema).apply(newAvroRecord).get + val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String]) + internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema) + } + + private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = { + schema match { + case StructType(fields) => + val expectedRow = expected.asInstanceOf[InternalRow] + val actualRow = actual.asInstanceOf[InternalRow] + fields.zipWithIndex.foreach { case (field, i) => internalRowCompare(expectedRow.get(i, field.dataType), actualRow.get(i, field.dataType), field.dataType) } + case ArrayType(elementType, _) => + val expectedArray = expected.asInstanceOf[ArrayData].toSeq[Any](elementType) + val actualArray = actual.asInstanceOf[ArrayData].toSeq[Any](elementType) + if (expectedArray.size != actualArray.size) { + throw new AssertionError() + } else { + expectedArray.zip(actualArray).foreach { case (e1, e2) => internalRowCompare(e1, e2, elementType) } + } + case MapType(keyType, valueType, _) => + val expectedKeyArray = expected.asInstanceOf[MapData].keyArray() + val expectedValueArray = expected.asInstanceOf[MapData].valueArray() + val actualKeyArray = actual.asInstanceOf[MapData].keyArray() + val actualValueArray = actual.asInstanceOf[MapData].valueArray() + internalRowCompare(expectedKeyArray, actualKeyArray, ArrayType(keyType)) + internalRowCompare(expectedValueArray, actualValueArray, ArrayType(valueType)) + case StringType => if (checkNull(expected, actual) || !expected.toString.equals(actual.toString)) { + throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + // TODO Verify after 'https://github.com/apache/hudi/pull/5907' merge + case BinaryType => if (checkNull(expected, actual) || !expected.asInstanceOf[Array[Byte]].sameElements(actual.asInstanceOf[Array[Byte]])) { + // throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + case _ => if (!Objects.equals(expected, actual)) { + // throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + } + } + + private def checkNull(left: Any, right: Any): Boolean = { + (left == null && right != null) || (left == null && right != null) + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 736e416162d21..2f38a22768a4e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -273,6 +273,7 @@ public void refreshTimeline() throws IOException { .setTableName(cfg.targetTableName) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPayloadClassName(cfg.payloadClassName) + .setMergeClassName(cfg.mergeClassName) .setBaseFileFormat(cfg.baseFileFormat) .setPartitionFields(partitionColumns) .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) @@ -370,6 +371,7 @@ public Pair>> readFromSource( .setTableName(cfg.targetTableName) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPayloadClassName(cfg.payloadClassName) + .setMergeClassName(cfg.mergeClassName) .setBaseFileFormat(cfg.baseFileFormat) .setPartitionFields(partitionColumns) .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a22a3581ae94a..dcf1581216d74 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -270,6 +271,10 @@ public static class Config implements Serializable { + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value") public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); + @Parameter(names = {"--merge-class"}, description = "Implements of HoodieMerge, that defines how to merge two records." + + "Implement your own, if you want to implement specific record merge logic.") + public String mergeClassName = HoodieAvroRecordMerge.class.getName(); + @Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema" + ".SchemaProvider to attach schemas to input & target table data, built in options: " + "org.apache.hudi.utilities.schema.FilebasedSchemaProvider."