Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6ddb4fc
t
Karl-WangSK Sep 11, 2020
21f0d89
Description: feat: We met performance issue with foo module recently,…
Karl-WangSK Sep 11, 2020
c5f47c7
Merge remote-tracking branch 'update_stream/master'
Karl-WangSK Sep 15, 2020
835fce8
feat: update preCombine
Karl-WangSK Sep 20, 2020
b548678
add test
Karl-WangSK Sep 20, 2020
e6efb8c
update
Karl-WangSK Sep 22, 2020
f9e3df9
preCombine all HoodieRecords and update all fields according to order…
Karl-WangSK Sep 23, 2020
ec6e8a2
update
Karl-WangSK Oct 1, 2020
a9d4831
update
Karl-WangSK Oct 1, 2020
de06b54
Merge branch 'master' into HUDI-1284
Karl-WangSK Oct 1, 2020
bf5f22b
update
Karl-WangSK Oct 1, 2020
ac7940d
Merge branch 'HUDI-1284' of github.com:Karl-WangSK/hudi into HUDI-1284
Karl-WangSK Oct 1, 2020
54e2a08
update
Karl-WangSK Oct 1, 2020
11323b2
Update DataSourceUtils.java
Karl-WangSK Oct 1, 2020
94e9c5c
update
Karl-WangSK Oct 3, 2020
b574874
Merge branch 'master' into HUDI-1284
Karl-WangSK Oct 10, 2020
f678964
Update AbstractWriteHelper.java
Karl-WangSK Oct 10, 2020
cca47ce
update
Karl-WangSK Oct 13, 2020
2e3e799
update
Karl-WangSK Oct 13, 2020
a6464f6
update
Karl-WangSK Oct 13, 2020
02a2dfc
update
Karl-WangSK Oct 14, 2020
f142420
update
Karl-WangSK Oct 22, 2020
1382b04
update
Karl-WangSK Oct 22, 2020
cf1de28
update
Karl-WangSK Oct 23, 2020
a75f293
add serializableSchema
Karl-WangSK Nov 11, 2020
c855aef
Merge branch 'master' into HUDI-1284
Karl-WangSK Nov 11, 2020
d8959f6
update
Karl-WangSK Nov 11, 2020
10aa16c
Merge branch 'HUDI-1284' of github.com:Karl-WangSK/hudi into HUDI-1284
Karl-WangSK Nov 11, 2020
427d0f3
update
Karl-WangSK Nov 11, 2020
bf9446f
update
Karl-WangSK Nov 11, 2020
d44f9e6
update
Karl-WangSK Nov 11, 2020
2c8c08d
update
Karl-WangSK Nov 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
Expand Down Expand Up @@ -259,7 +260,7 @@ protected void rollBackInflightBootstrap() {
* @param instantTime Instant time of the commit
* @return WriteStatus to inspect errors and counts
*/
public abstract O upsert(I records, final String instantTime);
public abstract O upsert(I records, final String instantTime, Schema schema);

/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private synchronized FileSystemViewManager getViewManager() {
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
I records);
I records, Schema schema);

/**
* Insert a batch of new records into Hoodie table at the supplied instantTime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
Expand All @@ -39,10 +40,23 @@ public HoodieWriteMetadata<O> write(String instantTime,
int shuffleParallelism,
BaseCommitActionExecutor<T, I, K, O, R> executor,
boolean performTagging) {
return write(instantTime, inputRecords, context, table, shouldCombine, shuffleParallelism,
null, executor, performTagging);
}

public HoodieWriteMetadata<O> write(String instantTime,
I inputRecordsRDD,
HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
boolean shouldCombine,
int shuffleParallelism,
SerializableSchema schema,
BaseCommitActionExecutor<T, I, K, O, R> executor,
boolean performTagging) {
try {
// De-dupe/merge if needed
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
combineOnCondition(shouldCombine, inputRecordsRDD, shuffleParallelism, table, schema);

Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
Expand Down Expand Up @@ -70,8 +84,8 @@ private I tag(
}

public I combineOnCondition(
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
return condition ? deduplicateRecords(records, table, parallelism) : records;
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table, SerializableSchema schema) {
return condition ? deduplicateRecords(records, table, parallelism, schema) : records;
}

/**
Expand All @@ -82,10 +96,10 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism);
I records, HoodieTable<T, I, K, O> table, int parallelism, SerializableSchema schema) {
return deduplicateRecords(records, table.getIndex(), parallelism, schema);
}

public abstract I deduplicateRecords(
I records, HoodieIndex<T, I, K, O> index, int parallelism);
I records, HoodieIndex<T, I, K, O> index, int parallelism, SerializableSchema schema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client;

import org.apache.avro.Schema;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
Expand Down Expand Up @@ -128,14 +129,18 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).bootstrap(context, extraMetadata);
}

@Override
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
return upsert(records, instantTime, null);
}

@Override
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime, Schema schema) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records, schema);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.avro.Schema;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
Expand Down Expand Up @@ -84,8 +85,9 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
}

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime,
JavaRDD<HoodieRecord<T>> records, Schema schema) {
return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records, schema).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.avro.Schema;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
Expand Down Expand Up @@ -78,8 +79,10 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
}

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime,
JavaRDD<HoodieRecord<T>> records, Schema schema) {
return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime,
records, schema).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(JavaRDD<HoodieRecord

if (performDedupe) {
dedupedRecords = (JavaRDD<HoodieRecord<T>>) SparkWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
config.getBulkInsertShuffleParallelism(), table);
config.getBulkInsertShuffleParallelism(), table, null);
}

final JavaRDD<HoodieRecord<T>> repartitionedRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.hudi.table.action.commit;

import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
Expand All @@ -33,17 +35,21 @@ public class SparkUpsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {

private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private SerializableSchema schema;

public SparkUpsertCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Schema schema) {
super(context, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
this.schema = new SerializableSchema(schema);
}

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),
schema, this, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.UpdatePrecombineAvroPayload;
import org.apache.hudi.index.HoodieIndex;

import org.apache.spark.api.java.JavaRDD;
Expand All @@ -33,8 +35,8 @@
*
* @param <T>
*/
public class SparkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractWriteHelper<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
public class SparkWriteHelper<T extends HoodieRecordPayload, R> extends AbstractWriteHelper<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
private SparkWriteHelper() {
}

Expand All @@ -49,7 +51,7 @@ public static SparkWriteHelper newInstance() {
@Override
public JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records,
HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> index,
int parallelism) {
int parallelism, SerializableSchema schema) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
Expand All @@ -58,7 +60,14 @@ public JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> reco
return new Tuple2<>(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
T reducedData;
//To prevent every records from parsing schema
if (rec2.getData() instanceof UpdatePrecombineAvroPayload) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this prevent old payload impl from calling preCombine(_, schema) ?

reducedData = schema.getSchema() != null ? (T) rec1.getData().preCombine(rec2.getData(), schema.getSchema())
: (T) rec1.getData().preCombine(rec2.getData());
} else {
reducedData = (T) rec1.getData().preCombine(rec2.getData());
}
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.hudi.table.action.deltacommit;

import org.apache.avro.Schema;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
Expand All @@ -33,17 +35,21 @@ public class SparkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<
extends AbstractSparkDeltaCommitActionExecutor<T> {

private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private SerializableSchema schema;

public SparkUpsertDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Schema schema) {
super(context, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
this.schema = new SerializableSchema(schema);
}

@Override
public HoodieWriteMetadata execute() {
return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, true);
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),
schema, this, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,15 @@ private void testDeduplication(
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1,
null).collect();
assertEquals(1, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);

// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1, null).collect();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public void testFileSizeUpsertRecords() throws Exception {

// Insert new records
BaseSparkCommitActionExecutor actionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
instantTime, jsc.parallelize(records));
instantTime, jsc.parallelize(records), null);
jsc.parallelize(Arrays.asList(1))
.map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator()))
.map(Transformations::flatten).collect();
Expand Down Expand Up @@ -426,7 +426,7 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
String partitionPath = writeStatus.getPartitionPath();
long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
instantTime, jsc.parallelize(updates));
instantTime, jsc.parallelize(updates), null);
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
}).map(Transformations::flatten).collect();
Expand Down
Loading