diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HiddenPathFilter.java b/core/src/main/java/org/apache/iceberg/hadoop/HiddenPathFilter.java index bd17d058dd9b..248d3d3679db 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HiddenPathFilter.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HiddenPathFilter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.hadoop; +import java.io.Serializable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -26,7 +27,7 @@ * A {@link PathFilter} that filters out hidden paths. A path is considered to * be hidden when the path name starts with a period ('.') or an underscore ('_'). */ -public class HiddenPathFilter implements PathFilter { +public class HiddenPathFilter implements PathFilter, Serializable { private static final HiddenPathFilter INSTANCE = new HiddenPathFilter(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java index 9b8011856839..dc58a05d4d0d 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java @@ -21,23 +21,30 @@ import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.sql.Timestamp; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -230,9 +237,10 @@ private Dataset buildActualFileDF() { List matchingFiles = Lists.newArrayList(); Predicate predicate = file -> file.getModificationTime() < olderThanTimestamp; + PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs()); // list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver - listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, matchingFiles); + listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles); JavaRDD matchingFileRDD = sparkContext().parallelize(matchingFiles, 1); @@ -244,7 +252,9 @@ private Dataset buildActualFileDF() { JavaRDD subDirRDD = sparkContext().parallelize(subDirs, parallelism); Broadcast conf = sparkContext().broadcast(hadoopConf); - JavaRDD matchingLeafFileRDD = subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp)); + JavaRDD matchingLeafFileRDD = subDirRDD.mapPartitions( + listDirsRecursively(conf, olderThanTimestamp, pathFilter) + ); JavaRDD completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD); return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH); @@ -252,7 +262,7 @@ private Dataset buildActualFileDF() { private static void listDirRecursively( String dir, Predicate predicate, Configuration conf, int maxDepth, - int maxDirectSubDirs, List remainingSubDirs, List matchingFiles) { + int maxDirectSubDirs, List remainingSubDirs, PathFilter pathFilter, List matchingFiles) { // stop listing whenever we reach the max depth if (maxDepth <= 0) { @@ -266,7 +276,7 @@ private static void listDirRecursively( List subDirs = Lists.newArrayList(); - for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) { + for (FileStatus file : fs.listStatus(path, pathFilter)) { if (file.isDirectory()) { subDirs.add(file.getPath().toString()); } else if (file.isFile() && predicate.test(file)) { @@ -281,7 +291,8 @@ private static void listDirRecursively( } for (String subDir : subDirs) { - listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, matchingFiles); + listDirRecursively( + subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, pathFilter, matchingFiles); } } catch (IOException e) { throw new RuntimeIOException(e); @@ -290,7 +301,8 @@ private static void listDirRecursively( private static FlatMapFunction, String> listDirsRecursively( Broadcast conf, - long olderThanTimestamp) { + long olderThanTimestamp, + PathFilter pathFilter) { return dirs -> { List subDirs = Lists.newArrayList(); @@ -302,7 +314,8 @@ private static FlatMapFunction, String> listDirsRecursively( int maxDirectSubDirs = Integer.MAX_VALUE; dirs.forEachRemaining(dir -> { - listDirRecursively(dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, files); + listDirRecursively( + dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, pathFilter, files); }); if (!subDirs.isEmpty()) { @@ -312,4 +325,40 @@ private static FlatMapFunction, String> listDirsRecursively( return files.iterator(); }; } + + /** + * A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be marked + * as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of the characters that + * indicate a hidden path. + */ + @VisibleForTesting + static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable { + + private final Set hiddenPathPartitionNames; + + PartitionAwareHiddenPathFilter(Set hiddenPathPartitionNames) { + this.hiddenPathPartitionNames = hiddenPathPartitionNames; + } + + @Override + public boolean accept(Path path) { + boolean isHiddenPartitionPath = hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith); + return isHiddenPartitionPath || HiddenPathFilter.get().accept(path); + } + + static PathFilter forSpecs(Map specs) { + if (specs == null) { + return HiddenPathFilter.get(); + } + + Set partitionNames = specs.values().stream() + .map(PartitionSpec::fields) + .flatMap(List::stream) + .filter(partitionField -> partitionField.name().startsWith("_") || partitionField.name().startsWith(".")) + .map(partitionField -> partitionField.name() + "=") + .collect(Collectors.toSet()); + + return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames); + } + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 39427e66b4dd..1b28f76d5971 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -60,6 +60,9 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -553,6 +556,139 @@ public void testManyLeafPartitions() throws InterruptedException { Assert.assertEquals("Rows must match", records, actualRecords); } + @Test + public void testHiddenPartitionPaths() throws InterruptedException { + Schema schema = new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "_c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema) + .truncate("_c2", 2) + .identity("c3") + .build(); + Table table = TABLES.create(schema, spec, Maps.newHashMap(), tableLocation); + + StructType structType = new StructType() + .add("c1", DataTypes.IntegerType) + .add("_c2", DataTypes.StringType) + .add("c3", DataTypes.StringType); + List records = Lists.newArrayList( + RowFactory.create(1, "AAAAAAAAAA", "AAAA") + ); + Dataset df = spark.createDataFrame(records, structType).coalesce(1); + + df.select("c1", "_c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA/c3=AAAA"); + df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA/c3=AAAA"); + + waitUntilAfter(System.currentTimeMillis()); + + SparkActions actions = SparkActions.get(); + + DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertEquals("Should delete 2 files", 2, Iterables.size(result.orphanFileLocations())); + } + + @Test + public void testHiddenPartitionPathsWithPartitionEvolution() throws InterruptedException { + Schema schema = new Schema( + optional(1, "_c1", Types.IntegerType.get()), + optional(2, "_c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema) + .truncate("_c2", 2) + .build(); + Table table = TABLES.create(schema, spec, Maps.newHashMap(), tableLocation); + + StructType structType = new StructType() + .add("_c1", DataTypes.IntegerType) + .add("_c2", DataTypes.StringType) + .add("c3", DataTypes.StringType); + List records = Lists.newArrayList( + RowFactory.create(1, "AAAAAAAAAA", "AAAA") + ); + Dataset df = spark.createDataFrame(records, structType).coalesce(1); + + df.select("_c1", "_c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA"); + + table.updateSpec() + .addField("_c1") + .commit(); + + df.write().mode("append").parquet(tableLocation + "/data/_c2_trunc=AA/_c1=1"); + + waitUntilAfter(System.currentTimeMillis()); + + SparkActions actions = SparkActions.get(); + + DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertEquals("Should delete 2 files", 2, Iterables.size(result.orphanFileLocations())); + } + + @Test + public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws InterruptedException, IOException { + Schema schema = new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "_c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + PartitionSpec spec = PartitionSpec.builderFor(schema) + .truncate("_c2", 2) + .identity("c3") + .build(); + Table table = TABLES.create(schema, spec, Maps.newHashMap(), tableLocation); + + StructType structType = new StructType() + .add("c1", DataTypes.IntegerType) + .add("_c2", DataTypes.StringType) + .add("c3", DataTypes.StringType); + List records = Lists.newArrayList( + RowFactory.create(1, "AAAAAAAAAA", "AAAA") + ); + Dataset df = spark.createDataFrame(records, structType).coalesce(1); + + df.select("c1", "_c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + Path pathToFileInHiddenFolder = new Path(dataPath, "_c2_trunc/file.txt"); + fs.createNewFile(pathToFileInHiddenFolder); + + waitUntilAfter(System.currentTimeMillis()); + + SparkActions actions = SparkActions.get(); + + DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis()) + .execute(); + + Assert.assertEquals("Should delete 0 files", 0, Iterables.size(result.orphanFileLocations())); + Assert.assertTrue(fs.exists(pathToFileInHiddenFolder)); + } + private List snapshotFiles(long snapshotId) { return spark.read().format("iceberg") .option("snapshot-id", snapshotId) @@ -821,4 +957,12 @@ public void testCompareToFileList() throws IOException, InterruptedException { Assert.assertEquals( "Action should find nothing", Lists.newArrayList(), result4.orphanFileLocations()); } + + protected long waitUntilAfter(long timestampMillis) { + long current = System.currentTimeMillis(); + while (current <= timestampMillis) { + current = System.currentTimeMillis(); + } + return current; + } }