Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 63 additions & 17 deletions hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,21 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo


/**
* Create a wirte client, without cleaning up failed/inflight commits.
*
* @param jsc Java Spark Context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible add a line about instantiation and what this constructor is used for compared to other constructors.

* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) throws Exception {
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
this(jsc, clientConfig, false);
}

/**
* Create a wirte client, with new hudi index.
*
* @param jsc Java Spark Context
* @param clientConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) {
this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
Expand All @@ -119,6 +126,14 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, b
this(jsc, clientConfig, rollbackPending, index, Option.empty());
}

/**
* Create a wirte client, allows to specify all parameters.
*
* @param jsc Java Spark Context
* @param clientConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
* @param timelineService Timeline Service that runs as part of write client.
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending,
HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
super(jsc, index, clientConfig, timelineService);
Expand All @@ -127,6 +142,12 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, b
this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService);
}

/**
* Register hudi classes for Kryo serialization.
*
* @param conf instance of SparkConf
* @return SparkConf
*/
public static SparkConf registerClasses(SparkConf conf) {
conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
return conf;
Expand All @@ -148,7 +169,11 @@ public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieReco
}

/**
* Upserts a bunch of new records into the Hoodie table, at the supplied commitTime.
* Upsert a batch of new records into Hoodie table at the supplied commitTime.
*
* @param records JavaRDD of hoodieRecords to upsert
* @param commitTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT);
Expand Down Expand Up @@ -176,7 +201,7 @@ public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final Strin
* This implementation requires that the input records are already tagged, and de-duped if needed.
*
* @param preppedRecords Prepared HoodieRecords to upsert
* @param commitTime Commit Time handle
* @param commitTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) {
Expand All @@ -198,7 +223,7 @@ public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
* alignment, as with upsert(), by profiling the workload
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @param commitTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
Expand All @@ -225,7 +250,7 @@ public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final Strin
* de-duped if needed.
*
* @param preppedRecords HoodieRecords to insert
* @param commitTime Commit Time handle
* @param commitTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) {
Expand All @@ -248,7 +273,7 @@ public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
* the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @param commitTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
Expand All @@ -265,7 +290,7 @@ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final S
* {@link UserDefinedBulkInsertPartitioner}.
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @param commitTime Instant time of the commit
* @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted
* into hoodie.
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
Expand Down Expand Up @@ -298,7 +323,7 @@ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final S
* {@link UserDefinedBulkInsertPartitioner}.
*
* @param preppedRecords HoodieRecords to insert
* @param commitTime Commit Time handle
* @param commitTime Instant time of the commit
* @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted
* into hoodie.
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
Expand Down Expand Up @@ -329,7 +354,7 @@ public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitT
try {
// De-dupe/merge if needed
JavaRDD<HoodieKey> dedupedKeys =
config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, config.getDeleteShuffleParallelism()) : keys;
config.shouldCombineBeforeDelete() ? deduplicateKeys(keys) : keys;

JavaRDD<HoodieRecord<T>> dedupedRecords =
dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
Expand Down Expand Up @@ -683,7 +708,10 @@ public boolean rollbackToSavepoint(String savepointTime) {
/**
* Rollback the (inflight/committed) record changes with the given commit time. Three steps: (1) Atomically unpublish
* this commit (2) clean indexing data, (3) clean new generated parquet files. (4) Finally delete .commit or .inflight
* file,
* file.
*
* @param commitTime Instant time of the commit
* @return {@code true} If rollback the record changes successfully. {@code false} otherwise
*/
public boolean rollback(final String commitTime) throws HoodieRollbackException {
rollbackInternal(commitTime);
Expand All @@ -692,9 +720,11 @@ public boolean rollback(final String commitTime) throws HoodieRollbackException

/**
* NOTE : This action requires all writers (ingest and compact) to a table to be stopped before proceeding. Revert
* the (inflight/committed) record changes for all commits after the provided @param. Three steps: (1) Atomically
* the (inflight/committed) record changes for all commits after the provided @param. Four steps: (1) Atomically
* unpublish this commit (2) clean indexing data, (3) clean new generated parquet/log files and/or append rollback to
* existing log files. (4) Finally delete .commit, .inflight, .compaction.inflight or .compaction.requested file
*
* @param instantTime Instant time to which restoration is requested
*/
public void restoreToInstant(final String instantTime) throws HoodieRollbackException {

Expand Down Expand Up @@ -808,7 +838,7 @@ protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOExcept
}

/**
* Provides a new commit time for a write operation (insert/update).
* Provides a new commit time for a write operation (insert/update/delete).
*/
public String startCommit() {
// NOTE : Need to ensure that rollback is done before a new commit is started
Expand All @@ -821,6 +851,11 @@ public String startCommit() {
return commitTime;
}

/**
* Provides a new commit time for a write operation (insert/update/delete).
*
* @param instantTime Instant time to be generated
*/
public void startCommitWithTime(String instantTime) {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackPending) {
Expand Down Expand Up @@ -848,6 +883,8 @@ private void startCommit(String instantTime) {

/**
* Schedules a new compaction instant.
*
* @param extraMetadata Extra Metadata to be stored
*/
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
Expand Down Expand Up @@ -897,13 +934,18 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
* Performs Compaction for the workload stored in instant-time.
*
* @param compactionInstantTime Compaction Instant Time
* @return RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> compact(String compactionInstantTime) throws IOException {
return compact(compactionInstantTime, config.shouldAutoCommit());
}

/**
* Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file.
*
* @param compactionInstantTime Compaction Instant Time
* @param writeStatuses RDD of WriteStatus to inspect errors and counts
* @param extraMetadata Extra Metadata to be stored
*/
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) throws IOException {
Expand All @@ -928,6 +970,10 @@ public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus>

/**
* Deduplicate Hoodie records, using the given deduplication function.
*
* @param records hoodieRecords to deduplicate
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return RDD of HoodieRecord already be deduplicated
*/
JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) {
boolean isIndexingGlobal = getIndex().isGlobal();
Expand All @@ -948,8 +994,11 @@ JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, in

/**
* Deduplicate Hoodie records, using the given deduplication function.
*
* @param keys RDD of HoodieKey to deduplicate
* @return RDD of HoodieKey already be deduplicated
*/
JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, int parallelism) {
JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys) {
boolean isIndexingGlobal = getIndex().isGlobal();
if (isIndexingGlobal) {
return keys.keyBy(HoodieKey::getRecordKey)
Expand All @@ -973,14 +1022,11 @@ private void rollbackPendingCommits() {
}
}

/**
* Compaction specific private methods
*/

/**
* Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
*
* @param compactionInstantTime Compaction Instant Time
* @return RDD of Write Status
*/
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
// Create a Hoodie table which encapsulated the commits and files visible
Expand Down