Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -31,12 +35,15 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
Expand Down Expand Up @@ -191,9 +198,10 @@ private Dataset<Row> buildActualFileDF() {
List<String> matchingFiles = Lists.newArrayList();

Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
PathFilter pathFilter = PartitionAwareHiddenPathFilter.build(table.specs());

// list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver
listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, matchingFiles);
listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);

JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);

Expand All @@ -205,15 +213,16 @@ private Dataset<Row> buildActualFileDF() {
JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);

Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp));
JavaRDD<String> matchingLeafFileRDD =
Comment thread
ulmako marked this conversation as resolved.
Outdated
subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp, pathFilter));

JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF("file_path");
}

private static void listDirRecursively(
String dir, Predicate<FileStatus> predicate, Configuration conf, int maxDepth,
int maxDirectSubDirs, List<String> remainingSubDirs, List<String> matchingFiles) {
int maxDirectSubDirs, List<String> remainingSubDirs, PathFilter pathFilter, List<String> matchingFiles) {

// stop listing whenever we reach the max depth
if (maxDepth <= 0) {
Expand All @@ -227,7 +236,7 @@ private static void listDirRecursively(

List<String> subDirs = Lists.newArrayList();

for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) {
for (FileStatus file : fs.listStatus(path, pathFilter)) {
if (file.isDirectory()) {
subDirs.add(file.getPath().toString());
} else if (file.isFile() && predicate.test(file)) {
Expand All @@ -242,7 +251,8 @@ private static void listDirRecursively(
}

for (String subDir : subDirs) {
listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, matchingFiles);
listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, pathFilter,
Comment thread
ulmako marked this conversation as resolved.
Outdated
matchingFiles);
}
} catch (IOException e) {
throw new RuntimeIOException(e);
Expand All @@ -251,7 +261,8 @@ private static void listDirRecursively(

private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
Broadcast<SerializableConfiguration> conf,
long olderThanTimestamp) {
long olderThanTimestamp,
PathFilter pathFilter) {

return dirs -> {
List<String> subDirs = Lists.newArrayList();
Expand All @@ -263,7 +274,7 @@ private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
int maxDirectSubDirs = Integer.MAX_VALUE;

dirs.forEachRemaining(dir -> {
listDirRecursively(dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, files);
listDirRecursively(dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, pathFilter, files);
});

if (!subDirs.isEmpty()) {
Expand All @@ -273,4 +284,35 @@ private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
return files.iterator();
};
}

@VisibleForTesting
static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {
Comment thread
ulmako marked this conversation as resolved.

private final Set<String> hiddenPathPartitionNames;

PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
this.hiddenPathPartitionNames = hiddenPathPartitionNames;
}

@Override
public boolean accept(Path path) {
boolean isHiddenPartitionPath = hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
}

static PathFilter build(Map<Integer, PartitionSpec> specs) {
Comment thread
ulmako marked this conversation as resolved.
Outdated
Comment thread
ulmako marked this conversation as resolved.
Outdated
Set<String> partitionNames = new HashSet<>();
Comment thread
ulmako marked this conversation as resolved.
Outdated

specs.values().stream()
.map(PartitionSpec::fields)
.flatMap(List::stream)
.forEach(partitionField -> {
if (partitionField.name().startsWith("_") || partitionField.name().startsWith(".")) {
partitionNames.add(partitionField.name());
}
});

return (partitionNames.isEmpty()) ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -551,6 +554,48 @@ public void testManyLeafPartitions() throws InterruptedException {
Assert.assertEquals("Rows must match", records, actualRecords);
}

@Test
public void testHiddenPartitionPaths() throws InterruptedException {
Schema schema = new Schema(
optional(1, "c1", Types.IntegerType.get()),
optional(2, "_c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get())
Comment thread
ulmako marked this conversation as resolved.
Outdated
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
.truncate("_c2", 2)
.identity("c3")
.build();
Table table = TABLES.create(schema, spec, Maps.newHashMap(), tableLocation);

StructType structType = new StructType()
.add("c1", DataTypes.IntegerType)
.add("_c2", DataTypes.StringType)
.add("c3", DataTypes.StringType);
List<Row> records = Lists.newArrayList(
RowFactory.create(1, "AAAAAAAAAA", "AAAA")
);
Dataset<Row> df = spark.createDataFrame(records, structType).coalesce(1);

df.select("c1", "_c2", "c3")
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);

df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA/c3=AAAA");
df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA/c3=AAAA");

Thread.sleep(1000);
Comment thread
ulmako marked this conversation as resolved.
Outdated

SparkActions actions = SparkActions.get();

DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.execute();

Assert.assertEquals("Should delete 2 files", 2, Iterables.size(result.orphanFileLocations()));
}
Comment thread
ulmako marked this conversation as resolved.

private List<String> snapshotFiles(long snapshotId) {
return spark.read().format("iceberg")
.option("snapshot-id", snapshotId)
Expand Down