Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,40 @@

import static org.apache.iceberg.TableProperties.GC_ENABLED;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -63,7 +75,8 @@ public void testExpireSnapshotsInEmptyTable() {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);

List<Object[]> output = sql("CALL %s.system.expire_snapshots('%s')", catalogName, tableIdent);
assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 0L)), output);
assertEquals(
"Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 0L, 0L)), output);
}

@Test
Expand Down Expand Up @@ -91,7 +104,8 @@ public void testExpireSnapshotsUsingPositionalArgs() {
sql(
"CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')",
catalogName, tableIdent, secondSnapshotTimestamp);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output1);
assertEquals(
"Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output1);

table.refresh();

Expand All @@ -117,7 +131,8 @@ public void testExpireSnapshotsUsingPositionalArgs() {
sql(
"CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)",
catalogName, tableIdent, currentTimestamp);
assertEquals("Procedure output must match", ImmutableList.of(row(2L, 0L, 0L, 2L, 1L)), output);
assertEquals(
"Procedure output must match", ImmutableList.of(row(2L, 0L, 0L, 2L, 1L, 0L)), output);
}

@Test
Expand All @@ -137,12 +152,10 @@ public void testExpireSnapshotUsingNamedArgs() {

List<Object[]> output =
sql(
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
+ "retain_last => 1)",
"CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output);
assertEquals(
"Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output);
}

@Test
Expand Down Expand Up @@ -231,12 +244,11 @@ public void testConcurrentExpireSnapshots() {
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
+ "max_concurrent_deletes => %s,"
+ "retain_last => 1)",
+ "max_concurrent_deletes => %s)",
catalogName, currentTimestamp, tableIdent, 4);
assertEquals(
"Expiring snapshots concurrently should succeed",
ImmutableList.of(row(0L, 0L, 0L, 0L, 3L)),
ImmutableList.of(row(0L, 0L, 0L, 0L, 3L, 0L)),
output);
}

Expand Down Expand Up @@ -283,7 +295,7 @@ public void testExpireDeleteFiles() throws Exception {
.append();
sql("DELETE FROM %s WHERE id=1", tableName);

Table table = Spark3Util.loadIcebergTable(spark, tableName);
Table table = validationCatalog.loadTable(tableIdent);

Assert.assertEquals(
"Should have 1 delete manifest", 1, TestHelpers.deleteManifests(table).size());
Expand Down Expand Up @@ -318,15 +330,12 @@ public void testExpireDeleteFiles() throws Exception {
Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
List<Object[]> output =
sql(
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
+ "retain_last => 1)",
"CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);

assertEquals(
"Should deleted 1 data and pos delete file and 4 manifests and lists (one for each txn)",
ImmutableList.of(row(1L, 1L, 0L, 4L, 4L)),
ImmutableList.of(row(1L, 1L, 0L, 4L, 4L, 0L)),
output);
Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath));
Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath));
Expand All @@ -352,10 +361,10 @@ public void testExpireSnapshotWithStreamResultsEnabled() {
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
+ "retain_last => 1, "
+ "stream_results => true)",
catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output);
assertEquals(
"Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output);
}

@Test
Expand Down Expand Up @@ -434,13 +443,102 @@ public void testExpireSnapshotsProcedureWorksWithSqlComments() {
+ "/* And comments that span *multiple* \n"
+ " lines */ CALL /* this is the actual CALL */ %s.system.expire_snapshots("
+ " older_than => TIMESTAMP '%s',"
+ " table => '%s',"
+ " retain_last => 1)";
+ " table => '%s')";
List<Object[]> output = sql(callStatement, catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output);
assertEquals(
"Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output);

table.refresh();

Assert.assertEquals("Should be 1 snapshot remaining", 1, Iterables.size(table.snapshots()));
}

@Test
public void testExpireSnapshotsWithStatisticFiles() throws Exception {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
String statsFileLocation1 = statsFileLocation(table.location());
StatisticsFile statisticsFile1 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
statsFileLocation1,
table.io());
table.updateStatistics().setStatistics(statisticsFile1.snapshotId(), statisticsFile1).commit();

sql("INSERT INTO %s SELECT 20, 'def'", tableName);
table.refresh();
String statsFileLocation2 = statsFileLocation(table.location());
StatisticsFile statisticsFile2 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
statsFileLocation2,
table.io());
table.updateStatistics().setStatistics(statisticsFile2.snapshotId(), statisticsFile2).commit();

waitUntilAfter(table.currentSnapshot().timestampMillis());

Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
List<Object[]> output =
sql(
"CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);
Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics file").isEqualTo(1L);

table.refresh();
List<StatisticsFile> statsWithSnapshotId1 =
table.statisticsFiles().stream()
.filter(statisticsFile -> statisticsFile.snapshotId() == statisticsFile1.snapshotId())
.collect(Collectors.toList());
Assertions.assertThat(statsWithSnapshotId1)
.as(
"Statistics file entry in TableMetadata should be deleted for the snapshot %s",
statisticsFile1.snapshotId())
.isEmpty();
Assertions.assertThat(table.statisticsFiles())
.as(
"Statistics file entry in TableMetadata should be present for the snapshot %s",
statisticsFile2.snapshotId())
.extracting(StatisticsFile::snapshotId)
.containsExactly(statisticsFile2.snapshotId());

Assertions.assertThat(new File(statsFileLocation1))
.as("Statistics file should not exist for snapshot %s", statisticsFile1.snapshotId())
.doesNotExist();

Assertions.assertThat(new File(statsFileLocation2))
.as("Statistics file should exist for snapshot %s", statisticsFile2.snapshotId())
.exists();
}

private StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
puffinWriter.add(
new Blob(
"some-blob-type",
ImmutableList.of(1),
snapshotId,
snapshotSequenceNumber,
ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
puffinWriter.finish();

return new GenericStatisticsFile(
snapshotId,
statsLocation,
puffinWriter.fileSize(),
puffinWriter.footerSize(),
puffinWriter.writtenBlobsMetadata().stream()
.map(GenericBlobMetadata::from)
.collect(ImmutableList.toImmutableList()));
}
}

private String statsFileLocation(String tableLocation) {
String statsFileName = "stats-file-" + UUID.randomUUID();
return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
Expand All @@ -43,6 +44,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.NotFoundException;
Expand Down Expand Up @@ -80,6 +82,7 @@ abstract class BaseSparkAction<ThisT> {

protected static final String MANIFEST = "Manifest";
protected static final String MANIFEST_LIST = "Manifest List";
protected static final String STATISTICS_FILES = "Statistics Files";
protected static final String OTHERS = "Others";

protected static final String FILE_PATH = "file_path";
Expand Down Expand Up @@ -200,6 +203,18 @@ protected Dataset<FileInfo> manifestListDS(Table table, Set<Long> snapshotIds) {
return toFileInfoDS(manifestLists, MANIFEST_LIST);
}

protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long> snapshotIds) {
Predicate<StatisticsFile> predicate;
if (snapshotIds == null) {
predicate = statisticsFile -> true;
} else {
predicate = statisticsFile -> snapshotIds.contains(statisticsFile.snapshotId());
}

List<String> statisticsFiles = ReachableFileUtil.statisticsFilesLocations(table, predicate);
return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
}

protected Dataset<FileInfo> otherMetadataFileDS(Table table) {
return otherMetadataFileDS(table, false /* include all reachable old metadata locations */);
}
Expand Down Expand Up @@ -297,6 +312,7 @@ static class DeleteSummary {
private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
private final AtomicLong manifestsCount = new AtomicLong(0L);
private final AtomicLong manifestListsCount = new AtomicLong(0L);
private final AtomicLong statisticsFilesCount = new AtomicLong(0L);
private final AtomicLong otherFilesCount = new AtomicLong(0L);

public void deletedFiles(String type, int numFiles) {
Expand All @@ -315,6 +331,9 @@ public void deletedFiles(String type, int numFiles) {
} else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
manifestListsCount.addAndGet(numFiles);

} else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
statisticsFilesCount.addAndGet(numFiles);

} else if (OTHERS.equalsIgnoreCase(type)) {
otherFilesCount.addAndGet(numFiles);

Expand Down Expand Up @@ -344,6 +363,10 @@ public void deletedFile(String path, String type) {
manifestListsCount.incrementAndGet();
LOG.debug("Deleted manifest list: {}", path);

} else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
statisticsFilesCount.incrementAndGet();
LOG.debug("Deleted statistics file: {}", path);

} else if (OTHERS.equalsIgnoreCase(type)) {
otherFilesCount.incrementAndGet();
LOG.debug("Deleted other metadata file: {}", path);
Expand Down Expand Up @@ -373,6 +396,10 @@ public long manifestListsCount() {
return manifestListsCount.get();
}

public long statisticsFilesCount() {
return statisticsFilesCount.get();
}

public long otherFilesCount() {
return otherFilesCount.get();
}
Expand All @@ -383,6 +410,7 @@ public long totalFilesCount() {
+ equalityDeleteFilesCount()
+ manifestsCount()
+ manifestListsCount()
+ statisticsFilesCount()
+ otherFilesCount();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.actions.ImmutableExpireSnapshots;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -245,7 +245,8 @@ private Dataset<FileInfo> fileDS(TableMetadata metadata, Set<Long> snapshotIds)
Table staticTable = newStaticTable(metadata, table.io());
return contentFileDS(staticTable, snapshotIds)
.union(manifestDS(staticTable, snapshotIds))
.union(manifestListDS(staticTable, snapshotIds));
.union(manifestListDS(staticTable, snapshotIds))
.union(statisticsFileDS(staticTable, snapshotIds));
}

private Set<Long> findExpiredSnapshotIds(
Expand Down Expand Up @@ -277,11 +278,13 @@ private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {

LOG.info("Deleted {} total files", summary.totalFilesCount());

return new BaseExpireSnapshotsActionResult(
summary.dataFilesCount(),
summary.positionDeleteFilesCount(),
summary.equalityDeleteFilesCount(),
summary.manifestsCount(),
summary.manifestListsCount());
return ImmutableExpireSnapshots.Result.builder()
.deletedDataFilesCount(summary.dataFilesCount())
.deletedPositionDeleteFilesCount(summary.positionDeleteFilesCount())
.deletedEqualityDeleteFilesCount(summary.equalityDeleteFilesCount())
.deletedManifestsCount(summary.manifestsCount())
.deletedManifestListsCount(summary.manifestListsCount())
.deletedStatisticsFilesCount(summary.statisticsFilesCount())
.build();
}
}
Loading