Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,13 @@ public boolean commit(String commitTime,
// Save was a success
// We cannot have unbounded commit files. Archive commits if we have to archive
archiveLog.archiveIfRequired();
// Call clean to cleanup if there is anything to cleanup after the commit,
clean(commitTime);
if(config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the commit,
logger.info("Auto cleaning is enabled. Running cleaner now");
clean(commitTime);
} else {
logger.info("Auto cleaning is not enabled. Not running cleaner now");
}
if (writeContext != null) {
long durationInMs = metrics.getDurationInMs(writeContext.stop());
metrics.updateCommitMetrics(
Expand Down Expand Up @@ -713,6 +718,18 @@ public void close() {

/**
* Clean up any stale/old files/data lying around (either on file storage or index storage)
* based on the configurations and CleaningPolicy used. (typically files that no longer can be used
* by a running query can be cleaned)
*/
public void clean() throws HoodieIOException {
String startCleanTime = startCommit();
clean(startCleanTime);
}

/**
* Clean up any stale/old files/data lying around (either on file storage or index storage)
* based on the configurations and CleaningPolicy used. (typically files that no longer can be used
* by a running query can be cleaned)
*/
private void clean(String startCleanTime) throws HoodieIOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
private static final String DEFAULT_CLEANER_POLICY =
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();

public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
private static final String DEFAULT_AUTO_CLEAN = "true";

public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP =
"hoodie.cleaner.fileversions.retained";
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
Expand Down Expand Up @@ -94,6 +97,11 @@ public Builder fromFile(File propertiesFile) throws IOException {
}
}

public Builder withAutoClean(Boolean autoClean) {
props.setProperty(AUTO_CLEAN_PROP, String.valueOf(Boolean.TRUE));
return this;
}

public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
props.setProperty(CLEANER_POLICY_PROP, policy.name());
return this;
Expand Down Expand Up @@ -143,6 +151,8 @@ public Builder withCleanerParallelism(int cleanerParallelism) {

public HoodieCompactionConfig build() {
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP),
AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
setDefaultOnCondition(props, !props.containsKey(CLEANER_POLICY_PROP),
CLEANER_POLICY_PROP, DEFAULT_CLEANER_POLICY);
setDefaultOnCondition(props, !props.containsKey(CLEANER_FILE_VERSIONS_RETAINED_PROP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism";
// Disable explicit bloom index parallelism setting by default - hoodie auto computes
public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0";

private HoodieIndexConfig(Properties props) {
super(props);
Expand Down Expand Up @@ -91,6 +94,11 @@ public Builder hbaseTableName(String tableName) {
return this;
}

public Builder bloomIndexParallelism(int parallelism) {
props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
return this;
}

public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP),
Expand All @@ -99,6 +107,8 @@ public HoodieIndexConfig build() {
BLOOM_FILTER_NUM_ENTRIES, DEFAULT_BLOOM_FILTER_NUM_ENTRIES);
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_FPP),
BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_PARALLELISM_PROP),
BLOOM_INDEX_PARALLELISM_PROP, DEFAULT_BLOOM_INDEX_PARALLELISM);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public int getCleanerParallelism() {
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.CLEANER_PARALLELISM));
}

public boolean isAutoClean() {
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
}

/**
* index properties
**/
Expand Down Expand Up @@ -171,6 +175,10 @@ public String getHbaseTableName() {
return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP);
}

public int getBloomIndexParallelism() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
}

/**
* storage properties
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,11 @@ private int determineParallelism(int inputParallelism, final Map<String, Long> s
for (long subparts : subpartitionCountMap.values()) {
totalSubparts += (int) subparts;
}
int joinParallelism = Math.max(totalSubparts, inputParallelism);
// If bloom index parallelism is set, use it to to check against the input parallelism and take the max
int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
int joinParallelism = Math.max(totalSubparts, indexParallelism);
logger.info("InputParallelism: ${" + inputParallelism + "}, " +
"IndexParallelism: ${" + config.getBloomIndexParallelism() + "}, " +
"TotalSubParts: ${" + totalSubparts + "}, " +
"Join Parallelism set to : " + joinParallelism);
return joinParallelism;
Expand Down