Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
Iterables.transform(metadata.statisticsFiles(), StatisticsFile::path),
"statistics",
true);
deleteFiles(
io,
Iterables.transform(metadata.partitionStatisticsFiles(), PartitionStatisticsFile::path),
"partition statistics",
true);
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.hadoop;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
Expand All @@ -29,12 +30,15 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
Expand All @@ -60,17 +64,31 @@ public void dropTableDataDeletesExpectedFiles() throws IOException {
table.io());
table.updateStatistics().setStatistics(statisticsFile.snapshotId(), statisticsFile).commit();

TableMetadata tableMetadata = readMetadataVersion(4);
PartitionStatisticsFile partitionStatisticsFile =
writePartitionStatsFile(
table.currentSnapshot().snapshotId(),
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats",
table.io());
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile).commit();

TableMetadata tableMetadata = readMetadataVersion(5);
Set<Snapshot> snapshotSet = Sets.newHashSet(table.snapshots());

Set<String> manifestListLocations = manifestListLocations(snapshotSet);
Set<String> manifestLocations = manifestLocations(snapshotSet, table.io());
Set<String> dataLocations = dataLocations(snapshotSet, table.io());
Set<String> metadataLocations = metadataLocations(tableMetadata);
Set<String> statsLocations = statsLocations(tableMetadata);
Set<String> partitionStatsLocations = partitionStatsLocations(tableMetadata);

Assertions.assertThat(manifestListLocations).as("should have 2 manifest lists").hasSize(2);
Assertions.assertThat(metadataLocations).as("should have 4 metadata locations").hasSize(4);
Assertions.assertThat(statsLocations).as("should have 1 stats file").hasSize(1);
Assertions.assertThat(metadataLocations).as("should have 5 metadata locations").hasSize(5);
Assertions.assertThat(statsLocations)
.as("should have 1 stats file")
.containsExactly(statisticsFile.path());
Assertions.assertThat(partitionStatsLocations)
.as("should have 1 partition stats file")
.containsExactly(partitionStatisticsFile.path());

FileIO fileIO = Mockito.mock(FileIO.class);
Mockito.when(fileIO.newInputFile(Mockito.anyString()))
Expand All @@ -90,7 +108,8 @@ public void dropTableDataDeletesExpectedFiles() throws IOException {
+ manifestLocations.size()
+ dataLocations.size()
+ metadataLocations.size()
+ statsLocations.size()))
+ statsLocations.size()
+ partitionStatsLocations.size()))
.deleteFile(argumentCaptor.capture());

List<String> deletedPaths = argumentCaptor.getAllValues();
Expand All @@ -107,8 +126,11 @@ public void dropTableDataDeletesExpectedFiles() throws IOException {
.as("should contain all created metadata locations")
.containsAll(metadataLocations);
Assertions.assertThat(deletedPaths)
.as("should contain all created statistic")
.as("should contain all created statistics")
.containsAll(statsLocations);
Assertions.assertThat(deletedPaths)
.as("should contain all created partition stats files")
.containsAll(partitionStatsLocations);
}

@Test
Expand Down Expand Up @@ -179,25 +201,25 @@ public void shouldNotDropDataFilesIfGcNotEnabled() {
.containsAll(metadataLocations);
}

private Set<String> manifestListLocations(Set<Snapshot> snapshotSet) {
private static Set<String> manifestListLocations(Set<Snapshot> snapshotSet) {
return snapshotSet.stream().map(Snapshot::manifestListLocation).collect(Collectors.toSet());
}

private Set<String> manifestLocations(Set<Snapshot> snapshotSet, FileIO io) {
private static Set<String> manifestLocations(Set<Snapshot> snapshotSet, FileIO io) {
return snapshotSet.stream()
.flatMap(snapshot -> snapshot.allManifests(io).stream())
.map(ManifestFile::path)
.collect(Collectors.toSet());
}

private Set<String> dataLocations(Set<Snapshot> snapshotSet, FileIO io) {
private static Set<String> dataLocations(Set<Snapshot> snapshotSet, FileIO io) {
return snapshotSet.stream()
.flatMap(snapshot -> StreamSupport.stream(snapshot.addedDataFiles(io).spliterator(), false))
.map(dataFile -> dataFile.path().toString())
.collect(Collectors.toSet());
}

private Set<String> metadataLocations(TableMetadata tableMetadata) {
private static Set<String> metadataLocations(TableMetadata tableMetadata) {
Set<String> metadataLocations =
tableMetadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file)
Expand All @@ -206,13 +228,13 @@ private Set<String> metadataLocations(TableMetadata tableMetadata) {
return metadataLocations;
}

private Set<String> statsLocations(TableMetadata tableMetadata) {
private static Set<String> statsLocations(TableMetadata tableMetadata) {
return tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());
}

private StatisticsFile writeStatsFile(
private static StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
Expand All @@ -235,4 +257,27 @@ private StatisticsFile writeStatsFile(
.collect(ImmutableList.toImmutableList()));
}
}

private static PartitionStatisticsFile writePartitionStatsFile(
long snapshotId, String statsLocation, FileIO fileIO) {
PositionOutputStream positionOutputStream;
try {
positionOutputStream = fileIO.newOutputFile(statsLocation).create();
positionOutputStream.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(snapshotId)
.fileSizeInBytes(42L)
.path(statsLocation)
.build();
}

private static Set<String> partitionStatsLocations(TableMetadata tableMetadata) {
return tableMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path)
.collect(Collectors.toSet());
}
}