Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,18 @@ public static class Config implements Serializable {
}

public void export(JavaSparkContext jsc, Config cfg) throws IOException {
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

if (outputPathExists(fs, cfg)) {
if (outputPathExists(outputFs, cfg)) {
throw new HoodieSnapshotExporterException("The target output path already exists.");
}

final String latestCommitTimestamp = getLatestCommitTimestamp(fs, cfg).<HoodieSnapshotExporterException>orElseThrow(() -> {
throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
});
FileSystem sourceFs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
final String latestCommitTimestamp = getLatestCommitTimestamp(sourceFs, cfg)
.<HoodieSnapshotExporterException>orElseThrow(() -> {
throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
});
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
latestCommitTimestamp));

Expand All @@ -134,11 +136,11 @@ public void export(JavaSparkContext jsc, Config cfg) throws IOException {
LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));

if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp);
exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
} else {
exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp);
exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
}
createSuccessTag(fs, cfg);
createSuccessTag(outputFs, cfg);
}

private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException {
Expand All @@ -164,7 +166,8 @@ private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
}
}

private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) {
private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs,
Config cfg, List<String> partitions, String latestCommitTimestamp) {
Partitioner defaultPartitioner = dataset -> {
Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
Expand All @@ -178,7 +181,7 @@ private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> part

HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
Iterator<String> exportingFilePaths = jsc
.parallelize(partitions, partitions.size())
.flatMap(partition -> fsView
Expand All @@ -193,8 +196,9 @@ private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> part
.save(cfg.targetOutputPath);
}

private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs,
Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);

final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
final SerializableConfiguration serConf = context.getHadoopConf();
Expand All @@ -219,20 +223,26 @@ private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partiti
String partition = tuple._1();
Path sourceFilePath = new Path(tuple._2());
Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());

if (!fs.exists(toPartitionPath)) {
fs.mkdirs(toPartitionPath);
if (!executorOutputFs.exists(toPartitionPath)) {
executorOutputFs.mkdirs(toPartitionPath);
}
FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), false,
fs.getConf());
FileUtil.copy(
executorSourceFs,
sourceFilePath,
executorOutputFs,
new Path(toPartitionPath, sourceFilePath.getName()),
false,
executorOutputFs.getConf());
}, files.size());

// Also copy the .commit files
LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
FileStatus[] commitFilesToCopy =
fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
Expand All @@ -244,20 +254,22 @@ private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partiti
for (FileStatus commitStatus : commitFilesToCopy) {
Path targetFilePath =
new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
if (!fileSystem.exists(targetFilePath.getParent())) {
fileSystem.mkdirs(targetFilePath.getParent());
if (!outputFs.exists(targetFilePath.getParent())) {
outputFs.mkdirs(targetFilePath.getParent());
}
if (fileSystem.exists(targetFilePath)) {
if (outputFs.exists(targetFilePath)) {
LOG.error(
String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
}
FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, targetFilePath, false, fileSystem.getConf());
FileUtil.copy(sourceFs, commitStatus.getPath(), outputFs, targetFilePath, false, outputFs.getConf());
}
}

private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) {
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
private BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder()
.setConf(sourceFs.getConf())
.setBasePath(cfg.sourceBasePath)
.build();
return new HoodieTableFileSystemView(tableMetadata, tableMetadata
.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
}
Expand Down