Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ public void testDryRun() throws IOException, InterruptedException {
invalidFiles.removeAll(validFiles);
Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will jsut immediately return. The waitUntilAfter function loops while the passed in timestamp is less than System.currentTimeMillis.

Other places I see waitUntilAfter used tend to wait until table.currentSnapshot.timestampMillis(). But in this file it does seem that this is how waitUntilAfter is used, so I'm ok with this.

If we notice this being flakey, then we might want to change it to using the table's current snapshot timestamp instead.


SparkActions actions = SparkActions.get();

Expand Down Expand Up @@ -223,8 +222,7 @@ public void testAllValidFilesAreKept() throws IOException, InterruptedException
df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid");

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

Expand Down Expand Up @@ -290,8 +288,7 @@ public void orphanedFileRemovedWithParallelTasks() throws InterruptedException,
df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid");

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

Set<String> deletedFiles = Sets.newHashSet();
Set<String> deleteThreads = ConcurrentHashMap.newKeySet();
Expand Down Expand Up @@ -353,8 +350,7 @@ public void testWapFilesAreKept() throws InterruptedException {
.collectAsList();
Assert.assertEquals("Should not return data from the staged snapshot", records, actualRecords);

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

Expand Down Expand Up @@ -385,8 +381,7 @@ public void testMetadataFolderIsIntact() throws InterruptedException {

df.write().mode("append").parquet(tableLocation + "/c2_trunc=AA/c3=AAAA");

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

Expand Down Expand Up @@ -421,11 +416,11 @@ public void testOlderThanTimestamp() throws InterruptedException {
df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");
df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");

Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

long timestamp = System.currentTimeMillis();

Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

df.write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA");

Expand Down Expand Up @@ -462,8 +457,7 @@ public void testRemoveUnreachableMetadataVersionFiles() throws InterruptedExcept
.mode("append")
.save(tableLocation);

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

Expand Down Expand Up @@ -503,8 +497,7 @@ public void testManyTopLevelPartitions() throws InterruptedException {
.mode("append")
.save(tableLocation);

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

Expand Down Expand Up @@ -538,8 +531,7 @@ public void testManyLeafPartitions() throws InterruptedException {
.mode("append")
.save(tableLocation);

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

Expand Down Expand Up @@ -736,8 +728,7 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, Inte
invalidFiles.removeIf(file -> file.contains(validFile));
Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();
DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table)
Expand Down Expand Up @@ -771,8 +762,7 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException

df.write().mode("append").parquet(table.location() + "/data");

// sleep for 1 second to unsure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

table.refresh();

Expand Down Expand Up @@ -891,7 +881,7 @@ public void testCompareToFileList() throws IOException, InterruptedException {
Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());

// sleep for 1 second to ensure files will be old enough
Thread.sleep(1000);
waitUntilAfter(System.currentTimeMillis());

SparkActions actions = SparkActions.get();

Expand Down