diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java index 0e02f4bec964..286ab115d14f 100644 --- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java +++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java @@ -22,6 +22,7 @@ import java.util.function.Consumer; import org.apache.iceberg.Snapshot; import org.apache.iceberg.io.SupportsBulkOperations; +import org.immutables.value.Value; /** * An action that expires snapshots in a table. @@ -29,6 +30,7 @@ *

Similar to {@link org.apache.iceberg.ExpireSnapshots} but may use a query engine to distribute * parts of the work. */ +@Value.Enclosing public interface ExpireSnapshots extends Action { /** * Expires a specific {@link Snapshot} identified by id. @@ -98,6 +100,7 @@ public interface ExpireSnapshots extends Action manifestListLocations(Table table, Set snapshot * @return the location of statistics files */ public static List statisticsFilesLocations(Table table) { - List statisticsFilesLocations = Lists.newArrayList(); - for (StatisticsFile statisticsFile : table.statisticsFiles()) { - statisticsFilesLocations.add(statisticsFile.path()); - } + return statisticsFilesLocations(table, statisticsFile -> true); + } - return statisticsFilesLocations; + /** + * Returns locations of statistics files for a table matching the given predicate . + * + * @param table table for which statistics files needs to be listed + * @param predicate predicate for filtering the statistics files + * @return the location of statistics files + */ + public static List statisticsFilesLocations( + Table table, Predicate predicate) { + return table.statisticsFiles().stream() + .filter(predicate) + .map(StatisticsFile::path) + .collect(Collectors.toList()); } } diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java index 93fd8431b34e..d750bcdbc9f0 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.actions; +/** @deprecated will be removed in 1.3.0. */ +@Deprecated public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result { private final long deletedDataFilesCount; diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java index e7f648ed6f7d..efb3d43668f1 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java @@ -20,28 +20,40 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; +import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -63,7 +75,8 @@ public void testExpireSnapshotsInEmptyTable() { sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); List output = sql("CALL %s.system.expire_snapshots('%s')", catalogName, tableIdent); - assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 0L)), output); + assertEquals( + "Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 0L, 0L)), output); } @Test @@ -91,7 +104,8 @@ public void testExpireSnapshotsUsingPositionalArgs() { sql( "CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')", catalogName, tableIdent, secondSnapshotTimestamp); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output1); + assertEquals( + "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output1); table.refresh(); @@ -117,7 +131,8 @@ public void testExpireSnapshotsUsingPositionalArgs() { sql( "CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)", catalogName, tableIdent, currentTimestamp); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 0L, 0L, 2L, 1L)), output); + assertEquals( + "Procedure output must match", ImmutableList.of(row(2L, 0L, 0L, 2L, 1L, 0L)), output); } @Test @@ -137,12 +152,10 @@ public void testExpireSnapshotUsingNamedArgs() { List output = sql( - "CALL %s.system.expire_snapshots(" - + "older_than => TIMESTAMP '%s'," - + "table => '%s'," - + "retain_last => 1)", + "CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", catalogName, currentTimestamp, tableIdent); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output); + assertEquals( + "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output); } @Test @@ -231,12 +244,11 @@ public void testConcurrentExpireSnapshots() { "CALL %s.system.expire_snapshots(" + "older_than => TIMESTAMP '%s'," + "table => '%s'," - + "max_concurrent_deletes => %s," - + "retain_last => 1)", + + "max_concurrent_deletes => %s)", catalogName, currentTimestamp, tableIdent, 4); assertEquals( "Expiring snapshots concurrently should succeed", - ImmutableList.of(row(0L, 0L, 0L, 0L, 3L)), + ImmutableList.of(row(0L, 0L, 0L, 0L, 3L, 0L)), output); } @@ -283,7 +295,7 @@ public void testExpireDeleteFiles() throws Exception { .append(); sql("DELETE FROM %s WHERE id=1", tableName); - Table table = Spark3Util.loadIcebergTable(spark, tableName); + Table table = validationCatalog.loadTable(tableIdent); Assert.assertEquals( "Should have 1 delete manifest", 1, TestHelpers.deleteManifests(table).size()); @@ -318,15 +330,12 @@ public void testExpireDeleteFiles() throws Exception { Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); List output = sql( - "CALL %s.system.expire_snapshots(" - + "older_than => TIMESTAMP '%s'," - + "table => '%s'," - + "retain_last => 1)", + "CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", catalogName, currentTimestamp, tableIdent); assertEquals( "Should deleted 1 data and pos delete file and 4 manifests and lists (one for each txn)", - ImmutableList.of(row(1L, 1L, 0L, 4L, 4L)), + ImmutableList.of(row(1L, 1L, 0L, 4L, 4L, 0L)), output); Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath)); Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath)); @@ -352,10 +361,10 @@ public void testExpireSnapshotWithStreamResultsEnabled() { "CALL %s.system.expire_snapshots(" + "older_than => TIMESTAMP '%s'," + "table => '%s'," - + "retain_last => 1, " + "stream_results => true)", catalogName, currentTimestamp, tableIdent); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output); + assertEquals( + "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output); } @Test @@ -434,13 +443,102 @@ public void testExpireSnapshotsProcedureWorksWithSqlComments() { + "/* And comments that span *multiple* \n" + " lines */ CALL /* this is the actual CALL */ %s.system.expire_snapshots(" + " older_than => TIMESTAMP '%s'," - + " table => '%s'," - + " retain_last => 1)"; + + " table => '%s')"; List output = sql(callStatement, catalogName, currentTimestamp, tableIdent); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output); + assertEquals( + "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output); table.refresh(); Assert.assertEquals("Should be 1 snapshot remaining", 1, Iterables.size(table.snapshots())); } + + @Test + public void testExpireSnapshotsWithStatisticFiles() throws Exception { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName); + Table table = validationCatalog.loadTable(tableIdent); + String statsFileLocation1 = statsFileLocation(table.location()); + StatisticsFile statisticsFile1 = + writeStatsFile( + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + statsFileLocation1, + table.io()); + table.updateStatistics().setStatistics(statisticsFile1.snapshotId(), statisticsFile1).commit(); + + sql("INSERT INTO %s SELECT 20, 'def'", tableName); + table.refresh(); + String statsFileLocation2 = statsFileLocation(table.location()); + StatisticsFile statisticsFile2 = + writeStatsFile( + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + statsFileLocation2, + table.io()); + table.updateStatistics().setStatistics(statisticsFile2.snapshotId(), statisticsFile2).commit(); + + waitUntilAfter(table.currentSnapshot().timestampMillis()); + + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + List output = + sql( + "CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", + catalogName, currentTimestamp, tableIdent); + Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics file").isEqualTo(1L); + + table.refresh(); + List statsWithSnapshotId1 = + table.statisticsFiles().stream() + .filter(statisticsFile -> statisticsFile.snapshotId() == statisticsFile1.snapshotId()) + .collect(Collectors.toList()); + Assertions.assertThat(statsWithSnapshotId1) + .as( + "Statistics file entry in TableMetadata should be deleted for the snapshot %s", + statisticsFile1.snapshotId()) + .isEmpty(); + Assertions.assertThat(table.statisticsFiles()) + .as( + "Statistics file entry in TableMetadata should be present for the snapshot %s", + statisticsFile2.snapshotId()) + .extracting(StatisticsFile::snapshotId) + .containsExactly(statisticsFile2.snapshotId()); + + Assertions.assertThat(new File(statsFileLocation1)) + .as("Statistics file should not exist for snapshot %s", statisticsFile1.snapshotId()) + .doesNotExist(); + + Assertions.assertThat(new File(statsFileLocation2)) + .as("Statistics file should exist for snapshot %s", statisticsFile2.snapshotId()) + .exists(); + } + + private StatisticsFile writeStatsFile( + long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) + throws IOException { + try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) { + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + + return new GenericStatisticsFile( + snapshotId, + statsLocation, + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + } + + private String statsFileLocation(String tableLocation) { + String statsFileName = "stats-file-" + UUID.randomUUID(); + return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName; + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 1b285e8caca8..3c007c6214c2 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.iceberg.AllManifestsTable; import org.apache.iceberg.BaseTable; @@ -43,6 +44,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.NotFoundException; @@ -80,6 +82,7 @@ abstract class BaseSparkAction { protected static final String MANIFEST = "Manifest"; protected static final String MANIFEST_LIST = "Manifest List"; + protected static final String STATISTICS_FILES = "Statistics Files"; protected static final String OTHERS = "Others"; protected static final String FILE_PATH = "file_path"; @@ -200,6 +203,18 @@ protected Dataset manifestListDS(Table table, Set snapshotIds) { return toFileInfoDS(manifestLists, MANIFEST_LIST); } + protected Dataset statisticsFileDS(Table table, Set snapshotIds) { + Predicate predicate; + if (snapshotIds == null) { + predicate = statisticsFile -> true; + } else { + predicate = statisticsFile -> snapshotIds.contains(statisticsFile.snapshotId()); + } + + List statisticsFiles = ReachableFileUtil.statisticsFilesLocations(table, predicate); + return toFileInfoDS(statisticsFiles, STATISTICS_FILES); + } + protected Dataset otherMetadataFileDS(Table table) { return otherMetadataFileDS(table, false /* include all reachable old metadata locations */); } @@ -297,6 +312,7 @@ static class DeleteSummary { private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L); private final AtomicLong manifestsCount = new AtomicLong(0L); private final AtomicLong manifestListsCount = new AtomicLong(0L); + private final AtomicLong statisticsFilesCount = new AtomicLong(0L); private final AtomicLong otherFilesCount = new AtomicLong(0L); public void deletedFiles(String type, int numFiles) { @@ -315,6 +331,9 @@ public void deletedFiles(String type, int numFiles) { } else if (MANIFEST_LIST.equalsIgnoreCase(type)) { manifestListsCount.addAndGet(numFiles); + } else if (STATISTICS_FILES.equalsIgnoreCase(type)) { + statisticsFilesCount.addAndGet(numFiles); + } else if (OTHERS.equalsIgnoreCase(type)) { otherFilesCount.addAndGet(numFiles); @@ -344,6 +363,10 @@ public void deletedFile(String path, String type) { manifestListsCount.incrementAndGet(); LOG.debug("Deleted manifest list: {}", path); + } else if (STATISTICS_FILES.equalsIgnoreCase(type)) { + statisticsFilesCount.incrementAndGet(); + LOG.debug("Deleted statistics file: {}", path); + } else if (OTHERS.equalsIgnoreCase(type)) { otherFilesCount.incrementAndGet(); LOG.debug("Deleted other metadata file: {}", path); @@ -373,6 +396,10 @@ public long manifestListsCount() { return manifestListsCount.get(); } + public long statisticsFilesCount() { + return statisticsFilesCount.get(); + } + public long otherFilesCount() { return otherFilesCount.get(); } @@ -383,6 +410,7 @@ public long totalFilesCount() { + equalityDeleteFilesCount() + manifestsCount() + manifestListsCount() + + statisticsFilesCount() + otherFilesCount(); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index 95e153a9a5a6..a1db08663ef4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -32,8 +32,8 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult; import org.apache.iceberg.actions.ExpireSnapshots; +import org.apache.iceberg.actions.ImmutableExpireSnapshots; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -245,7 +245,8 @@ private Dataset fileDS(TableMetadata metadata, Set snapshotIds) Table staticTable = newStaticTable(metadata, table.io()); return contentFileDS(staticTable, snapshotIds) .union(manifestDS(staticTable, snapshotIds)) - .union(manifestListDS(staticTable, snapshotIds)); + .union(manifestListDS(staticTable, snapshotIds)) + .union(statisticsFileDS(staticTable, snapshotIds)); } private Set findExpiredSnapshotIds( @@ -277,11 +278,13 @@ private ExpireSnapshots.Result deleteFiles(Iterator files) { LOG.info("Deleted {} total files", summary.totalFilesCount()); - return new BaseExpireSnapshotsActionResult( - summary.dataFilesCount(), - summary.positionDeleteFilesCount(), - summary.equalityDeleteFilesCount(), - summary.manifestsCount(), - summary.manifestListsCount()); + return ImmutableExpireSnapshots.Result.builder() + .deletedDataFilesCount(summary.dataFilesCount()) + .deletedPositionDeleteFilesCount(summary.positionDeleteFilesCount()) + .deletedEqualityDeleteFilesCount(summary.equalityDeleteFilesCount()) + .deletedManifestsCount(summary.manifestsCount()) + .deletedManifestListsCount(summary.manifestListsCount()) + .deletedStatisticsFilesCount(summary.statisticsFilesCount()) + .build(); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java index a66310f49389..9d2fc7e467cf 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java @@ -67,7 +67,9 @@ public class ExpireSnapshotsProcedure extends BaseProcedure { new StructField( "deleted_manifest_files_count", DataTypes.LongType, true, Metadata.empty()), new StructField( - "deleted_manifest_lists_count", DataTypes.LongType, true, Metadata.empty()) + "deleted_manifest_lists_count", DataTypes.LongType, true, Metadata.empty()), + new StructField( + "deleted_statistics_files_count", DataTypes.LongType, true, Metadata.empty()) }); public static ProcedureBuilder builder() { @@ -159,7 +161,8 @@ private InternalRow[] toOutputRows(ExpireSnapshots.Result result) { result.deletedPositionDeleteFilesCount(), result.deletedEqualityDeleteFilesCount(), result.deletedManifestsCount(), - result.deletedManifestListsCount()); + result.deletedManifestListsCount(), + result.deletedStatisticsFilesCount()); return new InternalRow[] {row}; }