Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,12 @@ acceptedBreaks:
\ java.lang.Object[]) throws java.lang.Exception"
justification: "Reduce visibilty of deprecated method"
org.apache.iceberg:iceberg-core:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.RewriteTablePathUtil.RewriteResult<T extends\
\ java.lang.Object>"
new: "class org.apache.iceberg.RewriteTablePathUtil.RewriteResult<T extends\
\ java.lang.Object>"
justification: "Serialization across versions is not supported"
- code: "java.class.removed"
old: "class org.apache.iceberg.MetadataUpdate.EnableRowLineage"
justification: "Removing deprecations for 1.10.0"
Expand Down
124 changes: 103 additions & 21 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private RewriteTablePathUtil() {}
public static class RewriteResult<T> implements Serializable {
private final Set<T> toRewrite = Sets.newHashSet();
private final Set<Pair<String, String>> copyPlan = Sets.newHashSet();
private Long length = null;

public RewriteResult() {}

Expand All @@ -85,6 +86,14 @@ public Set<T> toRewrite() {
public Set<Pair<String, String>> copyPlan() {
return copyPlan;
}

public Long length() {
return length;
}

public void length(long newLength) {
this.length = newLength;
}
Comment on lines +89 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this alerts the revapi and we probably need to add a exception as this is neither source nor binary breaking. Copy from CI run

  java.class.defaultSerializationChanged: The default serialization ID for the class has changed. This means that the new version of the class is not deserializable from the byte stream of a serialized old class.
  
  old: class org.apache.iceberg.RewriteTablePathUtil.RewriteResult<T extends java.lang.Object>
  new: class org.apache.iceberg.RewriteTablePathUtil.RewriteResult<T extends java.lang.Object>
  
  SOURCE: EQUIVALENT, BINARY: EQUIVALENT, SEMANTIC: BREAKING
  
  From old archive: iceberg-core-1.9.0.jar
  From new archive: iceberg-core-d356455.jar

}

/**
Expand Down Expand Up @@ -226,7 +235,9 @@ private static List<Snapshot> updatePathInSnapshots(
* @param outputPath location to write the manifest list
* @return a copy plan for manifest files whose metadata were contained in the rewritten manifest
* list
* @deprecated since 1.10.0, will be removed in 1.11.0
*/
@Deprecated
public static RewriteResult<ManifestFile> rewriteManifestList(
Snapshot snapshot,
FileIO io,
Expand Down Expand Up @@ -276,6 +287,62 @@ public static RewriteResult<ManifestFile> rewriteManifestList(
}
}

/**
* Rewrite a manifest list representing a snapshot, replacing path references.
*
* @param snapshot snapshot represented by the manifest list
* @param io file io
* @param tableMetadata metadata of table
* @param rewrittenManifestLengths rewritten manifest files and their sizes
* @param sourcePrefix source prefix that will be replaced
* @param targetPrefix target prefix that will replace it
* @param outputPath location to write the manifest list
*/
public static void rewriteManifestList(
Snapshot snapshot,
FileIO io,
TableMetadata tableMetadata,
Map<String, Long> rewrittenManifestLengths,
String sourcePrefix,
String targetPrefix,
String outputPath) {
OutputFile outputFile = io.newOutputFile(outputPath);

List<ManifestFile> manifestFiles = manifestFilesInSnapshot(io, snapshot);
manifestFiles.forEach(
mf -> {
Preconditions.checkArgument(
mf.path().startsWith(sourcePrefix),
"Encountered manifest file %s not under the source prefix %s",
mf.path(),
sourcePrefix);
Preconditions.checkArgument(
rewrittenManifestLengths.get(mf.path()) != null,
"Encountered manifest file %s that was not rewritten or has null length",
mf.path());
});

try (FileAppender<ManifestFile> writer =
ManifestLists.write(
tableMetadata.formatVersion(),
outputFile,
snapshot.snapshotId(),
snapshot.parentId(),
snapshot.sequenceNumber(),
snapshot.firstRowId())) {

for (ManifestFile file : manifestFiles) {
ManifestFile newFile = file.copy();
((StructLike) newFile).set(0, newPath(file.path(), sourcePrefix, targetPrefix));
((StructLike) newFile).set(1, rewrittenManifestLengths.get(file.path()));
writer.add(newFile);
}
} catch (IOException e) {
throw new UncheckedIOException(
"Failed to rewrite the manifest list file " + snapshot.manifestListLocation(), e);
}
}

private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, Snapshot snapshot) {
String path = snapshot.manifestListLocation();
List<ManifestFile> manifestFiles = Lists.newLinkedList();
Expand Down Expand Up @@ -347,16 +414,24 @@ public static RewriteResult<DataFile> rewriteDataManifest(
String targetPrefix)
throws IOException {
PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
try (ManifestWriter<DataFile> writer =
ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId());
RewriteResult<DataFile> rewriteResult;
ManifestWriter<DataFile> writer =
ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId());

try (writer;
ManifestReader<DataFile> reader =
ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) {
return StreamSupport.stream(reader.entries().spliterator(), false)
.map(
entry ->
writeDataFileEntry(entry, snapshotIds, spec, sourcePrefix, targetPrefix, writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
rewriteResult =
StreamSupport.stream(reader.entries().spliterator(), false)
.map(
entry ->
writeDataFileEntry(
entry, snapshotIds, spec, sourcePrefix, targetPrefix, writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
}

rewriteResult.length(writer.length());
return rewriteResult;
}

/**
Expand Down Expand Up @@ -427,24 +502,31 @@ public static RewriteResult<DeleteFile> rewriteDeleteManifest(
String stagingLocation)
throws IOException {
PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
try (ManifestWriter<DeleteFile> writer =
ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId());
RewriteResult<DeleteFile> rewriteResult;
ManifestWriter<DeleteFile> writer =
ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId());

try (writer;
ManifestReader<DeleteFile> reader =
ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
.select(Arrays.asList("*"))) {
return StreamSupport.stream(reader.entries().spliterator(), false)
.map(
entry ->
writeDeleteFileEntry(
entry,
snapshotIds,
spec,
sourcePrefix,
targetPrefix,
stagingLocation,
writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
rewriteResult =
StreamSupport.stream(reader.entries().spliterator(), false)
.map(
entry ->
writeDeleteFileEntry(
entry,
snapshotIds,
spec,
sourcePrefix,
targetPrefix,
stagingLocation,
writer))
.reduce(new RewriteResult<>(), RewriteResult::append);
}

rewriteResult.length(writer.length());
return rewriteResult;
}

private static RewriteResult<DataFile> writeDataFileEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,8 @@ protected Dataset<FileInfo> contentFileDS(Table table) {
protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();

Dataset<ManifestFileBean> manifestBeanDS =
manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
"length",
"0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);

return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
return manifestBeanDS(table, snapshotIds)
.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
}

protected Dataset<FileInfo> manifestDS(Table table) {
Expand All @@ -179,6 +165,22 @@ protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
.as(FileInfo.ENCODER);
}

protected Dataset<ManifestFileBean> manifestBeanDS(Table table, Set<Long> snapshotIds) {
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();

return manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
"length",
"0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);
}

private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
if (snapshotIds != null) {
Expand Down
Loading