-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 4.0: RewriteTablePath: Update sizes of rewritten manifests in manifest lists #13720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
76fc60e
bbb5b27
a2ec9c4
5cb4fc0
9146b48
a90f1f6
a6c280d
0d90390
23fcfd8
e82a4db
6a8acc9
150ecd5
4d8886e
c53b227
f952af9
f809cf6
d69e9f3
0e6427e
e86e883
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,6 +87,24 @@ public Set<Pair<String, String>> copyPlan() { | |
| } | ||
| } | ||
|
|
||
| public static class RewrittenFileInfo implements Serializable { | ||
| private final String newPath; | ||
| private final long newSize; | ||
|
|
||
| public RewrittenFileInfo(String newPath, long newSize) { | ||
| this.newPath = newPath; | ||
| this.newSize = newSize; | ||
| } | ||
|
|
||
| public String getNewPath() { | ||
| return newPath; | ||
| } | ||
|
|
||
| public long getNewSize() { | ||
| return newSize; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Create a new table metadata object, replacing path references | ||
| * | ||
|
|
@@ -226,7 +244,9 @@ private static List<Snapshot> updatePathInSnapshots( | |
| * @param outputPath location to write the manifest list | ||
| * @return a copy plan for manifest files whose metadata were contained in the rewritten manifest | ||
| * list | ||
| * @deprecated since 1.10.0, will be removed in 1.11.0 | ||
| */ | ||
| @Deprecated | ||
| public static RewriteResult<ManifestFile> rewriteManifestList( | ||
| Snapshot snapshot, | ||
| FileIO io, | ||
|
|
@@ -274,6 +294,65 @@ public static RewriteResult<ManifestFile> rewriteManifestList( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Rewrite a manifest list representing a snapshot, replacing path references. | ||
| * @param snapshot snapshot represented by the manifest list | ||
| * @param io file io | ||
| * @param tableMetadata metadata of table | ||
| * @param rewrittenManifests information about rewritten manifest files | ||
| * @param sourcePrefix source prefix that will be replaced | ||
| * @param targetPrefix target prefix that will replace it | ||
| * @param stagingDir staging directory | ||
| * @param outputPath location to write the manifest list | ||
| */ | ||
| public static void rewriteManifestList( | ||
stevenzwu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Snapshot snapshot, | ||
| FileIO io, | ||
| TableMetadata tableMetadata, | ||
| Map<String, RewrittenFileInfo> rewrittenManifests, | ||
| String sourcePrefix, | ||
| String targetPrefix, | ||
| String stagingDir, | ||
|
||
| String outputPath) { | ||
| OutputFile outputFile = io.newOutputFile(outputPath); | ||
|
|
||
| List<ManifestFile> manifestFiles = manifestFilesInSnapshot(io, snapshot); | ||
| manifestFiles.forEach( | ||
| mf -> | ||
| Preconditions.checkArgument( | ||
| mf.path().startsWith(sourcePrefix), | ||
| "Encountered manifest file %s not under the source prefix %s", | ||
| mf.path(), | ||
| sourcePrefix)); | ||
|
|
||
| try (FileAppender<ManifestFile> writer = | ||
| ManifestLists.write( | ||
| tableMetadata.formatVersion(), | ||
| outputFile, | ||
| snapshot.snapshotId(), | ||
| snapshot.parentId(), | ||
| snapshot.sequenceNumber(), | ||
| snapshot.firstRowId())) { | ||
|
|
||
| for (ManifestFile file : manifestFiles) { | ||
| ManifestFile newFile = file.copy(); | ||
|
|
||
| if (rewrittenManifests.containsKey(file.path())) { | ||
| String rewrittenPath = rewrittenManifests.get(file.path()).getNewPath(); | ||
| long rewrittenSize = rewrittenManifests.get(file.path()).getNewSize(); | ||
| ((StructLike) newFile).set(0, rewrittenPath); | ||
| ((StructLike) newFile).set(1, rewrittenSize); | ||
| } else { | ||
| ((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, targetPrefix)); | ||
| } | ||
| writer.add(newFile); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException( | ||
| "Failed to rewrite the manifest list file " + snapshot.manifestListLocation(), e); | ||
| } | ||
| } | ||
|
|
||
| private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, Snapshot snapshot) { | ||
| String path = snapshot.manifestListLocation(); | ||
| List<ManifestFile> manifestFiles = Lists.newLinkedList(); | ||
|
|
@@ -333,7 +412,9 @@ public static RewriteResult<DataFile> rewriteDataManifest( | |
| * @param sourcePrefix source prefix that will be replaced | ||
| * @param targetPrefix target prefix that will replace it | ||
| * @return a copy plan of content files in the manifest that was rewritten | ||
| * @deprecated since 1.10.0, will be removed in 1.11.0 | ||
| */ | ||
| @Deprecated | ||
|
||
| public static RewriteResult<DataFile> rewriteDataManifest( | ||
| ManifestFile manifestFile, | ||
| Set<Long> snapshotIds, | ||
|
|
@@ -357,6 +438,55 @@ public static RewriteResult<DataFile> rewriteDataManifest( | |
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Rewrite a data manifest, replacing path references. | ||
| * | ||
| * @param manifestFile source manifest file to rewrite | ||
| * @param snapshotIds snapshot ids for filtering returned data manifest entries | ||
| * @param outputFile output file to rewrite manifest file to | ||
| * @param io file io | ||
| * @param format format of the manifest file | ||
| * @param specsById map of partition specs by id | ||
| * @param sourcePrefix source prefix that will be replaced | ||
| * @param targetPrefix target prefix that will replace it | ||
| * @return rewritten manifest file and a copy plan for the referenced content files | ||
| */ | ||
| public static Pair<ManifestFile, RewriteResult<DataFile>> rewriteDataManifestWithResult( | ||
| ManifestFile manifestFile, | ||
| Set<Long> snapshotIds, | ||
| OutputFile outputFile, | ||
| FileIO io, | ||
| int format, | ||
| Map<Integer, PartitionSpec> specsById, | ||
| String sourcePrefix, | ||
| String targetPrefix) | ||
| throws IOException { | ||
| PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); | ||
| ManifestWriter<DataFile> writer = | ||
| ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); | ||
| RewriteResult<DataFile> rewriteResult = null; | ||
|
|
||
| try (ManifestWriter<DataFile> dataManifestWriter = writer; | ||
|
||
| ManifestReader<DataFile> reader = | ||
| ManifestFiles.read(manifestFile, io, specsById) | ||
| .select(Arrays.asList("*"))) { | ||
| rewriteResult = | ||
| StreamSupport.stream(reader.entries().spliterator(), false) | ||
| .map( | ||
| entry -> | ||
| writeDataFileEntry( | ||
| entry, | ||
| snapshotIds, | ||
| spec, | ||
| sourcePrefix, | ||
| targetPrefix, | ||
| writer)) | ||
| .reduce(new RewriteResult<>(), RewriteResult::append); | ||
| } | ||
| return Pair.of(writer.toManifestFile(), rewriteResult); | ||
|
||
| } | ||
|
|
||
| /** | ||
| * Rewrite a delete manifest, replacing path references. | ||
| * | ||
|
|
@@ -411,8 +541,11 @@ public static RewriteResult<DeleteFile> rewriteDeleteManifest( | |
| * @param targetPrefix target prefix that will replace it | ||
| * @param stagingLocation staging location for rewritten files (referred delete file will be | ||
| * rewritten here) | ||
| * @return a copy plan of content files in the manifest that was rewritten | ||
| * @return rewritten manifest file and a copy plan of content files | ||
| * in the manifest that was rewritten | ||
| * @deprecated since 1.10.0, will be removed in 1.11.0 | ||
| */ | ||
| @Deprecated | ||
|
||
| public static RewriteResult<DeleteFile> rewriteDeleteManifest( | ||
| ManifestFile manifestFile, | ||
| Set<Long> snapshotIds, | ||
|
|
@@ -445,6 +578,59 @@ public static RewriteResult<DeleteFile> rewriteDeleteManifest( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Rewrite a delete manifest, replacing path references. | ||
| * | ||
| * @param manifestFile source delete manifest to rewrite | ||
| * @param snapshotIds snapshot ids for filtering returned delete manifest entries | ||
| * @param outputFile output file to rewrite manifest file to | ||
| * @param io file io | ||
| * @param format format of the manifest file | ||
| * @param specsById map of partition specs by id | ||
| * @param sourcePrefix source prefix that will be replaced | ||
| * @param targetPrefix target prefix that will replace it | ||
| * @param stagingLocation staging location for rewritten files (referred delete file will be | ||
| * rewritten here) | ||
| * @return rewritten manifest file and a copy plan for the referenced content files | ||
| */ | ||
| public static Pair<ManifestFile, RewriteResult<DeleteFile>> rewriteDeleteManifestWithResult( | ||
| ManifestFile manifestFile, | ||
| Set<Long> snapshotIds, | ||
| OutputFile outputFile, | ||
| FileIO io, | ||
| int format, | ||
| Map<Integer, PartitionSpec> specsById, | ||
| String sourcePrefix, | ||
| String targetPrefix, | ||
| String stagingLocation) | ||
| throws IOException { | ||
| PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); | ||
| ManifestWriter<DeleteFile> writer = | ||
| ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId()); | ||
| RewriteResult<DeleteFile> rewriteResult = null; | ||
|
|
||
| try (ManifestWriter<DeleteFile> deleteManifestWriter = writer; | ||
| ManifestReader<DeleteFile> reader = | ||
| ManifestFiles.readDeleteManifest(manifestFile, io, specsById) | ||
| .select(Arrays.asList("*"))) { | ||
| rewriteResult = | ||
| StreamSupport.stream(reader.entries().spliterator(), false) | ||
| .map( | ||
| entry -> | ||
| writeDeleteFileEntry( | ||
| entry, | ||
| snapshotIds, | ||
| spec, | ||
| sourcePrefix, | ||
| targetPrefix, | ||
| stagingLocation, | ||
| writer)) | ||
| .reduce(new RewriteResult<>(), RewriteResult::append); | ||
| } | ||
|
|
||
| return Pair.of(writer.toManifestFile(), rewriteResult); | ||
| } | ||
|
|
||
| private static RewriteResult<DataFile> writeDataFileEntry( | ||
| ManifestEntry<DataFile> entry, | ||
| Set<Long> snapshotIds, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is in iceberg core and this class is only used in static class for record and in SparkAction, I am wondering if we want to define it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could return it from
rewrite*ManifestWithResultand maybe use it in fixes for delete files laterThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it altogether.