Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
Expand Down Expand Up @@ -234,6 +235,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "the record payload class to merge records in the log against each other, merge again with the base file and "
+ "produce the final record to be written after compaction.");

public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
.key("hoodie.compaction.merge.class")
.defaultValue(HoodieAvroRecordMerge.class.getName())
.withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+ "types, such as Spark records or Flink records.");

public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty
.key("hoodie.compaction.lazy.block.read")
.defaultValue("true")
Expand Down Expand Up @@ -691,6 +698,11 @@ public Builder withPayloadClass(String payloadClassName) {
return this;
}

public Builder withMergeClass(String mergeClass) {
compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass);
return this;
}

public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FileSystemRetryConfig;
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand Down Expand Up @@ -123,6 +124,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");

public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make this just hoodie.merge.class . the datasource naming below is probably not a good one if you are basing off that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are mainly consistent with payload, which is hoodie.datasource.write.payload.class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would suggest to use hoodie.merge.class if the option is also used for reading code path.

.key("hoodie.datasource.write.merge.class")
.defaultValue(HoodieAvroRecordMerge.class.getName())
.withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+ "types, such as Spark records or Flink records.");

public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
.noDefaultValue()
Expand Down Expand Up @@ -1324,6 +1331,10 @@ public String getPayloadClass() {
return getString(HoodieCompactionConfig.PAYLOAD_CLASS_NAME);
}

public String getMergeClass() {
return getString(HoodieCompactionConfig.MERGE_CLASS_NAME);
}

public int getTargetPartitionsPerDayBasedCompaction() {
return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,7 @@ public void write(HoodieRecord<T> oldRecord) {
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
try {
Option<HoodieRecord> combinedRecord =
hoodieRecord.combineAndGetUpdateValue(oldRecord,
schema,
props);
Option<HoodieRecord> combinedRecord = merge.combineAndGetUpdateValue(oldRecord, hoodieRecord, schema, props);

if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
Expand Down Expand Up @@ -59,6 +61,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
*/
protected final Schema tableSchema;
protected final Schema tableSchemaWithMetaFields;
protected final HoodieMerge merge;

/**
* The write schema. In most case the write schema is the same to the
Expand Down Expand Up @@ -103,6 +106,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
this.merge = HoodieRecordUtils.loadMerge(config.getMergeClass());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge is a verb, I would suggest HoodiePayloadMerger for better readability.

Copy link
Contributor Author

@wulei0302 wulei0302 Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xushiyan @vinothchandar @alexeykudinkin @danny0405 I think there are differences on the name of Hoodie merge API. I created a JIRA HUDI-4380 , and we can make it clear. cc @wzx140 @minihippo

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -81,9 +83,10 @@ public I combineOnCondition(
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism);
HoodieMerge merge = HoodieRecordUtils.loadMerge(table.getConfig().getMergeClass());
return deduplicateRecords(records, table.getIndex(), parallelism, merge);
}

public abstract I deduplicateRecords(
I records, HoodieIndex<?, ?> index, int parallelism);
I records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {

private HoodieWriteHelper() {
}

Expand All @@ -49,7 +49,7 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec

@Override
public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
Expand All @@ -58,10 +58,9 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
HoodieRecord reducedRec = rec2.preCombine(rec1);
HoodieKey reducedKey = rec1.getData().equals(reducedRec) ? rec1.getKey() : rec2.getKey();

return (HoodieRecord<T>) reducedRec.newInstance(reducedKey);
HoodieRecord<T> reducedRecord = merge.preCombine(rec1, rec2);
HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey();
return reducedRecord.newInstance(reducedKey);
}, parallelism).map(Pair::getRight);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package org.apache.hudi.testutils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
Expand All @@ -44,13 +50,6 @@
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
Expand Down Expand Up @@ -87,13 +88,14 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie

@Override
public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) {
// If index used is global, then records are expected to differ in their partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));

return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
@SuppressWarnings("unchecked") final HoodieRecord reducedRec = rec2.preCombine(rec1);
@SuppressWarnings("unchecked")
final HoodieRecord reducedRec = merge.preCombine(rec1, rec2);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -54,7 +55,7 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie

@Override
public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) {
boolean isIndexingGlobal = index.isGlobal();
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
HoodieKey hoodieKey = record.getKey();
Expand All @@ -65,11 +66,11 @@ public List<HoodieRecord<T>> deduplicateRecords(

return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
@SuppressWarnings("unchecked")
HoodieRecord reducedRec = rec2.preCombine(rec1);
HoodieRecord<T> reducedRecord = merge.preCombine(rec1,rec2);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
return (HoodieRecord<T>) reducedRec.newInstance(rec1.getKey());
return reducedRecord.newInstance(rec1.getKey());
}).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableProjection, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.types.StructType

Expand All @@ -42,6 +42,23 @@ trait HoodieCatalystExpressionUtils {
GenerateUnsafeProjection.generate(targetExprs, attrs)
}

/**
* Generates instance of [[MutableProjection]] projecting row of one [[StructType]] into another [[StructType]]
*
* NOTE: No safety checks are executed to validate that this projection is actually feasible,
* it's up to the caller to make sure that such projection is possible.
*
* NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
* B is a subset of A
*/
def generateMutableProjection(from: StructType, to: StructType): MutableProjection = {
val attrs = from.toAttributes
val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
val targetExprs = to.fields.map(f => attrsMap(f.name))

GenerateMutableProjection.generate(targetExprs, attrs)
}

/**
* Parses and resolves expression against the attributes of the given table schema.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieMerge;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
Expand Down Expand Up @@ -461,15 +463,16 @@ private void testDeduplication(
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
HoodieMerge merge = new HoodieAvroRecordMerge();
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, merge).collectAsList();
assertEquals(1, dedupedRecs.size());
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
assertNodupesWithinPartition(dedupedRecs);

// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, merge).collectAsList();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -765,9 +766,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}
switch (newSchema.getType()) {
case RECORD:
if (!(oldRecord instanceof IndexedRecord)) {
throw new IllegalArgumentException("cannot rewrite record with different type");
}
ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type");
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
List<Schema.Field> fields = newSchema.getFields();
Map<Integer, Object> helper = new HashMap<>();
Expand Down Expand Up @@ -806,9 +805,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}
return newRecord;
case ARRAY:
if (!(oldRecord instanceof Collection)) {
throw new IllegalArgumentException("cannot rewrite record with different type");
}
ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type");
Collection array = (Collection)oldRecord;
List<Object> newArray = new ArrayList();
fieldNames.push("element");
Expand All @@ -818,9 +815,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
fieldNames.pop();
return newArray;
case MAP:
if (!(oldRecord instanceof Map)) {
throw new IllegalArgumentException("cannot rewrite record with different type");
}
ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type");
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>();
fieldNames.push("value");
Expand All @@ -836,7 +831,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}
}

private static String createFullName(Deque<String> fieldNames) {
public static String createFullName(Deque<String> fieldNames) {
String result = "";
if (!fieldNames.isEmpty()) {
List<String> parentNames = new ArrayList<>();
Expand Down Expand Up @@ -971,7 +966,7 @@ private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Sche
}

// convert days to Date
private static java.sql.Date toJavaDate(int days) {
public static java.sql.Date toJavaDate(int days) {
long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY);
int timeZoneOffset;
TimeZone defaultTimeZone = TimeZone.getDefault();
Expand All @@ -984,7 +979,7 @@ private static java.sql.Date toJavaDate(int days) {
}

// convert Date to days
private static int fromJavaDate(Date date) {
public static int fromJavaDate(Date date) {
long millisUtc = date.getTime();
long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc);
int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, MILLIS_PER_DAY));
Expand Down
Loading