Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::scanManifestsWith(java.util.concurrent.ExecutorService)"
justification: "Accept all changes prior to introducing API compatibility checks"
- code: "java.method.addedToInterface"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay with this given other changes, upcoming 1.0 and that it is mostly Iceberg itself that provides implementations of this interface. Even if someone has custom actions, they probably reuse the provided result implementation.

If other folks are concerned, we could default the new methods.

new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedEqualityDeleteFilesCount()"
justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface"
- code: "java.method.addedToInterface"
new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedPositionDeleteFilesCount()"
justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::planWith(java.util.concurrent.ExecutorService)"
justification: "Accept all changes prior to introducing API compatibility checks"
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/ManifestContent.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ public enum ManifestContent {
public int id() {
return id;
}

public static ManifestContent fromId(int id) {
switch (id) {
case 0: return DATA;
case 1: return DELETES;
}
throw new IllegalArgumentException("Unknown manifest content: " + id);
}
}
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ interface Result {
*/
long deletedDataFilesCount();

/**
* Returns the number of deleted equality delete files.
*/
long deletedEqualityDeleteFilesCount();

/**
* Returns the number of deleted position delete files.
*/
long deletedPositionDeleteFilesCount();

/**
* Returns the number of deleted manifests.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,29 @@
public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result {

private final long deletedDataFilesCount;
private final long deletedPosDeleteFilesCount;
private final long deletedEqDeleteFilesCount;
private final long deletedManifestsCount;
private final long deletedManifestListsCount;

public BaseExpireSnapshotsActionResult(long deletedDataFilesCount,
long deletedManifestsCount,
long deletedManifestListsCount) {
this.deletedDataFilesCount = deletedDataFilesCount;
this.deletedPosDeleteFilesCount = 0;
this.deletedEqDeleteFilesCount = 0;
this.deletedManifestsCount = deletedManifestsCount;
this.deletedManifestListsCount = deletedManifestListsCount;
}

public BaseExpireSnapshotsActionResult(long deletedDataFilesCount,
long deletedPosDeleteFilesCount,
long deletedEqDeleteFilesCount,
long deletedManifestsCount,
long deletedManifestListsCount) {
this.deletedDataFilesCount = deletedDataFilesCount;
this.deletedPosDeleteFilesCount = deletedPosDeleteFilesCount;
this.deletedEqDeleteFilesCount = deletedEqDeleteFilesCount;
this.deletedManifestsCount = deletedManifestsCount;
this.deletedManifestListsCount = deletedManifestListsCount;
}
Expand All @@ -38,6 +54,16 @@ public long deletedDataFilesCount() {
return deletedDataFilesCount;
}

@Override
public long deletedPositionDeleteFilesCount() {
return deletedPosDeleteFilesCount;
}

@Override
public long deletedEqualityDeleteFilesCount() {
return deletedEqDeleteFilesCount;
}

@Override
public long deletedManifestsCount() {
return deletedManifestsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testExpireSnapshotsInEmptyTable() {
List<Object[]> output = sql(
"CALL %s.system.expire_snapshots('%s')",
catalogName, tableIdent);
assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L, 0L)), output);
assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 0L)), output);
}

@Test
Expand All @@ -92,7 +92,7 @@ public void testExpireSnapshotsUsingPositionalArgs() {
"CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')",
catalogName, tableIdent, secondSnapshotTimestamp);
assertEquals("Procedure output must match",
ImmutableList.of(row(0L, 0L, 1L)),
ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)),
output1);

table.refresh();
Expand All @@ -118,7 +118,7 @@ public void testExpireSnapshotsUsingPositionalArgs() {
"CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)",
catalogName, tableIdent, currentTimestamp);
assertEquals("Procedure output must match",
ImmutableList.of(row(2L, 2L, 1L)),
ImmutableList.of(row(2L, 0L, 0L, 2L, 1L)),
output);
}

Expand All @@ -144,7 +144,7 @@ public void testExpireSnapshotUsingNamedArgs() {
"retain_last => 1)",
catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match",
ImmutableList.of(row(0L, 0L, 1L)),
ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)),
output);
}

Expand Down Expand Up @@ -213,7 +213,8 @@ public void testConcurrentExpireSnapshots() {
"max_concurrent_deletes => %s," +
"retain_last => 1)",
catalogName, currentTimestamp, tableIdent, 4);
assertEquals("Expiring snapshots concurrently should succeed", ImmutableList.of(row(0L, 0L, 3L)), output);
assertEquals("Expiring snapshots concurrently should succeed",
ImmutableList.of(row(0L, 0L, 0L, 0L, 3L)), output);
}

@Test
Expand Down Expand Up @@ -274,12 +275,14 @@ public void testExpireDeleteFiles() throws Exception {
Assert.assertTrue("Delete file should still exist", localFs.exists(deleteFilePath));

Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
sql("CALL %s.system.expire_snapshots(" +
List<Object[]> output = sql("CALL %s.system.expire_snapshots(" +
"older_than => TIMESTAMP '%s'," +
"table => '%s'," +
"retain_last => 1)",
catalogName, currentTimestamp, tableIdent);

assertEquals("Should deleted 1 data and pos delete file and 4 manifests and lists (one for each txn)",
ImmutableList.of(row(1L, 1L, 0L, 4L, 4L)), output);
Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath));
Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath));
}
Expand All @@ -306,7 +309,7 @@ public void testExpireSnapshotWithStreamResultsEnabled() {
"retain_last => 1, " +
"stream_results => true)",
catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 1L)), output);
assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)), output);
}

@Test
Expand Down Expand Up @@ -337,7 +340,7 @@ public void testExpireSnapshotsProcedureWorksWithSqlComments() {
List<Object[]> output = sql(
callStatement, catalogName, currentTimestamp, tableIdent);
assertEquals("Procedure output must match",
ImmutableList.of(row(0L, 0L, 1L)),
ImmutableList.of(row(0L, 0L, 0L, 0L, 1L)),
output);

table.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -215,7 +216,7 @@ private ExpireSnapshots.Result doExecute() {

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, table.io());
return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
return buildValidContentFileWithTypeDF(staticTable)
.union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
.union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
}
Expand All @@ -228,6 +229,8 @@ private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
*/
private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
AtomicLong dataFileCount = new AtomicLong(0L);
AtomicLong posDeleteFileCount = new AtomicLong(0L);
AtomicLong eqDeleteFileCount = new AtomicLong(0L);
AtomicLong manifestCount = new AtomicLong(0L);
AtomicLong manifestListCount = new AtomicLong(0L);

Expand All @@ -243,23 +246,31 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
deleteFunc.accept(file);
switch (type) {
case CONTENT_FILE:
dataFileCount.incrementAndGet();
LOG.trace("Deleted Content File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
LOG.debug("Deleted Manifest: {}", file);
break;
case MANIFEST_LIST:
manifestListCount.incrementAndGet();
LOG.debug("Deleted Manifest List: {}", file);
break;

if (FileContent.DATA.name().equalsIgnoreCase(type)) {
dataFileCount.incrementAndGet();
LOG.trace("Deleted Data File: {}", file);
} else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
posDeleteFileCount.incrementAndGet();
LOG.trace("Deleted Positional Delete File: {}", file);
} else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
eqDeleteFileCount.incrementAndGet();
LOG.trace("Deleted Equality Delete File: {}", file);
} else if (MANIFEST.equals(type)) {
manifestCount.incrementAndGet();
LOG.debug("Deleted Manifest: {}", file);
} else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
manifestListCount.incrementAndGet();
LOG.debug("Deleted Manifest List: {}", file);
} else {
throw new ValidationException("Illegal file type: %s", type);
}
});

LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get());
return new BaseExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get());
long contentFileCount = dataFileCount.get() + posDeleteFileCount.get() + eqDeleteFileCount.get();
LOG.info("Deleted {} total files", contentFileCount + manifestCount.get() + manifestListCount.get());

return new BaseExpireSnapshotsActionResult(dataFileCount.get(), posDeleteFileCount.get(),
eqDeleteFileCount.get(), manifestCount.get(), manifestListCount.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,26 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.actions.Action;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.JobGroupUtils;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
Expand All @@ -48,6 +53,7 @@
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
import static org.apache.spark.sql.functions.col;
Expand Down Expand Up @@ -122,18 +128,29 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
return new BaseTable(ops, metadataFileLocation);
}

// builds a DF of delete and data file locations by reading all manifests
protected Dataset<Row> buildValidContentFileDF(Table table) {
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));
// builds a DF of delete and data file path and type by reading all manifests
protected Dataset<Row> buildValidContentFileWithTypeDF(Table table) {
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));

Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, ALL_MANIFESTS)
.selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
.selectExpr(
"content",
"path",
"length",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks
.as(Encoders.bean(ManifestFileBean.class));

return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH);
return allManifests
.flatMap(new ReadManifest(tableBroadcast), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.toDF(FILE_PATH, FILE_TYPE);
}

// builds a DF of delete and data file paths by reading all manifests
protected Dataset<Row> buildValidContentFileDF(Table table) {
return buildValidContentFileWithTypeDF(table).select(FILE_PATH);
}

protected Dataset<Row> buildManifestFileDF(Table table) {
Expand Down Expand Up @@ -176,16 +193,39 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
return SparkTableUtil.loadMetadataTable(spark, table, type);
}

private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
private final Broadcast<FileIO> io;
private static class ReadManifest implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
private final Broadcast<Table> table;

ReadManifest(Broadcast<FileIO> io) {
this.io = io;
ReadManifest(Broadcast<Table> table) {
this.table = table;
}

@Override
public Iterator<String> call(ManifestFileBean manifest) {
return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
return new ClosingIterator<>(entries(manifest));
}

public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
FileIO io = table.getValue().io();
Map<Integer, PartitionSpec> specs = table.getValue().specs();
ImmutableList<String> projection = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());

switch (manifest.content()) {
case DATA:
return CloseableIterator.transform(
ManifestFiles.read(manifest, io, specs).select(projection).iterator(),
ReadManifest::contentFileWithType);
case DELETES:
return CloseableIterator.transform(
ManifestFiles.readDeleteManifest(manifest, io, specs).select(projection).iterator(),
ReadManifest::contentFileWithType);
default:
throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
}
}

static Tuple2<String, String> contentFileWithType(ContentFile<?> file) {
return new Tuple2<>(file.path().toString(), file.content().toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class ManifestFileBean implements ManifestFile {
private Long length = null;
private Integer partitionSpecId = null;
private Long addedSnapshotId = null;
private Integer content = null;

public String getPath() {
return path;
Expand Down Expand Up @@ -62,6 +63,14 @@ public void setAddedSnapshotId(Long addedSnapshotId) {
this.addedSnapshotId = addedSnapshotId;
}

public Integer getContent() {
return content;
}

public void setContent(Integer content) {
this.content = content;
}

@Override
public String path() {
return path;
Expand All @@ -79,7 +88,7 @@ public int partitionSpecId() {

@Override
public ManifestContent content() {
return ManifestContent.DATA;
return ManifestContent.fromId(content);
}

@Override
Expand Down
Loading