Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ private static RewriteResult<DataFile> writeDataFileEntry(
DataFile newDataFile =
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
appendEntryWithFile(entry, writer, newDataFile);
result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location()));
// keep deleted data file entries but exclude them from copyPlan
if (entry.isLive()) {
result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location()));
}
return result;
}

Expand All @@ -381,16 +384,22 @@ private static RewriteResult<DeleteFile> writeDeleteFileEntry(
.withMetrics(metricsWithTargetPath)
.build();
appendEntryWithFile(entry, writer, movedFile);
result
.copyPlan()
.add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location()));
// keep deleted position delete entries but exclude them from copyPlan
if (entry.isLive()) {
result
.copyPlan()
.add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location()));
}
result.toRewrite().add(file);
return result;
case EQUALITY_DELETES:
DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix);
appendEntryWithFile(entry, writer, eqDeleteFile);
// No need to rewrite equality delete files as they do not contain absolute file paths.
result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location()));
// keep deleted equality delete entries but exclude them from copyPlan
if (entry.isLive()) {
// No need to rewrite equality delete files as they do not contain absolute file paths.
result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location()));
}
return result;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand All @@ -40,13 +41,15 @@
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
Expand All @@ -57,13 +60,15 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.SparkEnv;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -105,14 +110,14 @@ protected ActionsProvider actions() {
private final String backupNs = "backupns";

@BeforeEach
public void setupTableLocation() throws Exception {
public void setupTableLocation() {
this.tableLocation = tableDir.toFile().toURI().toString();
this.table = createATableWith2Snapshots(tableLocation);
createNameSpaces();
}

@AfterEach
public void cleanupTableSetup() throws Exception {
public void cleanupTableSetup() {
dropNameSpaces();
}

Expand All @@ -126,6 +131,11 @@ private Table createTableWithSnapshots(String location, int snapshotNumber) {

protected Table createTableWithSnapshots(
String location, int snapshotNumber, Map<String, String> properties) {
return createTableWithSnapshots(location, snapshotNumber, properties, "append");
}

private Table createTableWithSnapshots(
String location, int snapshotNumber, Map<String, String> properties, String mode) {
Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location);

List<ThreeColumnRecord> records =
Expand All @@ -134,7 +144,7 @@ protected Table createTableWithSnapshots(
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);

for (int i = 0; i < snapshotNumber; i++) {
df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location);
df.select("c1", "c2", "c3").write().format("iceberg").mode(mode).save(location);
}

return newTable;
Expand Down Expand Up @@ -478,10 +488,13 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception {
Table sourceTable = createTableWithSnapshots(location, 2);
// expire the first snapshot
Table staticTable = newStaticTable(location + "metadata/v2.metadata.json", table.io());
actions()
.expireSnapshots(sourceTable)
.expireSnapshotId(staticTable.currentSnapshot().snapshotId())
.execute();
int expiredManifestListCount = 1;
ExpireSnapshots.Result expireResult =
actions()
.expireSnapshots(sourceTable)
.expireSnapshotId(staticTable.currentSnapshot().snapshotId())
.execute();
assertThat(expireResult.deletedManifestListsCount()).isEqualTo(expiredManifestListCount);

// create 100 more snapshots
List<ThreeColumnRecord> records =
Expand All @@ -492,17 +505,25 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception {
}
sourceTable.refresh();

// each iteration generate 1 version file, 1 manifest list, 1 manifest and 1 data file
int totalIteration = 102;
// v1/v2/v3.metadata.json has been deleted in v104.metadata.json, and there is no way to find
// the first snapshot
// from the version file history
int missingVersionFile = 1;
RewriteTablePath.Result result =
actions()
.rewriteTablePath(sourceTable)
.stagingLocation(stagingLocation())
.rewriteLocationPrefix(location, targetTableLocation())
.execute();

checkFileNum(101, 101, 101, 406, result);
checkFileNum(
totalIteration - missingVersionFile,
totalIteration - expiredManifestListCount,
totalIteration,
totalIteration * 4 - missingVersionFile - expiredManifestListCount,
result);
}

@Test
Expand Down Expand Up @@ -533,14 +554,77 @@ public void testExpireSnapshotBeforeRewrite() throws Exception {
checkFileNum(4, 1, 2, 9, result);
}

@Test
public void testRewritePathWithNonLiveEntry() throws Exception {
String location = newTableLocation();
// first overwrite generate 1 manifest and 1 data file
// each subsequent overwrite on unpartitioned table generate 2 manifests and 1 data file
Table tableWith3Snaps = createTableWithSnapshots(location, 3, Maps.newHashMap(), "overwrite");

Snapshot oldest = SnapshotUtil.oldestAncestor(tableWith3Snaps);
String oldestDataFilePath =
Iterables.getOnlyElement(
tableWith3Snaps.snapshot(oldest.snapshotId()).addedDataFiles(tableWith3Snaps.io()))
.location();
String deletedDataFilePathInTargetLocation =
String.format("%sdata/%s", targetTableLocation(), fileName(oldestDataFilePath));

// expire the oldest snapshot and remove oldest DataFile
ExpireSnapshots.Result expireResult =
actions().expireSnapshots(tableWith3Snaps).expireSnapshotId(oldest.snapshotId()).execute();
assertThat(expireResult)
.as("Should deleted 1 data files in root snapshot")
.extracting(
ExpireSnapshots.Result::deletedManifestListsCount,
ExpireSnapshots.Result::deletedManifestsCount,
ExpireSnapshots.Result::deletedDataFilesCount)
.contains(1L, 1L, 1L);

RewriteTablePath.Result result =
actions()
.rewriteTablePath(tableWith3Snaps)
.stagingLocation(stagingLocation())
.rewriteLocationPrefix(tableWith3Snaps.location(), targetTableLocation())
.execute();

// 5 version files include 1 table creation 3 overwrite and 1 snapshot expiration
// 3 overwrites generate 3 manifest list and 5 manifests with 3 data files
// snapshot expiration removed 1 of each
checkFileNum(5, 2, 4, 13, result);

// copy the metadata files and data files
copyTableFiles(result);

// expect deleted data file is excluded from rewrite and copy
List<String> copiedDataFiles =
spark
.read()
.format("iceberg")
.load(targetTableLocation() + "#all_files")
.select("file_path")
.as(Encoders.STRING())
.collectAsList();
assertThat(copiedDataFiles).hasSize(2).doesNotContain(deletedDataFilePathInTargetLocation);

// expect manifest entries still contain deleted entry
List<String> copiedEntries =
spark
.read()
.format("iceberg")
.load(targetTableLocation() + "#all_entries")
.filter("status == 2")
.select("data_file.file_path")
.as(Encoders.STRING())
.collectAsList();
assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation);
}

@Test
public void testStartSnapshotWithoutValidSnapshot() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();

assertThat(((List) table.snapshots()).size())
.withFailMessage("1 out 2 snapshot has been removed")
.isEqualTo(1);
assertThat(table.snapshots()).hasSize(1);

RewriteTablePath.Result result =
actions()
Expand Down Expand Up @@ -573,7 +657,7 @@ public void testMoveTheVersionExpireSnapshot() throws Exception {
}

@Test
public void testMoveVersionWithInvalidSnapshots() throws Exception {
public void testMoveVersionWithInvalidSnapshots() {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();

Expand Down Expand Up @@ -940,16 +1024,20 @@ protected void checkFileNum(
.load(result.fileListLocation())
.as(Encoders.STRING())
.collectAsList();
assertThat(filesToMove.stream().filter(f -> f.endsWith(".metadata.json")).count())
.withFailMessage("Wrong rebuilt version file count")
Predicate<String> isManifest = f -> f.endsWith("-m0.avro") || f.endsWith("-m1.avro");
Predicate<String> isManifestList = f -> f.contains("snap-") && f.endsWith(".avro");
Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");

assertThat(filesToMove.stream().filter(isMetadataJSON).count())
.as("Wrong rebuilt version file count")
.isEqualTo(versionFileCount);
assertThat(filesToMove.stream().filter(f -> f.contains("snap-")).count())
.withFailMessage("Wrong rebuilt Manifest list file count")
assertThat(filesToMove.stream().filter(isManifestList).count())
.as("Wrong rebuilt Manifest list file count")
.isEqualTo(manifestListCount);
assertThat(filesToMove.stream().filter(f -> f.endsWith("-m0.avro")).count())
.withFailMessage("Wrong rebuilt Manifest file file count")
assertThat(filesToMove.stream().filter(isManifest).count())
.as("Wrong rebuilt Manifest file file count")
.isEqualTo(manifestFileCount);
assertThat(filesToMove.size()).withFailMessage("Wrong total file count").isEqualTo(totalCount);
assertThat(filesToMove.size()).as("Wrong total file count").isEqualTo(totalCount);
}

protected String newTableLocation() throws IOException {
Expand Down