Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 103 additions & 21 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private RewriteTablePathUtil() {}
public static class RewriteResult<T> implements Serializable {
private final Set<T> toRewrite = Sets.newHashSet();
private final Set<Pair<String, String>> copyPlan = Sets.newHashSet();
private Long size = null;

public RewriteResult() {}

Expand All @@ -85,6 +86,14 @@ public Set<T> toRewrite() {
public Set<Pair<String, String>> copyPlan() {
return copyPlan;
}

public Long size() {
return size;
}

public void setSize(long newSize) {
this.size = newSize;
}
}

/**
Expand Down Expand Up @@ -226,7 +235,9 @@ private static List<Snapshot> updatePathInSnapshots(
* @param outputPath location to write the manifest list
* @return a copy plan for manifest files whose metadata were contained in the rewritten manifest
* list
* @deprecated since 1.10.0, will be removed in 1.11.0
*/
@Deprecated
public static RewriteResult<ManifestFile> rewriteManifestList(
Snapshot snapshot,
FileIO io,
Expand Down Expand Up @@ -276,6 +287,62 @@ public static RewriteResult<ManifestFile> rewriteManifestList(
}
}

/**
* Rewrite a manifest list representing a snapshot, replacing path references.
*
* @param snapshot snapshot represented by the manifest list
* @param io file io
* @param tableMetadata metadata of table
* @param rewrittenManifestLengths rewritten manifest files and their sizes
* @param sourcePrefix source prefix that will be replaced
* @param targetPrefix target prefix that will replace it
* @param outputPath location to write the manifest list
*/
public static void rewriteManifestList(
Snapshot snapshot,
FileIO io,
TableMetadata tableMetadata,
Map<String, Long> rewrittenManifestLengths,
String sourcePrefix,
String targetPrefix,
String outputPath) {
OutputFile outputFile = io.newOutputFile(outputPath);

List<ManifestFile> manifestFiles = manifestFilesInSnapshot(io, snapshot);
manifestFiles.forEach(
mf -> {
Preconditions.checkArgument(
mf.path().startsWith(sourcePrefix),
"Encountered manifest file %s not under the source prefix %s",
mf.path(),
sourcePrefix);
Preconditions.checkArgument(
rewrittenManifestLengths.get(mf.path()) != null,
"Encountered manifest file %s that was not rewritten or has null length",
mf.path());
});

try (FileAppender<ManifestFile> writer =
ManifestLists.write(
tableMetadata.formatVersion(),
outputFile,
snapshot.snapshotId(),
snapshot.parentId(),
snapshot.sequenceNumber(),
snapshot.firstRowId())) {

for (ManifestFile file : manifestFiles) {
ManifestFile newFile = file.copy();
((StructLike) newFile).set(0, newPath(file.path(), sourcePrefix, targetPrefix));
((StructLike) newFile).set(1, rewrittenManifestLengths.get(file.path()));
writer.add(newFile);
}
} catch (IOException e) {
throw new UncheckedIOException(
"Failed to rewrite the manifest list file " + snapshot.manifestListLocation(), e);
}
}

private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, Snapshot snapshot) {
String path = snapshot.manifestListLocation();
List<ManifestFile> manifestFiles = Lists.newLinkedList();
Expand Down Expand Up @@ -347,16 +414,24 @@ public static RewriteResult<DataFile> rewriteDataManifest(
String targetPrefix)
throws IOException {
PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
try (ManifestWriter<DataFile> writer =
ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId());
RewriteResult<DataFile> rewriteResult;
ManifestWriter<DataFile> writer =
ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId());

try (writer;
ManifestReader<DataFile> reader =
ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) {
return StreamSupport.stream(reader.entries().spliterator(), false)
.map(
entry ->
writeDataFileEntry(entry, snapshotIds, spec, sourcePrefix, targetPrefix, writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
rewriteResult =
StreamSupport.stream(reader.entries().spliterator(), false)
.map(
entry ->
writeDataFileEntry(
entry, snapshotIds, spec, sourcePrefix, targetPrefix, writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
}

rewriteResult.setSize(writer.length());
return rewriteResult;
}

/**
Expand Down Expand Up @@ -427,24 +502,31 @@ public static RewriteResult<DeleteFile> rewriteDeleteManifest(
String stagingLocation)
throws IOException {
PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
try (ManifestWriter<DeleteFile> writer =
ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId());
RewriteResult<DeleteFile> rewriteResult;
ManifestWriter<DeleteFile> writer =
ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId());

try (writer;
ManifestReader<DeleteFile> reader =
ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
.select(Arrays.asList("*"))) {
return StreamSupport.stream(reader.entries().spliterator(), false)
.map(
entry ->
writeDeleteFileEntry(
entry,
snapshotIds,
spec,
sourcePrefix,
targetPrefix,
stagingLocation,
writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
rewriteResult =
StreamSupport.stream(reader.entries().spliterator(), false)
.map(
entry ->
writeDeleteFileEntry(
entry,
snapshotIds,
spec,
sourcePrefix,
targetPrefix,
stagingLocation,
writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
}

rewriteResult.setSize(writer.length());
return rewriteResult;
}

private static RewriteResult<DataFile> writeDataFileEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,8 @@ protected Dataset<FileInfo> contentFileDS(Table table) {
protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();

Dataset<ManifestFileBean> manifestBeanDS =
manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
"length",
"0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);

return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
return manifestBeanDS(table, snapshotIds)
.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
}

protected Dataset<FileInfo> manifestDS(Table table) {
Expand All @@ -179,6 +165,22 @@ protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
.as(FileInfo.ENCODER);
}

protected Dataset<ManifestFileBean> manifestBeanDS(Table table, Set<Long> snapshotIds) {
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();

return manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
"length",
"0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);
}

private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
if (snapshotIds != null) {
Expand Down
Loading
Loading