diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 1b28f76d5971..bcabe38e79f1 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -138,8 +138,7 @@ public void testDryRun() throws IOException, InterruptedException { invalidFiles.removeAll(validFiles); Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size()); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); @@ -223,8 +222,7 @@ public void testAllValidFilesAreKept() throws IOException, InterruptedException df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid"); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); @@ -290,8 +288,7 @@ public void orphanedFileRemovedWithParallelTasks() throws InterruptedException, df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid"); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); Set deletedFiles = Sets.newHashSet(); Set deleteThreads = ConcurrentHashMap.newKeySet(); @@ -353,8 +350,7 @@ public void testWapFilesAreKept() throws InterruptedException { .collectAsList(); Assert.assertEquals("Should not return data from the staged snapshot", records, actualRecords); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); @@ -385,8 +381,7 @@ public void testMetadataFolderIsIntact() throws InterruptedException { df.write().mode("append").parquet(tableLocation + "/c2_trunc=AA/c3=AAAA"); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); @@ -421,11 +416,11 @@ public void testOlderThanTimestamp() throws InterruptedException { df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); long timestamp = System.currentTimeMillis(); - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); @@ -462,8 +457,7 @@ public void testRemoveUnreachableMetadataVersionFiles() throws InterruptedExcept .mode("append") .save(tableLocation); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); @@ -503,8 +497,7 @@ public void testManyTopLevelPartitions() throws InterruptedException { .mode("append") .save(tableLocation); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); @@ -538,8 +531,7 @@ public void testManyLeafPartitions() throws InterruptedException { .mode("append") .save(tableLocation); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); @@ -736,8 +728,7 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, Inte invalidFiles.removeIf(file -> file.contains(validFile)); Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size()); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get(); DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table) @@ -771,8 +762,7 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException df.write().mode("append").parquet(table.location() + "/data"); - // sleep for 1 second to unsure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); table.refresh(); @@ -891,7 +881,7 @@ public void testCompareToFileList() throws IOException, InterruptedException { Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size()); // sleep for 1 second to ensure files will be old enough - Thread.sleep(1000); + waitUntilAfter(System.currentTimeMillis()); SparkActions actions = SparkActions.get();