Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
Expand Down Expand Up @@ -141,6 +143,11 @@ public int getUpsertShuffleParallelism() {
return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM));
}

public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}


public int getWriteBufferLimitBytes() {
return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
}
Expand Down Expand Up @@ -562,6 +569,11 @@ public Builder withParallelism(int insertShuffleParallelism, int upsertShufflePa
return this;
}

public Builder withRollbackParallelism(int rollbackParallelism) {
props.setProperty(ROLLBACK_PARALLELISM, String.valueOf(rollbackParallelism));
return this;
}

public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit));
return this;
Expand Down Expand Up @@ -651,6 +663,7 @@ public HoodieWriteConfig build() {
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
DEFAULT_COMBINE_BEFORE_INSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieCleanStat;
Expand Down Expand Up @@ -74,7 +72,6 @@
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
Expand Down Expand Up @@ -294,45 +291,6 @@ public List<HoodieCleanStat> clean(JavaSparkContext jsc) {
}
}

/**
* Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
*/
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String partitionPath,
PathFilter filter) throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = getMetaClient().getFs();
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
logger.info("Delete file " + file.getPath() + "\t" + success);
}
return results;
}

/**
* Common method used for cleaning out parquet files under a partition path during rollback of a set of commits
*/
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String commit,
String partitionPath) throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = getMetaClient().getFs();
PathFilter filter = (path) -> {
if (path.toString().contains(".parquet")) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
}
return false;
};
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
logger.info("Delete file " + file.getPath() + "\t" + success);
}
return results;
}

@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
throws IOException {
Expand All @@ -342,30 +300,38 @@ public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, bo
this.getInflightCommitTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
// Atomically unpublish the commits
if (!inflights.contains(commit)) {
logger.info("Unpublishing " + commit);
activeTimeline.revertToInflight(new HoodieInstant(false, actionType, commit));
}
logger.info("Unpublished " + commit);

HoodieInstant instantToRollback = new HoodieInstant(false, actionType, commit);
Long startTime = System.currentTimeMillis();

// delete all the data files for this commit
logger.info("Clean out all parquet files generated for commit: " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(instantToRollback);

//TODO: We need to persist this as rollback workload and use it in case of partial failures
List<HoodieRollbackStat> stats =
jsc.parallelize(FSUtils.getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning())).map((Function<String, HoodieRollbackStat>) partitionPath -> {
// Scan all partitions files with this commit time
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(filesToDeletedStatus).build();
}).collect();
new RollbackExecutor(metaClient, config).performRollback(jsc, instantToRollback, rollbackRequests);

// Delete Inflight instant if enabled
deleteInflightInstant(deleteInstants, activeTimeline, new HoodieInstant(true, actionType, commit));
logger.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return stats;
}

private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
throws IOException {
return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> {
return RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback);
}).collect(Collectors.toList());
}

/**
* Delete Inflight instant if enabled
*
*
* @param deleteInstant Enable Deletion of Inflight instant
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
Expand Down
Loading