diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java index b7aed67396a5..a63647011c3b 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java @@ -38,9 +38,10 @@ *
    *
  1. The name of the latest metadata.json rewritten to staging location. After the files are * copied, this will be the root of the copied table. - *
  2. A list of all files added to the table between startVersion and endVersion, including their - * original and target paths under the target prefix. This list covers both original and - * rewritten files, allowing for copying to the target paths to form the copied table. + *
  3. A 'copy-plan'. This is a list of all files added to the table between startVersion and + * endVersion, including their original and target paths under the target prefix. This list + * covers both original and rewritten files, allowing for copying a functioning version of the + * source table to the target prefix. *
*/ public interface RewriteTablePath extends Action { @@ -91,9 +92,21 @@ interface Result { String stagingLocation(); /** - * Path to a comma-separated list of source and target paths for all files added to the table - * between startVersion and endVersion, including original data files and metadata files - * rewritten to staging. + * Result file list location. This file contains a listing of all files added to the table + * between startVersion and endVersion, comma-separated.
+ * For each file, it will include the source path (either the original path in the table, or in + * the staging location if rewritten), and the target path (under the new prefix). + * + *

Example file content: + * + *


+     * sourcepath/datafile1.parquet,targetpath/datafile1.parquet
+     * sourcepath/datafile2.parquet,targetpath/datafile2.parquet
+     * stagingpath/manifest.avro,targetpath/manifest.avro
+     * 
+ * + *
+ * This allows for copying a functioning version of the table to the target prefix. */ String fileListLocation(); diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java new file mode 100644 index 000000000000..f250d2e12289 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Rewrite table path action. */ +public class RewriteTablePathUtil { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathUtil.class); + + private RewriteTablePathUtil() {} + + /** + * Rewrite result. + * + * @param type of file to rewrite + */ + public static class RewriteResult implements Serializable { + private final Set toRewrite = Sets.newHashSet(); + private final Set> copyPlan = Sets.newHashSet(); + + public RewriteResult() {} + + public RewriteResult append(RewriteResult r1) { + toRewrite.addAll(r1.toRewrite); + copyPlan.addAll(r1.copyPlan); + return this; + } + + /** Returns next list of files to rewrite (discovered by rewriting this file) */ + public Set toRewrite() { + return toRewrite; + } + + /** + * Returns a copy plan of files whose metadata were rewritten, for each file a source and target + * location + */ + public Set> copyPlan() { + return copyPlan; + } + } + + /** + * Create a new table metadata object, replacing path references + * + * @param metadata source table metadata + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @return copy of table metadata with paths replaced + */ + public static TableMetadata replacePaths( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + String newLocation = metadata.location().replaceFirst(sourcePrefix, targetPrefix); + List newSnapshots = updatePathInSnapshots(metadata, sourcePrefix, targetPrefix); + List metadataLogEntries = + updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix); + long snapshotId = + metadata.currentSnapshot() == null ? -1 : metadata.currentSnapshot().snapshotId(); + Map properties = + updateProperties(metadata.properties(), sourcePrefix, targetPrefix); + + return new TableMetadata( + null, + metadata.formatVersion(), + metadata.uuid(), + newLocation, + metadata.lastSequenceNumber(), + metadata.lastUpdatedMillis(), + metadata.lastColumnId(), + metadata.currentSchemaId(), + metadata.schemas(), + metadata.defaultSpecId(), + metadata.specs(), + metadata.lastAssignedPartitionId(), + metadata.defaultSortOrderId(), + metadata.sortOrders(), + properties, + snapshotId, + newSnapshots, + null, + metadata.snapshotLog(), + metadataLogEntries, + metadata.refs(), + // TODO: update statistic file paths + metadata.statisticsFiles(), + metadata.partitionStatisticsFiles(), + metadata.changes()); + } + + private static Map updateProperties( + Map tableProperties, String sourcePrefix, String targetPrefix) { + Map properties = Maps.newHashMap(tableProperties); + updatePathInProperty(properties, sourcePrefix, targetPrefix, TableProperties.OBJECT_STORE_PATH); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_DATA_LOCATION); + updatePathInProperty( + properties, sourcePrefix, targetPrefix, TableProperties.WRITE_METADATA_LOCATION); + + return properties; + } + + private static void updatePathInProperty( + Map properties, + String sourcePrefix, + String targetPrefix, + String propertyName) { + if (properties.containsKey(propertyName)) { + properties.put( + propertyName, newPath(properties.get(propertyName), sourcePrefix, targetPrefix)); + } + } + + private static List updatePathInMetadataLogs( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + List metadataLogEntries = + Lists.newArrayListWithCapacity(metadata.previousFiles().size()); + for (TableMetadata.MetadataLogEntry metadataLog : metadata.previousFiles()) { + TableMetadata.MetadataLogEntry newMetadataLog = + new TableMetadata.MetadataLogEntry( + metadataLog.timestampMillis(), + newPath(metadataLog.file(), sourcePrefix, targetPrefix)); + metadataLogEntries.add(newMetadataLog); + } + return metadataLogEntries; + } + + private static List updatePathInSnapshots( + TableMetadata metadata, String sourcePrefix, String targetPrefix) { + List newSnapshots = Lists.newArrayListWithCapacity(metadata.snapshots().size()); + for (Snapshot snapshot : metadata.snapshots()) { + String newManifestListLocation = + newPath(snapshot.manifestListLocation(), sourcePrefix, targetPrefix); + Snapshot newSnapshot = + new BaseSnapshot( + snapshot.sequenceNumber(), + snapshot.snapshotId(), + snapshot.parentId(), + snapshot.timestampMillis(), + snapshot.operation(), + snapshot.summary(), + snapshot.schemaId(), + newManifestListLocation); + newSnapshots.add(newSnapshot); + } + return newSnapshots; + } + + /** + * Rewrite a manifest list representing a snapshot, replacing path references. + * + * @param snapshot snapshot represented by the manifest list + * @param io file io + * @param tableMetadata metadata of table + * @param manifestsToRewrite a list of manifest files to filter for rewrite + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @param stagingDir staging directory + * @param outputPath location to write the manifest list + * @return a copy plan for manifest files whose metadata were contained in the rewritten manifest + * list + */ + public static RewriteResult rewriteManifestList( + Snapshot snapshot, + FileIO io, + TableMetadata tableMetadata, + Set manifestsToRewrite, + String sourcePrefix, + String targetPrefix, + String stagingDir, + String outputPath) { + RewriteResult result = new RewriteResult<>(); + OutputFile outputFile = io.newOutputFile(outputPath); + + List manifestFiles = manifestFilesInSnapshot(io, snapshot); + List manifestFilesToRewrite = + manifestFiles.stream() + .filter(mf -> manifestsToRewrite.contains(mf.path())) + .collect(Collectors.toList()); + manifestFilesToRewrite.forEach( + mf -> + Preconditions.checkArgument( + mf.path().startsWith(sourcePrefix), + "Encountered manifest file %s not under the source prefix %s", + mf.path(), + sourcePrefix)); + + try (FileAppender writer = + ManifestLists.write( + tableMetadata.formatVersion(), + outputFile, + snapshot.snapshotId(), + snapshot.parentId(), + snapshot.sequenceNumber())) { + + for (ManifestFile file : manifestFilesToRewrite) { + ManifestFile newFile = file.copy(); + ((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, targetPrefix)); + writer.add(newFile); + + result.toRewrite().add(file); + result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), newFile.path())); + } + return result; + } catch (IOException e) { + throw new UncheckedIOException( + "Failed to rewrite the manifest list file " + snapshot.manifestListLocation(), e); + } + } + + private static List manifestFilesInSnapshot(FileIO io, Snapshot snapshot) { + String path = snapshot.manifestListLocation(); + List manifestFiles = Lists.newLinkedList(); + try { + manifestFiles = ManifestLists.read(io.newInputFile(path)); + } catch (RuntimeIOException e) { + LOG.warn("Failed to read manifest list {}", path, e); + } + return manifestFiles; + } + + /** + * Rewrite a data manifest, replacing path references. + * + * @param manifestFile source manifest file to rewrite + * @param outputFile output file to rewrite manifest file to + * @param io file io + * @param format format of the manifest file + * @param specsById map of partition specs by id + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @return a copy plan of content files in the manifest that was rewritten + */ + public static RewriteResult rewriteDataManifest( + ManifestFile manifestFile, + OutputFile outputFile, + FileIO io, + int format, + Map specsById, + String sourcePrefix, + String targetPrefix) + throws IOException { + PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); + try (ManifestWriter writer = + ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); + ManifestReader reader = + ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) { + return StreamSupport.stream(reader.entries().spliterator(), false) + .map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer)) + .reduce(new RewriteResult<>(), RewriteResult::append); + } + } + + /** + * Rewrite a delete manifest, replacing path references. + * + * @param manifestFile source delete manifest to rewrite + * @param outputFile output file to rewrite manifest file to + * @param io file io + * @param format format of the manifest file + * @param specsById map of partition specs by id + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @param stagingLocation staging location for rewritten files (referred delete file will be + * rewritten here) + * @return a copy plan of content files in the manifest that was rewritten + */ + public static RewriteResult rewriteDeleteManifest( + ManifestFile manifestFile, + OutputFile outputFile, + FileIO io, + int format, + Map specsById, + String sourcePrefix, + String targetPrefix, + String stagingLocation) + throws IOException { + PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); + try (ManifestWriter writer = + ManifestFiles.writeDeleteManifest(format, spec, outputFile, manifestFile.snapshotId()); + ManifestReader reader = + ManifestFiles.readDeleteManifest(manifestFile, io, specsById) + .select(Arrays.asList("*"))) { + return StreamSupport.stream(reader.entries().spliterator(), false) + .map( + entry -> + writeDeleteFileEntry( + entry, spec, sourcePrefix, targetPrefix, stagingLocation, writer)) + .reduce(new RewriteResult<>(), RewriteResult::append); + } + } + + private static RewriteResult writeDataFileEntry( + ManifestEntry entry, + PartitionSpec spec, + String sourcePrefix, + String targetPrefix, + ManifestWriter writer) { + RewriteResult result = new RewriteResult<>(); + DataFile dataFile = entry.file(); + String sourceDataFilePath = dataFile.location(); + Preconditions.checkArgument( + sourceDataFilePath.startsWith(sourcePrefix), + "Encountered data file %s not under the source prefix %s", + sourceDataFilePath, + sourcePrefix); + String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, targetPrefix); + DataFile newDataFile = + DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build(); + appendEntryWithFile(entry, writer, newDataFile); + result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location())); + return result; + } + + private static RewriteResult writeDeleteFileEntry( + ManifestEntry entry, + PartitionSpec spec, + String sourcePrefix, + String targetPrefix, + String stagingLocation, + ManifestWriter writer) { + + DeleteFile file = entry.file(); + RewriteResult result = new RewriteResult<>(); + + switch (file.content()) { + case POSITION_DELETES: + String targetDeleteFilePath = newPath(file.location(), sourcePrefix, targetPrefix); + Metrics metricsWithTargetPath = + ContentFileUtil.replacePathBounds(file, sourcePrefix, targetPrefix); + DeleteFile movedFile = + FileMetadata.deleteFileBuilder(spec) + .copy(file) + .withPath(targetDeleteFilePath) + .withMetrics(metricsWithTargetPath) + .build(); + appendEntryWithFile(entry, writer, movedFile); + result + .copyPlan() + .add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location())); + result.toRewrite().add(file); + return result; + case EQUALITY_DELETES: + DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix); + appendEntryWithFile(entry, writer, eqDeleteFile); + // No need to rewrite equality delete files as they do not contain absolute file paths. + result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location())); + return result; + + default: + throw new UnsupportedOperationException("Unsupported delete file type: " + file.content()); + } + } + + private static > void appendEntryWithFile( + ManifestEntry entry, ManifestWriter writer, F file) { + + switch (entry.status()) { + case ADDED: + writer.add(file); + break; + case EXISTING: + writer.existing( + file, entry.snapshotId(), entry.dataSequenceNumber(), entry.fileSequenceNumber()); + break; + case DELETED: + writer.delete(file, entry.dataSequenceNumber(), entry.fileSequenceNumber()); + break; + } + } + + private static DeleteFile newEqualityDeleteEntry( + DeleteFile file, PartitionSpec spec, String sourcePrefix, String targetPrefix) { + String path = file.location(); + + if (!path.startsWith(sourcePrefix)) { + throw new UnsupportedOperationException( + "Expected delete file to be under the source prefix: " + + sourcePrefix + + " but was " + + path); + } + int[] equalityFieldIds = file.equalityFieldIds().stream().mapToInt(Integer::intValue).toArray(); + String newPath = newPath(path, sourcePrefix, targetPrefix); + return FileMetadata.deleteFileBuilder(spec) + .ofEqualityDeletes(equalityFieldIds) + .copy(file) + .withPath(newPath) + .withSplitOffsets(file.splitOffsets()) + .build(); + } + + /** Class providing engine-specific methods to read and write position delete files. */ + public interface PositionDeleteReaderWriter extends Serializable { + CloseableIterable reader(InputFile inputFile, FileFormat format, PartitionSpec spec); + + PositionDeleteWriter writer( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException; + } + + /** + * Rewrite a position delete file, replacing path references. + * + * @param deleteFile source delete file to be rewritten + * @param outputFile output file to rewrite delete file to + * @param io file io + * @param spec spec of delete file + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix to replace it + * @param posDeleteReaderWriter class to read and write position delete files + */ + public static void rewritePositionDeleteFile( + DeleteFile deleteFile, + OutputFile outputFile, + FileIO io, + PartitionSpec spec, + String sourcePrefix, + String targetPrefix, + PositionDeleteReaderWriter posDeleteReaderWriter) + throws IOException { + String path = deleteFile.location(); + if (!path.startsWith(sourcePrefix)) { + throw new UnsupportedOperationException( + String.format("Expected delete file %s to start with prefix: %s", path, sourcePrefix)); + } + InputFile sourceFile = io.newInputFile(path); + try (CloseableIterable reader = + posDeleteReaderWriter.reader(sourceFile, deleteFile.format(), spec)) { + Record record = null; + Schema rowSchema = null; + CloseableIterator recordIt = reader.iterator(); + + if (recordIt.hasNext()) { + record = recordIt.next(); + rowSchema = record.get(2) != null ? spec.schema() : null; + } + + if (record != null) { + try (PositionDeleteWriter writer = + posDeleteReaderWriter.writer( + outputFile, deleteFile.format(), spec, deleteFile.partition(), rowSchema)) { + + writer.write(newPositionDeleteRecord(record, sourcePrefix, targetPrefix)); + + while (recordIt.hasNext()) { + record = recordIt.next(); + if (record != null) { + writer.write(newPositionDeleteRecord(record, sourcePrefix, targetPrefix)); + } + } + } + } + } + } + + private static PositionDelete newPositionDeleteRecord( + Record record, String sourcePrefix, String targetPrefix) { + PositionDelete delete = PositionDelete.create(); + String oldPath = (String) record.get(0); + if (!oldPath.startsWith(sourcePrefix)) { + throw new UnsupportedOperationException( + "Expected delete file to be under the source prefix: " + + sourcePrefix + + " but was " + + oldPath); + } + String newPath = newPath(oldPath, sourcePrefix, targetPrefix); + delete.set(newPath, (Long) record.get(1), record.get(2)); + return delete; + } + + /** + * Replace path reference + * + * @param path path reference + * @param sourcePrefix source prefix that will be replaced + * @param targetPrefix target prefix that will replace it + * @return new path reference + */ + public static String newPath(String path, String sourcePrefix, String targetPrefix) { + return combinePaths(targetPrefix, relativize(path, sourcePrefix)); + } + + /** Combine a base and relative path. */ + public static String combinePaths(String absolutePath, String relativePath) { + String combined = absolutePath; + if (!combined.endsWith("/")) { + combined += "/"; + } + combined += relativePath; + return combined; + } + + /** Returns the file name of a path. */ + public static String fileName(String path) { + String filename = path; + int lastIndex = path.lastIndexOf(File.separator); + if (lastIndex != -1) { + filename = path.substring(lastIndex + 1); + } + return filename; + } + + /** Relativize a path. */ + public static String relativize(String path, String prefix) { + String toRemove = prefix; + if (!toRemove.endsWith("/")) { + toRemove += "/"; + } + if (!path.startsWith(toRemove)) { + throw new IllegalArgumentException( + String.format("Path %s does not start with %s", path, toRemove)); + } + return path.substring(toRemove.length()); + } + + /** + * Construct a staging path under a given staging directory + * + * @param originalPath source path + * @param stagingDir staging directory + * @return a staging path under the staging directory, based on the original path + */ + public static String stagingPath(String originalPath, String stagingDir) { + return stagingDir + fileName(originalPath); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java index 91c7a03151f6..4af310d80e4d 100644 --- a/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java @@ -43,4 +43,15 @@ public static Schema pathPosSchema() { public static Schema posDeleteSchema(Schema rowSchema) { return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema); } + + public static Schema posDeleteReadSchema(Schema rowSchema) { + return new Schema( + MetadataColumns.DELETE_FILE_PATH, + MetadataColumns.DELETE_FILE_POS, + Types.NestedField.optional( + MetadataColumns.DELETE_FILE_ROW_FIELD_ID, + MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, + rowSchema.asStruct(), + MetadataColumns.DELETE_FILE_ROW_DOC)); + } } diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java index beffd3a955c9..ac7de6644c51 100644 --- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java @@ -19,6 +19,8 @@ package org.apache.iceberg.util; import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.util.Collections; import java.util.Map; import java.util.Set; import org.apache.iceberg.ContentFile; @@ -26,13 +28,20 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.RewriteTablePathUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; public class ContentFileUtil { private ContentFileUtil() {} + private static final int PATH_ID = MetadataColumns.DELETE_FILE_PATH.fieldId(); + private static final Type PATH_TYPE = MetadataColumns.DELETE_FILE_PATH.type(); + /** * Copies the {@link ContentFile} with the specific stat settings. * @@ -60,28 +69,67 @@ public static CharSequence referencedDataFile(DeleteFile deleteFile) { return deleteFile.referencedDataFile(); } - int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId(); - Type pathType = MetadataColumns.DELETE_FILE_PATH.type(); - Map lowerBounds = deleteFile.lowerBounds(); - ByteBuffer lowerPathBound = lowerBounds != null ? lowerBounds.get(pathId) : null; + ByteBuffer lowerPathBound = lowerBounds != null ? lowerBounds.get(PATH_ID) : null; if (lowerPathBound == null) { return null; } Map upperBounds = deleteFile.upperBounds(); - ByteBuffer upperPathBound = upperBounds != null ? upperBounds.get(pathId) : null; + ByteBuffer upperPathBound = upperBounds != null ? upperBounds.get(PATH_ID) : null; if (upperPathBound == null) { return null; } if (lowerPathBound.equals(upperPathBound)) { - return Conversions.fromByteBuffer(pathType, lowerPathBound); + return Conversions.fromByteBuffer(PATH_TYPE, lowerPathBound); } else { return null; } } + /** + * Replace file_path reference for a delete file manifest entry, if file_path field's lower_bound + * and upper_bound metrics are equal. Else clear file_path lower and upper bounds. + * + * @param deleteFile delete file whose entry will be replaced + * @param sourcePrefix source prefix which will be replaced + * @param targetPrefix target prefix which will replace it + * @return metrics for the new delete file entry + */ + public static Metrics replacePathBounds( + DeleteFile deleteFile, String sourcePrefix, String targetPrefix) { + Preconditions.checkArgument( + deleteFile.content() == FileContent.POSITION_DELETES, + "Only position delete files supported"); + + Map lowerBounds = deleteFile.lowerBounds(); + ByteBuffer lowerPathBound = lowerBounds != null ? lowerBounds.get(PATH_ID) : null; + if (lowerPathBound == null) { + return metricsWithoutPathBounds(deleteFile); + } + + Map upperBounds = deleteFile.upperBounds(); + ByteBuffer upperPathBound = upperBounds != null ? upperBounds.get(PATH_ID) : null; + if (upperPathBound == null) { + return metricsWithoutPathBounds(deleteFile); + } + + if (lowerPathBound.equals(upperPathBound)) { + CharBuffer path = Conversions.fromByteBuffer(PATH_TYPE, lowerPathBound); + CharBuffer newPath = + CharBuffer.wrap( + RewriteTablePathUtil.newPath(path.toString(), sourcePrefix, targetPrefix)); + ByteBuffer newBytes = Conversions.toByteBuffer(PATH_TYPE, newPath); + return metricsWithPathBounds(deleteFile, newBytes); + } else { + // The file_path's lower_bound and upper_bound are only used for filtering data files when + // both values match + // (file-scoped position delete). Hence do not rewrite but set null if they do not match. + return metricsWithoutPathBounds(deleteFile); + } + } + public static String referencedDataFileLocation(DeleteFile deleteFile) { CharSequence location = referencedDataFile(deleteFile); return location != null ? location.toString() : null; @@ -107,4 +155,48 @@ public static String dvDesc(DeleteFile deleteFile) { deleteFile.contentSizeInBytes(), deleteFile.referencedDataFile()); } + + private static Metrics metricsWithoutPathBounds(DeleteFile file) { + Map lowerBounds = + file.lowerBounds() == null ? null : Maps.newHashMap(file.lowerBounds()); + Map upperBounds = + file.upperBounds() == null ? null : Maps.newHashMap(file.upperBounds()); + if (lowerBounds != null) { + lowerBounds.remove(PATH_ID); + } + if (upperBounds != null) { + upperBounds.remove(PATH_ID); + } + + return new Metrics( + file.recordCount(), + file.columnSizes(), + file.valueCounts(), + file.nullValueCounts(), + file.nanValueCounts(), + lowerBounds == null ? null : Collections.unmodifiableMap(lowerBounds), + upperBounds == null ? null : Collections.unmodifiableMap(upperBounds)); + } + + private static Metrics metricsWithPathBounds(DeleteFile file, ByteBuffer bound) { + Map lowerBounds = + file.lowerBounds() == null ? null : Maps.newHashMap(file.lowerBounds()); + Map upperBounds = + file.upperBounds() == null ? null : Maps.newHashMap(file.upperBounds()); + if (lowerBounds != null) { + lowerBounds.put(PATH_ID, bound); + } + if (upperBounds != null) { + upperBounds.put(PATH_ID, bound); + } + + return new Metrics( + file.recordCount(), + file.columnSizes(), + file.valueCounts(), + file.nullValueCounts(), + file.nanValueCounts(), + lowerBounds == null ? null : Collections.unmodifiableMap(lowerBounds), + upperBounds == null ? null : Collections.unmodifiableMap(upperBounds)); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 34bb4afe67f9..a8e82d101fbf 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -139,6 +139,11 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) { return new BaseTable(ops, metadata.metadataFileLocation()); } + protected Table newStaticTable(String metadataFileLocation, FileIO io) { + StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io); + return new BaseTable(ops, metadataFileLocation); + } + protected Dataset contentFileDS(Table table) { return contentFileDS(table, null); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java new file mode 100644 index 000000000000..64469384b881 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -0,0 +1,731 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteTablePathUtil; +import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter; +import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadata.MetadataLogEntry; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.actions.ImmutableRewriteTablePath; +import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.util.Pair; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +public class RewriteTablePathSparkAction extends BaseSparkAction + implements RewriteTablePath { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathSparkAction.class); + private static final String RESULT_LOCATION = "file-list"; + + private String sourcePrefix; + private String targetPrefix; + private String startVersionName; + private String endVersionName; + private String stagingDir; + + private final Table table; + + RewriteTablePathSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + } + + @Override + protected RewriteTablePath self() { + return this; + } + + @Override + public RewriteTablePath rewriteLocationPrefix(String sPrefix, String tPrefix) { + Preconditions.checkArgument( + sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be empty.", sPrefix); + this.sourcePrefix = sPrefix; + this.targetPrefix = tPrefix; + return this; + } + + @Override + public RewriteTablePath startVersion(String sVersion) { + Preconditions.checkArgument( + sVersion != null && !sVersion.trim().isEmpty(), + "Start version('%s') cannot be empty.", + sVersion); + this.startVersionName = sVersion; + return this; + } + + @Override + public RewriteTablePath endVersion(String eVersion) { + Preconditions.checkArgument( + eVersion != null && !eVersion.trim().isEmpty(), + "End version('%s') cannot be empty.", + eVersion); + this.endVersionName = eVersion; + return this; + } + + @Override + public RewriteTablePath stagingLocation(String stagingLocation) { + Preconditions.checkArgument( + stagingLocation != null && !stagingLocation.isEmpty(), + "Staging location('%s') cannot be empty.", + stagingLocation); + this.stagingDir = stagingLocation; + return this; + } + + @Override + public Result execute() { + validateInputs(); + JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + String resultLocation = rebuildMetadata(); + return ImmutableRewriteTablePath.Result.builder() + .stagingLocation(stagingDir) + .fileListLocation(resultLocation) + .latestVersion(RewriteTablePathUtil.fileName(endVersionName)) + .build(); + } + + private void validateInputs() { + Preconditions.checkArgument( + sourcePrefix != null && !sourcePrefix.isEmpty(), + "Source prefix('%s') cannot be empty.", + sourcePrefix); + Preconditions.checkArgument( + targetPrefix != null && !targetPrefix.isEmpty(), + "Target prefix('%s') cannot be empty.", + targetPrefix); + Preconditions.checkArgument( + !sourcePrefix.equals(targetPrefix), + "Source prefix cannot be the same as target prefix (%s)", + sourcePrefix); + + validateAndSetEndVersion(); + validateAndSetStartVersion(); + + if (stagingDir == null) { + stagingDir = getMetadataLocation(table) + "copy-table-staging-" + UUID.randomUUID() + "/"; + } else if (!stagingDir.endsWith("/")) { + stagingDir = stagingDir + "/"; + } + } + + private void validateAndSetEndVersion() { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + + if (endVersionName == null) { + LOG.info("No end version specified. Will stage all files to the latest table version."); + Preconditions.checkNotNull( + tableMetadata.metadataFileLocation(), "Metadata file location should not be null"); + this.endVersionName = tableMetadata.metadataFileLocation(); + } else { + this.endVersionName = validateVersion(tableMetadata, endVersionName); + } + } + + private void validateAndSetStartVersion() { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + + if (startVersionName != null) { + this.startVersionName = validateVersion(tableMetadata, startVersionName); + } + } + + private String validateVersion(TableMetadata tableMetadata, String versionFileName) { + String versionFile = null; + if (versionInFilePath(tableMetadata.metadataFileLocation(), versionFileName)) { + versionFile = tableMetadata.metadataFileLocation(); + } + + for (MetadataLogEntry log : tableMetadata.previousFiles()) { + if (versionInFilePath(log.file(), versionFileName)) { + versionFile = log.file(); + } + } + + Preconditions.checkNotNull( + versionFile, "Version file %s does not exist in metadata log.", versionFile); + Preconditions.checkArgument( + fileExist(versionFile), "Version file %s does not exist.", versionFile); + return versionFile; + } + + private boolean versionInFilePath(String path, String version) { + return RewriteTablePathUtil.fileName(path).equals(version); + } + + private String jobDesc() { + if (startVersionName != null) { + return String.format( + "Replacing path prefixes '%s' with '%s' in the metadata files of table %s," + + "up to version '%s'.", + sourcePrefix, targetPrefix, table.name(), endVersionName); + } else { + return String.format( + "Replacing path prefixes '%s' with '%s' in the metadata files of table %s," + + "from version '%s' to '%s'.", + sourcePrefix, targetPrefix, table.name(), startVersionName, endVersionName); + } + } + + /** + * Rebuild metadata in a staging location, with paths rewritten. + * + *
    + *
  • Rebuild version files to staging + *
  • Rebuild manifest list files to staging + *
  • Rebuild manifest to staging + *
  • Get all files needed to move + *
+ */ + private String rebuildMetadata() { + TableMetadata startMetadata = + startVersionName != null + ? ((HasTableOperations) newStaticTable(startVersionName, table.io())) + .operations() + .current() + : null; + TableMetadata endMetadata = + ((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current(); + + Preconditions.checkArgument( + endMetadata.statisticsFiles() == null || endMetadata.statisticsFiles().isEmpty(), + "Statistic files are not supported yet."); + + // rebuild version files + RewriteResult rewriteVersionResult = rewriteVersionFiles(endMetadata); + Set deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); + + Set manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata); + Set validSnapshots = + Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata)); + + // rebuild manifest-list files + RewriteResult rewriteManifestListResult = + validSnapshots.stream() + .map(snapshot -> rewriteManifestList(snapshot, endMetadata, manifestsToRewrite)) + .reduce(new RewriteResult<>(), RewriteResult::append); + + // rebuild manifest files + RewriteContentFileResult rewriteManifestResult = + rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite()); + + // rebuild position delete files + Set deleteFiles = + rewriteManifestResult.toRewrite().stream() + .filter(e -> e instanceof DeleteFile) + .map(e -> (DeleteFile) e) + .collect(Collectors.toSet()); + rewritePositionDeletes(endMetadata, deleteFiles); + + Set> copyPlan = Sets.newHashSet(); + copyPlan.addAll(rewriteVersionResult.copyPlan()); + copyPlan.addAll(rewriteManifestListResult.copyPlan()); + copyPlan.addAll(rewriteManifestResult.copyPlan()); + + return saveFileList(copyPlan); + } + + private String saveFileList(Set> filesToMove) { + List> fileList = + filesToMove.stream() + .map(p -> Tuple2.apply(p.first(), p.second())) + .collect(Collectors.toList()); + Dataset> fileListDataset = + spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + String fileListPath = stagingDir + RESULT_LOCATION; + fileListDataset + .repartition(1) + .write() + .mode(SaveMode.Overwrite) + .format("csv") + .save(fileListPath); + return fileListPath; + } + + private Set deltaSnapshots(TableMetadata startMetadata, Set allSnapshots) { + if (startMetadata == null) { + return allSnapshots; + } else { + Set startSnapshotIds = + startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return allSnapshots.stream() + .filter(s -> !startSnapshotIds.contains(s.snapshotId())) + .collect(Collectors.toSet()); + } + } + + private RewriteResult rewriteVersionFiles(TableMetadata endMetadata) { + RewriteResult result = new RewriteResult<>(); + result.toRewrite().addAll(endMetadata.snapshots()); + result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName)); + + List versions = endMetadata.previousFiles(); + for (int i = versions.size() - 1; i >= 0; i--) { + String versionFilePath = versions.get(i).file(); + if (versionFilePath.equals(startVersionName)) { + break; + } + + Preconditions.checkArgument( + fileExist(versionFilePath), + String.format("Version file %s doesn't exist", versionFilePath)); + TableMetadata tableMetadata = + new StaticTableOperations(versionFilePath, table.io()).current(); + + result.toRewrite().addAll(tableMetadata.snapshots()); + result.copyPlan().add(rewriteVersionFile(tableMetadata, versionFilePath)); + } + + return result; + } + + private Pair rewriteVersionFile(TableMetadata metadata, String versionFilePath) { + String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir); + TableMetadata newTableMetadata = + RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix); + TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); + return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix, targetPrefix)); + } + + /** + * Rewrite a manifest list representing a snapshot. + * + * @param snapshot snapshot represented by the manifest list + * @param tableMetadata metadata of table + * @param manifestsToRewrite filter of manifests to rewrite. + * @return a result including a copy plan for the manifests contained in the manifest list, as + * well as for the manifest list itself + */ + private RewriteResult rewriteManifestList( + Snapshot snapshot, TableMetadata tableMetadata, Set manifestsToRewrite) { + RewriteResult result = new RewriteResult<>(); + + String path = snapshot.manifestListLocation(); + String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir); + RewriteResult rewriteResult = + RewriteTablePathUtil.rewriteManifestList( + snapshot, + table.io(), + tableMetadata, + manifestsToRewrite, + sourcePrefix, + targetPrefix, + stagingDir, + outputPath); + + result.append(rewriteResult); + // add the manifest list copy plan itself to the result + result.copyPlan().add(Pair.of(outputPath, newPath(path, sourcePrefix, targetPrefix))); + return result; + } + + private Set manifestsToRewrite( + Set deltaSnapshots, TableMetadata startMetadata) { + try { + Table endStaticTable = newStaticTable(endVersionName, table.io()); + Dataset lastVersionFiles = manifestDS(endStaticTable).select("path"); + if (startMetadata == null) { + return Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList()); + } else { + Set deltaSnapshotIds = + deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return Sets.newHashSet( + lastVersionFiles + .distinct() + .filter( + functions + .column(ManifestFile.SNAPSHOT_ID.name()) + .isInCollection(deltaSnapshotIds)) + .as(Encoders.STRING()) + .collectAsList()); + } + } catch (Exception e) { + throw new UnsupportedOperationException( + "Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. " + + "Please choose an earlier version without invalid snapshots.", + e); + } + } + + public static class RewriteContentFileResult extends RewriteResult> { + public RewriteContentFileResult append(RewriteResult> r1) { + this.copyPlan().addAll(r1.copyPlan()); + this.toRewrite().addAll(r1.toRewrite()); + return this; + } + + public RewriteContentFileResult appendDataFile(RewriteResult r1) { + this.copyPlan().addAll(r1.copyPlan()); + this.toRewrite().addAll(r1.toRewrite()); + return this; + } + + public RewriteContentFileResult appendDeleteFile(RewriteResult r1) { + this.copyPlan().addAll(r1.copyPlan()); + this.toRewrite().addAll(r1.toRewrite()); + return this; + } + } + + /** Rewrite manifest files in a distributed manner and return rewritten data files path pairs. */ + private RewriteContentFileResult rewriteManifests( + TableMetadata tableMetadata, Set toRewrite) { + if (toRewrite.isEmpty()) { + return new RewriteContentFileResult(); + } + + Encoder manifestFileEncoder = Encoders.javaSerialization(ManifestFile.class); + Dataset manifestDS = + spark().createDataset(Lists.newArrayList(toRewrite), manifestFileEncoder); + + Broadcast serializableTable = sparkContext().broadcast(SerializableTable.copyOf(table)); + Broadcast> specsById = + sparkContext().broadcast(tableMetadata.specsById()); + + return manifestDS + .repartition(toRewrite.size()) + .map( + toManifests( + serializableTable, + stagingDir, + tableMetadata.formatVersion(), + specsById, + sourcePrefix, + targetPrefix), + Encoders.bean(RewriteContentFileResult.class)) + // duplicates are expected here as the same data file can have different statuses + // (e.g. added and deleted) + .reduce((ReduceFunction) RewriteContentFileResult::append); + } + + private static MapFunction toManifests( + Broadcast
tableBroadcast, + String stagingLocation, + int format, + Broadcast> specsById, + String sourcePrefix, + String targetPrefix) { + + return manifestFile -> { + RewriteContentFileResult result = new RewriteContentFileResult(); + switch (manifestFile.content()) { + case DATA: + result.appendDataFile( + writeDataManifest( + manifestFile, + tableBroadcast, + stagingLocation, + format, + specsById, + sourcePrefix, + targetPrefix)); + break; + case DELETES: + result.appendDeleteFile( + writeDeleteManifest( + manifestFile, + tableBroadcast, + stagingLocation, + format, + specsById, + sourcePrefix, + targetPrefix)); + break; + default: + throw new UnsupportedOperationException( + "Unsupported manifest type: " + manifestFile.content()); + } + return result; + }; + } + + private static RewriteResult writeDataManifest( + ManifestFile manifestFile, + Broadcast
tableBroadcast, + String stagingLocation, + int format, + Broadcast> specsByIdBroadcast, + String sourcePrefix, + String targetPrefix) { + try { + String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); + FileIO io = tableBroadcast.getValue().io(); + OutputFile outputFile = io.newOutputFile(stagingPath); + Map specsById = specsByIdBroadcast.getValue(); + return RewriteTablePathUtil.rewriteDataManifest( + manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + private static RewriteResult writeDeleteManifest( + ManifestFile manifestFile, + Broadcast
tableBroadcast, + String stagingLocation, + int format, + Broadcast> specsByIdBroadcast, + String sourcePrefix, + String targetPrefix) { + try { + String stagingPath = RewriteTablePathUtil.stagingPath(manifestFile.path(), stagingLocation); + FileIO io = tableBroadcast.getValue().io(); + OutputFile outputFile = io.newOutputFile(stagingPath); + Map specsById = specsByIdBroadcast.getValue(); + return RewriteTablePathUtil.rewriteDeleteManifest( + manifestFile, + outputFile, + io, + format, + specsById, + sourcePrefix, + targetPrefix, + stagingLocation); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + private void rewritePositionDeletes(TableMetadata metadata, Set toRewrite) { + if (toRewrite.isEmpty()) { + return; + } + + Encoder deleteFileEncoder = Encoders.javaSerialization(DeleteFile.class); + Dataset deleteFileDs = + spark().createDataset(Lists.newArrayList(toRewrite), deleteFileEncoder); + + Broadcast
serializableTable = sparkContext().broadcast(SerializableTable.copyOf(table)); + Broadcast> specsById = + sparkContext().broadcast(metadata.specsById()); + + PositionDeleteReaderWriter posDeleteReaderWriter = new SparkPositionDeleteReaderWriter(); + deleteFileDs + .repartition(toRewrite.size()) + .foreach( + rewritePositionDelete( + serializableTable, + specsById, + sourcePrefix, + targetPrefix, + stagingDir, + posDeleteReaderWriter)); + } + + private static class SparkPositionDeleteReaderWriter implements PositionDeleteReaderWriter { + @Override + public CloseableIterable reader( + InputFile inputFile, FileFormat format, PartitionSpec spec) { + return positionDeletesReader(inputFile, format, spec); + } + + @Override + public PositionDeleteWriter writer( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException { + return positionDeletesWriter(outputFile, format, spec, partition, rowSchema); + } + } + + private ForeachFunction rewritePositionDelete( + Broadcast
tableBroadcast, + Broadcast> specsById, + String sourcePrefixArg, + String targetPrefixArg, + String stagingLocationArg, + PositionDeleteReaderWriter posDeleteReaderWriter) { + return deleteFile -> { + FileIO io = tableBroadcast.getValue().io(); + String newPath = RewriteTablePathUtil.stagingPath(deleteFile.location(), stagingLocationArg); + OutputFile outputFile = io.newOutputFile(newPath); + PartitionSpec spec = specsById.getValue().get(deleteFile.specId()); + RewriteTablePathUtil.rewritePositionDeleteFile( + deleteFile, + outputFile, + io, + spec, + sourcePrefixArg, + targetPrefixArg, + posDeleteReaderWriter); + }; + } + + private static CloseableIterable positionDeletesReader( + InputFile inputFile, FileFormat format, PartitionSpec spec) { + Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema()); + switch (format) { + case AVRO: + return Avro.read(inputFile) + .project(deleteSchema) + .reuseContainers() + .createReaderFunc(DataReader::create) + .build(); + + case PARQUET: + return Parquet.read(inputFile) + .project(deleteSchema) + .reuseContainers() + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) + .build(); + + case ORC: + return ORC.read(inputFile) + .project(deleteSchema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) + .build(); + + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } + + private static PositionDeleteWriter positionDeletesWriter( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException { + switch (format) { + case AVRO: + return Avro.writeDeletes(outputFile) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case PARQUET: + return Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case ORC: + return ORC.writeDeletes(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } + + private Set snapshotSet(TableMetadata metadata) { + if (metadata == null) { + return Sets.newHashSet(); + } else { + return Sets.newHashSet(metadata.snapshots()); + } + } + + private boolean fileExist(String path) { + if (path == null || path.trim().isEmpty()) { + return false; + } + return table.io().newInputFile(path).exists(); + } + + private static String newPath(String path, String sourcePrefix, String targetPrefix) { + return RewriteTablePathUtil.combinePaths( + targetPrefix, RewriteTablePathUtil.relativize(path, sourcePrefix)); + } + + private String getMetadataLocation(Table tbl) { + String currentMetadataPath = + ((HasTableOperations) tbl).operations().current().metadataFileLocation(); + int lastIndex = currentMetadataPath.lastIndexOf(File.separator); + String metadataDir = ""; + if (lastIndex != -1) { + metadataDir = currentMetadataPath.substring(0, lastIndex + 1); + } + + Preconditions.checkArgument( + !metadataDir.isEmpty(), "Failed to get the metadata file root directory"); + return metadataDir; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index ba9fa2e7b4db..aa4ef987e788 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -108,4 +108,9 @@ public ComputeTableStats computeTableStats(Table table) { public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) { return new RemoveDanglingDeletesSparkAction(spark, table); } + + @Override + public RewriteTablePathSparkAction rewriteTablePath(Table table) { + return new RewriteTablePathSparkAction(spark, table); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java new file mode 100644 index 000000000000..dba7ff197b39 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -0,0 +1,1052 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.actions.ActionsProvider; +import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.TestBase; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import scala.Tuple2; + +public class TestRewriteTablePathsAction extends TestBase { + + @TempDir private Path staging; + @TempDir private Path tableDir; + @TempDir private Path newTableDir; + @TempDir private Path targetTableDir; + + protected ActionsProvider actions() { + return SparkActions.get(); + } + + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + protected static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + protected String tableLocation = null; + private Table table = null; + + private final String ns = "testns"; + private final String backupNs = "backupns"; + + @BeforeEach + public void setupTableLocation() throws Exception { + this.tableLocation = tableDir.toFile().toURI().toString(); + this.table = createATableWith2Snapshots(tableLocation); + createNameSpaces(); + } + + @AfterEach + public void cleanupTableSetup() throws Exception { + dropNameSpaces(); + } + + private Table createATableWith2Snapshots(String location) { + return createTableWithSnapshots(location, 2); + } + + private Table createTableWithSnapshots(String location, int snapshotNumber) { + return createTableWithSnapshots(location, snapshotNumber, Maps.newHashMap()); + } + + protected Table createTableWithSnapshots( + String location, int snapshotNumber, Map properties) { + Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + for (int i = 0; i < snapshotNumber; i++) { + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + } + + return newTable; + } + + private void createNameSpaces() { + sql("CREATE DATABASE IF NOT EXISTS %s", ns); + sql("CREATE DATABASE IF NOT EXISTS %s", backupNs); + } + + private void dropNameSpaces() { + sql("DROP DATABASE IF EXISTS %s CASCADE", ns); + sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs); + } + + @Test + public void testRewritePath() throws Exception { + String targetTableLocation = targetTableLocation(); + + // check the data file location before the rebuild + List validDataFiles = + spark + .read() + .format("iceberg") + .load(tableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(validDataFiles.size()).isEqualTo(2); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, targetTableLocation) + .endVersion("v3.metadata.json") + .execute(); + + assertThat(result.latestVersion()).isEqualTo("v3.metadata.json"); + + checkFileNum(3, 2, 2, 9, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // verify the data file path after the rebuild + List validDataFilesAfterRebuilt = + spark + .read() + .format("iceberg") + .load(targetTableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + assertThat(validDataFilesAfterRebuilt) + .hasSize(2) + .allMatch(item -> item.startsWith(targetTableLocation)); + + // verify data rows + List actual = rows(targetTableLocation); + List expected = rows(tableLocation); + assertEquals("Rows should match after copy", expected, actual); + } + + @Test + public void testSameLocations() { + assertThatThrownBy( + () -> + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, tableLocation) + .endVersion("v1.metadata.json") + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Source prefix cannot be the same as target prefix"); + } + + @Test + public void testStartVersion() throws Exception { + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, targetTableLocation()) + .startVersion("v2.metadata.json") + .execute(); + + checkFileNum(1, 1, 1, 4, result); + + List> paths = readPathPairList(result.fileListLocation()); + + String currentSnapshotId = String.valueOf(table.currentSnapshot().snapshotId()); + assertThat(paths.stream().filter(c -> c._2().contains(currentSnapshotId)).count()) + .withFailMessage("Should have the current snapshot file") + .isEqualTo(1); + + String parentSnapshotId = String.valueOf(table.currentSnapshot().parentId()); + assertThat(paths.stream().filter(c -> c._2().contains(parentSnapshotId)).count()) + .withFailMessage("Should NOT have the parent snapshot file") + .isEqualTo(0); + } + + @Test + public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2) + throws Exception { + String location = newTableLocation(); + Table tableWith3Snaps = createTableWithSnapshots(location, 3); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(tableWith3Snaps) + .rewriteLocationPrefix(location, toAbsolute(location1)) + .startVersion("v2.metadata.json") + .execute(); + + checkFileNum(2, 2, 2, 8, result); + + // start from the first version + RewriteTablePath.Result result1 = + actions() + .rewriteTablePath(tableWith3Snaps) + .rewriteLocationPrefix(location, toAbsolute(location2)) + .startVersion("v1.metadata.json") + .execute(); + + checkFileNum(3, 3, 3, 12, result1); + } + + @Test + public void testFullTableRewritePath() throws Exception { + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, targetTableLocation()) + .execute(); + + checkFileNum(3, 2, 2, 9, result); + } + + @Test + public void testDeleteDataFile() throws Exception { + List validDataFiles = + spark + .read() + .format("iceberg") + .load(table.location() + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + + table.newDelete().deleteFile(validDataFiles.stream().findFirst().get()).commit(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .stagingLocation(stagingLocation()) + .execute(); + + checkFileNum(4, 3, 3, 12, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // verify data rows + Dataset resultDF = spark.read().format("iceberg").load(targetTableLocation()); + assertThat(resultDF.as(Encoders.bean(ThreeColumnRecord.class)).count()) + .withFailMessage("There are only one row left since we deleted a data file") + .isEqualTo(1); + } + + @Test + public void testPositionDeletes() throws Exception { + List> deletes = + Lists.newArrayList( + Pair.of( + table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(), + 0L)); + + File file = new File(removePrefix(table.location() + "/data/deeply/nested/deletes.parquet")); + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + table, table.io().newOutputFile(file.toURI().toString()), deletes) + .first(); + + table.newRowDelta().addDeletes(positionDeletes).commit(); + + assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(1); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .execute(); + + // We have one more snapshot, an additional manifest list, and a new (delete) manifest, + // and an additional position delete + checkFileNum(4, 3, 3, 13, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // Positional delete affects a single row, so only one row must remain + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1); + } + + @Test + public void testPositionDeleteWithRow() throws Exception { + String dataFileLocation = + table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(); + List> deletes = Lists.newArrayList(); + OutputFile deleteFile = + table + .io() + .newOutputFile( + new File(removePrefix(table.location() + "/data/deeply/nested/deletes.parquet")) + .toURI() + .toString()); + deletes.add(positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", "AAAA")); + DeleteFile positionDeletes = FileHelpers.writePosDeleteFile(table, deleteFile, null, deletes); + table.newRowDelta().addDeletes(positionDeletes).commit(); + + assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(1); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .execute(); + + // We have one more snapshot, an additional manifest list, and a new (delete) manifest, + // and an additional position delete + checkFileNum(4, 3, 3, 13, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // check copied position delete row + Object[] deletedRow = (Object[]) rows(targetTableLocation() + "#position_deletes").get(0)[2]; + assertEquals( + "Position deletes should be equal", new Object[] {1, "AAAAAAAAAA", "AAAA"}, deletedRow); + + // Positional delete affects a single row, so only one row must remain + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1); + } + + @Test + public void testPositionDeletesAcrossFiles() throws Exception { + Stream allFiles = + StreamSupport.stream(table.snapshots().spliterator(), false) + .flatMap(s -> StreamSupport.stream(s.addedDataFiles(table.io()).spliterator(), false)); + List> deletes = + allFiles.map(f -> Pair.of((CharSequence) f.location(), 0L)).collect(Collectors.toList()); + + // a single position delete with two entries + assertThat(deletes.size()).isEqualTo(2); + + File file = new File(removePrefix(table.location() + "/data/deeply/nested/file.parquet")); + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + table, table.io().newOutputFile(file.toURI().toString()), deletes) + .first(); + + table.newRowDelta().addDeletes(positionDeletes).commit(); + + assertThat(spark.read().format("iceberg").load(table.location()).count()).isEqualTo(0); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .execute(); + + // We have one more snapshot, an additional manifest list, and a new (delete) manifest, + // and an additional position delete + checkFileNum(4, 3, 3, 13, result); + + // copy the metadata files and data files + copyTableFiles(result); + + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(0); + } + + @Test + public void testEqualityDeletes() throws Exception { + Table sourceTable = createTableWithSnapshots(newTableLocation(), 1); + + // Add more varied data + List records = + Lists.newArrayList( + new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(3, "BBBBBBBBBB", "BBBB"), + new ThreeColumnRecord(4, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(5, "DDDDDDDDDD", "DDDD")); + spark + .createDataFrame(records, ThreeColumnRecord.class) + .coalesce(1) + .select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(newTableLocation()); + + Schema deleteRowSchema = sourceTable.schema().select("c2"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList( + dataDelete.copy("c2", "AAAAAAAAAA"), dataDelete.copy("c2", "CCCCCCCCCC")); + File file = new File(removePrefix(sourceTable.location()) + "/data/deeply/nested/file.parquet"); + DeleteFile equalityDeletes = + FileHelpers.writeDeleteFile( + sourceTable, + sourceTable.io().newOutputFile(file.toURI().toString()), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema); + sourceTable.newRowDelta().addDeletes(equalityDeletes).commit(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .execute(); + + // We have four metadata files: for the table creation, for the initial snapshot, for the + // second append here, and for commit with equality deletes. Thus, we have three manifest lists. + // We have a data file for each snapshot (two with data, one with equality deletes) + checkFileNum(4, 3, 3, 13, result); + + // copy the metadata files and data files + copyTableFiles(result); + + // Equality deletes affect three rows, so just two rows must remain + assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(2); + } + + @Test + public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception { + String location = newTableLocation(); + Table sourceTable = createTableWithSnapshots(location, 2); + // expire the first snapshot + Table staticTable = newStaticTable(location + "metadata/v2.metadata.json", table.io()); + actions() + .expireSnapshots(sourceTable) + .expireSnapshotId(staticTable.currentSnapshot().snapshotId()) + .execute(); + + // create 100 more snapshots + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + for (int i = 0; i < 100; i++) { + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location); + } + sourceTable.refresh(); + + // v1/v2/v3.metadata.json has been deleted in v104.metadata.json, and there is no way to find + // the first snapshot + // from the version file history + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(location, targetTableLocation()) + .execute(); + + checkFileNum(101, 101, 101, 406, result); + } + + @Test + public void testRewritePathWithoutSnapshot() throws Exception { + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(tableLocation, newTableLocation()) + .endVersion("v1.metadata.json") + .execute(); + + // the only rebuilt file is v1.metadata.json since it contains no snapshot + checkFileNum(1, 0, 0, 1, result); + } + + @Test + public void testExpireSnapshotBeforeRewrite() throws Exception { + // expire one snapshot + actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .execute(); + + checkFileNum(4, 1, 2, 9, result); + } + + @Test + public void testStartSnapshotWithoutValidSnapshot() throws Exception { + // expire one snapshot + actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); + + assertThat(((List) table.snapshots()).size()) + .withFailMessage("1 out 2 snapshot has been removed") + .isEqualTo(1); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .stagingLocation(stagingLocation()) + .startVersion("v2.metadata.json") + .execute(); + + // 2 metadata.json, 1 manifest list file, 1 manifest files + checkFileNum(2, 1, 1, 5, result); + } + + @Test + public void testMoveTheVersionExpireSnapshot() throws Exception { + // expire one snapshot + actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); + + // only move version v4, which is the version generated by snapshot expiration + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), targetTableLocation()) + .stagingLocation(stagingLocation()) + .startVersion("v3.metadata.json") + .execute(); + + // only v4.metadata.json needs to move + checkFileNum(1, 0, 0, 1, result); + } + + @Test + public void testMoveVersionWithInvalidSnapshots() throws Exception { + // expire one snapshot + actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute(); + + assertThatThrownBy( + () -> + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), newTableLocation()) + .stagingLocation(stagingLocation()) + .endVersion("v3.metadata.json") + .execute()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. " + + "Please choose an earlier version without invalid snapshots."); + } + + @Test + public void testRollBack() throws Exception { + long secondSnapshotId = table.currentSnapshot().snapshotId(); + + // roll back to the first snapshot(v2) + table.manageSnapshots().setCurrentSnapshot(table.currentSnapshot().parentId()).commit(); + + // add a new snapshot + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(table.location()); + + table.refresh(); + + // roll back to the second snapshot(v3) + table.manageSnapshots().setCurrentSnapshot(secondSnapshotId).commit(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), newTableLocation()) + .stagingLocation(stagingLocation()) + .execute(); + checkFileNum(6, 3, 3, 15, result); + } + + @Test + public void testWriteAuditPublish() throws Exception { + // enable WAP + table.updateProperties().set(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true").commit(); + spark.conf().set("spark.wap.id", "1"); + + // add a new snapshot without changing the current snapshot of the table + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(table.location()); + + table.refresh(); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), newTableLocation()) + .stagingLocation(stagingLocation()) + .execute(); + + // There are 3 snapshots in total, although the current snapshot is the second one. + checkFileNum(5, 3, 3, 14, result); + } + + @Test + public void testSchemaChange() throws Exception { + // change the schema + table.updateSchema().addColumn("c4", Types.StringType.get()).commit(); + + // copy table + RewriteTablePath.Result result = + actions() + .rewriteTablePath(table) + .rewriteLocationPrefix(table.location(), newTableLocation()) + .stagingLocation(stagingLocation()) + .execute(); + + // check the result + checkFileNum(4, 2, 2, 10, result); + } + + @Test + public void testSnapshotIdInheritanceEnabled() throws Exception { + String sourceTableLocation = newTableLocation(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true"); + + Table sourceTable = createTableWithSnapshots(sourceTableLocation, 2, properties); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .execute(); + + checkFileNum(3, 2, 2, 9, result); + } + + @Test + public void testMetadataCompression() throws Exception { + String sourceTableLocation = newTableLocation(); + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + Table sourceTable = createTableWithSnapshots(sourceTableLocation, 2, properties); + + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .endVersion("v2.gz.metadata.json") + .execute(); + + checkFileNum(2, 1, 1, 5, result); + + result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .startVersion("v1.gz.metadata.json") + .execute(); + + checkFileNum(2, 2, 2, 8, result); + } + + @Test + public void testInvalidArgs() { + RewriteTablePath actions = actions().rewriteTablePath(table); + + assertThatThrownBy(() -> actions.rewriteLocationPrefix("", null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Source prefix('') cannot be empty"); + + assertThatThrownBy(() -> actions.rewriteLocationPrefix(null, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Source prefix('null') cannot be empty"); + + assertThatThrownBy(() -> actions.stagingLocation("")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Staging location('') cannot be empty"); + + assertThatThrownBy(() -> actions.stagingLocation(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Staging location('null') cannot be empty"); + + assertThatThrownBy(() -> actions.startVersion(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Start version('null') cannot be empty"); + + assertThatThrownBy(() -> actions.endVersion(" ")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("End version(' ') cannot be empty"); + + assertThatThrownBy(() -> actions.endVersion(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("End version('null') cannot be empty"); + } + + @Test + public void testStatisticFile() throws IOException { + String sourceTableLocation = newTableLocation(); + Map properties = Maps.newHashMap(); + properties.put("format-version", "2"); + String tableName = "v2tblwithstats"; + Table sourceTable = + createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0); + + TableMetadata metadata = currentMetadata(sourceTable); + TableMetadata withStatistics = + TableMetadata.buildFrom(metadata) + .setStatistics( + 43, + new GenericStatisticsFile( + 43, "/some/path/to/stats/file", 128, 27, ImmutableList.of())) + .build(); + + OutputFile file = sourceTable.io().newOutputFile(metadata.metadataFileLocation()); + TableMetadataParser.overwrite(withStatistics, file); + + assertThatThrownBy( + () -> + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(sourceTableLocation, targetTableLocation()) + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Statistic files are not supported yet"); + } + + @Test + public void testMetadataCompressionWithMetastoreTable() throws Exception { + Map properties = Maps.newHashMap(); + properties.put(TableProperties.METADATA_COMPRESSION, "gzip"); + Table sourceTable = + createMetastoreTable( + newTableLocation(), properties, "default", "testMetadataCompression", 2); + + TableMetadata currentMetadata = currentMetadata(sourceTable); + + // set the second version as the endVersion + String endVersion = fileName(currentMetadata.previousFiles().get(1).file()); + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .endVersion(endVersion) + .execute(); + + checkFileNum(2, 1, 1, 5, result); + + // set the first version as the lastCopiedVersion + String firstVersion = fileName(currentMetadata.previousFiles().get(0).file()); + result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .startVersion(firstVersion) + .execute(); + + checkFileNum(2, 2, 2, 8, result); + } + + // Metastore table tests + @Test + public void testMetadataLocationChange() throws Exception { + Table sourceTable = + createMetastoreTable(newTableLocation(), Maps.newHashMap(), "default", "tbl", 1); + String metadataFilePath = currentMetadata(sourceTable).metadataFileLocation(); + + String newMetadataDir = "new-metadata-dir"; + sourceTable + .updateProperties() + .set(TableProperties.WRITE_METADATA_LOCATION, newTableLocation() + newMetadataDir) + .commit(); + + spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')"); + sourceTable.refresh(); + + // copy table + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .execute(); + + checkFileNum(4, 2, 2, 10, result); + + // pick up a version from the old metadata dir as the end version + RewriteTablePath.Result result1 = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .endVersion(fileName(metadataFilePath)) + .execute(); + + checkFileNum(2, 1, 1, 5, result1); + + // pick up a version from the old metadata dir as the last copied version + RewriteTablePath.Result result2 = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .startVersion(fileName(metadataFilePath)) + .execute(); + + checkFileNum(2, 1, 1, 5, result2); + } + + @Test + public void testDeleteFrom() throws Exception { + Map properties = Maps.newHashMap(); + properties.put("format-version", "2"); + properties.put("write.delete.mode", "merge-on-read"); + String tableName = "v2tbl"; + Table sourceTable = + createMetastoreTable(newTableLocation(), properties, "default", tableName, 0); + // ingest data + List records = + Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(3, "AAAAAAAAAA", "AAAA")); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .saveAsTable("hive.default." + tableName); + sourceTable.refresh(); + + // generate position delete files + spark.sql(String.format("delete from hive.default.%s where c1 = 1", tableName)); + sourceTable.refresh(); + + List originalData = + rowsToJava( + spark + .read() + .format("iceberg") + .load("hive.default." + tableName) + .sort("c1", "c2", "c3") + .collectAsList()); + // two rows + assertThat(originalData.size()).isEqualTo(2); + + // copy table and check the results + RewriteTablePath.Result result = + actions() + .rewriteTablePath(sourceTable) + .rewriteLocationPrefix(newTableLocation(), targetTableLocation()) + .execute(); + + checkFileNum(3, 2, 2, 9, result); + // one data and one metadata file + copyTableFiles(result); + + // register table + String metadataLocation = currentMetadata(sourceTable).metadataFileLocation(); + String versionFile = fileName(metadataLocation); + String targetTableName = "copiedV2Table"; + TableIdentifier tableIdentifier = TableIdentifier.of("default", targetTableName); + catalog.registerTable(tableIdentifier, targetTableLocation() + "/metadata/" + versionFile); + + List copiedData = + rowsToJava( + spark + .read() + .format("iceberg") + .load("hive.default." + targetTableName) + .sort("c1", "c2", "c3") + .collectAsList()); + + assertEquals("Rows must match", originalData, copiedData); + } + + protected void checkFileNum( + int versionFileCount, + int manifestListCount, + int manifestFileCount, + int totalCount, + RewriteTablePath.Result result) { + List filesToMove = + spark + .read() + .format("text") + .load(result.fileListLocation()) + .as(Encoders.STRING()) + .collectAsList(); + assertThat(filesToMove.stream().filter(f -> f.endsWith(".metadata.json")).count()) + .withFailMessage("Wrong rebuilt version file count") + .isEqualTo(versionFileCount); + assertThat(filesToMove.stream().filter(f -> f.contains("snap-")).count()) + .withFailMessage("Wrong rebuilt Manifest list file count") + .isEqualTo(manifestListCount); + assertThat(filesToMove.stream().filter(f -> f.endsWith("-m0.avro")).count()) + .withFailMessage("Wrong rebuilt Manifest file file count") + .isEqualTo(manifestFileCount); + assertThat(filesToMove.size()).withFailMessage("Wrong total file count").isEqualTo(totalCount); + } + + protected String newTableLocation() throws IOException { + return toAbsolute(newTableDir); + } + + protected String targetTableLocation() throws IOException { + return toAbsolute(targetTableDir); + } + + protected String stagingLocation() throws IOException { + return toAbsolute(staging); + } + + protected String toAbsolute(Path relative) throws IOException { + return relative.toFile().toURI().toString(); + } + + private void copyTableFiles(RewriteTablePath.Result result) throws Exception { + List> filesToMove = readPathPairList(result.fileListLocation()); + + for (Tuple2 pathPair : filesToMove) { + FileUtils.copyFile(new File(URI.create(pathPair._1())), new File(URI.create(pathPair._2()))); + } + } + + private String removePrefix(String path) { + return path.substring(path.lastIndexOf(":") + 1); + } + + protected Table newStaticTable(String metadataFileLocation, FileIO io) { + StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io); + return new BaseTable(ops, metadataFileLocation); + } + + private List> readPathPairList(String path) { + Encoder> encoder = Encoders.tuple(Encoders.STRING(), Encoders.STRING()); + return spark + .read() + .format("csv") + .schema(encoder.schema()) + .load(path) + .as(encoder) + .collectAsList(); + } + + private Table createMetastoreTable( + String location, + Map properties, + String namespace, + String tableName, + int snapshotNumber) { + spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog.hive.type", "hive"); + spark.conf().set("spark.sql.catalog.hive.default-namespace", "default"); + spark.conf().set("spark.sql.catalog.hive.cache-enabled", "false"); + + StringBuilder propertiesStr = new StringBuilder(); + properties.forEach((k, v) -> propertiesStr.append("'" + k + "'='" + v + "',")); + String tblProperties = + propertiesStr.substring(0, propertiesStr.length() > 0 ? propertiesStr.length() - 1 : 0); + + sql("DROP TABLE IF EXISTS hive.%s.%s", namespace, tableName); + if (tblProperties.isEmpty()) { + String sqlStr = + String.format( + "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName); + if (!location.isEmpty()) { + sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location); + } + sql(sqlStr); + } else { + String sqlStr = + String.format( + "CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName); + if (!location.isEmpty()) { + sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location); + } + + sqlStr = String.format("%s TBLPROPERTIES (%s)", sqlStr, tblProperties); + sql(sqlStr); + } + + for (int i = 0; i < snapshotNumber; i++) { + sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", namespace, tableName, i); + } + return catalog.loadTable(TableIdentifier.of(namespace, tableName)); + } + + private static String fileName(String path) { + String filename = path; + int lastIndex = path.lastIndexOf(File.separator); + if (lastIndex != -1) { + filename = path.substring(lastIndex + 1); + } + return filename; + } + + private TableMetadata currentMetadata(Table tbl) { + return ((HasTableOperations) tbl).operations().current(); + } + + private List rows(String location) { + return rowsToJava(spark.read().format("iceberg").load(location).collectAsList()); + } + + private PositionDelete positionDelete( + Schema tableSchema, CharSequence path, Long position, Object... values) { + PositionDelete posDelete = PositionDelete.create(); + GenericRecord nested = GenericRecord.create(tableSchema); + for (int i = 0; i < values.length; i++) { + nested.set(i, values[i]); + } + posDelete.set(path, position, nested); + return posDelete; + } +}