diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 2e04d01af90c8..867e2f99e2e33 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -639,8 +639,8 @@ public boolean rollback(final String commitInstantTime, Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())).orElse(table.scheduleRollback(context, rollbackInstantTime, - commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); + Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())) + .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); if (rollbackPlanOption.isPresent()) { // execute rollback HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index 078d9ac27d389..189de373d92d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -18,6 +18,9 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -33,11 +36,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -122,19 +120,11 @@ List> maybeDeleteAndCollectStats(HoodieEngineCo rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry))); return partitionToRollbackStats.stream(); } else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) { - Map logFilesToBeDeleted = rollbackRequest.getLogBlocksToBeDeleted(); - String fileId = rollbackRequest.getFileId(); - String latestBaseInstant = rollbackRequest.getLatestBaseInstant(); - FileSystem fs = metaClient.getFs(); - // collect all log files that is supposed to be deleted with this rollback - // what happens if file was deleted when invoking fs.getFileStatus(?) below. - // I understand we don't delete log files. but just curious if we need to handle this case. - Map writtenLogFileSizeMap = new HashMap<>(); - for (Map.Entry entry : logFilesToBeDeleted.entrySet()) { - writtenLogFileSizeMap.put(fs.getFileStatus(new Path(entry.getKey())), entry.getValue()); - } HoodieLogFormat.Writer writer = null; try { + String fileId = rollbackRequest.getFileId(); + String latestBaseInstant = rollbackRequest.getLatestBaseInstant(); + writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) .withFileId(fileId) @@ -156,7 +146,7 @@ List> maybeDeleteAndCollectStats(HoodieEngineCo writer.close(); } } catch (IOException io) { - throw new HoodieIOException("Error appending rollback block..", io); + throw new HoodieIOException("Error appending rollback block", io); } } @@ -167,15 +157,21 @@ List> maybeDeleteAndCollectStats(HoodieEngineCo metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L ); - return Collections.singletonList(Pair.of(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback) - .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build())).stream(); + + return Collections.singletonList( + Pair.of(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder() + .withPartitionPath(rollbackRequest.getPartitionPath()) + .withRollbackBlockAppendResults(filesToNumBlocksRollback) + .build())) + .stream(); } else { - return Collections - .singletonList(Pair.of(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .build())).stream(); + return Collections.singletonList( + Pair.of(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder() + .withPartitionPath(rollbackRequest.getPartitionPath()) + .build())) + .stream(); } }, numPartitions); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index b47136fa02a58..628b2fc3720f8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -19,18 +19,17 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -38,7 +37,6 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -104,22 +102,20 @@ private List getListingBasedRollbackRequests(HoodieEngine case APPEND_ROLLBACK_BLOCK: { String fileId = rollbackRequest.getFileId().get(); String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); - // collect all log files that is supposed to be deleted with this rollback - Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(), - FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()), - fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant) - .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); - Map logFilesToBeDeleted = new HashMap<>(); - for (Map.Entry fileToBeDeleted : writtenLogFileSizeMap.entrySet()) { - logFilesToBeDeleted.put(fileToBeDeleted.getKey().getPath().toString(), fileToBeDeleted.getValue()); - } + HoodieWriteStat writeStat = rollbackRequest.getWriteStat().get(); + + Path fullLogFilePath = FSUtils.getPartitionPath(config.getBasePath(), writeStat.getPath()); + + Map logFilesWithBlocksToRollback = + Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes()); + return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), fileId, latestBaseInstant, - Collections.EMPTY_LIST, logFilesToBeDeleted); + Collections.EMPTY_LIST, logFilesWithBlocksToRollback); } default: throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); } - }, numPartitions).stream().collect(Collectors.toList()); + }, numPartitions); } private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient metaClient, HoodieWriteConfig config, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java index bc2bbf20aea90..7411231bb7d79 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.util.Option; import java.io.Serializable; @@ -51,32 +52,42 @@ public enum Type { */ private final Option latestBaseInstant; + /** + * TODO + */ + private final Option writeStat; + private final Type type; + public ListingBasedRollbackRequest(String partitionPath, Type type) { + this(partitionPath, Option.empty(), Option.empty(), Option.empty(), type); + } + public ListingBasedRollbackRequest(String partitionPath, Option fileId, Option latestBaseInstant, + Option writeStat, Type type) { this.partitionPath = partitionPath; this.fileId = fileId; this.latestBaseInstant = latestBaseInstant; + this.writeStat = writeStat; this.type = type; } public static ListingBasedRollbackRequest createRollbackRequestWithDeleteDataFilesOnlyAction(String partitionPath) { - return new ListingBasedRollbackRequest(partitionPath, Option.empty(), Option.empty(), - Type.DELETE_DATA_FILES_ONLY); + return new ListingBasedRollbackRequest(partitionPath, Type.DELETE_DATA_FILES_ONLY); } public static ListingBasedRollbackRequest createRollbackRequestWithDeleteDataAndLogFilesAction(String partitionPath) { - return new ListingBasedRollbackRequest(partitionPath, Option.empty(), Option.empty(), - Type.DELETE_DATA_AND_LOG_FILES); + return new ListingBasedRollbackRequest(partitionPath, Type.DELETE_DATA_AND_LOG_FILES); } - public static ListingBasedRollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, String fileId, - String baseInstant) { - return new ListingBasedRollbackRequest(partitionPath, Option.of(fileId), Option.of(baseInstant), - Type.APPEND_ROLLBACK_BLOCK); + public static ListingBasedRollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, + String fileId, + String baseInstant, + HoodieWriteStat writeStat) { + return new ListingBasedRollbackRequest(partitionPath, Option.of(fileId), Option.of(baseInstant), Option.of(writeStat), Type.APPEND_ROLLBACK_BLOCK); } public String getPartitionPath() { @@ -91,6 +102,10 @@ public Option getLatestBaseInstant() { return latestBaseInstant; } + public Option getWriteStat() { + return writeStat; + } + public Type getType() { return type; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java index 9d04e3036f204..e7a4170ec7871 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -31,18 +32,13 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING; @@ -90,42 +86,41 @@ public List getRollbackRequests(HoodieInstant instantToRo Collections.singletonList(fullDeletePath.toString()), Collections.emptyMap()); case APPEND: + // NOTE: This marker file-path does NOT correspond to a log-file, but rather is a phony + // path serving as a "container" for the following components: + // - Base file's file-id + // - Base file's commit instant + // - Partition path return getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath)); default: throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback); } - }, parallelism).stream().collect(Collectors.toList()); + }, parallelism); } catch (Exception e) { throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); } } - protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException { - Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath); + protected HoodieRollbackRequest getRollbackRequestForAppend(String markerFilePath) throws IOException { + Path baseFilePathForAppend = new Path(basePath, markerFilePath); String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend); String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName()); - String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent()); - Map writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId); - Map writtenLogFileStrSizeMap = new HashMap<>(); - for (Map.Entry entry : writtenLogFileSizeMap.entrySet()) { - writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue()); - } - return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap); + String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), baseFilePathForAppend.getParent()); + Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), relativePartitionPath); + + // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its + // block to the latest log-file + // TODO(HUDI-1517) use provided marker-file's path instead + HoodieLogFile latestLogFile = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId, + HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).get(); + + // NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended, + // therefore we simply stub this value. + Map logFilesWithBlocsToRollback = + Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), -1L); + + return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), + logFilesWithBlocsToRollback); } - /** - * Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing. - * - * @param partitionPathStr partition path of interest - * @param baseCommitTime base commit time of interest - * @param fileId fileId of interest - * @return Map - * @throws IOException - */ - private Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException { - // collect all log files that is supposed to be deleted with this rollback - return FSUtils.getAllLogFiles(table.getMetaClient().getFs(), - FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime) - .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index a4b59a88b92c4..6c17ee2369ff4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.FileStatus; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -33,12 +34,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; - -import org.apache.hadoop.fs.FileStatus; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -48,8 +45,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + public class RollbackUtils { private static final Logger LOG = LogManager.getLogger(RollbackUtils.class); @@ -88,7 +88,7 @@ static Map generateHeader(String inst * @return Merged HoodieRollbackStat */ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) { - ValidationUtils.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath())); + checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath())); final List successDeleteFiles = new ArrayList<>(); final List failedDeleteFiles = new ArrayList<>(); final Map commandBlocksCount = new HashMap<>(); @@ -99,9 +99,7 @@ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll Option.ofNullable(stat2.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll); Option.ofNullable(stat1.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll); Option.ofNullable(stat2.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll); - Option.ofNullable(stat1.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll); - Option.ofNullable(stat2.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll); - return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap); + return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount); } /** @@ -191,28 +189,22 @@ public static List generateRollbackRequestsUsingFil // (B.3) Rollback triggered for first commit - Same as (B.1) // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files // as well if the base base file gets deleted. - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - table.getMetaClient().getCommitTimeline() - .getInstantDetails(new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) - .get(), - HoodieCommitMetadata.class); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(), + HoodieCommitMetadata.class); - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or base files), - // delete all files for the corresponding failed commit, if present (same as COW) - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or base files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - partitionRollbackRequests - .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); - } - break; - } catch (IOException io) { - throw new HoodieIOException("Failed to collect rollback actions for commit " + commit, io); + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); } + break; default: break; } @@ -222,7 +214,7 @@ public static List generateRollbackRequestsUsingFil private static List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, HoodieCommitMetadata commitMetadata, HoodieTable table) { - ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); // wStat.getPrevCommit() might not give the right commit time in the following // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be @@ -230,30 +222,40 @@ private static List generateAppendRollbackBlocksAct // But the index (global) might store the baseCommit of the base and not the requested, hence get the // baseCommit always by listing the file slice // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices() - Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), - true).collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); + Map latestFileSlices = table.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true) + .collect(Collectors.toMap(FileSlice::getFileId, Function.identity())); + + return commitMetadata.getPartitionToWriteStats().get(partitionPath) + .stream() + .filter(writeStat -> { + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) + && (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId()); - return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { + if (!validForRollback) { + return false; + } - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) - && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); + FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); - if (validForRollback) { - // For sanity, log instant time can never be less than base-commit on which we are rolling back - ValidationUtils - .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), - HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp())); - } + // For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back + checkArgument( + HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), + HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()), + "Log-file base-instant could not be less than the instant being rolled back"); - return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option - // to delete and we should not step on it - wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); - }).map(wStat -> { - String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), - baseCommitTime); - }).collect(Collectors.toList()); + // Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK} + // w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less + // than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up + // in a different branch of the flow. + return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); + }) + .map(writeStat -> { + FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); + return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, + writeStat.getFileId(), latestFileSlice.getBaseInstantTime(), writeStat); + }) + .collect(Collectors.toList()); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java index c0952bc5a7204..415c12a6407c6 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java @@ -188,7 +188,6 @@ private void createRollbackMetadata(String instantTime) throws Exception { rollbackPartitionMetadata.setPartitionPath("p1"); rollbackPartitionMetadata.setSuccessDeleteFiles(Arrays.asList("f1")); rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>()); - rollbackPartitionMetadata.setWrittenLogFiles(new HashMap<>()); rollbackPartitionMetadata.setRollbackLogFiles(new HashMap<>()); Map partitionMetadataMap = new HashMap<>(); partitionMetadataMap.put("p1", rollbackPartitionMetadata); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index f541720fd35fb..6282d7bebf533 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -425,7 +425,8 @@ private void updateTableMetadata(HoodieTable>, JavaRD } @Override - protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { + protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, + String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); @@ -442,8 +443,11 @@ protected HoodieTable>, JavaRDD, JavaRDD assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()))); - assertEquals(1, stat.getWrittenLogFileSizeMap().size()); - stat.getWrittenLogFileSizeMap().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()))); } } } diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc index f342db8738d33..5a300cda9e638 100644 --- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc @@ -38,14 +38,6 @@ "type": "long", "doc": "Size of this file in bytes" } - }], "default":null }, - {"name": "writtenLogFiles", "type": ["null", { - "type": "map", - "doc": "Log files written that were expected to be rolledback", - "values": { - "type": "long", - "doc": "Size of this file in bytes" - } }], "default":null } ] }}}, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java index 3e4ee34319c7c..a3191fa026c84 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java @@ -38,16 +38,13 @@ public class HoodieRollbackStat implements Serializable { private final List failedDeleteFiles; // Count of HoodieLogFile to commandBlocks written for a particular rollback private final Map commandBlocksCount; - // all log files with same base instant as instant to be rolledback - private final Map writtenLogFileSizeMap; public HoodieRollbackStat(String partitionPath, List successDeleteFiles, List failedDeleteFiles, - Map commandBlocksCount, Map writtenLogFileSizeMap) { + Map commandBlocksCount) { this.partitionPath = partitionPath; this.successDeleteFiles = successDeleteFiles; this.failedDeleteFiles = failedDeleteFiles; this.commandBlocksCount = commandBlocksCount; - this.writtenLogFileSizeMap = writtenLogFileSizeMap; } public Map getCommandBlocksCount() { @@ -66,10 +63,6 @@ public List getFailedDeleteFiles() { return failedDeleteFiles; } - public Map getWrittenLogFileSizeMap() { - return writtenLogFileSizeMap; - } - public static HoodieRollbackStat.Builder newBuilder() { return new Builder(); } @@ -82,7 +75,6 @@ public static class Builder { private List successDeleteFiles; private List failedDeleteFiles; private Map commandBlocksCount; - private Map writtenLogFileSizeMap; private String partitionPath; public Builder withDeletedFileResults(Map deletedFiles) { @@ -108,11 +100,6 @@ public Builder withRollbackBlockAppendResults(Map commandBlock return this; } - public Builder withWrittenLogFileSizeMap(Map writtenLogFileSizeMap) { - this.writtenLogFileSizeMap = writtenLogFileSizeMap; - return this; - } - public Builder withPartitionPath(String partitionPath) { this.partitionPath = partitionPath; return this; @@ -128,10 +115,7 @@ public HoodieRollbackStat build() { if (commandBlocksCount == null) { commandBlocksCount = Collections.EMPTY_MAP; } - if (writtenLogFileSizeMap == null) { - writtenLogFileSizeMap = Collections.EMPTY_MAP; - } - return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap); + return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index d7af8a7d46d8b..670a4d88ef50d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -484,24 +484,25 @@ public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path partit } /** - * Get the latest log file written from the list of log files passed in. + * Get the latest log file for the passed in file-id in the partition path */ - public static Option getLatestLogFile(Stream logFiles) { - return Option.fromJavaOptional(logFiles.min(HoodieLogFile.getReverseLogFileComparator())); + public static Option getLatestLogFile(FileSystem fs, Path partitionPath, String fileId, + String logFileExtension, String baseCommitTime) throws IOException { + return getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime)); } /** - * Get all the log files for the passed in FileId in the partition path. + * Get all the log files for the passed in file-id in the partition path. */ public static Stream getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { try { - return Arrays - .stream(fs.listStatus(partitionPath, - path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension))) - .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); + PathFilter pathFilter = path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension); + return Arrays.stream(fs.listStatus(partitionPath, pathFilter)) + .map(HoodieLogFile::new) + .filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); } catch (FileNotFoundException e) { - return Stream.builder().build(); + return Stream.of(); } } @@ -776,4 +777,8 @@ public static List getFileStatusAtLevel( public interface SerializableFunction extends Function, Serializable { } + + private static Option getLatestLogFile(Stream logFiles) { + return Option.fromJavaOptional(logFiles.min(HoodieLogFile.getReverseLogFileComparator())); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index c1e8cbf08b11c..d693d91f676fc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -18,10 +18,6 @@ package org.apache.hudi.common.model; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.PropertyAccessor; @@ -29,6 +25,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index 32e42ee58ac27..723c594ff1625 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -77,10 +77,8 @@ public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbac for (HoodieRollbackStat stat : rollbackStats) { Map rollbackLogFiles = stat.getCommandBlocksCount().keySet().stream() .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen)); - Map probableLogFiles = stat.getWrittenLogFileSizeMap().keySet().stream() - .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen)); HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(), - stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles, probableLogFiles); + stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles); partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); totalDeleted += stat.getSuccessDeleteFiles().size(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 58d63a194e81d..bb29e4236da0b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -194,7 +194,8 @@ public static List convertMetadataToRecords(HoodieActiveTimeline m * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. */ - private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata rollbackMetadata, + private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline, + HoodieRollbackMetadata rollbackMetadata, Map> partitionToDeletedFiles, Map> partitionToAppendedFiles, Option lastSyncTs) { @@ -264,17 +265,6 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn); }); } - - if (pm.getWrittenLogFiles() != null && !pm.getWrittenLogFiles().isEmpty()) { - if (!partitionToAppendedFiles.containsKey(partition)) { - partitionToAppendedFiles.put(partition, new HashMap<>()); - } - - // Extract appended file name from the absolute paths saved in getWrittenLogFiles() - pm.getWrittenLogFiles().forEach((path, size) -> { - partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn); - }); - } }); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 9397295013ea1..22ceb5bfef373 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -270,8 +270,7 @@ private HoodieRollbackMetadata getRollbackMetadataInstance(String basePath, Stri List rollbacks = new ArrayList<>(); rollbacks.add(new HoodieInstant(false, actionType, commitTs)); - HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap(), - Collections.EMPTY_MAP); + HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap()); List rollbackStats = new ArrayList<>(); rollbackStats.add(rollbackStat); return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index 0bcebaf71e9ff..a9c9db303f328 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -556,7 +556,7 @@ private void performRestore(HoodieInstant instant, List files, String ro boolean isRestore) throws IOException { Map> partititonToFiles = deleteFiles(files); List rollbackStats = partititonToFiles.entrySet().stream().map(e -> - new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>(), new HashMap<>()) + new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>()) ).collect(Collectors.toList()); List rollbacks = new ArrayList<>(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 7b8148a612a8b..c55a389e268ca 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -354,7 +354,6 @@ public HoodieRollbackMetadata getRollbackMetadata(String instantTimeToDelete, Ma rollbackPartitionMetadata.setPartitionPath(entry.getKey()); rollbackPartitionMetadata.setSuccessDeleteFiles(entry.getValue()); rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>()); - rollbackPartitionMetadata.setWrittenLogFiles(getWrittenLogFiles(instantTimeToDelete, entry)); long rollbackLogFileSize = 50 + RANDOM.nextInt(500); String fileId = UUID.randomUUID().toString(); String logFileName = logFileName(instantTimeToDelete, fileId, 0);