Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.actions;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -48,6 +49,9 @@
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -69,6 +73,14 @@
public class RemoveOrphanFilesAction extends BaseAction<List<String>> {

private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
private static final UserDefinedFunction filename = functions.udf((String path) -> {
int lastIndex = path.lastIndexOf(File.separator);
if (lastIndex == -1) {
return path;
} else {
return path.substring(lastIndex + 1);
}
}, DataTypes.StringType);

private final SparkSession spark;
private final JavaSparkContext sparkContext;
Expand Down Expand Up @@ -141,7 +153,10 @@ public List<String> execute() {
Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();

Column joinCond = validFileDF.col("file_path").equalTo(actualFileDF.col("file_path"));
Column nameEqual = filename.apply(actualFileDF.col("file_path"))
.equalTo(filename.apply(validFileDF.col("file_path")));
Column actualContains = actualFileDF.col("file_path").contains(validFileDF.col("file_path"));
Column joinCond = nameEqual.and(actualContains);
List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
.as(Encoders.STRING())
.collectAsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ public static void stopSpark() {

@Rule
public TemporaryFolder temp = new TemporaryFolder();
private File tableDir = null;
private String tableLocation = null;

@Before
public void setupTableLocation() throws Exception {
File tableDir = temp.newFolder();
this.tableDir = temp.newFolder();
this.tableLocation = tableDir.toURI().toString();
}

Expand Down Expand Up @@ -491,4 +492,54 @@ private List<String> snapshotFiles(long snapshotId) {
.as(Encoders.STRING())
.collectAsList();
}

@Test
public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, InterruptedException {
Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableDir.getAbsolutePath());

List<ThreeColumnRecord> records = Lists.newArrayList(
new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
);

Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);

df.select("c1", "c2", "c3")
.write()
.format("iceberg")
.mode("append")
.save(tableDir.getAbsolutePath());

List<String> validFiles = spark.read().format("iceberg")
.load(tableLocation + "#files")
.select("file_path")
.as(Encoders.STRING())
.collectAsList();
Assert.assertEquals("Should be 1 valid files", 1, validFiles.size());
String validFile = validFiles.get(0);

df.write().mode("append").parquet(tableLocation + "/data");

Path dataPath = new Path(tableLocation + "/data");
FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
List<String> allFiles = Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
.filter(FileStatus::isFile)
.map(file -> file.getPath().toString())
.collect(Collectors.toList());
Assert.assertEquals("Should be 2 files", 2, allFiles.size());

List<String> invalidFiles = Lists.newArrayList(allFiles);
invalidFiles.removeIf(file -> file.contains(validFile));
Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);

Actions actions = Actions.forTable(table);
List<String> result = actions.removeOrphanFiles()
.olderThan(System.currentTimeMillis())
.deleteWith(s -> { })
.execute();
Assert.assertEquals("Action should find 1 file", invalidFiles, result);
Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0))));
}
}