diff --git a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java index b782f0839bd2..15cff96ca521 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java @@ -20,6 +20,7 @@ package org.apache.iceberg.actions; import com.google.common.collect.Lists; +import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -48,6 +49,9 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.util.SerializableConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +73,14 @@ public class RemoveOrphanFilesAction extends BaseAction> { private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class); + private static final UserDefinedFunction filename = functions.udf((String path) -> { + int lastIndex = path.lastIndexOf(File.separator); + if (lastIndex == -1) { + return path; + } else { + return path.substring(lastIndex + 1); + } + }, DataTypes.StringType); private final SparkSession spark; private final JavaSparkContext sparkContext; @@ -141,7 +153,10 @@ public List execute() { Dataset validFileDF = validDataFileDF.union(validMetadataFileDF); Dataset actualFileDF = buildActualFileDF(); - Column joinCond = validFileDF.col("file_path").equalTo(actualFileDF.col("file_path")); + Column nameEqual = filename.apply(actualFileDF.col("file_path")) + .equalTo(filename.apply(validFileDF.col("file_path"))); + Column actualContains = actualFileDF.col("file_path").contains(validFileDF.col("file_path")); + Column joinCond = nameEqual.and(actualContains); List orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti") .as(Encoders.STRING()) .collectAsList(); diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java index e0245b5bf65a..d3987814ccf8 100644 --- a/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java +++ b/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java @@ -85,11 +85,12 @@ public static void stopSpark() { @Rule public TemporaryFolder temp = new TemporaryFolder(); + private File tableDir = null; private String tableLocation = null; @Before public void setupTableLocation() throws Exception { - File tableDir = temp.newFolder(); + this.tableDir = temp.newFolder(); this.tableLocation = tableDir.toURI().toString(); } @@ -491,4 +492,54 @@ private List snapshotFiles(long snapshotId) { .as(Encoders.STRING()) .collectAsList(); } + + @Test + public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, InterruptedException { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableDir.getAbsolutePath()); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableDir.getAbsolutePath()); + + List validFiles = spark.read().format("iceberg") + .load(tableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + Assert.assertEquals("Should be 1 valid files", 1, validFiles.size()); + String validFile = validFiles.get(0); + + df.write().mode("append").parquet(tableLocation + "/data"); + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List allFiles = Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map(file -> file.getPath().toString()) + .collect(Collectors.toList()); + Assert.assertEquals("Should be 2 files", 2, allFiles.size()); + + List invalidFiles = Lists.newArrayList(allFiles); + invalidFiles.removeIf(file -> file.contains(validFile)); + Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size()); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + List result = actions.removeOrphanFiles() + .olderThan(System.currentTimeMillis()) + .deleteWith(s -> { }) + .execute(); + Assert.assertEquals("Action should find 1 file", invalidFiles, result); + Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); + } }