Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ private TableProperties() {
public static final String ORC_BATCH_SIZE = "read.orc.vectorization.batch-size";
public static final int ORC_BATCH_SIZE_DEFAULT = 5000;

public static final String INCLUDE_HIDDEN_PATHS = "read.fs.include-hidden-paths";
public static final boolean INCLUDE_HIDDEN_PATHS_DEFAULT = false;
Comment thread
ulmako marked this conversation as resolved.
Outdated
Comment thread
ulmako marked this conversation as resolved.
Outdated

public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled";
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@

import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.INCLUDE_HIDDEN_PATHS;
import static org.apache.iceberg.TableProperties.INCLUDE_HIDDEN_PATHS_DEFAULT;

/**
* An action that removes orphan metadata, data and delete files by listing a given location and comparing
Expand Down Expand Up @@ -92,6 +94,7 @@ public class BaseDeleteOrphanFilesSparkAction
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
private final boolean includeHiddenPaths;

private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
Expand All @@ -107,10 +110,12 @@ public void accept(String file) {
public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
super(spark);

this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
this.partitionDiscoveryParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
partitionDiscoveryParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
this.table = table;
this.location = table.location();
includeHiddenPaths = PropertyUtil.propertyAsBoolean(table.properties(), INCLUDE_HIDDEN_PATHS,
INCLUDE_HIDDEN_PATHS_DEFAULT);
Comment thread
ulmako marked this conversation as resolved.
Outdated
location = table.location();
Comment thread
ulmako marked this conversation as resolved.
Outdated

ValidationException.check(
PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
Expand All @@ -124,25 +129,25 @@ protected DeleteOrphanFiles self() {

@Override
public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
this.deleteExecutorService = executorService;
Comment thread
ulmako marked this conversation as resolved.
deleteExecutorService = executorService;
return this;
}

@Override
public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
this.location = newLocation;
location = newLocation;
return this;
}

@Override
public BaseDeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
this.olderThanTimestamp = newOlderThanTimestamp;
olderThanTimestamp = newOlderThanTimestamp;
return this;
}

@Override
public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
deleteFunc = newDeleteFunc;
return this;
}

Expand Down Expand Up @@ -193,7 +198,8 @@ private Dataset<Row> buildActualFileDF() {
Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;

// list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver
listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, matchingFiles);
listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, matchingFiles,
includeHiddenPaths);

JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);

Expand All @@ -205,15 +211,17 @@ private Dataset<Row> buildActualFileDF() {
JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);

Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp));
JavaRDD<String> matchingLeafFileRDD =
Comment thread
ulmako marked this conversation as resolved.
Outdated
subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp, includeHiddenPaths));

JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF("file_path");
}

private static void listDirRecursively(
String dir, Predicate<FileStatus> predicate, Configuration conf, int maxDepth,
int maxDirectSubDirs, List<String> remainingSubDirs, List<String> matchingFiles) {
int maxDirectSubDirs, List<String> remainingSubDirs, List<String> matchingFiles,
boolean includeHiddenPaths) {

// stop listing whenever we reach the max depth
if (maxDepth <= 0) {
Expand All @@ -227,7 +235,10 @@ private static void listDirRecursively(

List<String> subDirs = Lists.newArrayList();

for (FileStatus file : fs.listStatus(path, HiddenPathFilter.get())) {
FileStatus[] fileStatus =
(includeHiddenPaths) ? fs.listStatus(path) : fs.listStatus(path, HiddenPathFilter.get());
Comment thread
ulmako marked this conversation as resolved.
Outdated

for (FileStatus file : fileStatus) {
if (file.isDirectory()) {
subDirs.add(file.getPath().toString());
} else if (file.isFile() && predicate.test(file)) {
Expand All @@ -242,7 +253,8 @@ private static void listDirRecursively(
}

for (String subDir : subDirs) {
listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, matchingFiles);
listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, matchingFiles,
includeHiddenPaths);
}
} catch (IOException e) {
throw new RuntimeIOException(e);
Expand All @@ -251,7 +263,8 @@ private static void listDirRecursively(

private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
Broadcast<SerializableConfiguration> conf,
long olderThanTimestamp) {
long olderThanTimestamp,
boolean includeHiddenPaths) {

return dirs -> {
List<String> subDirs = Lists.newArrayList();
Expand All @@ -263,7 +276,15 @@ private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
int maxDirectSubDirs = Integer.MAX_VALUE;

dirs.forEachRemaining(dir -> {
listDirRecursively(dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, files);
listDirRecursively(
dir,
predicate,
conf.value().value(),
maxDepth,
maxDirectSubDirs,
subDirs,
files,
includeHiddenPaths);
Comment thread
ulmako marked this conversation as resolved.
Outdated
});

if (!subDirs.isEmpty()) {
Expand Down