diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java new file mode 100644 index 0000000000000..156da66ff1b32 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.repair; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; + +/** + * Utils for table repair tool. + */ +public final class RepairUtils { + /** + * Tags the instant time of each base or log file from the input file paths. + * + * @param basePath Base path of the table. + * @param baseFileExtension Base file extension, e.g., ".parquet". + * @param allPaths A {@link List} of file paths to tag. + * @return A {@link Map} of instant time in {@link String} to a {@link List} of relative file paths. + */ + public static Map> tagInstantsOfBaseAndLogFiles( + String basePath, String baseFileExtension, List allPaths) { + // Instant time -> Set of base and log file paths + Map> instantToFilesMap = new HashMap<>(); + allPaths.forEach(path -> { + String instantTime = path.toString().endsWith(baseFileExtension) + ? FSUtils.getCommitTime(path.getName()) : FSUtils.getBaseCommitTimeFromLogPath(path); + instantToFilesMap.computeIfAbsent(instantTime, k -> new ArrayList<>()); + instantToFilesMap.get(instantTime).add( + FSUtils.getRelativePartitionPath(new Path(basePath), path)); + }); + return instantToFilesMap; + } + + /** + * Gets the base and log file paths written for a given instant from the timeline. + * This reads the details of the instant metadata. + * + * @param timeline {@link HoodieTimeline} instance, can be active or archived timeline. + * @param instant Instant for lookup. + * @return A {@link Option} of {@link Set} of relative file paths to base path + * if the instant action is supported; empty {@link Option} otherwise. + * @throws IOException if reading instant details fail. + */ + public static Option> getBaseAndLogFilePathsFromTimeline( + HoodieTimeline timeline, HoodieInstant instant) throws IOException { + if (!instant.isCompleted()) { + throw new HoodieException("Cannot get base and log file paths from " + + "instant not completed: " + instant.getTimestamp()); + } + + switch (instant.getAction()) { + case COMMIT_ACTION: + case DELTA_COMMIT_ACTION: + final HoodieCommitMetadata commitMetadata = + HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + return Option.of(commitMetadata.getPartitionToWriteStats().values().stream().flatMap(List::stream) + .map(HoodieWriteStat::getPath).collect(Collectors.toSet())); + case REPLACE_COMMIT_ACTION: + final HoodieReplaceCommitMetadata replaceCommitMetadata = + HoodieReplaceCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); + return Option.of(replaceCommitMetadata.getPartitionToWriteStats().values().stream().flatMap(List::stream) + .map(HoodieWriteStat::getPath).collect(Collectors.toSet())); + default: + return Option.empty(); + } + } + + /** + * Finds the dangling files to remove for a given instant to repair. + * + * @param instantToRepair Instant timestamp to repair. + * @param baseAndLogFilesFromFs A {@link List} of base and log files based on the file system. + * @param activeTimeline {@link HoodieActiveTimeline} instance. + * @param archivedTimeline {@link HoodieArchivedTimeline} instance. + * @return A {@link List} of relative file paths to base path for removing. + */ + public static List findInstantFilesToRemove( + String instantToRepair, List baseAndLogFilesFromFs, + HoodieActiveTimeline activeTimeline, HoodieArchivedTimeline archivedTimeline) { + // Skips the instant if it is requested or inflight in active timeline + if (activeTimeline.filter(instant -> instant.getTimestamp().equals(instantToRepair) + && !instant.isCompleted()).getInstants().findAny().isPresent()) { + return Collections.emptyList(); + } + + try { + boolean doesInstantExist = false; + Option> filesFromTimeline = Option.empty(); + Option instantOption = activeTimeline.filterCompletedInstants().filter( + instant -> instant.getTimestamp().equals(instantToRepair)).firstInstant(); + if (instantOption.isPresent()) { + // Completed instant in active timeline + doesInstantExist = true; + filesFromTimeline = RepairUtils.getBaseAndLogFilePathsFromTimeline( + activeTimeline, instantOption.get()); + } else { + instantOption = archivedTimeline.filterCompletedInstants().filter( + instant -> instant.getTimestamp().equals(instantToRepair)).firstInstant(); + if (instantOption.isPresent()) { + // Completed instant in archived timeline + doesInstantExist = true; + filesFromTimeline = RepairUtils.getBaseAndLogFilePathsFromTimeline( + archivedTimeline, instantOption.get()); + } + } + + if (doesInstantExist) { + if (!filesFromTimeline.isPresent() || filesFromTimeline.get().isEmpty()) { + // Skips if no instant details + return Collections.emptyList(); + } + // Excludes committed base and log files from timeline + Set filesToRemove = new HashSet<>(baseAndLogFilesFromFs); + filesToRemove.removeAll(filesFromTimeline.get()); + return new ArrayList<>(filesToRemove); + } else { + // The instant does not exist in the whole timeline (neither completed nor requested/inflight), + // this means the files from this instant are dangling, which should be removed + return baseAndLogFilesFromFs; + } + } catch (IOException e) { + // In case of failure, does not remove any files for the instant + return Collections.emptyList(); + } + } + + /** + * Serializable path filter class for Spark job. + */ + public interface SerializablePathFilter extends PathFilter, Serializable { + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index faff4ae6fba6e..e82819e73e1d9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -18,10 +18,6 @@ package org.apache.hudi.common.table.timeline; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.common.model.HoodieLogFile; @@ -32,10 +28,16 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.Nonnull; + import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -65,7 +67,7 @@ */ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { private static final Pattern ARCHIVE_FILE_PATTERN = - Pattern.compile("^\\.commits_\\.archive\\.([0-9]*)$"); + Pattern.compile("^\\.commits_\\.archive\\.([0-9]*)$"); private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits"; private static final String ACTION_TYPE_KEY = "actionType"; @@ -112,6 +114,11 @@ public void loadInstantDetailsInMemory(String startTs, String endTs) { loadInstants(startTs, endTs); } + public void loadCompletedInstantDetailsInMemory() { + loadInstants(null, true, + record -> HoodieInstant.State.COMPLETED.toString().equals(record.get(ACTION_STATE).toString())); + } + public void loadCompactionDetailsInMemory(String compactionInstantTime) { loadCompactionDetailsInMemory(compactionInstantTime, compactionInstantTime); } @@ -143,15 +150,17 @@ public HoodieArchivedTimeline reload() { } private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { - final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString(); + final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString(); final String action = record.get(ACTION_TYPE_KEY).toString(); if (loadDetails) { getMetadataKey(action).map(key -> { Object actionData = record.get(key); - if (action.equals(HoodieTimeline.COMPACTION_ACTION)) { - this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData)); - } else { - this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8)); + if (actionData != null) { + if (action.equals(HoodieTimeline.COMPACTION_ACTION)) { + this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) actionData)); + } else { + this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8)); + } } return null; }); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java index cb3f103a9c87c..8fc43ef1cfa24 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java @@ -18,6 +18,11 @@ package org.apache.hudi.common.util; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -98,6 +103,31 @@ public static void copy(InputStream inputStream, OutputStream outputStream) thro } } + /** + * Copies the file content from source path to destination path. + * + * @param fileSystem {@link FileSystem} instance. + * @param sourceFilePath Source file path. + * @param destFilePath Destination file path. + */ + public static void copy( + FileSystem fileSystem, org.apache.hadoop.fs.Path sourceFilePath, + org.apache.hadoop.fs.Path destFilePath) { + FSDataInputStream fsDataInputStream = null; + FSDataOutputStream fsDataOutputStream = null; + try { + fsDataInputStream = fileSystem.open(sourceFilePath); + fsDataOutputStream = fileSystem.create(destFilePath, false); + copy(fsDataInputStream, fsDataOutputStream); + } catch (IOException e) { + throw new HoodieIOException(String.format("Cannot copy from %s to %s", + sourceFilePath.toString(), destFilePath.toString()), e); + } finally { + closeQuietly(fsDataInputStream); + closeQuietly(fsDataOutputStream); + } + } + public static byte[] readAsByteArray(InputStream input) throws IOException { return readAsByteArray(input, 128); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java new file mode 100644 index 0000000000000..d7fa70889fb8a --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.metadata.FileSystemBackedTableMetadata; +import org.apache.hudi.table.repair.RepairUtils; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.io.Serializable; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import scala.Tuple2; + +/** + * A tool with spark-submit to repair Hudi table by finding and deleting dangling + * base and log files. + *

+ * You can run this tool with the following command: + * ``` + * spark-submit \ + * --class org.apache.hudi.utilities.HoodieRepairTool \ + * --driver-memory 4g \ + * --executor-memory 1g \ + * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + * --conf spark.sql.catalogImplementation=hive \ + * --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ + * --packages org.apache.spark:spark-avro_2.12:3.1.2 \ + * $HUDI_DIR/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.11.0-SNAPSHOT.jar \ + * --mode dry_run \ + * --base-path base_path \ + * --assume-date-partitioning + * ``` + *

+ * You can specify the running mode of the tool through `--mode`. + * There are three modes of the {@link HoodieRepairTool}: + * - REPAIR ("repair"): repairs the table by removing dangling data and log files not belonging to any commit. + * The removed files are going to be backed up at the backup path provided, in case recovery is needed. + * In this mode, backup path is required through `--backup-path`. You can also provide a range for repairing + * only the instants within the range, through `--start-instant-time` and `--end-instant-time`. You can also + * specify only one of them. If no range is provided, all instants are going to be repaired. + *

+ * Example command: + * ``` + * spark-submit \ + * --class org.apache.hudi.utilities.HoodieRepairTool \ + * --driver-memory 4g \ + * --executor-memory 1g \ + * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + * --conf spark.sql.catalogImplementation=hive \ + * --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ + * --packages org.apache.spark:spark-avro_2.12:3.1.2 \ + * $HUDI_DIR/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.11.0-SNAPSHOT.jar \ + * --mode repair \ + * --base-path base_path \ + * --backup-path backup_path \ + * --start-instant-time ts1 \ + * --end-instant-time ts2 \ + * --assume-date-partitioning + * ``` + *

+ * - DRY_RUN ("dry_run"): only looks for dangling data and log files. You can also provide a range for looking + * at only the instants within the range, through `--start-instant-time` and `--end-instant-time`. You can also + * specify only one of them. If no range is provided, all instants are going to be scanned. + *

+ * Example command: + * ``` + * spark-submit \ + * --class org.apache.hudi.utilities.HoodieRepairTool \ + * --driver-memory 4g \ + * --executor-memory 1g \ + * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + * --conf spark.sql.catalogImplementation=hive \ + * --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ + * --packages org.apache.spark:spark-avro_2.12:3.1.2 \ + * $HUDI_DIR/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.11.0-SNAPSHOT.jar \ + * --mode dry_run \ + * --base-path base_path \ + * --start-instant-time ts1 \ + * --end-instant-time ts2 \ + * --assume-date-partitioning + * ``` + *

+ * - UNDO ("undo"): undoes the repair by copying back the files from backup directory to the table base path. + * In this mode, backup path is required through `--backup-path`. + *

+ * Example command: + * ``` + * spark-submit \ + * --class org.apache.hudi.utilities.HoodieRepairTool \ + * --driver-memory 4g \ + * --executor-memory 1g \ + * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + * --conf spark.sql.catalogImplementation=hive \ + * --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ + * --packages org.apache.spark:spark-avro_2.12:3.1.2 \ + * $HUDI_DIR/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.11.0-SNAPSHOT.jar \ + * --mode undo \ + * --base-path base_path \ + * --backup-path backup_path + * ``` + */ +public class HoodieRepairTool { + + private static final Logger LOG = LogManager.getLogger(HoodieRepairTool.class); + private static final String BACKUP_DIR_PREFIX = "hoodie_repair_backup_"; + // Repair config + private final Config cfg; + // Properties with source, hoodie client, key generator etc. + private TypedProperties props; + // Spark context + private final JavaSparkContext jsc; + private final HoodieTableMetaClient metaClient; + private final FileSystemBackedTableMetadata tableMetadata; + + public HoodieRepairTool(JavaSparkContext jsc, Config cfg) { + if (cfg.propsFilePath != null) { + cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString(); + } + this.jsc = jsc; + this.cfg = cfg; + this.props = cfg.propsFilePath == null + ? UtilHelpers.buildProperties(cfg.configs) + : readConfigFromFileSystem(jsc, cfg); + this.metaClient = HoodieTableMetaClient.builder() + .setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath) + .setLoadActiveTimelineOnLoad(true) + .build(); + this.tableMetadata = new FileSystemBackedTableMetadata( + new HoodieSparkEngineContext(jsc), + new SerializableConfiguration(jsc.hadoopConfiguration()), + cfg.basePath, cfg.assumeDatePartitioning); + } + + public void run() { + Option startingInstantOption = Option.ofNullable(cfg.startingInstantTime); + Option endingInstantOption = Option.ofNullable(cfg.endingInstantTime); + + if (startingInstantOption.isPresent() && endingInstantOption.isPresent()) { + LOG.info(String.format("Start repairing completed instants between %s and %s (inclusive)", + startingInstantOption.get(), endingInstantOption.get())); + } else if (startingInstantOption.isPresent()) { + LOG.info(String.format("Start repairing completed instants from %s (inclusive)", + startingInstantOption.get())); + } else if (endingInstantOption.isPresent()) { + LOG.info(String.format("Start repairing completed instants till %s (inclusive)", + endingInstantOption.get())); + } else { + LOG.info("Start repairing all completed instants"); + } + + try { + Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase()); + switch (mode) { + case REPAIR: + LOG.info(" ****** The repair tool is in REPAIR mode, dangling data and logs files " + + "not belonging to any commit are going to be DELETED from the table ******"); + if (checkBackupPathForRepair() < 0) { + LOG.error("Backup path check failed."); + break; + } + doRepair(startingInstantOption, endingInstantOption, false); + break; + case DRY_RUN: + LOG.info(" ****** The repair tool is in DRY_RUN mode, " + + "only LOOKING FOR dangling data and log files from the table ******"); + doRepair(startingInstantOption, endingInstantOption, true); + break; + case UNDO: + if (checkBackupPathAgainstBasePath() < 0) { + LOG.error("Backup path check failed."); + break; + } + undoRepair(); + break; + default: + LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly"); + } + } catch (IOException e) { + throw new HoodieIOException("Unable to repair table in " + cfg.basePath, e); + } + } + + public static void main(String[] args) { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + final JavaSparkContext jsc = UtilHelpers.buildSparkContext("hudi-table-repair", cfg.sparkMaster, cfg.sparkMemory); + try { + new HoodieRepairTool(jsc, cfg).run(); + } catch (Throwable throwable) { + LOG.error("Fail to run table repair for " + cfg.basePath, throwable); + } finally { + jsc.stop(); + } + } + + /** + * Copies the list of files from source base path to destination base path. + * The destination file path (base + relative) should not already exist. + * + * @param jsc {@link JavaSparkContext} instance. + * @param relativeFilePaths A {@link List} of relative file paths for copying. + * @param sourceBasePath Source base path. + * @param destBasePath Destination base path. + * @param parallelism Parallelism. + * @return {@code true} if all successful; {@code false} otherwise. + */ + static boolean copyFiles( + JavaSparkContext jsc, List relativeFilePaths, String sourceBasePath, + String destBasePath, int parallelism) { + SerializableConfiguration conf = new SerializableConfiguration(jsc.hadoopConfiguration()); + List allResults = jsc.parallelize(relativeFilePaths, parallelism) + .mapPartitions(iterator -> { + List results = new ArrayList<>(); + FileSystem fs = FSUtils.getFs(destBasePath, conf.get()); + iterator.forEachRemaining(filePath -> { + boolean success = false; + Path destPath = new Path(destBasePath, filePath); + try { + if (!fs.exists(destPath)) { + FileIOUtils.copy(fs, new Path(sourceBasePath, filePath), destPath); + success = true; + } + } catch (IOException e) { + // Copy Fail + } finally { + results.add(success); + } + }); + return results.iterator(); + }) + .collect(); + return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false); + } + + /** + * Lists all Hoodie files from the table base path. + * + * @param basePathStr Table base path. + * @param conf {@link Configuration} instance. + * @return An array of {@link FileStatus} of all Hoodie files. + * @throws IOException upon errors. + */ + static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration conf) throws IOException { + final Set validFileExtensions = Arrays.stream(HoodieFileFormat.values()) + .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new)); + final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension(); + FileSystem fs = FSUtils.getFs(basePathStr, conf); + Path basePath = new Path(basePathStr); + + try { + return Arrays.stream(fs.listStatus(basePath, path -> { + String extension = FSUtils.getFileExtension(path.getName()); + return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension); + })).filter(FileStatus::isFile).toArray(FileStatus[]::new); + } catch (IOException e) { + // return empty FileStatus if partition does not exist already + if (!fs.exists(basePath)) { + return new FileStatus[0]; + } else { + throw e; + } + } + } + + /** + * Does repair, either in REPAIR or DRY_RUN mode. + * + * @param startingInstantOption {@link Option} of starting instant for scanning, can be empty. + * @param endingInstantOption {@link Option} of ending instant for scanning, can be empty. + * @param isDryRun Is dry run. + * @throws IOException upon errors. + */ + void doRepair( + Option startingInstantOption, Option endingInstantOption, boolean isDryRun) throws IOException { + // Scans all partitions to find base and log files in the base path + List allFilesInPartitions = getBaseAndLogFilePathsFromFileSystem(); + // Buckets the files based on instant time + // instant time -> relative paths of base and log files to base path + Map> instantToFilesMap = RepairUtils.tagInstantsOfBaseAndLogFiles( + metaClient.getBasePath(), + metaClient.getTableConfig().getBaseFileFormat().getFileExtension(), allFilesInPartitions); + List instantTimesToRepair = instantToFilesMap.keySet().stream() + .filter(instant -> (!startingInstantOption.isPresent() + || instant.compareTo(startingInstantOption.get()) >= 0) + && (!endingInstantOption.isPresent() + || instant.compareTo(endingInstantOption.get()) <= 0) + ).collect(Collectors.toList()); + + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); + // This assumes that the archived timeline only has completed instants so this is safe + archivedTimeline.loadCompletedInstantDetailsInMemory(); + + int parallelism = Math.max(Math.min(instantTimesToRepair.size(), cfg.parallelism), 1); + List>> instantFilesToRemove = + jsc.parallelize(instantTimesToRepair, parallelism) + .mapToPair(instantToRepair -> + new Tuple2<>(instantToRepair, RepairUtils.findInstantFilesToRemove(instantToRepair, + instantToFilesMap.get(instantToRepair), activeTimeline, archivedTimeline))) + .collect(); + + List>> instantsWithDanglingFiles = + instantFilesToRemove.stream().filter(e -> !e._2.isEmpty()).collect(Collectors.toList()); + printRepairInfo(instantTimesToRepair, instantsWithDanglingFiles); + if (!isDryRun) { + List relativeFilePathsToDelete = + instantsWithDanglingFiles.stream().flatMap(e -> e._2.stream()).collect(Collectors.toList()); + if (relativeFilePathsToDelete.size() > 0) { + parallelism = Math.max(Math.min(relativeFilePathsToDelete.size(), cfg.parallelism), 1); + if (!backupFiles(relativeFilePathsToDelete, parallelism)) { + LOG.error("Error backing up dangling files. Exiting..."); + return; + } + deleteFiles(relativeFilePathsToDelete, parallelism); + } + LOG.info(String.format("Table repair on %s is successful", cfg.basePath)); + } + } + + /** + * @return All hoodie files of the table from the file system. + * @throws IOException upon errors. + */ + List getBaseAndLogFilePathsFromFileSystem() throws IOException { + List allPartitionPaths = tableMetadata.getAllPartitionPaths() + .stream().map(partitionPath -> + FSUtils.getPartitionPath(cfg.basePath, partitionPath).toString()) + .collect(Collectors.toList()); + return tableMetadata.getAllFilesInPartitions(allPartitionPaths).values().stream() + .map(fileStatuses -> + Arrays.stream(fileStatuses).map(fileStatus -> fileStatus.getPath()).collect(Collectors.toList())) + .flatMap(list -> list.stream()) + .collect(Collectors.toList()); + } + + /** + * Undoes repair for UNDO mode. + * + * @throws IOException upon errors. + */ + void undoRepair() throws IOException { + FileSystem fs = metaClient.getFs(); + String backupPathStr = cfg.backupPath; + Path backupPath = new Path(backupPathStr); + if (!fs.exists(backupPath)) { + LOG.error("Cannot find backup path: " + backupPath); + return; + } + + List relativeFilePaths = Arrays.stream( + listFilesFromBasePath(backupPathStr, jsc.hadoopConfiguration())) + .map(fileStatus -> + FSUtils.getPartitionPath(backupPathStr, fileStatus.getPath().toString()).toString()) + .collect(Collectors.toList()); + int parallelism = Math.max(Math.min(relativeFilePaths.size(), cfg.parallelism), 1); + restoreFiles(relativeFilePaths, parallelism); + } + + /** + * Verifies the backup path for repair. + * If there is no backup path configured, creates a new one in temp folder. + * If the backup path already has files, throws an error to the user. + * If the backup path is within the table base path, throws an error too. + * + * @return {@code 0} if successful; {@code -1} otherwise. + * @throws IOException upon errors. + */ + int checkBackupPathForRepair() throws IOException { + if (cfg.backupPath == null) { + SecureRandom random = new SecureRandom(); + long randomLong = random.nextLong(); + cfg.backupPath = "/tmp/" + BACKUP_DIR_PREFIX + randomLong; + } + + Path backupPath = new Path(cfg.backupPath); + if (metaClient.getFs().exists(backupPath) + && metaClient.getFs().listStatus(backupPath).length > 0) { + LOG.error(String.format("Cannot use backup path %s: it is not empty", cfg.backupPath)); + return -1; + } + + return checkBackupPathAgainstBasePath(); + } + + /** + * Verifies the backup path against table base path. + * If the backup path is within the table base path, throws an error. + * + * @return {@code 0} if successful; {@code -1} otherwise. + */ + int checkBackupPathAgainstBasePath() { + if (cfg.backupPath == null) { + LOG.error("Backup path is not configured"); + return -1; + } + + if (cfg.backupPath.contains(cfg.basePath)) { + LOG.error(String.format("Cannot use backup path %s: it resides in the base path %s", + cfg.backupPath, cfg.basePath)); + return -1; + } + return 0; + } + + /** + * Backs up dangling files from table base path to backup path. + * + * @param relativeFilePaths A {@link List} of relative file paths for backup. + * @param parallelism Parallelism for copying. + * @return {@code true} if all successful; {@code false} otherwise. + */ + boolean backupFiles(List relativeFilePaths, int parallelism) { + return copyFiles(jsc, relativeFilePaths, cfg.basePath, cfg.backupPath, parallelism); + } + + /** + * Restores dangling files from backup path to table base path. + * + * @param relativeFilePaths A {@link List} of relative file paths for restoring. + * @param parallelism Parallelism for copying. + * @return {@code true} if all successful; {@code false} otherwise. + */ + boolean restoreFiles(List relativeFilePaths, int parallelism) { + return copyFiles(jsc, relativeFilePaths, cfg.backupPath, cfg.basePath, parallelism); + } + + /** + * Deletes files from table base path. + * + * @param relativeFilePaths A {@link List} of relative file paths for deleting. + * @param parallelism Parallelism for deleting. + */ + void deleteFiles(List relativeFilePaths, int parallelism) { + jsc.parallelize(relativeFilePaths, parallelism) + .mapPartitions(iterator -> { + FileSystem fs = metaClient.getFs(); + List results = new ArrayList<>(); + iterator.forEachRemaining(filePath -> { + boolean success = false; + try { + success = fs.delete(new Path(filePath), false); + } catch (IOException e) { + LOG.warn("Failed to delete file " + filePath); + } finally { + results.add(success); + } + }); + return results.iterator(); + }) + .collect(); + } + + /** + * Prints the repair info. + * + * @param instantTimesToRepair A list instant times in consideration for repair + * @param instantsWithDanglingFiles A list of instants with dangling files. + */ + private void printRepairInfo( + List instantTimesToRepair, List>> instantsWithDanglingFiles) { + int numInstantsToRepair = instantsWithDanglingFiles.size(); + LOG.warn("Number of instants verified based on the base and log files: " + + instantTimesToRepair.size()); + LOG.warn("Instant timestamps: " + instantTimesToRepair); + LOG.warn("Number of instants to repair: " + numInstantsToRepair); + if (numInstantsToRepair > 0) { + instantsWithDanglingFiles.forEach(e -> { + LOG.warn(" -> Instant " + numInstantsToRepair); + LOG.warn(" ** Removing files: " + e._2); + }); + } + } + + /** + * Reads config from the file system. + * + * @param jsc {@link JavaSparkContext} instance. + * @param cfg {@link Config} instance. + * @return the {@link TypedProperties} instance. + */ + private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) + .getProps(true); + } + + public static class Config implements Serializable { + @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true) + public String basePath = null; + @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"repair\" means repairing the table " + + "by removing dangling data and log files not belonging to any commit; " + + "Set \"dry_run\" means only looking for dangling data and log files; " + + "Set \"undo\" means undoing the repair by copying back the files from backup directory", required = true) + public String runningMode = null; + @Parameter(names = {"--start-instant-time", "-si"}, description = "Starting Instant time " + + "for repair (inclusive)", required = false) + public String startingInstantTime = null; + @Parameter(names = {"--end-instant-time", "-ei"}, description = "Ending Instant time " + + "for repair (inclusive)", required = false) + public String endingInstantTime = null; + @Parameter(names = {"--backup-path", "-bp"}, description = "Backup path for storing dangling data " + + "and log files from the table", required = false) + public String backupPath = null; + @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for repair", required = false) + public int parallelism = 2; + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + public String sparkMaster = null; + @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false) + public String sparkMemory = "1g"; + @Parameter(names = {"--assume-date-partitioning", "-dp"}, description = "whether the partition path " + + "is date with three levels", required = false) + public Boolean assumeDatePartitioning = false; + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + + "hoodie client for table repair") + public String propsFilePath = null; + + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) + public List configs = new ArrayList<>(); + } + + public enum Mode { + // Repairs the table by removing dangling data and log files not belonging to any commit + REPAIR, + // Dry run by only looking for dangling data and log files + DRY_RUN, + // Undoes the repair by copying back the files from backup directory + UNDO + } +}