diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index 753765fb6a504..187f66d073609 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -114,16 +114,18 @@ public static class Config implements Serializable { } public void export(JavaSparkContext jsc, Config cfg) throws IOException { - FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); + FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration()); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - if (outputPathExists(fs, cfg)) { + if (outputPathExists(outputFs, cfg)) { throw new HoodieSnapshotExporterException("The target output path already exists."); } - final String latestCommitTimestamp = getLatestCommitTimestamp(fs, cfg).orElseThrow(() -> { - throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot."); - }); + FileSystem sourceFs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); + final String latestCommitTimestamp = getLatestCommitTimestamp(sourceFs, cfg) + .orElseThrow(() -> { + throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot."); + }); LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp)); @@ -134,11 +136,11 @@ public void export(JavaSparkContext jsc, Config cfg) throws IOException { LOG.info(String.format("The job needs to export %d partitions.", partitions.size())); if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) { - exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp); + exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp); } else { - exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp); + exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp); } - createSuccessTag(fs, cfg); + createSuccessTag(outputFs, cfg); } private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException { @@ -164,7 +166,8 @@ private void createSuccessTag(FileSystem fs, Config cfg) throws IOException { } } - private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List partitions, String latestCommitTimestamp) { + private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs, + Config cfg, List partitions, String latestCommitTimestamp) { Partitioner defaultPartitioner = dataset -> { Dataset hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq()); return StringUtils.isNullOrEmpty(cfg.outputPartitionField) @@ -178,7 +181,7 @@ private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List part HoodieEngineContext context = new HoodieSparkEngineContext(jsc); context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath); - final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); + final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg); Iterator exportingFilePaths = jsc .parallelize(partitions, partitions.size()) .flatMap(partition -> fsView @@ -193,8 +196,9 @@ private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List part .save(cfg.targetOutputPath); } - private void exportAsHudi(JavaSparkContext jsc, Config cfg, List partitions, String latestCommitTimestamp) throws IOException { - final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg); + private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs, + Config cfg, List partitions, String latestCommitTimestamp) throws IOException { + final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg); final HoodieEngineContext context = new HoodieSparkEngineContext(jsc); final SerializableConfiguration serConf = context.getHadoopConf(); @@ -219,20 +223,26 @@ private void exportAsHudi(JavaSparkContext jsc, Config cfg, List partiti String partition = tuple._1(); Path sourceFilePath = new Path(tuple._2()); Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition); - FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy()); + FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy()); + FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy()); - if (!fs.exists(toPartitionPath)) { - fs.mkdirs(toPartitionPath); + if (!executorOutputFs.exists(toPartitionPath)) { + executorOutputFs.mkdirs(toPartitionPath); } - FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), false, - fs.getConf()); + FileUtil.copy( + executorSourceFs, + sourceFilePath, + executorOutputFs, + new Path(toPartitionPath, sourceFilePath.getName()), + false, + executorOutputFs.getConf()); }, files.size()); // Also copy the .commit files LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp)); - final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); + FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration()); FileStatus[] commitFilesToCopy = - fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> { + sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> { if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) { return true; } else { @@ -244,20 +254,22 @@ private void exportAsHudi(JavaSparkContext jsc, Config cfg, List partiti for (FileStatus commitStatus : commitFilesToCopy) { Path targetFilePath = new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName()); - if (!fileSystem.exists(targetFilePath.getParent())) { - fileSystem.mkdirs(targetFilePath.getParent()); + if (!outputFs.exists(targetFilePath.getParent())) { + outputFs.mkdirs(targetFilePath.getParent()); } - if (fileSystem.exists(targetFilePath)) { + if (outputFs.exists(targetFilePath)) { LOG.error( String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath)); } - FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, targetFilePath, false, fileSystem.getConf()); + FileUtil.copy(sourceFs, commitStatus.getPath(), outputFs, targetFilePath, false, outputFs.getConf()); } } - private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) { - FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); - HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build(); + private BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) { + HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder() + .setConf(sourceFs.getConf()) + .setBasePath(cfg.sourceBasePath) + .build(); return new HoodieTableFileSystemView(tableMetadata, tableMetadata .getActiveTimeline().getWriteTimeline().filterCompletedInstants()); }