Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,9 @@ public List<RenameOpResult> repairCompaction(String compactionInstant, int paral
*/
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant)
throws IOException {
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().readPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
return compactionPlan;
return AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().readPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
}

/**
Expand Down
45 changes: 21 additions & 24 deletions hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -347,7 +347,7 @@ public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitT
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
// filter out non existant keys/records
JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(record -> record.isCurrentLocationKnown());
JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
if (!taggedValidRecords.isEmpty()) {
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
indexTimer = null;
Expand Down Expand Up @@ -392,7 +392,7 @@ private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> deduped

JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true)
.flatMap(writeStatuses -> writeStatuses.iterator());
.flatMap(List::iterator);

return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
}
Expand Down Expand Up @@ -424,14 +424,14 @@ private void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, Hood
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
profile.getPartitionPaths().stream().forEach(path -> {
profile.getPartitionPaths().forEach(path -> {
WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
partitionStat.getUpdateLocationToCount().entrySet().stream().forEach(entry -> {
partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(entry.getKey());
writeStat.setFileId(key);
// TODO : Write baseCommitTime is possible here ?
writeStat.setPrevCommit(entry.getValue().getKey());
writeStat.setNumUpdateWrites(entry.getValue().getValue());
writeStat.setPrevCommit(value.getKey());
writeStat.setNumUpdateWrites(value.getValue());
metadata.addWriteStat(path.toString(), writeStat);
});
});
Expand Down Expand Up @@ -804,7 +804,7 @@ public void restoreToInstant(final String instantTime) throws HoodieRollbackExce
ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder();
table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));
instantsToRollback.stream().forEach(instant -> {
instantsToRollback.forEach(instant -> {
try {
switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
Expand Down Expand Up @@ -850,7 +850,7 @@ private List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant insta
// Check if any of the commits is a savepoint - do not allow rollback on those commits
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
savepoints.stream().forEach(s -> {
savepoints.forEach(s -> {
if (s.contains(commitToRollback)) {
throw new HoodieRollbackException(
"Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
Expand All @@ -864,19 +864,18 @@ private List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant insta

// Make sure only the last n commits are being rolled back
// If there is a commit in-between or after that is not rolled back, then abort
String lastCommit = commitToRollback;

if ((lastCommit != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {
if ((commitToRollback != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(commitToRollback, Integer.MAX_VALUE).empty()) {
throw new HoodieRollbackException(
"Found commits after time :" + lastCommit + ", please rollback greater commits first");
"Found commits after time :" + commitToRollback + ", please rollback greater commits first");
}

List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) {
if ((commitToRollback != null) && !inflights.isEmpty() && (inflights.indexOf(commitToRollback) != inflights.size() - 1)) {
throw new HoodieRollbackException(
"Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first");
"Found in-flight commits after time :" + commitToRollback + ", please rollback greater commits first");
}

List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);
Expand All @@ -895,7 +894,7 @@ private void finishRollback(final Timer.Context context, List<HoodieRollbackStat
List<String> commitsToRollback, final String startRollbackTime) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
Option<Long> durationInMs = Option.empty();
Long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
Expand Down Expand Up @@ -923,7 +922,7 @@ private void finishRestore(final Timer.Context context, Map<String, List<HoodieR
List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
Option<Long> durationInMs = Option.empty();
Long numFilesDeleted = 0L;
long numFilesDeleted = 0L;
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
List<HoodieRollbackStat> stats = commitToStat.getValue();
numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
Expand Down Expand Up @@ -962,7 +961,7 @@ private void rollbackInternal(String commitToRollback) {

if (rollbackInstantOpt.isPresent()) {
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackInstantOpt.get());
finishRollback(context, stats, Arrays.asList(commitToRollback), startRollbackTime);
finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime);
}
} catch (IOException e) {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback,
Expand Down Expand Up @@ -1124,7 +1123,7 @@ public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus>
}

/**
* Deduplicate Hoodie records, using the given deduplication funciton.
* Deduplicate Hoodie records, using the given deduplication function.
*/
JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
Expand All @@ -1144,7 +1143,7 @@ JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, in
}

/**
* Deduplicate Hoodie records, using the given deduplication funciton.
* Deduplicate Hoodie records, using the given deduplication function.
*/
JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
Expand Down Expand Up @@ -1342,9 +1341,7 @@ private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<Wr

// Copy extraMetadata
extraMetadata.ifPresent(m -> {
m.entrySet().stream().forEach(e -> {
metadata.addMetadata(e.getKey(), e.getValue());
});
m.forEach(metadata::addMetadata);
});

LOG.info("Committing Compaction {}. Finished with result {}", compactionCommitTime, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
* Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads.
*/
public static final String HBASE_MIN_QPS_FRACTION_PROP = "hoodie.index.hbase.min.qps.fraction";
public static final String DEFAULT_HBASE_MIN_QPS_FRACTION_PROP = "0.002";

public static final String HBASE_MAX_QPS_FRACTION_PROP = "hoodie.index.hbase.max.qps.fraction";
public static final String DEFAULT_HBASE_MAX_QPS_FRACTION_PROP = "0.06";

/**
* Hoodie index desired puts operation time in seconds.
*/
Expand Down Expand Up @@ -115,12 +114,9 @@ public static class Builder {
private final Properties props = new Properties();

public HoodieHBaseIndexConfig.Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
} finally {
reader.close();
}
}

Expand Down Expand Up @@ -194,6 +190,11 @@ public Builder hbaseIndexSleepMsBetweenPutBatch(int sleepMsBetweenPutBatch) {
return this;
}

public Builder hbaseIndexSleepMsBetweenGetBatch(int sleepMsBetweenGetBatch) {
props.setProperty(HBASE_SLEEP_MS_GET_BATCH_PROP, String.valueOf(sleepMsBetweenGetBatch));
return this;
}

public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
return this;
Expand All @@ -217,7 +218,7 @@ public Builder hbaseZkZnodeParent(String zkZnodeParent) {
/**
* <p>
* Method to set maximum QPS allowed per Region Server. This should be same across various jobs. This is intended to
* limit the aggregate QPS generated across various jobs to an Hbase Region Server.
* limit the aggregate QPS generated across various jobs to an HBase Region Server.
* </p>
* <p>
* It is recommended to set this value based on your global indexing throughput needs and most importantly, how much
Expand All @@ -238,7 +239,7 @@ public HoodieHBaseIndexConfig build() {
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), HBASE_PUT_BATCH_SIZE_PROP,
String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP),
HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE));
HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE);
setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP), HBASE_QPS_FRACTION_PROP,
String.valueOf(DEFAULT_HBASE_QPS_FRACTION));
setDefaultOnCondition(props, !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP),
Expand All @@ -250,7 +251,7 @@ public HoodieHBaseIndexConfig build() {
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS),
HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
setDefaultOnCondition(props, !props.containsKey(HBASE_ZK_PATH_QPS_ROOT), HBASE_ZK_PATH_QPS_ROOT,
String.valueOf(DEFAULT_HBASE_ZK_PATH_QPS_ROOT));
DEFAULT_HBASE_ZK_PATH_QPS_ROOT);
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS),
HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_SESSION_TIMEOUT_MS));
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ public static class Builder {
private final Properties props = new Properties();

public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
} finally {
reader.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,9 @@ public static class Builder {
private final Properties props = new Properties();

public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
} finally {
reader.close();
}
}

Expand Down Expand Up @@ -141,9 +138,9 @@ private long getMaxMemoryAllowedForMerge(String maxMemoryFraction) {
// 0.6 is the default value used by Spark,
// look at {@link
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507}
double memoryFraction = Double.valueOf(
double memoryFraction = Double.parseDouble(
SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION));
double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction);
double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction);
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,9 @@ public static class Builder {
private final Properties props = new Properties();

public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
} finally {
reader.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,9 @@ public static class Builder {
private final Properties props = new Properties();

public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
} finally {
reader.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,11 @@ public String getHbaseTableName() {
}

public int getHbaseIndexGetBatchSize() {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
}

public int getHbaseIndexPutBatchSize() {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
}

public Boolean getHbaseIndexPutBatchSizeAutoCompute() {
Expand All @@ -363,11 +363,11 @@ public String getHBaseZkZnodeConnectionTimeout() {
}

public boolean getHBaseIndexShouldComputeQPSDynamically() {
return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY));
return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY));
}

public int getHBaseIndexDesiredPutsTime() {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
}

public String getBloomFilterType() {
Expand Down Expand Up @@ -455,15 +455,15 @@ public int getLogFileMaxSize() {
}

public double getParquetCompressionRatio() {
return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
return Double.parseDouble(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
}

public CompressionCodecName getParquetCompressionCodec() {
return CompressionCodecName.fromConf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC));
}

public double getLogFileToParquetCompressionRatio() {
return Double.valueOf(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
return Double.parseDouble(props.getProperty(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO));
}

/**
Expand Down Expand Up @@ -517,15 +517,15 @@ public Long getMaxMemoryPerCompaction() {
}

public int getMaxDFSStreamBufferSize() {
return Integer.valueOf(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP));
return Integer.parseInt(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP));
}

public String getSpillableMapBasePath() {
return props.getProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP);
}

public double getWriteStatusFailureFraction() {
return Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP));
return Double.parseDouble(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP));
}

public ConsistencyGuardConfig getConsistencyGuardConfig() {
Expand Down Expand Up @@ -564,12 +564,9 @@ public static class Builder {
private boolean isConsistencyGuardSet = false;

public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
} finally {
reader.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public boolean hasKeyRanges() {
* Does the given key fall within the range (inclusive).
*/
public boolean isKeyInRange(String recordKey) {
assert minRecordKey != null;
assert maxRecordKey != null;
return minRecordKey.compareTo(recordKey) <= 0 && maxRecordKey.compareTo(recordKey) >= 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long>
.mapToPair(t -> t).countByKey();
} else {
fileToComparisons = new HashMap<>();
partitionToFileInfo.entrySet().stream().forEach(e -> {
for (BloomIndexFileInfo fileInfo : e.getValue()) {
partitionToFileInfo.forEach((key, value) -> {
for (BloomIndexFileInfo fileInfo : value) {
// each file needs to be compared against all the records coming into the partition
fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(e.getKey()));
fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(key));
}
});
}
Expand Down
Loading