Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,18 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -224,4 +235,62 @@ public void testConcurrentExpireSnapshotsWithInvalidInput() {
catalogName, tableIdent, -1));

}

@Test
public void testExpireDeleteFiles() throws Exception {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg TBLPROPERTIES" +
"('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName);

sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName);
Copy link
Contributor

@aokolnychyi aokolnychyi Feb 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid we don't know the number of files that this insert will produce. That's why ID = 1 may end up in a separate file (unlikely but possible). If we write just a single file with ID = 1, the DELETE operation below will be a metadata operation and the test will fail.

I think it would be safer to use a typed Dataset and SimpleRecord. That way, we can call coalesce(1) before writing to make sure we produce only 1 file and the subsequent DELETE operation will produce a delete file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, good catch, done.

sql("DELETE FROM %s WHERE id=1", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know it may be easier to always just use the SparkUtil loadSparkTable class now. Wouldn't ever have to refresh

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

table.refresh();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't need to refresh here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


Assert.assertEquals("Should have 1 delete manifest", 1, deleteManifests(table).size());
Assert.assertEquals("Should have 1 delete file", 1, deleteFiles(table).size());
Path deleteManifestPath = new Path(deleteManifests(table).iterator().next().path());
Path deleteFilePath = new Path(String.valueOf(deleteFiles(table).iterator().next().path()));

sql("CALL %s.system.rewrite_data_files(table => '%s', options => map" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it would be easier to read if the args were on separate lines like in a few other places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"('delete-file-threshold','1', 'use-starting-sequence-number', 'false'))",
catalogName, tableIdent);

table.refresh();

sql("INSERT INTO TABLE %s VALUES (5, 'e')", tableName); // this txn moves the file to the DELETED state
sql("INSERT INTO TABLE %s VALUES (6, 'f')", tableName); // this txn removes the file reference

table.refresh();
Assert.assertEquals("Should have no delete manifests", 0, deleteManifests(table).size());
Assert.assertEquals("Should have no delete files", 0, deleteFiles(table).size());

FileSystem localFs = FileSystem.getLocal(new Configuration());
Assert.assertTrue("Delete manifest should still exist", localFs.exists(deleteManifestPath));
Assert.assertTrue("Delete file should still exist", localFs.exists(deleteFilePath));

Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
sql("CALL %s.system.expire_snapshots(" +
"older_than => TIMESTAMP '%s'," +
"table => '%s'," +
"retain_last => 1)",
catalogName, currentTimestamp, tableIdent);

Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath));
Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath));
}

private Set<ManifestFile> deleteManifests(Table table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this may be simplified a bit?

  private List<ManifestFile> deleteManifests(Table table) {
    return table.currentSnapshot().deleteManifests();
  }

  private Set<DeleteFile> deleteFiles(Table table) {
    Set<DeleteFile> deleteFiles = Sets.newHashSet();

    for (FileScanTask task : table.newScan().planFiles()) {
      deleteFiles.addAll(task.deletes());
    }

    return deleteFiles;
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

List<ManifestFile> manifests = table.currentSnapshot().allManifests();
return manifests.stream().filter(mf -> mf.content().equals(ManifestContent.DELETES))
.collect(Collectors.toSet());
}

private Set<DeleteFile> deleteFiles(Table table) {
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Stream<DeleteFile> dataFileStream = manifests.stream().filter(mf -> mf.content().equals(ManifestContent.DELETES))
.flatMap(mf -> (StreamSupport.stream((ManifestFiles.readDeleteManifest(
mf, table.io(), table.specs()).spliterator()), false)));
return dataFileStream.collect(Collectors.toSet());
}
}