Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.util.function.Consumer;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.immutables.value.Value;

/**
* An action that expires snapshots in a table.
*
* <p>Similar to {@link org.apache.iceberg.ExpireSnapshots} but may use a query engine to distribute
* parts of the work.
*/
@Value.Enclosing
public interface ExpireSnapshots extends Action<ExpireSnapshots, ExpireSnapshots.Result> {
/**
* Expires a specific {@link Snapshot} identified by id.
Expand Down Expand Up @@ -98,6 +100,7 @@ public interface ExpireSnapshots extends Action<ExpireSnapshots, ExpireSnapshots
ExpireSnapshots executeDeleteWith(ExecutorService executorService);

/** The action result that contains a summary of the execution. */
@Value.Immutable
interface Result {
/** Returns the number of deleted data files. */
long deletedDataFilesCount();
Expand All @@ -113,5 +116,11 @@ interface Result {

/** Returns the number of deleted manifest lists. */
long deletedManifestListsCount();

/** Returns the number of deleted statistics files. */
@Value.Default
default long deletedStatisticsFilesCount() {
return 0L;
}
}
}
22 changes: 17 additions & 5 deletions core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.hadoop.Util;
Expand Down Expand Up @@ -137,11 +139,21 @@ public static List<String> manifestListLocations(Table table, Set<Long> snapshot
* @return the location of statistics files
*/
public static List<String> statisticsFilesLocations(Table table) {
List<String> statisticsFilesLocations = Lists.newArrayList();
for (StatisticsFile statisticsFile : table.statisticsFiles()) {
statisticsFilesLocations.add(statisticsFile.path());
}
return statisticsFilesLocations(table, statisticsFile -> true);
}

return statisticsFilesLocations;
/**
* Returns locations of statistics files for a table matching the given predicate .
*
* @param table table for which statistics files needs to be listed
* @param predicate predicate for filtering the statistics files
* @return the location of statistics files
*/
public static List<String> statisticsFilesLocations(
Table table, Predicate<StatisticsFile> predicate) {
return table.statisticsFiles().stream()
.filter(predicate)
.map(StatisticsFile::path)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.actions;

/** @deprecated will be removed in 1.3.0. */
@Deprecated
public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result {

private final long deletedDataFilesCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,40 @@

import static org.apache.iceberg.TableProperties.GC_ENABLED;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -63,7 +75,8 @@ public void testExpireSnapshotsInEmptyTable() {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);

List<Object[]> output = sql("CALL %s.system.expire_snapshots('%s')", catalogName, tableIdent);
assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 0L)), output);
assertEquals(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added one more entry for deletedStatisticsFilesCount for all the existing cases.

"Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 0L, 0L)), output);
}

@Test
Expand Down Expand Up @@ -91,7 +104,8 @@ public void testExpireSnapshotsUsingPositionalArgs() {
sql(
"CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')",
catalogName, tableIdent, secondSnapshotTimestamp);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output1);
assertEquals(
"Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output1);

table.refresh();

Expand All @@ -117,7 +131,8 @@ public void testExpireSnapshotsUsingPositionalArgs() {
sql(
"CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)",
catalogName, tableIdent, currentTimestamp);
assertEquals("Procedure output must match", ImmutableList.of(row(2L, 0L, 0L, 2L, 1L)), output);
assertEquals(
"Procedure output must match", ImmutableList.of(row(2L, 0L, 0L, 2L, 1L, 0L)), output);
}

@Test
Expand All @@ -137,12 +152,10 @@ public void testExpireSnapshotUsingNamedArgs() {

List<Object[]> output =
sql(
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
+ "retain_last => 1)",
"CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output);
assertEquals(
"Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output);
}

@Test
Expand Down Expand Up @@ -231,12 +244,11 @@ public void testConcurrentExpireSnapshots() {
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
+ "max_concurrent_deletes => %s,"
+ "retain_last => 1)",
+ "max_concurrent_deletes => %s)",
catalogName, currentTimestamp, tableIdent, 4);
assertEquals(
"Expiring snapshots concurrently should succeed",
ImmutableList.of(row(0L, 0L, 0L, 0L, 3L)),
ImmutableList.of(row(0L, 0L, 0L, 0L, 3L, 0L)),
output);
}

Expand Down Expand Up @@ -283,7 +295,7 @@ public void testExpireDeleteFiles() throws Exception {
.append();
sql("DELETE FROM %s WHERE id=1", tableName);

Table table = Spark3Util.loadIcebergTable(spark, tableName);
Table table = validationCatalog.loadTable(tableIdent);

Assert.assertEquals(
"Should have 1 delete manifest", 1, TestHelpers.deleteManifests(table).size());
Expand Down Expand Up @@ -318,15 +330,12 @@ public void testExpireDeleteFiles() throws Exception {
Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
List<Object[]> output =
sql(
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
+ "retain_last => 1)",
"CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);

assertEquals(
"Should deleted 1 data and pos delete file and 4 manifests and lists (one for each txn)",
ImmutableList.of(row(1L, 1L, 0L, 4L, 4L)),
ImmutableList.of(row(1L, 1L, 0L, 4L, 4L, 0L)),
output);
Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath));
Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath));
Expand All @@ -352,10 +361,10 @@ public void testExpireSnapshotWithStreamResultsEnabled() {
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
+ "retain_last => 1, "
+ "stream_results => true)",
catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output);
assertEquals(
"Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output);
}

@Test
Expand Down Expand Up @@ -434,13 +443,102 @@ public void testExpireSnapshotsProcedureWorksWithSqlComments() {
+ "/* And comments that span *multiple* \n"
+ " lines */ CALL /* this is the actual CALL */ %s.system.expire_snapshots("
+ " older_than => TIMESTAMP '%s',"
+ " table => '%s',"
+ " retain_last => 1)";
+ " table => '%s')";
List<Object[]> output = sql(callStatement, catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output);
assertEquals(
"Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L, 0L)), output);

table.refresh();

Assert.assertEquals("Should be 1 snapshot remaining", 1, Iterables.size(table.snapshots()));
}

@Test
public void testExpireSnapshotsWithStatisticFiles() throws Exception {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
String statsFileLocation1 = statsFileLocation(table.location());
StatisticsFile statisticsFile1 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
statsFileLocation1,
table.io());
table.updateStatistics().setStatistics(statisticsFile1.snapshotId(), statisticsFile1).commit();

sql("INSERT INTO %s SELECT 20, 'def'", tableName);
table.refresh();
String statsFileLocation2 = statsFileLocation(table.location());
StatisticsFile statisticsFile2 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
statsFileLocation2,
table.io());
table.updateStatistics().setStatistics(statisticsFile2.snapshotId(), statisticsFile2).commit();

waitUntilAfter(table.currentSnapshot().timestampMillis());

Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
List<Object[]> output =
sql(
"CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);
Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics file").isEqualTo(1L);

table.refresh();
List<StatisticsFile> statsWithSnapshotId1 =
table.statisticsFiles().stream()
.filter(statisticsFile -> statisticsFile.snapshotId() == statisticsFile1.snapshotId())
.collect(Collectors.toList());
Assertions.assertThat(statsWithSnapshotId1)
.as(
"Statistics file entry in TableMetadata should be deleted for the snapshot %s",
statisticsFile1.snapshotId())
.isEmpty();
Assertions.assertThat(table.statisticsFiles())
.as(
"Statistics file entry in TableMetadata should be present for the snapshot %s",
statisticsFile2.snapshotId())
.extracting(StatisticsFile::snapshotId)
.containsExactly(statisticsFile2.snapshotId());

Assertions.assertThat(new File(statsFileLocation1))
.as("Statistics file should not exist for snapshot %s", statisticsFile1.snapshotId())
.doesNotExist();

Assertions.assertThat(new File(statsFileLocation2))
.as("Statistics file should exist for snapshot %s", statisticsFile2.snapshotId())
.exists();
}

private StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
puffinWriter.add(
new Blob(
"some-blob-type",
ImmutableList.of(1),
snapshotId,
snapshotSequenceNumber,
ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
puffinWriter.finish();

return new GenericStatisticsFile(
snapshotId,
statsLocation,
puffinWriter.fileSize(),
puffinWriter.footerSize(),
puffinWriter.writtenBlobsMetadata().stream()
.map(GenericBlobMetadata::from)
.collect(ImmutableList.toImmutableList()));
}
}

private String statsFileLocation(String tableLocation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it appears that a few other tests use

String statsFileName = "stats-file-" + UUID.randomUUID();
    File statsLocation =
        (new URI(table.location()).isAbsolute()
                ? new File(new URI(table.location()))
                : new File(table.location()))
            .toPath()
            .resolve("data")
            .resolve(statsFileName)
            .toFile();

so it might make sense to use a similar approach here (rather than replacing file: in the string

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have got a comment to not use File and Uri in the core impl PR for this logic.
#6090 (comment)

Hence, I handled it like this. In the core test case table.location() returns a path without file:. But this module returns with file:. Many other places in this module we are replacing file: in table location.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks for clarifying

String statsFileName = "stats-file-" + UUID.randomUUID();
return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName;
}
}
Loading