diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 40d90d0d0a151..0b8df71f286ca 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -101,14 +101,21 @@ public class HoodieWriteClient extends AbstractHo /** + * Create a wirte client, without cleaning up failed/inflight commits. * + * @param jsc Java Spark Context + * @param clientConfig instance of HoodieWriteConfig */ - public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) throws Exception { + public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) { this(jsc, clientConfig, false); } /** + * Create a wirte client, with new hudi index. * + * @param jsc Java Spark Context + * @param clientConfig instance of HoodieWriteConfig + * @param rollbackPending whether need to cleanup pending commits */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) { this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc)); @@ -119,6 +126,14 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, b this(jsc, clientConfig, rollbackPending, index, Option.empty()); } + /** + * Create a wirte client, allows to specify all parameters. + * + * @param jsc Java Spark Context + * @param clientConfig instance of HoodieWriteConfig + * @param rollbackPending whether need to cleanup pending commits + * @param timelineService Timeline Service that runs as part of write client. + */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index, Option timelineService) { super(jsc, index, clientConfig, timelineService); @@ -127,6 +142,12 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, b this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService); } + /** + * Register hudi classes for Kryo serialization. + * + * @param conf instance of SparkConf + * @return SparkConf + */ public static SparkConf registerClasses(SparkConf conf) { conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); return conf; @@ -148,7 +169,11 @@ public JavaRDD> filterExists(JavaRDD> hoodieReco } /** - * Upserts a bunch of new records into the Hoodie table, at the supplied commitTime. + * Upsert a batch of new records into Hoodie table at the supplied commitTime. + * + * @param records JavaRDD of hoodieRecords to upsert + * @param commitTime Instant time of the commit + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsert(JavaRDD> records, final String commitTime) { HoodieTable table = getTableAndInitCtx(OperationType.UPSERT); @@ -176,7 +201,7 @@ public JavaRDD upsert(JavaRDD> records, final Strin * This implementation requires that the input records are already tagged, and de-duped if needed. * * @param preppedRecords Prepared HoodieRecords to upsert - * @param commitTime Commit Time handle + * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime) { @@ -198,7 +223,7 @@ public JavaRDD upsertPreppedRecords(JavaRDD> preppe * alignment, as with upsert(), by profiling the workload * * @param records HoodieRecords to insert - * @param commitTime Commit Time handle + * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insert(JavaRDD> records, final String commitTime) { @@ -225,7 +250,7 @@ public JavaRDD insert(JavaRDD> records, final Strin * de-duped if needed. * * @param preppedRecords HoodieRecords to insert - * @param commitTime Commit Time handle + * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, final String commitTime) { @@ -248,7 +273,7 @@ public JavaRDD insertPreppedRecords(JavaRDD> preppe * the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)} * * @param records HoodieRecords to insert - * @param commitTime Commit Time handle + * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsert(JavaRDD> records, final String commitTime) { @@ -265,7 +290,7 @@ public JavaRDD bulkInsert(JavaRDD> records, final S * {@link UserDefinedBulkInsertPartitioner}. * * @param records HoodieRecords to insert - * @param commitTime Commit Time handle + * @param commitTime Instant time of the commit * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts @@ -298,7 +323,7 @@ public JavaRDD bulkInsert(JavaRDD> records, final S * {@link UserDefinedBulkInsertPartitioner}. * * @param preppedRecords HoodieRecords to insert - * @param commitTime Commit Time handle + * @param commitTime Instant time of the commit * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts @@ -329,7 +354,7 @@ public JavaRDD delete(JavaRDD keys, final String commitT try { // De-dupe/merge if needed JavaRDD dedupedKeys = - config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, config.getDeleteShuffleParallelism()) : keys; + config.shouldCombineBeforeDelete() ? deduplicateKeys(keys) : keys; JavaRDD> dedupedRecords = dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); @@ -683,7 +708,10 @@ public boolean rollbackToSavepoint(String savepointTime) { /** * Rollback the (inflight/committed) record changes with the given commit time. Three steps: (1) Atomically unpublish * this commit (2) clean indexing data, (3) clean new generated parquet files. (4) Finally delete .commit or .inflight - * file, + * file. + * + * @param commitTime Instant time of the commit + * @return {@code true} If rollback the record changes successfully. {@code false} otherwise */ public boolean rollback(final String commitTime) throws HoodieRollbackException { rollbackInternal(commitTime); @@ -692,9 +720,11 @@ public boolean rollback(final String commitTime) throws HoodieRollbackException /** * NOTE : This action requires all writers (ingest and compact) to a table to be stopped before proceeding. Revert - * the (inflight/committed) record changes for all commits after the provided @param. Three steps: (1) Atomically + * the (inflight/committed) record changes for all commits after the provided @param. Four steps: (1) Atomically * unpublish this commit (2) clean indexing data, (3) clean new generated parquet/log files and/or append rollback to * existing log files. (4) Finally delete .commit, .inflight, .compaction.inflight or .compaction.requested file + * + * @param instantTime Instant time to which restoration is requested */ public void restoreToInstant(final String instantTime) throws HoodieRollbackException { @@ -808,7 +838,7 @@ protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOExcept } /** - * Provides a new commit time for a write operation (insert/update). + * Provides a new commit time for a write operation (insert/update/delete). */ public String startCommit() { // NOTE : Need to ensure that rollback is done before a new commit is started @@ -821,6 +851,11 @@ public String startCommit() { return commitTime; } + /** + * Provides a new commit time for a write operation (insert/update/delete). + * + * @param instantTime Instant time to be generated + */ public void startCommitWithTime(String instantTime) { // NOTE : Need to ensure that rollback is done before a new commit is started if (rollbackPending) { @@ -848,6 +883,8 @@ private void startCommit(String instantTime) { /** * Schedules a new compaction instant. + * + * @param extraMetadata Extra Metadata to be stored */ public Option scheduleCompaction(Option> extraMetadata) throws IOException { String instantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -897,6 +934,7 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option compact(String compactionInstantTime) throws IOException { return compact(compactionInstantTime, config.shouldAutoCommit()); @@ -904,6 +942,10 @@ public JavaRDD compact(String compactionInstantTime) throws IOExcep /** * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file. + * + * @param compactionInstantTime Compaction Instant Time + * @param writeStatuses RDD of WriteStatus to inspect errors and counts + * @param extraMetadata Extra Metadata to be stored */ public void commitCompaction(String compactionInstantTime, JavaRDD writeStatuses, Option> extraMetadata) throws IOException { @@ -928,6 +970,10 @@ public void commitCompaction(String compactionInstantTime, JavaRDD /** * Deduplicate Hoodie records, using the given deduplication function. + * + * @param records hoodieRecords to deduplicate + * @param parallelism parallelism or partitions to be used while reducing/deduplicating + * @return RDD of HoodieRecord already be deduplicated */ JavaRDD> deduplicateRecords(JavaRDD> records, int parallelism) { boolean isIndexingGlobal = getIndex().isGlobal(); @@ -948,8 +994,11 @@ JavaRDD> deduplicateRecords(JavaRDD> records, in /** * Deduplicate Hoodie records, using the given deduplication function. + * + * @param keys RDD of HoodieKey to deduplicate + * @return RDD of HoodieKey already be deduplicated */ - JavaRDD deduplicateKeys(JavaRDD keys, int parallelism) { + JavaRDD deduplicateKeys(JavaRDD keys) { boolean isIndexingGlobal = getIndex().isGlobal(); if (isIndexingGlobal) { return keys.keyBy(HoodieKey::getRecordKey) @@ -973,14 +1022,11 @@ private void rollbackPendingCommits() { } } - /** - * Compaction specific private methods - */ - /** * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time. * * @param compactionInstantTime Compaction Instant Time + * @return RDD of Write Status */ private JavaRDD compact(String compactionInstantTime, boolean autoCommit) throws IOException { // Create a Hoodie table which encapsulated the commits and files visible