Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private void testSqlChangeLog(String tableName,
private List<Snapshot> findValidSnapshots(Table table) {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
validSnapshots.add(snapshot);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void setupTable() throws IOException {
private List<Snapshot> findValidSnapshots(Table table) {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
validSnapshots.add(snapshot);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private void testSqlChangeLog(String tableName,
private List<Snapshot> findValidSnapshots(Table table) {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
validSnapshots.add(snapshot);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void setupTable() throws IOException {
private List<Snapshot> findValidSnapshots(Table table) {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
validSnapshots.add(snapshot);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testRewriteLargeManifests() {

Table table = validationCatalog.loadTable(tableIdent);

Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests().size());
Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests(table.io()).size());

sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest.target-size-bytes' '1')", tableName);

Expand All @@ -72,7 +72,7 @@ public void testRewriteLargeManifests() {

table.refresh();

Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests().size());
Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests(table.io()).size());
}

@Test
Expand All @@ -88,7 +88,7 @@ public void testRewriteSmallManifestsWithSnapshotIdInheritance() {

Table table = validationCatalog.loadTable(tableIdent);

Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests().size());
Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests(table.io()).size());

List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableIdent);
Expand All @@ -98,7 +98,7 @@ public void testRewriteSmallManifestsWithSnapshotIdInheritance() {

table.refresh();

Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
}

@Test
Expand All @@ -110,7 +110,7 @@ public void testRewriteSmallManifestsWithoutCaching() {

Table table = validationCatalog.loadTable(tableIdent);

Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size());
Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size());

List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(use_caching => false, table => '%s')", catalogName, tableIdent);
Expand All @@ -120,7 +120,7 @@ public void testRewriteSmallManifestsWithoutCaching() {

table.refresh();

Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
}

@Test
Expand All @@ -132,7 +132,7 @@ public void testRewriteManifestsCaseInsensitiveArgs() {

Table table = validationCatalog.loadTable(tableIdent);

Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size());
Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size());

List<Object[]> output = sql(
"CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe => '%s')", catalogName, tableIdent);
Expand All @@ -142,7 +142,7 @@ public void testRewriteManifestsCaseInsensitiveArgs() {

table.refresh();

Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private List<ManifestFile> findMatchingManifests() {
return ImmutableList.of();
}

return currentSnapshot.dataManifests().stream()
return currentSnapshot.dataManifests(fileIO).stream()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this usage, there's a class level fileIO that is wrapped to avoid Kryo serialization issues. So we use that instead of table..io().

.filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public Offset latestOffset() {
}

Snapshot latestSnapshot = table.currentSnapshot();
return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()), false);
return new StreamingOffset(
latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles(table.io())), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,13 +570,13 @@ public void testWithExpiringDanglingStageCommit() {
expectedDeletes.add(snapshotA.manifestListLocation());

// Files should be deleted of dangling staged snapshot
snapshotB.addedFiles().forEach(i -> {
snapshotB.addedFiles(table.io()).forEach(i -> {
expectedDeletes.add(i.path().toString());
});

// ManifestList should be deleted too
expectedDeletes.add(snapshotB.manifestListLocation());
snapshotB.dataManifests().forEach(file -> {
snapshotB.dataManifests(table.io()).forEach(file -> {
// Only the manifest of B should be deleted.
if (file.snapshotId() == snapshotB.snapshotId()) {
expectedDeletes.add(file.path());
Expand Down Expand Up @@ -645,7 +645,7 @@ public void testWithCherryPickTableSnapshot() {

// Make sure no dataFiles are deleted for the B, C, D snapshot
Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> {
i.addedFiles().forEach(item -> {
i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
Expand Down Expand Up @@ -700,7 +700,7 @@ public void testWithExpiringStagedThenCherrypick() {

// Make sure no dataFiles are deleted for the staged snapshot
Lists.newArrayList(snapshotB).forEach(i -> {
i.addedFiles().forEach(item -> {
i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
Expand All @@ -714,7 +714,7 @@ public void testWithExpiringStagedThenCherrypick() {

// Make sure no dataFiles are deleted for the staged and cherry-pick
Lists.newArrayList(snapshotB, snapshotD).forEach(i -> {
i.addedFiles().forEach(item -> {
i.addedFiles(table.io()).forEach(item -> {
Assert.assertFalse(deletedFiles.contains(item.path().toString()));
});
});
Expand Down Expand Up @@ -762,7 +762,7 @@ public void testExpireOlderThanWithDelete() {

Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
1, firstSnapshot.allManifests().size());
1, firstSnapshot.allManifests(table.io()).size());

rightAfterSnapshot();

Expand All @@ -772,7 +772,7 @@ public void testExpireOlderThanWithDelete() {

Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create replace manifest with a rewritten manifest",
1, secondSnapshot.allManifests().size());
1, secondSnapshot.allManifests(table.io()).size());

table.newAppend()
.appendFile(FILE_B)
Expand All @@ -798,9 +798,9 @@ public void testExpireOlderThanWithDelete() {
Assert.assertEquals("Should remove expired manifest lists and deleted data file",
Sets.newHashSet(
firstSnapshot.manifestListLocation(), // snapshot expired
firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete
secondSnapshot.manifestListLocation(), // snapshot expired
secondSnapshot.allManifests().get(0).path(), // manifest contained only deletes, was dropped
secondSnapshot.allManifests(table.io()).get(0).path(), // manifest contained only deletes, was dropped
FILE_A.path()), // deleted
deletedFiles);

Expand All @@ -821,7 +821,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() {

Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
1, firstSnapshot.allManifests().size());
1, firstSnapshot.allManifests(table.io()).size());

rightAfterSnapshot();

Expand All @@ -831,7 +831,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() {

Snapshot secondSnapshot = table.currentSnapshot();
Assert.assertEquals("Should replace manifest with a rewritten manifest",
1, secondSnapshot.allManifests().size());
1, secondSnapshot.allManifests(table.io()).size());

table.newFastAppend() // do not merge to keep the last snapshot's manifest valid
.appendFile(FILE_C)
Expand All @@ -857,7 +857,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() {
Assert.assertEquals("Should remove expired manifest lists and deleted data file",
Sets.newHashSet(
firstSnapshot.manifestListLocation(), // snapshot expired
firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete
secondSnapshot.manifestListLocation(), // snapshot expired
FILE_A.path()), // deleted
deletedFiles);
Expand All @@ -879,7 +879,7 @@ public void testExpireOlderThanWithRollback() {

Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
1, firstSnapshot.allManifests().size());
1, firstSnapshot.allManifests(table.io()).size());

rightAfterSnapshot();

Expand All @@ -888,8 +888,8 @@ public void testExpireOlderThanWithRollback() {
.commit();

Snapshot secondSnapshot = table.currentSnapshot();
Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io()));
secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size());

table.manageSnapshots()
Expand Down Expand Up @@ -928,7 +928,7 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() {

Snapshot firstSnapshot = table.currentSnapshot();
Assert.assertEquals("Should create one manifest",
1, firstSnapshot.allManifests().size());
1, firstSnapshot.allManifests(table.io()).size());

rightAfterSnapshot();

Expand All @@ -937,8 +937,8 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() {
.commit();

Snapshot secondSnapshot = table.currentSnapshot();
Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io()));
secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size());

table.manageSnapshots()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,14 @@ public void testBinPackWithDeleteAllData() {
Assert.assertEquals(
"Data manifest should not have existing data file",
0,
(long) table.currentSnapshot().dataManifests().get(0).existingFilesCount());
(long) table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount());
Assert.assertEquals("Data manifest should have 1 delete data file",
1L,
(long) table.currentSnapshot().dataManifests().get(0).deletedFilesCount());
(long) table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount());
Assert.assertEquals(
"Delete manifest added row count should equal total count",
total,
(long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
(long) table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount());
}

@Test
Expand Down Expand Up @@ -962,7 +962,8 @@ public void testAutoSortShuffleOutput() {
.execute();

Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
Assert.assertTrue("Should have written 40+ files", Iterables.size(table.currentSnapshot().addedFiles()) >= 40);
Assert.assertTrue("Should have written 40+ files",
Iterables.size(table.currentSnapshot().addedFiles(table.io())) >= 40);

table.refresh();

Expand Down Expand Up @@ -1223,7 +1224,7 @@ private <T> List<Pair<Pair<T, T>, Pair<T, T>>> checkForOverlappingFiles(Table ta
NestedField field = table.schema().caseInsensitiveFindField(column);
Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();

Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles())
Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles(table.io()))
.collect(Collectors.groupingBy(DataFile::partition));

Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
Expand Down
Loading