Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6ddb4fc
t
Karl-WangSK Sep 11, 2020
21f0d89
Description: feat: We met performance issue with foo module recently,…
Karl-WangSK Sep 11, 2020
c5f47c7
Merge remote-tracking branch 'update_stream/master'
Karl-WangSK Sep 15, 2020
835fce8
feat: update preCombine
Karl-WangSK Sep 20, 2020
b548678
add test
Karl-WangSK Sep 20, 2020
e6efb8c
update
Karl-WangSK Sep 22, 2020
f9e3df9
preCombine all HoodieRecords and update all fields according to order…
Karl-WangSK Sep 23, 2020
ec6e8a2
update
Karl-WangSK Oct 1, 2020
a9d4831
update
Karl-WangSK Oct 1, 2020
de06b54
Merge branch 'master' into HUDI-1284
Karl-WangSK Oct 1, 2020
bf5f22b
update
Karl-WangSK Oct 1, 2020
ac7940d
Merge branch 'HUDI-1284' of github.com:Karl-WangSK/hudi into HUDI-1284
Karl-WangSK Oct 1, 2020
54e2a08
update
Karl-WangSK Oct 1, 2020
11323b2
Update DataSourceUtils.java
Karl-WangSK Oct 1, 2020
94e9c5c
update
Karl-WangSK Oct 3, 2020
b574874
Merge branch 'master' into HUDI-1284
Karl-WangSK Oct 10, 2020
f678964
Update AbstractWriteHelper.java
Karl-WangSK Oct 10, 2020
cca47ce
update
Karl-WangSK Oct 13, 2020
2e3e799
update
Karl-WangSK Oct 13, 2020
a6464f6
update
Karl-WangSK Oct 13, 2020
02a2dfc
update
Karl-WangSK Oct 14, 2020
f142420
update
Karl-WangSK Oct 22, 2020
1382b04
update
Karl-WangSK Oct 22, 2020
cf1de28
update
Karl-WangSK Oct 23, 2020
a75f293
add serializableSchema
Karl-WangSK Nov 11, 2020
c855aef
Merge branch 'master' into HUDI-1284
Karl-WangSK Nov 11, 2020
d8959f6
update
Karl-WangSK Nov 11, 2020
10aa16c
Merge branch 'HUDI-1284' of github.com:Karl-WangSK/hudi into HUDI-1284
Karl-WangSK Nov 11, 2020
427d0f3
update
Karl-WangSK Nov 11, 2020
bf9446f
update
Karl-WangSK Nov 11, 2020
d44f9e6
update
Karl-WangSK Nov 11, 2020
2c8c08d
update
Karl-WangSK Nov 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,15 @@ protected void rollBackInflightBootstrap() {
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
return upsert(records, instantTime, null);
}

public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String instantTime, String schema) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.upsert(jsc, instantTime, records);
HoodieWriteMetadata result = table.upsert(jsc, instantTime, records, schema);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

public class BootstrapRecordPayload implements HoodieRecordPayload<BootstrapRecordPayload> {

private final GenericRecord record;
Expand All @@ -38,6 +40,11 @@ public BootstrapRecordPayload preCombine(BootstrapRecordPayload another) {
return this;
}

@Override
public BootstrapRecordPayload preCombine(BootstrapRecordPayload another, Schema schema) throws IOException {
return this;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) {
return Option.ofNullable(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
public static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete";
public static final String DEFAULT_COMBINE_BEFORE_DELETE = "true";
public static final String COMBINE_ALL_FIELDS_BEFORE_UPSERT_PROP = "hoodie.combine.all.fields.before.upsert";
public static final String DEFAULT_COMBINE_ALL_FIELDS_BEFORE_UPSERT = "false";
public static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level";
public static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
public static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
Expand Down Expand Up @@ -236,6 +238,10 @@ public boolean shouldCombineBeforeDelete() {
return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP));
}

public boolean shouldCombineAllFieldsBeforeUpsert() {
return Boolean.parseBoolean(props.getProperty(COMBINE_ALL_FIELDS_BEFORE_UPSERT_PROP));
}

public boolean shouldAllowMultiWriteOnSameInstant() {
return Boolean.parseBoolean(props.getProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT));
}
Expand Down Expand Up @@ -987,6 +993,8 @@ protected void setDefaults() {
DEFAULT_COMBINE_BEFORE_UPSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP,
DEFAULT_COMBINE_BEFORE_DELETE);
setDefaultOnCondition(props, !props.containsKey(COMBINE_ALL_FIELDS_BEFORE_UPSERT_PROP),
COMBINE_ALL_FIELDS_BEFORE_UPSERT_PROP, DEFAULT_COMBINE_ALL_FIELDS_BEFORE_UPSERT);
setDefaultOnCondition(props, !props.containsKey(ALLOW_MULTI_WRITE_ON_SAME_INSTANT),
ALLOW_MULTI_WRITE_ON_SAME_INSTANT, DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT);
setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ public HoodieCopyOnWriteTable(HoodieWriteConfig config, Configuration hadoopConf
}

@Override
public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new UpsertCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
String schema) {
return new UpsertCommitActionExecutor<>(jsc, config, this, instantTime, records, schema).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}

@Override
public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new UpsertDeltaCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
public HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records,
String schema) {
return new UpsertDeltaCommitActionExecutor<>(jsc, config, this, instantTime, records, schema).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieTableM
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata upsert(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> records);
JavaRDD<HoodieRecord<T>> records, String schema);

/**
* Insert a batch of new records into Hoodie table at the supplied instantTime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata bulkInsert(

if (performDedupe) {
dedupedRecords = WriteHelper.combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
config.getBulkInsertShuffleParallelism(), ((HoodieTable<T>)table));
config.getBulkInsertShuffleParallelism(), ((HoodieTable<T>)table), false, null);
}

final JavaRDD<HoodieRecord<T>> repartitionedRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,28 @@ public class UpsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends CommitActionExecutor<T> {

private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private String schema;

public UpsertCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
String schema) {
super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
this.schema = schema;
}

public UpsertCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
}

@Override
public HoodieWriteMetadata execute() {
return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>)table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),
config.shouldCombineAllFieldsBeforeUpsert(), schema, this, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.commit;

import org.apache.avro.Schema;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand All @@ -38,11 +39,23 @@ public class WriteHelper<T extends HoodieRecordPayload<T>> {
public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata write(String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
HoodieTable<T> table, boolean shouldCombine,
int shuffleParallelism, CommitActionExecutor<T> executor, boolean performTagging) {
int shuffleParallelism, CommitActionExecutor<T> executor,
boolean performTagging) {
return write(instantTime, inputRecordsRDD, jsc, table, shouldCombine, shuffleParallelism,false,
null, executor, performTagging);
}

public static <T extends HoodieRecordPayload<T>> HoodieWriteMetadata write(String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD, JavaSparkContext jsc,
HoodieTable<T> table, boolean shouldCombine,
int shuffleParallelism,boolean precombineAgg,
String schema, CommitActionExecutor<T> executor,
boolean performTagging) {
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(shouldCombine, inputRecordsRDD, shuffleParallelism, table);
combineOnCondition(shouldCombine, inputRecordsRDD, shuffleParallelism, table, precombineAgg, schema);


Instant lookupBegin = Instant.now();
JavaRDD<HoodieRecord<T>> taggedRecords = dedupedRecords;
Expand Down Expand Up @@ -70,8 +83,9 @@ private static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> tag(
}

public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> combineOnCondition(
boolean condition, JavaRDD<HoodieRecord<T>> records, int parallelism, HoodieTable<T> table) {
return condition ? deduplicateRecords(records, table, parallelism) : records;
boolean condition, JavaRDD<HoodieRecord<T>> records, int parallelism, HoodieTable<T> table,
boolean precombineAgg, String schema) {
return condition ? deduplicateRecords(records, table, parallelism, precombineAgg, schema) : records;
}

/**
Expand All @@ -82,12 +96,14 @@ public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> combin
* @return RDD of HoodieRecord already be deduplicated
*/
public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
JavaRDD<HoodieRecord<T>> records, HoodieTable<T> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism);
JavaRDD<HoodieRecord<T>> records, HoodieTable<T> table, int parallelism, boolean precombineAgg,
String schema) {
return deduplicateRecords(records, table.getIndex(), parallelism, precombineAgg, schema);
}

public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> deduplicateRecords(
JavaRDD<HoodieRecord<T>> records, HoodieIndex<T> index, int parallelism) {
JavaRDD<HoodieRecord<T>> records, HoodieIndex<T> index, int parallelism, boolean precombineAgg,
String schema) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
Expand All @@ -96,7 +112,8 @@ public static <T extends HoodieRecordPayload<T>> JavaRDD<HoodieRecord<T>> dedupl
return new Tuple2<>(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
T reducedData = precombineAgg && schema != null ? (T) rec1.getData().preCombine(rec2.getData(),new Schema.Parser().parse(schema))
: (T) rec1.getData().preCombine(rec2.getData());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,28 @@ public class UpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends DeltaCommitActionExecutor<T> {

private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private String schema;

public UpsertDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
String schema) {
super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
this.schema = schema;
}

public UpsertDeltaCommitActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(jsc, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
}

@Override
public HoodieWriteMetadata execute() {
return WriteHelper.write(instantTime, inputRecordsRDD, jsc, (HoodieTable<T>) table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, true);
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),
config.shouldCombineAllFieldsBeforeUpsert(),schema, this, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ private void testDeduplication(
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = WriteHelper.deduplicateRecords(records, index, 1).collect();
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = WriteHelper.deduplicateRecords(records, index, 1,
false, null).collect();
assertEquals(1, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);

// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
dedupedRecs = WriteHelper.deduplicateRecords(records, index, 1).collect();
dedupedRecs = WriteHelper.deduplicateRecords(records, index, 1, false, null).collect();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public HoodieJsonPayload preCombine(HoodieJsonPayload another) {
return this;
}

@Override
public HoodieJsonPayload preCombine(HoodieJsonPayload another, Schema schema) throws IOException {
return this;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException {
return getInsertValue(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public abstract class BaseAvroPayload implements Serializable {
/**
* Avro data extracted from the source converted to bytes.
*/
public final byte[] recordBytes;
public byte[] recordBytes;

/**
* For purposes of preCombining.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

/**
* Empty payload used for deletions.
*/
Expand All @@ -40,6 +42,11 @@ public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload another) {
return another;
}

@Override
public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload another, Schema schema) throws IOException {
return another;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) {
return Option.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public HoodieAvroPayload preCombine(HoodieAvroPayload another) {
return this;
}

@Override
public HoodieAvroPayload preCombine(HoodieAvroPayload another, Schema schema) throws IOException {
return this;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return getInsertValue(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Seri
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
T preCombine(T another);

/**
* When more than one HoodieRecord have the same HoodieKey, this function combines all fields(which is not null)
* before attempting to insert/upsert (if combining turned on in HoodieClientConfig).
* eg: 1)
* Before:
* id name age ts
* 1 Karl null 0.0
* 1 null 18 0.0
* After:
* id name age ts
* 1 Karl 18 0.0
*
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
T preCombine(T another, Schema schema) throws IOException;

/**
* This methods lets you write custom merging/combining logic to produce new values as a function of current value on
* storage and whats contained in this object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.List;

/**
* Default payload used for delta streamer.
Expand All @@ -47,6 +48,26 @@ public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order
}

@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another,Schema schema) throws IOException {
// pick the payload with greatest ordering value and aggregate all the fields,choosing the
// value that is not null
GenericRecord thisValue = (GenericRecord) HoodieAvroUtils.bytesToAvro(this.recordBytes, schema);
GenericRecord anotherValue = (GenericRecord) HoodieAvroUtils.bytesToAvro(another.recordBytes,schema);
List<Schema.Field> fields = schema.getFields();

if (another.orderingVal.compareTo(orderingVal) > 0) {
GenericRecord anotherRoc = combineAllFields(fields,anotherValue,thisValue);
another.recordBytes = HoodieAvroUtils.avroToBytes(anotherRoc);
return another;
} else {
GenericRecord thisRoc = combineAllFields(fields,thisValue,anotherValue);
this.recordBytes = HoodieAvroUtils.avroToBytes(thisRoc);
return this;
}

}

@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) {
// pick the payload with greatest ordering value
Expand All @@ -57,6 +78,18 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
}
}

public GenericRecord combineAllFields(List<Schema.Field> fields,GenericRecord priorRec,GenericRecord inferiorRoc) {
for (int i = 0; i < fields.size(); i++) {
Object priorValue = priorRec.get(fields.get(i).name());
Object inferiorValue = inferiorRoc.get(fields.get(i).name());
Object defaultVal = fields.get(i).defaultVal();
if (overwriteField(priorValue,defaultVal) && !overwriteField(inferiorValue,defaultVal)) {
priorRec.put(fields.get(i).name(), inferiorValue);
}
}
return priorRec;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return getInsertValue(schema);
Expand Down
Loading