Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ interface Result {
*/
long deletedDataFilesCount();

/**
* Return the number that deleted delete files.
*/
long deletedDeleteFilesCount();

/**
* Returns the number of deleted manifests.
*/
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ public class AllManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())
)))
);
))),
Types.NestedField.optional(14, "content", Types.IntegerType.get())
);

AllManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".all_manifests");
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public static CloseableIterable<String> readPaths(ManifestFile manifest, FileIO
entry -> entry.file().path().toString());
}

public static CloseableIterable<String> readDeleteFiles(ManifestFile manifestFile, FileIO io) {
return CloseableIterable.transform(
readDeleteManifest(manifestFile, io, null).select(ImmutableList.of("file_path")).liveEntries(),
entry -> entry.file().path().toString());
}

/**
* Returns a new {@link ManifestReader} for a {@link ManifestFile}.
* <p>
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/java/org/apache/iceberg/ManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public class ManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())
)))
);
))),
Types.NestedField.required(14, "content", Types.IntegerType.get())
);

private final PartitionSpec spec;

Expand Down Expand Up @@ -94,8 +95,9 @@ static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile man
manifest.addedFilesCount(),
manifest.existingFilesCount(),
manifest.deletedFilesCount(),
partitionSummariesToRows(spec, manifest.partitions())
);
partitionSummariesToRows(spec, manifest.partitions()),
manifest.content().id()
);
}

static List<StaticDataTask.Row> partitionSummariesToRows(PartitionSpec spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result {

private final long deletedDataFilesCount;
private final long deletedDeleteFilesCount;
private final long deletedManifestsCount;
private final long deletedManifestListsCount;

public BaseExpireSnapshotsActionResult(long deletedDataFilesCount,
long deletedDeleteFilesCount,
long deletedManifestsCount,
long deletedManifestListsCount) {
this.deletedDataFilesCount = deletedDataFilesCount;
this.deletedDeleteFilesCount = deletedDeleteFilesCount;
this.deletedManifestsCount = deletedManifestsCount;
this.deletedManifestListsCount = deletedManifestListsCount;
}
Expand All @@ -38,6 +41,11 @@ public long deletedDataFilesCount() {
return deletedDataFilesCount;
}

@Override
public long deletedDeleteFilesCount() {
return deletedDeleteFilesCount;
}

@Override
public long deletedManifestsCount() {
return deletedManifestsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@
public class ExpireSnapshotsActionResult {

private final Long dataFilesDeleted;
private final Long deleteFilesDeleted;
private final Long manifestFilesDeleted;
private final Long manifestListsDeleted;

static ExpireSnapshotsActionResult wrap(ExpireSnapshots.Result result) {
return new ExpireSnapshotsActionResult(
result.deletedDataFilesCount(),
result.deletedDeleteFilesCount(),
result.deletedManifestsCount(),
result.deletedManifestListsCount());
}

public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted) {
public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long deleteFilesDeleted,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for public class, should keep existing constructor and add new ones.

Long manifestFilesDeleted, Long manifestListsDeleted) {
this.dataFilesDeleted = dataFilesDeleted;
this.deleteFilesDeleted = deleteFilesDeleted;
this.manifestFilesDeleted = manifestFilesDeleted;
this.manifestListsDeleted = manifestListsDeleted;
}
Expand All @@ -43,6 +47,10 @@ public Long dataFilesDeleted() {
return dataFilesDeleted;
}

public Long deleteFiledDeleted() {
return deleteFilesDeleted;
}

public Long manifestFilesDeleted() {
return manifestFilesDeleted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ManifestFileBean implements ManifestFile {
private String path = null;
private Long length = null;
private Integer partitionSpecId = null;
private Integer content = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use ManifestContent instead of Integer?

private Long addedSnapshotId = null;

public String getPath() {
Expand Down Expand Up @@ -61,6 +62,14 @@ public void setAddedSnapshotId(Long addedSnapshotId) {
this.addedSnapshotId = addedSnapshotId;
}

public Integer getContent() {
return content;
}

public void setContent(Integer content) {
this.content = content;
}

@Override
public String path() {
return path;
Expand All @@ -78,7 +87,8 @@ public int partitionSpecId() {

@Override
public ManifestContent content() {
return ManifestContent.DATA;
return (content != null && content == ManifestContent.DELETES.id()) ?
ManifestContent.DELETES : ManifestContent.DATA;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class BaseExpireSnapshotsSparkAction
private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);

private static final String DATA_FILE = "Data File";
private static final String DELETE_FILE = "Delete File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";

Expand Down Expand Up @@ -201,8 +202,9 @@ private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) {
private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, this.table.io());
return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE)
.union(appendTypeString(buildValidDeleteFileDF(staticTable), DELETE_FILE)
.union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST))
.union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST));
.union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST)));
}

/**
Expand All @@ -213,6 +215,7 @@ private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
*/
private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
AtomicLong dataFileCount = new AtomicLong(0L);
AtomicLong deleteFileCount = new AtomicLong(0L);
AtomicLong manifestCount = new AtomicLong(0L);
AtomicLong manifestListCount = new AtomicLong(0L);

Expand All @@ -231,7 +234,11 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
switch (type) {
case DATA_FILE:
dataFileCount.incrementAndGet();
LOG.trace("Deleted Data File: {}", file);
LOG.info("Deleted Data File: {}", file);
break;
case DELETE_FILE:
deleteFileCount.incrementAndGet();
LOG.info("Deleted Delete File:{}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
Expand All @@ -244,7 +251,9 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
}
});

LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get());
return new BaseExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get());
LOG.info("Deleted {} total files", dataFileCount.get() + deleteFileCount.get() +
manifestCount.get() + manifestListCount.get());
return new BaseExpireSnapshotsActionResult(dataFileCount.get(), deleteFileCount.get(),
manifestCount.get(), manifestListCount.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ public RemoveOrphanFiles.Result execute() {

private RemoveOrphanFiles.Result doExecute() {
Dataset<Row> validDataFileDF = buildValidDataFileDF(table);
Dataset<Row> validDeleteFileDF = buildValidDeleteFileDF(table);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table, ops);
Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
Dataset<Row> validFileDF = validDataFileDF.union(validDeleteFileDF).union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();

Column actualFileName = filenameUDF.apply(actualFileDF.col("file_path"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Snapshot;
Expand All @@ -45,6 +46,7 @@
import org.apache.iceberg.spark.SparkUtil;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrameReader;
Expand Down Expand Up @@ -152,14 +154,17 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
protected Dataset<Row> buildValidDataFileDF(Table table) {
JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));
return loadAllManifestFileBean(table).filter((FilterFunction<ManifestFileBean>) manifest ->
manifest.content() == ManifestContent.DATA)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should compare using MnaifestContent.DATA.equals(...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess == is also correct for enum

.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path");
}

Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, ALL_MANIFESTS)
.selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks
.as(Encoders.bean(ManifestFileBean.class));

return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path");
protected Dataset<Row> buildValidDeleteFileDF(Table table) {
JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));
return loadAllManifestFileBean(table).filter((FilterFunction<ManifestFileBean>) manifest ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worthwhile to separate the loadAllManifestFileBean into 2 passes instead of one?

manifest.content() == ManifestContent.DELETES)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should compare using MnaifestContent.DELETES.equals(...)

.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path");
}

protected Dataset<Row> buildManifestFileDF(Table table) {
Expand Down Expand Up @@ -190,6 +195,15 @@ protected Dataset<Row> buildValidMetadataFileDF(Table table, TableOperations ops
.orNoop()
.build();

private Dataset<ManifestFileBean> loadAllManifestFileBean(Table table) {
return loadMetadataTable(table, ALL_MANIFESTS)
.selectExpr("path", "length", "partition_spec_id as partitionSpecId", "content",
"added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
.repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks
.as(Encoders.bean(ManifestFileBean.class));
}

private Dataset<Row> loadCatalogMetadataTable(String tableName, MetadataTableType type) {
Preconditions.checkArgument(!LOAD_CATALOG.isNoop(), "Cannot find Spark3Util class but Spark3 is in use");
return LOAD_CATALOG.asStatic().invoke(spark, tableName, type);
Expand Down Expand Up @@ -235,7 +249,13 @@ private static class ReadManifest implements FlatMapFunction<ManifestFileBean, S

@Override
public Iterator<String> call(ManifestFileBean manifest) {
return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
switch (manifest.content()) {
case DATA:
return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
case DELETES:
return new ClosingIterator<>(ManifestFiles.readDeleteFiles(manifest, io.getValue()).iterator());
}
throw new UnsupportedOperationException("Cannot read unknown manifest type: " + manifest.content());
}
}
}
Loading