Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -238,7 +239,7 @@ private boolean versionInFilePath(String path, String version) {
}

private String jobDesc() {
if (startVersionName != null) {
if (startVersionName == null) {
return String.format(
"Replacing path prefixes '%s' with '%s' in the metadata files of table %s,"
+ "up to version '%s'.",
Expand Down Expand Up @@ -272,8 +273,9 @@ private String rebuildMetadata() {
((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current();

Preconditions.checkArgument(
endMetadata.statisticsFiles() == null || endMetadata.statisticsFiles().isEmpty(),
"Statistic files are not supported yet.");
endMetadata.partitionStatisticsFiles() == null
|| endMetadata.partitionStatisticsFiles().isEmpty(),
"Partition statistics files are not supported yet.");

// rebuild version files
RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata);
Expand Down Expand Up @@ -341,7 +343,7 @@ private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot>
private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
RewriteResult<Snapshot> result = new RewriteResult<>();
result.toRewrite().addAll(endMetadata.snapshots());
result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName));
result.copyPlan().addAll(rewriteVersionFile(endMetadata, endVersionName));

List<MetadataLogEntry> versions = endMetadata.previousFiles();
for (int i = versions.size() - 1; i >= 0; i--) {
Expand All @@ -357,19 +359,50 @@ private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
new StaticTableOperations(versionFilePath, table.io()).current();

result.toRewrite().addAll(tableMetadata.snapshots());
result.copyPlan().add(rewriteVersionFile(tableMetadata, versionFilePath));
result.copyPlan().addAll(rewriteVersionFile(tableMetadata, versionFilePath));
}

return result;
}

private Pair<String, String> rewriteVersionFile(TableMetadata metadata, String versionFilePath) {
private Set<Pair<String, String>> rewriteVersionFile(
TableMetadata metadata, String versionFilePath) {
Set<Pair<String, String>> result = Sets.newHashSet();
String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir);
TableMetadata newTableMetadata =
RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix);
TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath));
return Pair.of(
stagingPath, RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix));
result.add(
Pair.of(
stagingPath,
RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix)));

// include statistics files in copy plan
result.addAll(
statsFileCopyPlan(metadata.statisticsFiles(), newTableMetadata.statisticsFiles()));
return result;
}

private Set<Pair<String, String>> statsFileCopyPlan(
List<StatisticsFile> beforeStats, List<StatisticsFile> afterStats) {
Set<Pair<String, String>> result = Sets.newHashSet();
if (beforeStats.isEmpty()) {
return result;
}

Preconditions.checkArgument(
beforeStats.size() == afterStats.size(),
"Before and after path rewrite, statistic files count should be same");
for (int i = 0; i < beforeStats.size(); i++) {
StatisticsFile before = beforeStats.get(i);
StatisticsFile after = afterStats.get(i);
Preconditions.checkArgument(
before.fileSizeInBytes() == after.fileSizeInBytes(),
"Before and after path rewrite, statistic file size should be same");
result.add(
Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), after.path()));
}
return result;
}

/**
Expand Down
Loading