Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

package org.apache.iceberg.hadoop;

import java.io.Serializable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

/**
* A {@link PathFilter} that filters out hidden paths. A path is considered to
* be hidden when the path name starts with a period ('.') or an underscore ('_').
*/
public class HiddenPathFilter implements PathFilter {
public class HiddenPathFilter implements PathFilter, Serializable {

private static final HiddenPathFilter INSTANCE = new HiddenPathFilter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,30 @@

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -230,9 +237,10 @@ private Dataset<Row> buildActualFileDF() {
List<String> matchingFiles = Lists.newArrayList();

Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());

// list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver
listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, matchingFiles);
listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);

JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);

Expand All @@ -244,15 +252,17 @@ private Dataset<Row> buildActualFileDF() {
JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);

Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp));
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(
listDirsRecursively(conf, olderThanTimestamp, pathFilter)
);

JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
}

private static void listDirRecursively(
String dir, Predicate<FileStatus> predicate, Configuration conf, int maxDepth,
int maxDirectSubDirs, List<String> remainingSubDirs, List<String> matchingFiles) {
int maxDirectSubDirs, List<String> remainingSubDirs, PathFilter pathFilter, List<String> matchingFiles) {

// stop listing whenever we reach the max depth
if (maxDepth <= 0) {
Expand All @@ -266,7 +276,7 @@ private static void listDirRecursively(

List<String> subDirs = Lists.newArrayList();

for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) {
for (FileStatus file : fs.listStatus(path, pathFilter)) {
if (file.isDirectory()) {
subDirs.add(file.getPath().toString());
} else if (file.isFile() && predicate.test(file)) {
Expand All @@ -281,7 +291,8 @@ private static void listDirRecursively(
}

for (String subDir : subDirs) {
listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, matchingFiles);
listDirRecursively(
subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, pathFilter, matchingFiles);
}
} catch (IOException e) {
throw new RuntimeIOException(e);
Expand All @@ -290,7 +301,8 @@ private static void listDirRecursively(

private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
Broadcast<SerializableConfiguration> conf,
long olderThanTimestamp) {
long olderThanTimestamp,
PathFilter pathFilter) {

return dirs -> {
List<String> subDirs = Lists.newArrayList();
Expand All @@ -302,7 +314,8 @@ private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
int maxDirectSubDirs = Integer.MAX_VALUE;

dirs.forEachRemaining(dir -> {
listDirRecursively(dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, files);
listDirRecursively(
dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, pathFilter, files);
});

if (!subDirs.isEmpty()) {
Expand All @@ -312,4 +325,40 @@ private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
return files.iterator();
};
}

/**
* A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be marked
* as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of the characters that
* indicate a hidden path.
*/
@VisibleForTesting
static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {

private final Set<String> hiddenPathPartitionNames;

PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
this.hiddenPathPartitionNames = hiddenPathPartitionNames;
}

@Override
public boolean accept(Path path) {
boolean isHiddenPartitionPath = hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
}

static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
if (specs == null) {
return HiddenPathFilter.get();
}

Set<String> partitionNames = specs.values().stream()
.map(PartitionSpec::fields)
.flatMap(List::stream)
.filter(partitionField -> partitionField.name().startsWith("_") || partitionField.name().startsWith("."))
.map(partitionField -> partitionField.name() + "=")
.collect(Collectors.toSet());

return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -553,6 +556,139 @@ public void testManyLeafPartitions() throws InterruptedException {
Assert.assertEquals("Rows must match", records, actualRecords);
}

@Test
public void testHiddenPartitionPaths() throws InterruptedException {
Schema schema = new Schema(
optional(1, "c1", Types.IntegerType.get()),
optional(2, "_c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
.truncate("_c2", 2)
.identity("c3")
.build();
Table table = TABLES.create(schema, spec, Maps.newHashMap(), tableLocation);

StructType structType = new StructType()
.add("c1", DataTypes.IntegerType)
.add("_c2", DataTypes.StringType)
.add("c3", DataTypes.StringType);
List<Row> records = Lists.newArrayList(
RowFactory.create(1, "AAAAAAAAAA", "AAAA")
);
Dataset<Row> df = spark.createDataFrame(records, structType).coalesce(1);

df.select("c1", "_c2", "c3")
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);

df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA/c3=AAAA");
df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA/c3=AAAA");

waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.execute();

Assert.assertEquals("Should delete 2 files", 2, Iterables.size(result.orphanFileLocations()));
}

@Test
public void testHiddenPartitionPathsWithPartitionEvolution() throws InterruptedException {
Schema schema = new Schema(
optional(1, "_c1", Types.IntegerType.get()),
optional(2, "_c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
.truncate("_c2", 2)
.build();
Table table = TABLES.create(schema, spec, Maps.newHashMap(), tableLocation);

StructType structType = new StructType()
.add("_c1", DataTypes.IntegerType)
.add("_c2", DataTypes.StringType)
.add("c3", DataTypes.StringType);
List<Row> records = Lists.newArrayList(
RowFactory.create(1, "AAAAAAAAAA", "AAAA")
);
Dataset<Row> df = spark.createDataFrame(records, structType).coalesce(1);

df.select("_c1", "_c2", "c3")
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);

df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA");

table.updateSpec()
.addField("_c1")
.commit();

df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA/_c1=1");

waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.execute();

Assert.assertEquals("Should delete 2 files", 2, Iterables.size(result.orphanFileLocations()));
}

@Test
public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws InterruptedException, IOException {
Schema schema = new Schema(
optional(1, "c1", Types.IntegerType.get()),
optional(2, "_c2", Types.StringType.get()),
optional(3, "c3", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.builderFor(schema)
.truncate("_c2", 2)
.identity("c3")
.build();
Table table = TABLES.create(schema, spec, Maps.newHashMap(), tableLocation);

StructType structType = new StructType()
.add("c1", DataTypes.IntegerType)
.add("_c2", DataTypes.StringType)
.add("c3", DataTypes.StringType);
List<Row> records = Lists.newArrayList(
RowFactory.create(1, "AAAAAAAAAA", "AAAA")
);
Dataset<Row> df = spark.createDataFrame(records, structType).coalesce(1);

df.select("c1", "_c2", "c3")
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);

Path dataPath = new Path(tableLocation + "/data");
FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
Path pathToFileInHiddenFolder = new Path(dataPath, "_c2_trunc/file.txt");
fs.createNewFile(pathToFileInHiddenFolder);

waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis())
.execute();

Assert.assertEquals("Should delete 0 files", 0, Iterables.size(result.orphanFileLocations()));
Assert.assertTrue(fs.exists(pathToFileInHiddenFolder));
}

private List<String> snapshotFiles(long snapshotId) {
return spark.read().format("iceberg")
.option("snapshot-id", snapshotId)
Expand Down Expand Up @@ -821,4 +957,12 @@ public void testCompareToFileList() throws IOException, InterruptedException {
Assert.assertEquals(
"Action should find nothing", Lists.newArrayList(), result4.orphanFileLocations());
}

protected long waitUntilAfter(long timestampMillis) {
long current = System.currentTimeMillis();
while (current <= timestampMillis) {
current = System.currentTimeMillis();
}
return current;
}
}