Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import static org.apache.hudi.common.table.HoodieTimeline.COMPACTION_ACTION;

/**
* Client to perform admin operations related to compaction
* Client to perform admin operations related to compaction.
*/
public class CompactionAdminClient extends AbstractHoodieClient {

Expand Down Expand Up @@ -214,7 +214,7 @@ public List<RenameOpResult> repairCompaction(String compactionInstant, int paral
}

/**
* Construction Compaction Plan from compaction instant
* Construction Compaction Plan from compaction instant.
*/
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant)
throws IOException {
Expand Down Expand Up @@ -273,7 +273,7 @@ protected static void renameLogFile(HoodieTableMetaClient metaClient, HoodieLogF
}

/**
* Check if a compaction operation is valid
* Check if a compaction operation is valid.
*
* @param metaClient Hoodie Table Meta client
* @param compactionInstant Compaction Instant
Expand Down Expand Up @@ -342,7 +342,7 @@ private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient met
}

/**
* Execute Renaming operation
* Execute Renaming operation.
*
* @param metaClient HoodieTable MetaClient
* @param renameActions List of rename operations
Expand Down Expand Up @@ -484,7 +484,7 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
}

/**
* Holds Operation result for Renaming
* Holds Operation result for Renaming.
*/
public static class RenameOpResult extends OperationResult<RenameInfo> {

Expand All @@ -505,7 +505,7 @@ public RenameOpResult(Pair<HoodieLogFile, HoodieLogFile> op, boolean executed, b
}

/**
* Holds Operation result for Renaming
* Holds Operation result for Renaming.
*/
public static class ValidationOpResult extends OperationResult<CompactionOperation> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOExcept
}

/**
* Creates a Cleaner plan if there are files to be cleaned and stores them in instant file
* Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
*
* @param startCleanTime Cleaner Instant Time
* @return Cleaner Plan if generated
Expand Down Expand Up @@ -133,7 +133,7 @@ protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
}

/**
* Executes the Cleaner plan stored in the instant metadata
* Executes the Cleaner plan stored in the instant metadata.
*
* @param table Hoodie Table
* @param cleanInstantTs Cleaner Instant Timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private Option<String> convertToDataFilePath(Option<Pair<String, String>> partit
}

/**
* Given a bunch of hoodie keys, fetches all the individual records out as a data frame
* Given a bunch of hoodie keys, fetches all the individual records out as a data frame.
*
* @return a dataframe
*/
Expand Down
24 changes: 12 additions & 12 deletions hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieReco
}

/**
* Upserts a bunch of new records into the Hoodie table, at the supplied commitTime
* Upserts a bunch of new records into the Hoodie table, at the supplied commitTime.
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT);
Expand Down Expand Up @@ -505,14 +505,14 @@ record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.ge
}

/**
* Commit changes performed at the given commitTime marker
* Commit changes performed at the given commitTime marker.
*/
public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses) {
return commit(commitTime, writeStatuses, Option.empty());
}

/**
* Commit changes performed at the given commitTime marker
* Commit changes performed at the given commitTime marker.
*/
public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) {
Expand Down Expand Up @@ -988,7 +988,7 @@ protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOExcept
}

/**
* Provides a new commit time for a write operation (insert/update)
* Provides a new commit time for a write operation (insert/update).
*/
public String startCommit() {
// NOTE : Need to ensure that rollback is done before a new commit is started
Expand Down Expand Up @@ -1027,7 +1027,7 @@ private void startCommit(String instantTime) {
}

/**
* Schedules a new compaction instant
* Schedules a new compaction instant.
*/
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
String instantTime = HoodieActiveTimeline.createNewCommitTime();
Expand All @@ -1037,7 +1037,7 @@ public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetada
}

/**
* Schedules a new compaction instant with passed-in instant time
* Schedules a new compaction instant with passed-in instant time.
*
* @param instantTime Compaction Instant Time
* @param extraMetadata Extra Metadata to be stored
Expand Down Expand Up @@ -1074,7 +1074,7 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
}

/**
* Performs Compaction for the workload stored in instant-time
* Performs Compaction for the workload stored in instant-time.
*
* @param compactionInstantTime Compaction Instant Time
*/
Expand Down Expand Up @@ -1141,7 +1141,7 @@ JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, int parallelism) {
}

/**
* Cleanup all inflight commits
* Cleanup all inflight commits.
*/
private void rollbackInflightCommits() {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
Expand Down Expand Up @@ -1197,7 +1197,7 @@ private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) {
*/

/**
* Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time
* Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time.
*
* @param compactionInstantTime Compaction Instant Time
*/
Expand Down Expand Up @@ -1226,7 +1226,7 @@ private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoC
}

/**
* Perform compaction operations as specified in the compaction commit file
* Perform compaction operations as specified in the compaction commit file.
*
* @param compactionInstant Compacton Instant time
* @param activeTimeline Active Timeline
Expand Down Expand Up @@ -1254,7 +1254,7 @@ private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, Hood
}

/**
* Commit Compaction and track metrics
* Commit Compaction and track metrics.
*
* @param compactedStatuses Compaction Write status
* @param table Hoodie Table
Expand Down Expand Up @@ -1404,7 +1404,7 @@ private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetada
}

/**
* Refers to different operation types
* Refers to different operation types.
*/
enum OperationType {
INSERT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.io.IOException;

/**
* Timeline Service that runs as part of write client
* Timeline Service that runs as part of write client.
*/
public class EmbeddedTimelineService {

Expand Down Expand Up @@ -86,7 +86,7 @@ private void setHostAddrFromSparkConf(SparkConf sparkConf) {
}

/**
* Retrieves proper view storage configs for remote clients to access this service
* Retrieves proper view storage configs for remote clients to access this service.
*/
public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
return FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class ClientUtils {

/**
* Create Consistency Aware MetaClient
* Create Consistency Aware MetaClient.
*
* @param jsc JavaSparkContext
* @param config HoodieWriteConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Properties;

/**
* Compaction related config
* Compaction related config.
*/
@Immutable
public class HoodieCompactionConfig extends DefaultHoodieConfig {
Expand All @@ -55,8 +55,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
// By default, treat any file <= 100MB as a small file.
public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(104857600);
/**
* Configs related to specific table types
**/
* Configs related to specific table types.
*/
// Number of inserts, that will be put each partition/bucket for writing
public static final String COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE = "hoodie.copyonwrite.insert" + ".split.size";
// The rationale to pick the insert parallelism is the following. Writing out 100MB files,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
public static final String HBASE_ZK_ZNODEPARENT = "hoodie.index.hbase.zknode.path";
/**
* Note that if HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP is set to true, this batch size will not be honored for HBase
* Puts
* Puts.
*/
public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";

/**
* Property to set which implementation of HBase QPS resource allocator to be used
* Property to set which implementation of HBase QPS resource allocator to be used.
*/
public static final String HBASE_INDEX_QPS_ALLOCATOR_CLASS = "hoodie.index.hbase.qps.allocator.class";
public static final String DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS = DefaultHBaseQPSResourceAllocator.class.getName();
/**
* Property to set to enable auto computation of put batch size
* Property to set to enable auto computation of put batch size.
*/
public static final String HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP = "hoodie.index.hbase.put.batch.size.autocompute";
public static final String DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE = "false";
Expand All @@ -62,33 +62,33 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
*/
public static String HBASE_MAX_QPS_PER_REGION_SERVER_PROP = "hoodie.index.hbase.max.qps.per.region.server";
/**
* Default batch size, used only for Get, but computed for Put
* Default batch size, used only for Get, but computed for Put.
*/
public static final int DEFAULT_HBASE_BATCH_SIZE = 100;
/**
* A low default value.
*/
public static final int DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER = 1000;
/**
* Default is 50%, which means a total of 2 jobs can run using HbaseIndex without overwhelming Region Servers
* Default is 50%, which means a total of 2 jobs can run using HbaseIndex without overwhelming Region Servers.
*/
public static final float DEFAULT_HBASE_QPS_FRACTION = 0.5f;

/**
* Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on volume
* Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on volume.
*/
public static final String HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = "hoodie.index.hbase.dynamic_qps";
public static final boolean DEFAULT_HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = false;
/**
* Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads
* Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads.
*/
public static final String HBASE_MIN_QPS_FRACTION_PROP = "hoodie.index.hbase.min.qps.fraction";
public static final String DEFAULT_HBASE_MIN_QPS_FRACTION_PROP = "0.002";

public static final String HBASE_MAX_QPS_FRACTION_PROP = "hoodie.index.hbase.max.qps.fraction";
public static final String DEFAULT_HBASE_MAX_QPS_FRACTION_PROP = "0.06";
/**
* Hoodie index desired puts operation time in seconds
* Hoodie index desired puts operation time in seconds.
*/
public static final String HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = "hoodie.index.hbase.desired_puts_time_in_secs";
public static final int DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = 600;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


/**
* Indexing related config
* Indexing related config.
*/
@Immutable
public class HoodieIndexConfig extends DefaultHoodieConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Properties;

/**
* Memory related config
* Memory related config.
*/
@Immutable
public class HoodieMemoryConfig extends DefaultHoodieConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Properties;

/**
* Storage related config
* Storage related config.
*/
@Immutable
public class HoodieStorageConfig extends DefaultHoodieConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.Properties;

/**
* Class storing configs for the {@link HoodieWriteClient}
* Class storing configs for the {@link HoodieWriteClient}.
*/
@Immutable
public class HoodieWriteConfig extends DefaultHoodieConfig {
Expand Down Expand Up @@ -115,8 +115,8 @@ public static HoodieWriteConfig.Builder newBuilder() {
}

/**
* base properties
**/
* base properties.
*/
public String getBasePath() {
return props.getProperty(BASE_PATH_PROP);
}
Expand Down Expand Up @@ -210,8 +210,8 @@ public int getMaxConsistencyCheckIntervalMs() {
}

/**
* compaction properties
**/
* compaction properties.
*/
public HoodieCleaningPolicy getCleanerPolicy() {
return HoodieCleaningPolicy.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_POLICY_PROP));
}
Expand Down Expand Up @@ -297,8 +297,8 @@ public int getCommitArchivalBatchSize() {
}

/**
* index properties
**/
* index properties.
*/
public HoodieIndex.IndexType getIndexType() {
return HoodieIndex.IndexType.valueOf(props.getProperty(HoodieIndexConfig.INDEX_TYPE_PROP));
}
Expand Down Expand Up @@ -417,8 +417,8 @@ public StorageLevel getBloomIndexInputStorageLevel() {
}

/**
* storage properties
**/
* storage properties.
*/
public long getParquetMaxFileSize() {
return Long.parseLong(props.getProperty(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES));
}
Expand Down Expand Up @@ -452,8 +452,8 @@ public double getLogFileToParquetCompressionRatio() {
}

/**
* metrics properties
**/
* metrics properties.
*/
public boolean isMetricsOn() {
return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.METRICS_ON));
}
Expand Down Expand Up @@ -483,7 +483,7 @@ public int getJmxPort() {
}

/**
* memory configs
* memory configs.
*/
public Double getMaxMemoryFractionPerPartitionMerge() {
return Double.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE_PROP));
Expand Down
Loading