From 05bebb53acae4cf447cb09c360d85bcde4cb4e4c Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sun, 12 Sep 2021 20:18:03 -0700 Subject: [PATCH 1/4] [HUDI-2433] Refactor rollback actions in hudi-client module --- ...=> CopyOnWriteRollbackActionExecutor.java} | 48 ++-- .../rollback/ListingBasedRollbackHelper.java | 181 +++++++------ ....java => MarkerBasedRollbackStrategy.java} | 68 ++++- ...=> MergeOnReadRollbackActionExecutor.java} | 68 +++-- .../common/HoodieFlinkEngineContext.java | 10 + .../table/HoodieFlinkCopyOnWriteTable.java | 4 +- .../table/HoodieFlinkMergeOnReadTable.java | 4 +- ...linkCopyOnWriteRollbackActionExecutor.java | 71 ----- .../FlinkMarkerBasedRollbackStrategy.java | 90 ------- ...linkMergeOnReadRollbackActionExecutor.java | 77 ------ .../upgrade/ZeroToOneUpgradeHandler.java | 2 +- .../common/HoodieJavaEngineContext.java | 10 + .../table/HoodieJavaCopyOnWriteTable.java | 4 +- .../JavaCopyOnWriteRestoreActionExecutor.java | 4 +- ...JavaCopyOnWriteRollbackActionExecutor.java | 72 ----- .../JavaListingBasedRollbackHelper.java | 237 ---------------- .../JavaMarkerBasedRollbackStrategy.java | 78 ------ .../common/HoodieSparkEngineContext.java | 9 + .../table/HoodieSparkCopyOnWriteTable.java | 4 +- .../table/HoodieSparkMergeOnReadTable.java | 5 +- ...SparkCopyOnWriteRestoreActionExecutor.java | 4 +- ...SparkMergeOnReadRestoreActionExecutor.java | 4 +- .../rollback/ListingBasedRollbackHelper.java | 252 ------------------ ...parkCopyOnWriteRollbackActionExecutor.java | 73 ----- .../SparkMarkerBasedRollbackStrategy.java | 93 ------- ...parkMergeOnReadRollbackActionExecutor.java | 82 ------ ...TestCopyOnWriteRollbackActionExecutor.java | 10 +- ...TestMergeOnReadRollbackActionExecutor.java | 24 +- .../TestMarkerBasedRollbackStrategy.java | 6 +- .../common/engine/HoodieEngineContext.java | 4 + .../engine/HoodieLocalEngineContext.java | 11 + .../hudi/common/function/FunctionWrapper.java | 11 + .../function/SerializableBiFunction.java | 34 +++ 33 files changed, 351 insertions(+), 1303 deletions(-) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/{BaseCopyOnWriteRollbackActionExecutor.java => CopyOnWriteRollbackActionExecutor.java} (60%) rename hudi-client/{hudi-flink-client => hudi-client-common}/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java (52%) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/{AbstractMarkerBasedRollbackStrategy.java => MarkerBasedRollbackStrategy.java} (63%) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/{BaseMergeOnReadRollbackActionExecutor.java => MergeOnReadRollbackActionExecutor.java} (54%) delete mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java delete mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java delete mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java delete mode 100644 hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java delete mode 100644 hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java delete mode 100644 hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java similarity index 60% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java index fa74f7f6e86bc..44b5492e742e5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java @@ -33,30 +33,39 @@ import java.util.ArrayList; import java.util.List; -public abstract class BaseCopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecutor { +public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecutor { - private static final Logger LOG = LogManager.getLogger(BaseCopyOnWriteRollbackActionExecutor.class); + private static final Logger LOG = LogManager.getLogger(CopyOnWriteRollbackActionExecutor.class); - public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { + public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { super(context, config, table, instantTime, commitInstant, deleteInstants); } - public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { + public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); } + @Override + protected RollbackStrategy getRollbackStrategy() { + if (useMarkerBasedStrategy) { + return new MarkerBasedRollbackStrategy(table, context, config, instantTime); + } else { + return this::executeRollbackUsingFileListing; + } + } + @Override protected List executeRollback() { HoodieTimer rollbackTimer = new HoodieTimer(); @@ -88,4 +97,11 @@ protected List executeRollback() { LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer()); return stats; } + + @Override + protected List executeRollbackUsingFileListing(HoodieInstant instantToRollback) { + List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW( + context, table.getMetaClient().getBasePath(), config); + return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java similarity index 52% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index f03b211bf9de8..9bfd3420de222 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.table.action.rollback; @@ -42,7 +43,6 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -54,7 +54,6 @@ * Performs Rollback of Hoodie Tables. */ public class ListingBasedRollbackHelper implements Serializable { - private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class); private final HoodieTableMetaClient metaClient; @@ -68,107 +67,107 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteC /** * Performs all rollback actions that we have collected in parallel. */ - public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { - Map partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true); - - Map>> collect = partitionPathRollbackStatsPairs.entrySet() - .stream() - .map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft)); - return collect.values().stream() - .map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null)) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, + List rollbackRequests) { + int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); + context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions"); + return context.mapToPairAndReduceByKey(rollbackRequests, + rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, true), + RollbackUtils::mergeRollbackStat, + parallelism); } /** * Collect all file info that needs to be rollbacked. */ - public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { - Map partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false); - return new ArrayList<>(partitionPathRollbackStatsPairs.values()); + public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, + List rollbackRequests) { + int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); + context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); + return context.mapToPairAndReduceByKey(rollbackRequests, + rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false), + RollbackUtils::mergeRollbackStat, + parallelism); } /** * May be delete interested files and collect stats or collect stats only. * - * @param context instance of {@link HoodieEngineContext} to use. * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. - * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. - * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes. + * @param doDelete {@code true} if deletion has to be done. + * {@code false} if only stats are to be collected w/o performing any deletes. * @return stats collected with or w/o actual deletions. */ - Map maybeDeleteAndCollectStats(HoodieEngineContext context, - HoodieInstant instantToRollback, - List rollbackRequests, - boolean doDelete) { - return context.mapToPair(rollbackRequests, rollbackRequest -> { - switch (rollbackRequest.getType()) { - case DELETE_DATA_FILES_ONLY: { - final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(), - rollbackRequest.getPartitionPath(), doDelete); - return new ImmutablePair<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); - } - case DELETE_DATA_AND_LOG_FILES: { - final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); - return new ImmutablePair<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); - } - case APPEND_ROLLBACK_BLOCK: { - String fileId = rollbackRequest.getFileId().get(); - String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); - - // collect all log files that is supposed to be deleted with this rollback - Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(), - FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()), - fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant) - .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); - - HoodieLogFormat.Writer writer = null; + private Pair maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest, + HoodieInstant instantToRollback, + boolean doDelete) throws IOException { + switch (rollbackRequest.getType()) { + case DELETE_DATA_FILES_ONLY: { + final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(), + rollbackRequest.getPartitionPath(), doDelete); + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); + } + case DELETE_DATA_AND_LOG_FILES: { + final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); + } + case APPEND_ROLLBACK_BLOCK: { + String fileId = rollbackRequest.getFileId().get(); + String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); + + // collect all log files that is supposed to be deleted with this rollback + Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(), + FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()), + fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant) + .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); + + HoodieLogFormat.Writer writer = null; + try { + writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) + .withFileId(fileId) + .overBaseCommit(latestBaseInstant) + .withFs(metaClient.getFs()) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + + // generate metadata + if (doDelete) { + Map header = generateHeader(instantToRollback.getTimestamp()); + // if update belongs to an existing log file + writer.appendBlock(new HoodieCommandBlock(header)); + } + } catch (IOException | InterruptedException io) { + throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); + } finally { try { - writer = HoodieLogFormat.newWriterBuilder() - .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) - .withFileId(fileId) - .overBaseCommit(latestBaseInstant) - .withFs(metaClient.getFs()) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - - // generate metadata - if (doDelete) { - Map header = generateHeader(instantToRollback.getTimestamp()); - // if update belongs to an existing log file - writer.appendBlock(new HoodieCommandBlock(header)); - } - } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); - } finally { - try { - if (writer != null) { - writer.close(); - } - } catch (IOException io) { - throw new HoodieIOException("Error appending rollback block..", io); + if (writer != null) { + writer.close(); } + } catch (IOException io) { + throw new HoodieIOException("Error appending rollback block..", io); } - - // This step is intentionally done after writer is closed. Guarantees that - // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in - // cloud-storage : HUDI-168 - Map filesToNumBlocksRollback = Collections.singletonMap( - metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), - 1L - ); - return new ImmutablePair<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback) - .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build()); } - default: - throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); + + // This step is intentionally done after writer is closed. Guarantees that + // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in + // cloud-storage : HUDI-168 + Map filesToNumBlocksRollback = Collections.singletonMap( + metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), + 1L + ); + + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withRollbackBlockAppendResults(filesToNumBlocksRollback) + .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build()); } - }, 0); + default: + throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); + } } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java similarity index 63% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java index cc596ba3422b7..1bfd4b1659f56 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/AbstractMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java @@ -20,34 +20,43 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; + import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; +import org.apache.hudi.table.marker.WriteMarkers; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * Performs rollback using marker files generated during the write.. */ -public abstract class AbstractMarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy { +public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy { - private static final Logger LOG = LogManager.getLogger(AbstractMarkerBasedRollbackStrategy.class); + private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class); - protected final HoodieTable table; + protected final HoodieTable table; protected final transient HoodieEngineContext context; @@ -57,7 +66,7 @@ public abstract class AbstractMarkerBasedRollbackStrategy table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { + public MarkerBasedRollbackStrategy(HoodieTable table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { this.table = table; this.context = context; this.basePath = table.getMetaClient().getBasePath(); @@ -124,8 +133,8 @@ protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant // the information of files appended to is required for metadata sync Map filesToNumBlocksRollback = Collections.singletonMap( - table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), - 1L); + table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), + 1L); return HoodieRollbackStat.newBuilder() .withPartitionPath(partitionPath) @@ -135,13 +144,48 @@ protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant /** * Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing. - * @param partitionPath partition path of interest - * @param baseCommitTime base commit time of interest - * @param fileId fileId of interest + * + * @param partitionPathStr partition path of interest + * @param baseCommitTime base commit time of interest + * @param fileId fileId of interest * @return Map * @throws IOException */ - protected Map getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException { - return Collections.EMPTY_MAP; + protected Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException { + // collect all log files that is supposed to be deleted with this rollback + return FSUtils.getAllLogFiles(table.getMetaClient().getFs(), + FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime) + .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); + } + + @Override + public List execute(HoodieInstant instantToRollback) { + try { + List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( + table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); + int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1); + context.setJobStatus(this.getClass().getSimpleName(), "Rolling back using marker files"); + return context.mapToPairAndReduceByKey(markerPaths, markerFilePath -> { + String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); + IOType type = IOType.valueOf(typeStr); + HoodieRollbackStat rollbackStat; + switch (type) { + case MERGE: + rollbackStat = undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath)); + break; + case APPEND: + rollbackStat = undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback); + break; + case CREATE: + rollbackStat = undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath)); + break; + default: + throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback); + } + return new ImmutablePair<>(rollbackStat.getPartitionPath(), rollbackStat); + }, RollbackUtils::mergeRollbackStat, parallelism); + } catch (Exception e) { + throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java similarity index 54% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java index 2e751443abc00..87d26281ddf15 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.table.action.rollback; @@ -24,37 +25,49 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.List; -public abstract class BaseMergeOnReadRollbackActionExecutor extends BaseRollbackActionExecutor { +public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecutor { - private static final Logger LOG = LogManager.getLogger(BaseMergeOnReadRollbackActionExecutor.class); + private static final Logger LOG = LogManager.getLogger(MergeOnReadRollbackActionExecutor.class); - public BaseMergeOnReadRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { + public MergeOnReadRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { super(context, config, table, instantTime, commitInstant, deleteInstants); } - public BaseMergeOnReadRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { + public MergeOnReadRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); } + @Override + protected RollbackStrategy getRollbackStrategy() { + if (useMarkerBasedStrategy) { + return new MarkerBasedRollbackStrategy(table, context, config, instantTime); + } else { + return this::executeRollbackUsingFileListing; + } + } + @Override protected List executeRollback() { HoodieTimer rollbackTimer = new HoodieTimer(); @@ -93,4 +106,15 @@ protected List executeRollback() { LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer()); return allRollbackStats; } + + @Override + protected List executeRollbackUsingFileListing(HoodieInstant resolvedInstant) { + List rollbackRequests; + try { + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context); + } catch (IOException e) { + throw new HoodieIOException("Error generating rollback requests by file listing.", e); + } + return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 2fc5af19bf7c0..11e674fb687d3 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; @@ -43,6 +44,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper; /** * A flink engine implementation of HoodieEngineContext. @@ -74,6 +76,14 @@ public List map(List data, SerializableFunction func, int par return data.stream().parallel().map(throwingMapWrapper(func)).collect(Collectors.toList()); } + @Override + public List mapToPairAndReduceByKey(List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, int parallelism) { + return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc)) + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get()) + .collect(Collectors.toList()); + } + @Override public List flatMap(List data, SerializableFunction> func, int parallelism) { return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 8a9b4bf9b8206..27571bcdbcb2c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -53,7 +53,7 @@ import org.apache.hudi.table.action.commit.FlinkMergeHelper; import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor; -import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -305,7 +305,7 @@ public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstan @Override public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new FlinkCopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index bfe8b6f4999cd..46142709853f1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -37,7 +37,7 @@ import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor; -import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; import java.util.List; import java.util.Map; @@ -108,7 +108,7 @@ public HoodieWriteMetadata> compact(HoodieEngineContext contex @Override public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java deleted file mode 100644 index 47039a3adf20a..0000000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; - -import java.util.List; - -@SuppressWarnings("checkstyle:LineLength") -public class FlinkCopyOnWriteRollbackActionExecutor extends - BaseCopyOnWriteRollbackActionExecutor>, List, List> { - public FlinkCopyOnWriteRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, List, List> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); - } - - public FlinkCopyOnWriteRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, List, List> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { - super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); - } - - @Override - protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() { - if (useMarkerBasedStrategy) { - return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime); - } else { - return this::executeRollbackUsingFileListing; - } - } - - @Override - protected List executeRollbackUsingFileListing(HoodieInstant instantToRollback) { - List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW( - context, table.getMetaClient().getBasePath(), config); - return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests); - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java deleted file mode 100644 index bb7ec7600a21b..0000000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; -import org.apache.hudi.table.marker.WriteMarkers; - -import org.apache.hadoop.fs.FileStatus; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import scala.Tuple2; - -@SuppressWarnings("checkstyle:LineLength") -public class FlinkMarkerBasedRollbackStrategy extends AbstractMarkerBasedRollbackStrategy>, List, List> { - public FlinkMarkerBasedRollbackStrategy(HoodieTable>, List, List> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { - super(table, context, config, instantTime); - } - - @Override - public List execute(HoodieInstant instantToRollback) { - try { - List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( - table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); - List rollbackStats = context.map(markerPaths, markerFilePath -> { - String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); - IOType type = IOType.valueOf(typeStr); - switch (type) { - case MERGE: - return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath)); - case APPEND: - return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback); - case CREATE: - return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath)); - default: - throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback); - } - }, 0); - - return rollbackStats.stream().map(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat)) - .collect(Collectors.groupingBy(Tuple2::_1)) - .values() - .stream() - .map(x -> x.stream().map(y -> y._2).reduce(RollbackUtils::mergeRollbackStat).get()) - .collect(Collectors.toList()); - } catch (Exception e) { - throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); - } - } - - protected Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException { - // collect all log files that is supposed to be deleted with this rollback - return FSUtils.getAllLogFiles(table.getMetaClient().getFs(), - FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime) - .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java deleted file mode 100644 index 25b20a5073ffd..0000000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.table.HoodieTable; - -import java.io.IOException; -import java.util.List; - -@SuppressWarnings("checkstyle:LineLength") -public class FlinkMergeOnReadRollbackActionExecutor extends - BaseMergeOnReadRollbackActionExecutor>, List, List> { - public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, List, List> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); - } - - public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, List, List> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { - super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); - } - - @Override - protected RollbackStrategy getRollbackStrategy() { - if (useMarkerBasedStrategy) { - return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime); - } else { - return this::executeRollbackUsingFileListing; - } - } - - @Override - protected List executeRollbackUsingFileListing(HoodieInstant resolvedInstant) { - List rollbackRequests; - try { - rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context); - } catch (IOException e) { - throw new HoodieIOException("Error generating rollback requests by file listing.", e); - } - return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests); - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 59e94e557e063..cb024c603a165 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -27,8 +27,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import java.util.List; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index 013e094036b6a..f7a28e283ab8d 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; @@ -40,6 +41,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper; /** * A java engine implementation of HoodieEngineContext. @@ -59,6 +61,14 @@ public List map(List data, SerializableFunction func, int par return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); } + @Override + public List mapToPairAndReduceByKey(List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, int parallelism) { + return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc)) + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get()) + .collect(Collectors.toList()); + } + @Override public List flatMap(List data, SerializableFunction> func, int parallelism) { return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList()); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 06e66a1a02f91..7715bf965daa5 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -50,7 +50,7 @@ import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor; -import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import java.util.List; @@ -193,7 +193,7 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new JavaCopyOnWriteRollbackActionExecutor( + return new CopyOnWriteRollbackActionExecutor( context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java index 75c1e0e30f255..f7677ae48a7e5 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java @@ -30,7 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import java.util.List; @@ -48,7 +48,7 @@ public JavaCopyOnWriteRestoreActionExecutor(HoodieJavaEngineContext context, @Override protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { table.getMetaClient().reloadActiveTimeline(); - JavaCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new JavaCopyOnWriteRollbackActionExecutor( + CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor( context, config, table, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java deleted file mode 100644 index 15e393220f083..0000000000000 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; - -import java.util.List; - -@SuppressWarnings("checkstyle:LineLength") -public class JavaCopyOnWriteRollbackActionExecutor extends - BaseCopyOnWriteRollbackActionExecutor>, List, List> { - public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, List, List> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); - } - - public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, List, List> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { - super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); - } - - @Override - protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() { - if (useMarkerBasedStrategy) { - return new JavaMarkerBasedRollbackStrategy(table, context, config, instantTime); - } else { - return this::executeRollbackUsingFileListing; - } - } - - @Override - protected List executeRollbackUsingFileListing(HoodieInstant instantToRollback) { - List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW( - context, table.getMetaClient().getBasePath(), config); - return new JavaListingBasedRollbackHelper(table.getMetaClient(), config) - .performRollback(context, instantToRollback, rollbackRequests); - } -} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java deleted file mode 100644 index 5331ca5891c28..0000000000000 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieCommandBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.collection.ImmutablePair; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieRollbackException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -/** - * Performs Rollback of Hoodie Tables. - */ -public class JavaListingBasedRollbackHelper implements Serializable { - - private static final Logger LOG = LogManager.getLogger(JavaListingBasedRollbackHelper.class); - - private final HoodieTableMetaClient metaClient; - private final HoodieWriteConfig config; - - public JavaListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { - this.metaClient = metaClient; - this.config = config; - } - - /** - * Performs all rollback actions that we have collected in parallel. - */ - public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { - Map partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true); - - Map>> collect = partitionPathRollbackStatsPairs.entrySet() - .stream() - .map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft)); - return collect.values().stream() - .map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null)) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } - - /** - * Collect all file info that needs to be rollbacked. - */ - public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { - Map partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false); - return new ArrayList<>(partitionPathRollbackStatsPairs.values()); - } - - /** - * May be delete interested files and collect stats or collect stats only. - * - * @param context instance of {@link HoodieEngineContext} to use. - * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. - * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. - * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes. - * @return stats collected with or w/o actual deletions. - */ - Map maybeDeleteAndCollectStats(HoodieEngineContext context, - HoodieInstant instantToRollback, - List rollbackRequests, - boolean doDelete) { - return context.mapToPair(rollbackRequests, rollbackRequest -> { - switch (rollbackRequest.getType()) { - case DELETE_DATA_FILES_ONLY: { - final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(), - rollbackRequest.getPartitionPath(), doDelete); - return new ImmutablePair<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); - } - case DELETE_DATA_AND_LOG_FILES: { - final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); - return new ImmutablePair<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); - } - case APPEND_ROLLBACK_BLOCK: { - HoodieLogFormat.Writer writer = null; - try { - writer = HoodieLogFormat.newWriterBuilder() - .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) - .withFileId(rollbackRequest.getFileId().get()) - .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs()) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - - // generate metadata - if (doDelete) { - Map header = generateHeader(instantToRollback.getTimestamp()); - // if update belongs to an existing log file - writer.appendBlock(new HoodieCommandBlock(header)); - } - } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); - } finally { - try { - if (writer != null) { - writer.close(); - } - } catch (IOException io) { - throw new HoodieIOException("Error appending rollback block..", io); - } - } - - // This step is intentionally done after writer is closed. Guarantees that - // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in - // cloud-storage : HUDI-168 - Map filesToNumBlocksRollback = Collections.singletonMap( - metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L - ); - return new ImmutablePair<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); - } - default: - throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); - } - }, 0); - } - - /** - * Common method used for cleaning out base files under a partition path during rollback of a set of commits. - */ - private Map deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String commit, String partitionPath, boolean doDelete) throws IOException { - LOG.info("Cleaning path " + partitionPath); - String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); - SerializablePathFilter filter = (path) -> { - if (path.toString().endsWith(basefileExtension)) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } else if (FSUtils.isLogFile(path)) { - // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return commit.equals(fileCommitTime); - } - return false; - }; - - final Map results = new HashMap<>(); - FileSystem fs = metaClient.getFs(); - FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - for (FileStatus file : toBeDeleted) { - if (doDelete) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); - } else { - results.put(file, true); - } - } - return results; - } - - /** - * Common method used for cleaning out base files under a partition path during rollback of a set of commits. - */ - private Map deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String commit, String partitionPath, boolean doDelete) throws IOException { - final Map results = new HashMap<>(); - LOG.info("Cleaning path " + partitionPath); - FileSystem fs = metaClient.getFs(); - String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); - PathFilter filter = (path) -> { - if (path.toString().contains(basefileExtension)) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } - return false; - }; - FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - for (FileStatus file : toBeDeleted) { - if (doDelete) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); - } else { - results.put(file, true); - } - } - return results; - } - - private Map generateHeader(String commit) { - // generate metadata - Map header = new HashMap<>(3); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); - header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit); - header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); - return header; - } - - public interface SerializablePathFilter extends PathFilter, Serializable { - - } -} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java deleted file mode 100644 index 150f663cf44f5..0000000000000 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; -import org.apache.hudi.table.marker.WriteMarkers; - -import java.util.List; -import java.util.stream.Collectors; - -@SuppressWarnings("checkstyle:LineLength") -public class JavaMarkerBasedRollbackStrategy extends AbstractMarkerBasedRollbackStrategy>, List, List> { - public JavaMarkerBasedRollbackStrategy(HoodieTable>, List, List> table, - HoodieEngineContext context, - HoodieWriteConfig config, - String instantTime) { - super(table, context, config, instantTime); - } - - @Override - public List execute(HoodieInstant instantToRollback) { - try { - List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( - table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); - List rollbackStats = context.map(markerPaths, markerFilePath -> { - String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); - IOType type = IOType.valueOf(typeStr); - switch (type) { - case MERGE: - return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath)); - case APPEND: - return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback); - case CREATE: - return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath)); - default: - throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback); - } - }, 0); - - return rollbackStats.stream().map(rollbackStat -> Pair.of(rollbackStat.getPartitionPath(), rollbackStat)) - .collect(Collectors.groupingBy(Pair::getKey)) - .values() - .stream() - .map(x -> x.stream().map(y -> y.getValue()).reduce(RollbackUtils::mergeRollbackStat).get()) - .collect(Collectors.toList()); - } catch (Exception e) { - throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); - } - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java index d869ec77a720b..ad1d7cd929ef5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; @@ -73,6 +74,14 @@ public List map(List data, SerializableFunction func, int par return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect(); } + @Override + public List mapToPairAndReduceByKey(List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, int parallelism) { + return javaSparkContext.parallelize(data, parallelism).mapToPair(input -> { + Pair pair = mapToPairFunc.call(input); + return new Tuple2<>(pair.getLeft(), pair.getRight()); + }).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect(); + } + @Override public List flatMap(List data, SerializableFunction> func, int parallelism) { return javaSparkContext.parallelize(data, parallelism).flatMap(x -> func.apply(x).iterator()).collect(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 6a2bd6fb2f86f..26d14cfa94443 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -65,7 +65,7 @@ import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor; -import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -247,7 +247,7 @@ public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstan @Override public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new SparkCopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index 997116ec06448..2db4eeb702c11 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -49,7 +49,8 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor; -import org.apache.hudi.table.action.rollback.SparkMergeOnReadRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; + import org.apache.spark.api.java.JavaRDD; import java.util.List; @@ -146,7 +147,7 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new SparkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java index 101b3217da99e..9c6ec6e703f23 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java @@ -30,7 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.spark.api.java.JavaRDD; @@ -49,7 +49,7 @@ public SparkCopyOnWriteRestoreActionExecutor(HoodieSparkEngineContext context, @Override protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { table.getMetaClient().reloadActiveTimeline(); - SparkCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor( + CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor( (HoodieSparkEngineContext) context, config, table, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java index c320579380b1d..ebca1fe8de8b2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java @@ -29,7 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.rollback.SparkMergeOnReadRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; import org.apache.spark.api.java.JavaRDD; @@ -48,7 +48,7 @@ public SparkMergeOnReadRestoreActionExecutor(HoodieSparkEngineContext context, @Override protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { table.getMetaClient().reloadActiveTimeline(); - SparkMergeOnReadRollbackActionExecutor rollbackActionExecutor = new SparkMergeOnReadRollbackActionExecutor( + MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor( context, config, table, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java deleted file mode 100644 index fcb3882b7bf5e..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; -import org.apache.hudi.common.table.log.block.HoodieCommandBlock; -import org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; -import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieRollbackException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -import scala.Tuple2; - -/** - * Performs Rollback of Hoodie Tables. - */ -public class ListingBasedRollbackHelper implements Serializable { - - private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class); - - private final HoodieTableMetaClient metaClient; - private final HoodieWriteConfig config; - - public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { - this.metaClient = metaClient; - this.config = config; - } - - /** - * Performs all rollback actions that we have collected in parallel. - */ - public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { - int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions"); - JavaPairRDD partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, true); - return partitionPathRollbackStatsPairRDD.reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect(); - } - - /** - * Collect all file info that needs to be rollbacked. - */ - public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { - int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); - JavaPairRDD partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, sparkPartitions, false); - return partitionPathRollbackStatsPairRDD.map(Tuple2::_2).collect(); - } - - /** - * May be delete interested files and collect stats or collect stats only. - * - * @param context instance of {@link HoodieEngineContext} to use. - * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. - * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. - * @param sparkPartitions number of spark partitions to use for parallelism. - * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes. - * @return stats collected with or w/o actual deletions. - */ - JavaPairRDD maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests, - int sparkPartitions, boolean doDelete) { - JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); - return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> { - switch (rollbackRequest.getType()) { - case DELETE_DATA_FILES_ONLY: { - final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(), - rollbackRequest.getPartitionPath(), doDelete); - return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); - } - case DELETE_DATA_AND_LOG_FILES: { - final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); - return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); - } - case APPEND_ROLLBACK_BLOCK: { - String fileId = rollbackRequest.getFileId().get(); - String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); - - // collect all log files that is supposed to be deleted with this rollback - Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(), - FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()), - fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant) - .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); - - Writer writer = null; - try { - writer = HoodieLogFormat.newWriterBuilder() - .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) - .withFileId(fileId) - .overBaseCommit(latestBaseInstant) - .withFs(metaClient.getFs()) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - - // generate metadata - if (doDelete) { - Map header = generateHeader(instantToRollback.getTimestamp()); - // if update belongs to an existing log file - writer.appendBlock(new HoodieCommandBlock(header)); - } - } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); - } finally { - try { - if (writer != null) { - writer.close(); - } - } catch (IOException io) { - throw new HoodieIOException("Error appending rollback block..", io); - } - } - - // This step is intentionally done after writer is closed. Guarantees that - // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in - // cloud-storage : HUDI-168 - Map filesToNumBlocksRollback = Collections.singletonMap( - metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), - 1L - ); - - return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback) - .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build()); - } - default: - throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); - } - }); - } - - /** - * Common method used for cleaning out base files under a partition path during rollback of a set of commits. - */ - private Map deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String commit, String partitionPath, boolean doDelete) throws IOException { - LOG.info("Cleaning path " + partitionPath); - String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); - SerializablePathFilter filter = (path) -> { - if (path.toString().endsWith(basefileExtension)) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } else if (FSUtils.isLogFile(path)) { - // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return commit.equals(fileCommitTime); - } - return false; - }; - - final Map results = new HashMap<>(); - FileSystem fs = metaClient.getFs(); - FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - for (FileStatus file : toBeDeleted) { - if (doDelete) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); - } else { - results.put(file, true); - } - } - return results; - } - - /** - * Common method used for cleaning out base files under a partition path during rollback of a set of commits. - */ - private Map deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String commit, String partitionPath, boolean doDelete) throws IOException { - final Map results = new HashMap<>(); - LOG.info("Cleaning path " + partitionPath); - FileSystem fs = metaClient.getFs(); - String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); - PathFilter filter = (path) -> { - if (path.toString().contains(basefileExtension)) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } - return false; - }; - FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - for (FileStatus file : toBeDeleted) { - if (doDelete) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); - } else { - results.put(file, true); - } - } - return results; - } - - private Map generateHeader(String commit) { - // generate metadata - Map header = new HashMap<>(3); - header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); - header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit); - header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, - String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); - return header; - } - - public interface SerializablePathFilter extends PathFilter, Serializable { - - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java deleted file mode 100644 index 611ec217a7759..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkCopyOnWriteRollbackActionExecutor.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; - -import org.apache.spark.api.java.JavaRDD; - -import java.util.List; - -@SuppressWarnings("checkstyle:LineLength") -public class SparkCopyOnWriteRollbackActionExecutor extends - BaseCopyOnWriteRollbackActionExecutor>, JavaRDD, JavaRDD> { - public SparkCopyOnWriteRollbackActionExecutor(HoodieSparkEngineContext context, - HoodieWriteConfig config, - HoodieTable>, JavaRDD, JavaRDD> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); - } - - public SparkCopyOnWriteRollbackActionExecutor(HoodieSparkEngineContext context, - HoodieWriteConfig config, - HoodieTable>, JavaRDD, JavaRDD> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { - super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); - } - - @Override - protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() { - if (useMarkerBasedStrategy) { - return new SparkMarkerBasedRollbackStrategy(table, context, config, instantTime); - } else { - return this::executeRollbackUsingFileListing; - } - } - - @Override - protected List executeRollbackUsingFileListing(HoodieInstant instantToRollback) { - List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, - table.getMetaClient().getBasePath(), config); - return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java deleted file mode 100644 index 0adacd28cd9ec..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMarkerBasedRollbackStrategy.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; -import org.apache.hudi.table.marker.WriteMarkers; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import scala.Tuple2; - -@SuppressWarnings("checkstyle:LineLength") -public class SparkMarkerBasedRollbackStrategy extends AbstractMarkerBasedRollbackStrategy>, JavaRDD, JavaRDD> { - public SparkMarkerBasedRollbackStrategy(HoodieTable>, JavaRDD, JavaRDD> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { - super(table, context, config, instantTime); - } - - @Override - public List execute(HoodieInstant instantToRollback) { - JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); - try { - List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths( - table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism()); - int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1); - jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files"); - return jsc.parallelize(markerPaths, parallelism) - .map(markerFilePath -> { - String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); - IOType type = IOType.valueOf(typeStr); - switch (type) { - case MERGE: - return undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath)); - case APPEND: - return undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback); - case CREATE: - return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath)); - default: - throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback); - } - }) - .mapToPair(rollbackStat -> new Tuple2<>(rollbackStat.getPartitionPath(), rollbackStat)) - .reduceByKey(RollbackUtils::mergeRollbackStat) - .map(Tuple2::_2).collect(); - } catch (Exception e) { - throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); - } - } - - protected Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException { - // collect all log files that is supposed to be deleted with this rollback - return FSUtils.getAllLogFiles(table.getMetaClient().getFs(), - FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime) - .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java deleted file mode 100644 index 9486362558147..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/SparkMergeOnReadRollbackActionExecutor.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.table.HoodieTable; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; - -import java.io.IOException; -import java.util.List; - -@SuppressWarnings("checkstyle:LineLength") -public class SparkMergeOnReadRollbackActionExecutor extends - BaseMergeOnReadRollbackActionExecutor>, JavaRDD, JavaRDD> { - public SparkMergeOnReadRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, JavaRDD, JavaRDD> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); - } - - public SparkMergeOnReadRollbackActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, - HoodieTable>, JavaRDD, JavaRDD> table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { - super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); - } - - @Override - protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() { - if (useMarkerBasedStrategy) { - return new SparkMarkerBasedRollbackStrategy(table, context, config, instantTime); - } else { - return this::executeRollbackUsingFileListing; - } - } - - @Override - protected List executeRollbackUsingFileListing(HoodieInstant resolvedInstant) { - List rollbackRequests; - JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); - try { - rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context); - } catch (IOException e) { - throw new HoodieIOException("Error generating rollback requests by file listing.", e); - } - return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests); - } -} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index bc1f3c3885028..810733c647a17 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -83,8 +83,8 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002"); // execute CopyOnWriteRollbackActionExecutor with filelisting mode - SparkCopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true); - assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy); + CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true); + assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy); List hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback(); // assert hoodieRollbackStats @@ -162,11 +162,11 @@ private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfi commitInstant = table.getCompletedCommitTimeline().lastInstant().get(); } - SparkCopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new SparkCopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false); + CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false); if (!isUsingMarkers) { - assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy); + assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy); } else { - assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy); + assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy); } Map rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 75e6a7ac0b703..5d269cf6a8da3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -89,7 +89,7 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws //2. rollback HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002"); - SparkMergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new SparkMergeOnReadRollbackActionExecutor( + MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor( context, cfg, table, @@ -98,9 +98,9 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws true); // assert is filelist mode if (!isUsingMarkers) { - assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy); + assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy); } else { - assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof SparkMarkerBasedRollbackStrategy); + assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy); } //3. assert the rollback stat @@ -145,15 +145,15 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws public void testFailForCompletedInstants() { Assertions.assertThrows(IllegalArgumentException.class, () -> { HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"); - new SparkMergeOnReadRollbackActionExecutor( - context, - getConfigBuilder().build(), - getHoodieTable(metaClient, getConfigBuilder().build()), - "003", - rollBackInstant, - true, - true, - true); + new MergeOnReadRollbackActionExecutor( + context, + getConfigBuilder().build(), + getHoodieTable(metaClient, getConfigBuilder().build()), + "003", + rollBackInstant, + true, + true, + true); }); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java index 6e6738653972b..94fa6974d34e4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java @@ -32,7 +32,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy; +import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hadoop.fs.FileStatus; @@ -93,7 +93,7 @@ public void testCopyOnWriteRollbackWithTestTable() throws Exception { .withMarkerFile("partA", f2, IOType.CREATE); // when - List stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002") + List stats = new MarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002") .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001")); // then: ensure files are deleted correctly, non-existent files reported as failed deletes @@ -176,7 +176,7 @@ private List testRun(boolean useFileListingMetadata, HoodieW writeStatuses.collect(); // rollback 2nd commit and ensure stats reflect the info. - return new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003") + return new MarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003") .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002")); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java index 0128ce52b85ff..8ea6a43e0dead 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.engine; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; @@ -56,6 +57,9 @@ public TaskContextSupplier getTaskContextSupplier() { public abstract List map(List data, SerializableFunction func, int parallelism); + public abstract List mapToPairAndReduceByKey( + List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, int parallelism); + public abstract List flatMap(List data, SerializableFunction> func, int parallelism); public abstract void foreach(List data, SerializableConsumer consumer, int parallelism); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java index e8045670244b2..0aeb9d8c0050c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; @@ -37,6 +38,7 @@ import static org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrapper; /** * A java based engine context, use this implementation on the query engine integrations if needed. @@ -56,6 +58,15 @@ public List map(List data, SerializableFunction func, int par return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); } + @Override + public List mapToPairAndReduceByKey( + List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, int parallelism) { + return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc)) + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get()) + .collect(Collectors.toList()); + } + @Override public List flatMap(List data, SerializableFunction> func, int parallelism) { return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java index 405f57eeedfc5..b729e48ae7ef4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -70,4 +71,14 @@ public static Function> throwingMapToPairWrapper(Seriali } }; } + + public static BinaryOperator throwingReduceWrapper(SerializableBiFunction throwingReduceFunction) { + return (v1, v2) -> { + try { + return throwingReduceFunction.apply(v1, v2); + } catch (Exception e) { + throw new HoodieException("Error occurs when executing mapToPair", e); + } + }; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java new file mode 100644 index 0000000000000..940396cf8e1ec --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializableBiFunction.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.function; + +import java.io.Serializable; + +/** + * A function that accepts two arguments and produces a result. + * + * @param the type of the first argument to the function + * @param the type of the second argument to the function + * @param the type of the result of the function + */ +@FunctionalInterface +public interface SerializableBiFunction extends Serializable { + R apply(T t, U u); +} From 87a3b37885e23b5348133179b9d984cb07871bef Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 15 Sep 2021 09:57:48 -0700 Subject: [PATCH 2/4] Fix minor issue in Flink transformation --- .../apache/hudi/client/common/HoodieFlinkEngineContext.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 11e674fb687d3..66b7e78d430c7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -80,7 +81,8 @@ public List map(List data, SerializableFunction func, int par public List mapToPairAndReduceByKey(List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, int parallelism) { return data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc)) .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() - .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get()) + .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)) + .filter(Objects::nonNull) .collect(Collectors.toList()); } From c070774d6c7d1c876eb3c529ba6cacf0bb58a948 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 15 Sep 2021 12:31:49 -0700 Subject: [PATCH 3/4] Changes around common logic --- .../rollback/ListingBasedRollbackHelper.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index 9bfd3420de222..e12d1a5f88e92 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -27,7 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; @@ -43,6 +43,7 @@ import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -84,10 +85,9 @@ public List collectRollbackStats(HoodieEngineContext context List rollbackRequests) { int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); - return context.mapToPairAndReduceByKey(rollbackRequests, + return new ArrayList<>(context.mapToPair(rollbackRequests, rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false), - RollbackUtils::mergeRollbackStat, - parallelism); + parallelism).values()); } /** @@ -136,7 +136,7 @@ private Pair maybeDeleteAndCollectStats(ListingBased // generate metadata if (doDelete) { - Map header = generateHeader(instantToRollback.getTimestamp()); + Map header = generateHeader(instantToRollback.getTimestamp()); // if update belongs to an existing log file writer.appendBlock(new HoodieCommandBlock(header)); } @@ -233,12 +233,12 @@ private Map deleteBaseFiles(HoodieTableMetaClient metaClien return results; } - private Map generateHeader(String commit) { + private Map generateHeader(String commit) { // generate metadata - Map header = new HashMap<>(3); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); - header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit); - header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, + Map header = new HashMap<>(3); + header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); + header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit); + header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); return header; } From 37b53485eaef00800e1ec5ce833c54b618cb3054 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 15 Sep 2021 13:59:15 -0700 Subject: [PATCH 4/4] Fix collectRollbackStats() bug in existing code --- .../table/action/rollback/ListingBasedRollbackHelper.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index e12d1a5f88e92..849087222dae0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -43,7 +43,6 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -85,9 +84,10 @@ public List collectRollbackStats(HoodieEngineContext context List rollbackRequests) { int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); - return new ArrayList<>(context.mapToPair(rollbackRequests, + return context.mapToPairAndReduceByKey(rollbackRequests, rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false), - parallelism).values()); + RollbackUtils::mergeRollbackStat, + parallelism); } /**