diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java index 68b706e2d281..5c04c855149f 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java @@ -296,7 +296,7 @@ private void testSqlChangeLog(String tableName, private List findValidSnapshots(Table table) { List validSnapshots = Lists.newArrayList(); for (Snapshot snapshot : table.snapshots()) { - if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { + if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { validSnapshots.add(snapshot); } } diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 23169d1a59c6..97506b90ba46 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -147,7 +147,7 @@ public void setupTable() throws IOException { private List findValidSnapshots(Table table) { List validSnapshots = Lists.newArrayList(); for (Snapshot snapshot : table.snapshots()) { - if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { + if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { validSnapshots.add(snapshot); } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java index 68b706e2d281..5c04c855149f 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java @@ -296,7 +296,7 @@ private void testSqlChangeLog(String tableName, private List findValidSnapshots(Table table) { List validSnapshots = Lists.newArrayList(); for (Snapshot snapshot : table.snapshots()) { - if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { + if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { validSnapshots.add(snapshot); } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index 23169d1a59c6..97506b90ba46 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -147,7 +147,7 @@ public void setupTable() throws IOException { private List findValidSnapshots(Table table) { List validSnapshots = Lists.newArrayList(); for (Snapshot snapshot : table.snapshots()) { - if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { + if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) { validSnapshots.add(snapshot); } } diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index b04f17693e3a..dcf0a2d91e3e 100644 --- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -60,7 +60,7 @@ public void testRewriteLargeManifests() { Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests(table.io()).size()); sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest.target-size-bytes' '1')", tableName); @@ -72,7 +72,7 @@ public void testRewriteLargeManifests() { table.refresh(); - Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests(table.io()).size()); } @Test @@ -88,7 +88,7 @@ public void testRewriteSmallManifestsWithSnapshotIdInheritance() { Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests(table.io()).size()); List output = sql( "CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableIdent); @@ -98,7 +98,7 @@ public void testRewriteSmallManifestsWithSnapshotIdInheritance() { table.refresh(); - Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size()); } @Test @@ -110,7 +110,7 @@ public void testRewriteSmallManifestsWithoutCaching() { Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size()); List output = sql( "CALL %s.system.rewrite_manifests(use_caching => false, table => '%s')", catalogName, tableIdent); @@ -120,7 +120,7 @@ public void testRewriteSmallManifestsWithoutCaching() { table.refresh(); - Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size()); } @Test @@ -132,7 +132,7 @@ public void testRewriteManifestsCaseInsensitiveArgs() { Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size()); List output = sql( "CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe => '%s')", catalogName, tableIdent); @@ -142,7 +142,7 @@ public void testRewriteManifestsCaseInsensitiveArgs() { table.refresh(); - Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size()); } @Test diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java index c446d42ca062..b1769f428d14 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java @@ -261,7 +261,7 @@ private List findMatchingManifests() { return ImmutableList.of(); } - return currentSnapshot.dataManifests().stream() + return currentSnapshot.dataManifests(fileIO).stream() .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest)) .collect(Collectors.toList()); } diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index dd68022c6b8d..d72928b6b75d 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -111,7 +111,8 @@ public Offset latestOffset() { } Snapshot latestSnapshot = table.currentSnapshot(); - return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()), false); + return new StreamingOffset( + latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles(table.io())), false); } @Override diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index c4445e95454e..9b8c9d2501db 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -570,13 +570,13 @@ public void testWithExpiringDanglingStageCommit() { expectedDeletes.add(snapshotA.manifestListLocation()); // Files should be deleted of dangling staged snapshot - snapshotB.addedFiles().forEach(i -> { + snapshotB.addedFiles(table.io()).forEach(i -> { expectedDeletes.add(i.path().toString()); }); // ManifestList should be deleted too expectedDeletes.add(snapshotB.manifestListLocation()); - snapshotB.dataManifests().forEach(file -> { + snapshotB.dataManifests(table.io()).forEach(file -> { // Only the manifest of B should be deleted. if (file.snapshotId() == snapshotB.snapshotId()) { expectedDeletes.add(file.path()); @@ -645,7 +645,7 @@ public void testWithCherryPickTableSnapshot() { // Make sure no dataFiles are deleted for the B, C, D snapshot Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> { - i.addedFiles().forEach(item -> { + i.addedFiles(table.io()).forEach(item -> { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); @@ -700,7 +700,7 @@ public void testWithExpiringStagedThenCherrypick() { // Make sure no dataFiles are deleted for the staged snapshot Lists.newArrayList(snapshotB).forEach(i -> { - i.addedFiles().forEach(item -> { + i.addedFiles(table.io()).forEach(item -> { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); @@ -714,7 +714,7 @@ public void testWithExpiringStagedThenCherrypick() { // Make sure no dataFiles are deleted for the staged and cherry-pick Lists.newArrayList(snapshotB, snapshotD).forEach(i -> { - i.addedFiles().forEach(item -> { + i.addedFiles(table.io()).forEach(item -> { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); @@ -762,7 +762,7 @@ public void testExpireOlderThanWithDelete() { Snapshot firstSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create one manifest", - 1, firstSnapshot.allManifests().size()); + 1, firstSnapshot.allManifests(table.io()).size()); rightAfterSnapshot(); @@ -772,7 +772,7 @@ public void testExpireOlderThanWithDelete() { Snapshot secondSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create replace manifest with a rewritten manifest", - 1, secondSnapshot.allManifests().size()); + 1, secondSnapshot.allManifests(table.io()).size()); table.newAppend() .appendFile(FILE_B) @@ -798,9 +798,9 @@ public void testExpireOlderThanWithDelete() { Assert.assertEquals("Should remove expired manifest lists and deleted data file", Sets.newHashSet( firstSnapshot.manifestListLocation(), // snapshot expired - firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete + firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete secondSnapshot.manifestListLocation(), // snapshot expired - secondSnapshot.allManifests().get(0).path(), // manifest contained only deletes, was dropped + secondSnapshot.allManifests(table.io()).get(0).path(), // manifest contained only deletes, was dropped FILE_A.path()), // deleted deletedFiles); @@ -821,7 +821,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { Snapshot firstSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create one manifest", - 1, firstSnapshot.allManifests().size()); + 1, firstSnapshot.allManifests(table.io()).size()); rightAfterSnapshot(); @@ -831,7 +831,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { Snapshot secondSnapshot = table.currentSnapshot(); Assert.assertEquals("Should replace manifest with a rewritten manifest", - 1, secondSnapshot.allManifests().size()); + 1, secondSnapshot.allManifests(table.io()).size()); table.newFastAppend() // do not merge to keep the last snapshot's manifest valid .appendFile(FILE_C) @@ -857,7 +857,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { Assert.assertEquals("Should remove expired manifest lists and deleted data file", Sets.newHashSet( firstSnapshot.manifestListLocation(), // snapshot expired - firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete + firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete secondSnapshot.manifestListLocation(), // snapshot expired FILE_A.path()), // deleted deletedFiles); @@ -879,7 +879,7 @@ public void testExpireOlderThanWithRollback() { Snapshot firstSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create one manifest", - 1, firstSnapshot.allManifests().size()); + 1, firstSnapshot.allManifests(table.io()).size()); rightAfterSnapshot(); @@ -888,8 +888,8 @@ public void testExpireOlderThanWithRollback() { .commit(); Snapshot secondSnapshot = table.currentSnapshot(); - Set secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests()); - secondSnapshotManifests.removeAll(firstSnapshot.allManifests()); + Set secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io())); + secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io())); Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size()); table.manageSnapshots() @@ -928,7 +928,7 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { Snapshot firstSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create one manifest", - 1, firstSnapshot.allManifests().size()); + 1, firstSnapshot.allManifests(table.io()).size()); rightAfterSnapshot(); @@ -937,8 +937,8 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { .commit(); Snapshot secondSnapshot = table.currentSnapshot(); - Set secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests()); - secondSnapshotManifests.removeAll(firstSnapshot.allManifests()); + Set secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io())); + secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io())); Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size()); table.manageSnapshots() diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 3f465fe72223..9d460276525a 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -269,14 +269,14 @@ public void testBinPackWithDeleteAllData() { Assert.assertEquals( "Data manifest should not have existing data file", 0, - (long) table.currentSnapshot().dataManifests().get(0).existingFilesCount()); + (long) table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount()); Assert.assertEquals("Data manifest should have 1 delete data file", 1L, - (long) table.currentSnapshot().dataManifests().get(0).deletedFilesCount()); + (long) table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount()); Assert.assertEquals( "Delete manifest added row count should equal total count", total, - (long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount()); + (long) table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount()); } @Test @@ -962,7 +962,8 @@ public void testAutoSortShuffleOutput() { .execute(); Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1); - Assert.assertTrue("Should have written 40+ files", Iterables.size(table.currentSnapshot().addedFiles()) >= 40); + Assert.assertTrue("Should have written 40+ files", + Iterables.size(table.currentSnapshot().addedFiles(table.io())) >= 40); table.refresh(); @@ -1223,7 +1224,7 @@ private List, Pair>> checkForOverlappingFiles(Table ta NestedField field = table.schema().caseInsensitiveFindField(column); Class javaClass = (Class) field.type().typeId().javaClass(); - Map> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles()) + Map> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles(table.io())) .collect(Collectors.groupingBy(DataFile::partition)); Stream, Pair>> overlaps = diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 40adb7d4c918..f30251e74001 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -129,7 +129,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); SparkActions actions = SparkActions.get(); @@ -143,7 +143,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { table.refresh(); - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size()); Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); @@ -183,7 +183,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); SparkActions actions = SparkActions.get(); @@ -207,7 +207,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() { table.refresh(); // table should reflect the changes, since the commit was successful - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size()); Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); @@ -262,7 +262,7 @@ public void testRewriteSmallManifestsPartitionedTable() { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 4 manifests before rewrite", 4, manifests.size()); SparkActions actions = SparkActions.get(); @@ -284,7 +284,7 @@ public void testRewriteSmallManifestsPartitionedTable() { table.refresh(); - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size()); Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); @@ -347,7 +347,8 @@ public void testRewriteImportedManifests() throws IOException { .stagingLocation(temp.newFolder().toString()) .execute(); - Assert.assertEquals("Action should rewrite all manifests", snapshot.allManifests(), result.rewrittenManifests()); + Assert.assertEquals("Action should rewrite all manifests", + snapshot.allManifests(table.io()), result.rewrittenManifests()); Assert.assertEquals("Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); } finally { @@ -375,7 +376,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifests before rewrite", 1, manifests.size()); // set the target manifest size to a small value to force splitting records into multiple files @@ -395,7 +396,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { table.refresh(); - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size()); Dataset resultDF = spark.read().format("iceberg").load(tableLocation); @@ -430,7 +431,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); SparkActions actions = SparkActions.get(); @@ -447,7 +448,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { table.refresh(); - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size()); Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0))); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java index 1391362823e1..f6d292f89f8b 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java @@ -181,7 +181,7 @@ private void writeAndValidateWithLocations(Table table, File location, File expe } Assert.assertEquals("Both iterators should be exhausted", expectedIter.hasNext(), actualIter.hasNext()); - table.currentSnapshot().addedFiles().forEach(dataFile -> + table.currentSnapshot().addedFiles(table.io()).forEach(dataFile -> Assert.assertTrue( String.format( "File should have the parent directory %s, but has: %s.", diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 7655b4b82f13..55605288b808 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -203,7 +203,7 @@ public void testSplitOptionsOverridesTableProperties() throws IOException { .mode("append") .save(tableLocation); - List files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles()); + List files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles(icebergTable.io())); Assert.assertEquals("Should have written 1 file", 1, files.size()); long fileSize = files.get(0).fileSizeInBytes(); @@ -327,7 +327,7 @@ public void testMetadataSplitSizeOptionOverrideTableProperties() throws IOExcept .mode("append") .save(tableLocation); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Must be 2 manifests", 2, manifests.size()); @@ -356,7 +356,7 @@ public void testDefaultMetadataSplitSize() throws IOException { HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); - tables.create(SCHEMA, spec, options, tableLocation); + Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation); List expectedRecords = Lists.newArrayList( new SimpleRecord(1, "a"), @@ -371,7 +371,7 @@ public void testDefaultMetadataSplitSize() throws IOException { int splitSize = (int) TableProperties.METADATA_SPLIT_SIZE_DEFAULT; // 32MB split size int expectedSplits = ((int) tables.load(tableLocation + "#entries") - .currentSnapshot().allManifests().get(0).length() + splitSize - 1) / splitSize; + .currentSnapshot().allManifests(icebergTable.io()).get(0).length() + splitSize - 1) / splitSize; Dataset metadataDf = spark.read() .format("iceberg") diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index ea6aaae5319b..d6f65af4d88b 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -148,9 +148,9 @@ public void testEntriesTable() throws Exception { Snapshot snapshot = table.currentSnapshot(); - Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests().size()); + Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests(table.io()).size()); - InputFile manifest = table.io().newInputFile(snapshot.allManifests().get(0).path()); + InputFile manifest = table.io().newInputFile(snapshot.allManifests(table.io()).get(0).path()); List expected = Lists.newArrayList(); try (CloseableIterable rows = Avro.read(manifest).project(entriesTable.schema()).build()) { // each row must inherit snapshot_id and sequence_number @@ -206,7 +206,7 @@ public void testEntriesTableDataFilePrune() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); - DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next(); List singleActual = rowsToJava(spark.read() .format("iceberg") @@ -233,7 +233,7 @@ public void testEntriesTableDataFilePruneMulti() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); - DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next(); List multiActual = rowsToJava(spark.read() .format("iceberg") @@ -261,7 +261,7 @@ public void testFilesSelectMap() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); - DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next(); List multiActual = rowsToJava(spark.read() .format("iceberg") @@ -308,7 +308,8 @@ public void testAllEntriesTable() throws Exception { .collectAsList(); List expected = Lists.newArrayList(); - for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::allManifests))) { + for (ManifestFile manifest : Iterables.concat( + Iterables.transform(table.snapshots(), s -> s.allManifests(table.io())))) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { // each row must inherit snapshot_id and sequence_number @@ -384,7 +385,7 @@ public void testFilesTable() throws Exception { .collectAsList(); List expected = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().dataManifests()) { + for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { @@ -441,7 +442,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { .collectAsList(); List expected = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().dataManifests()) { + for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { @@ -530,7 +531,7 @@ public void testFilesUnpartitionedTable() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); - DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles()); + DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles(table.io())); // add a second file df2.select("id", "data").write() @@ -547,7 +548,7 @@ public void testFilesUnpartitionedTable() throws Exception { .collectAsList(); List expected = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().dataManifests()) { + for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { @@ -644,7 +645,9 @@ public void testAllDataFilesTable() throws Exception { actual.sort(Comparator.comparing(o -> o.getString(1))); List expected = Lists.newArrayList(); - for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::dataManifests))) { + Iterable dataManifests = Iterables.concat(Iterables.transform(table.snapshots(), + snapshot -> snapshot.dataManifests(table.io()))); + for (ManifestFile manifest : dataManifests) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { @@ -897,7 +900,7 @@ public void testManifestsTable() { manifestTable.schema(), "manifests")); GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> + List expected = Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest -> builder .set("content", manifest.content().id()) .set("path", manifest.path()) @@ -967,7 +970,7 @@ public void testPruneManifestsTable() { GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct())); GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( projectedSchema.findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> + List expected = Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest -> builder.set("partition_spec_id", manifest.partitionSpecId()) .set("path", manifest.path()) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> @@ -999,11 +1002,11 @@ public void testAllManifestsTable() { .mode("append") .save(loadLocation(tableIdentifier)); - manifests.addAll(table.currentSnapshot().allManifests()); + manifests.addAll(table.currentSnapshot().allManifests(table.io())); table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - manifests.addAll(table.currentSnapshot().allManifests()); + manifests.addAll(table.currentSnapshot().allManifests(table.io())); List actual = spark.read() .format("iceberg") diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java index f6ca5f7de9f4..cd1404766d46 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java @@ -156,7 +156,7 @@ private void checkSparkDataFile(Table table) throws IOException { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifest", 1, manifests.size()); List dataFiles = Lists.newArrayList(); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 8bf57ba65ff1..5b158c518ae4 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -133,7 +133,7 @@ public void testBasicWrite() throws IOException { List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); Assert.assertEquals("Result rows should match", expected, actual); - for (ManifestFile manifest : table.currentSnapshot().allManifests()) { + for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) { for (DataFile file : ManifestFiles.read(manifest, table.io())) { // TODO: avro not support split if (!format.equals(FileFormat.AVRO)) { @@ -372,7 +372,7 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws Assert.assertEquals("Result rows should match", expected, actual); List files = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().allManifests()) { + for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) { for (DataFile file : ManifestFiles.read(manifest, table.io())) { files.add(file); } @@ -590,7 +590,7 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti Assert.assertEquals("Result rows should match", expected, actual); List files = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().allManifests()) { + for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) { for (DataFile file : ManifestFiles.read(manifest, table.io())) { files.add(file); } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java index 1cbb4fef0986..510d7c40eecb 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java @@ -66,7 +66,7 @@ public void testRefreshCommand() { // Modify table outside of spark, it should be cached so Spark should see the same value after mutation Table table = validationCatalog.loadTable(tableIdent); - DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next(); table.newDelete().deleteFile(file).commit(); List cachedActual = sql("SELECT * FROM %s", tableName); diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index b04f17693e3a..dcf0a2d91e3e 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -60,7 +60,7 @@ public void testRewriteLargeManifests() { Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests(table.io()).size()); sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest.target-size-bytes' '1')", tableName); @@ -72,7 +72,7 @@ public void testRewriteLargeManifests() { table.refresh(); - Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests(table.io()).size()); } @Test @@ -88,7 +88,7 @@ public void testRewriteSmallManifestsWithSnapshotIdInheritance() { Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests(table.io()).size()); List output = sql( "CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableIdent); @@ -98,7 +98,7 @@ public void testRewriteSmallManifestsWithSnapshotIdInheritance() { table.refresh(); - Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size()); } @Test @@ -110,7 +110,7 @@ public void testRewriteSmallManifestsWithoutCaching() { Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size()); List output = sql( "CALL %s.system.rewrite_manifests(use_caching => false, table => '%s')", catalogName, tableIdent); @@ -120,7 +120,7 @@ public void testRewriteSmallManifestsWithoutCaching() { table.refresh(); - Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size()); } @Test @@ -132,7 +132,7 @@ public void testRewriteManifestsCaseInsensitiveArgs() { Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size()); List output = sql( "CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe => '%s')", catalogName, tableIdent); @@ -142,7 +142,7 @@ public void testRewriteManifestsCaseInsensitiveArgs() { table.refresh(); - Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size()); + Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size()); } @Test diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java index c446d42ca062..b1769f428d14 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java @@ -261,7 +261,7 @@ private List findMatchingManifests() { return ImmutableList.of(); } - return currentSnapshot.dataManifests().stream() + return currentSnapshot.dataManifests(fileIO).stream() .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest)) .collect(Collectors.toList()); } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index dd68022c6b8d..d72928b6b75d 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -111,7 +111,8 @@ public Offset latestOffset() { } Snapshot latestSnapshot = table.currentSnapshot(); - return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()), false); + return new StreamingOffset( + latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles(table.io())), false); } @Override diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index c4445e95454e..9b8c9d2501db 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -570,13 +570,13 @@ public void testWithExpiringDanglingStageCommit() { expectedDeletes.add(snapshotA.manifestListLocation()); // Files should be deleted of dangling staged snapshot - snapshotB.addedFiles().forEach(i -> { + snapshotB.addedFiles(table.io()).forEach(i -> { expectedDeletes.add(i.path().toString()); }); // ManifestList should be deleted too expectedDeletes.add(snapshotB.manifestListLocation()); - snapshotB.dataManifests().forEach(file -> { + snapshotB.dataManifests(table.io()).forEach(file -> { // Only the manifest of B should be deleted. if (file.snapshotId() == snapshotB.snapshotId()) { expectedDeletes.add(file.path()); @@ -645,7 +645,7 @@ public void testWithCherryPickTableSnapshot() { // Make sure no dataFiles are deleted for the B, C, D snapshot Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> { - i.addedFiles().forEach(item -> { + i.addedFiles(table.io()).forEach(item -> { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); @@ -700,7 +700,7 @@ public void testWithExpiringStagedThenCherrypick() { // Make sure no dataFiles are deleted for the staged snapshot Lists.newArrayList(snapshotB).forEach(i -> { - i.addedFiles().forEach(item -> { + i.addedFiles(table.io()).forEach(item -> { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); @@ -714,7 +714,7 @@ public void testWithExpiringStagedThenCherrypick() { // Make sure no dataFiles are deleted for the staged and cherry-pick Lists.newArrayList(snapshotB, snapshotD).forEach(i -> { - i.addedFiles().forEach(item -> { + i.addedFiles(table.io()).forEach(item -> { Assert.assertFalse(deletedFiles.contains(item.path().toString())); }); }); @@ -762,7 +762,7 @@ public void testExpireOlderThanWithDelete() { Snapshot firstSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create one manifest", - 1, firstSnapshot.allManifests().size()); + 1, firstSnapshot.allManifests(table.io()).size()); rightAfterSnapshot(); @@ -772,7 +772,7 @@ public void testExpireOlderThanWithDelete() { Snapshot secondSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create replace manifest with a rewritten manifest", - 1, secondSnapshot.allManifests().size()); + 1, secondSnapshot.allManifests(table.io()).size()); table.newAppend() .appendFile(FILE_B) @@ -798,9 +798,9 @@ public void testExpireOlderThanWithDelete() { Assert.assertEquals("Should remove expired manifest lists and deleted data file", Sets.newHashSet( firstSnapshot.manifestListLocation(), // snapshot expired - firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete + firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete secondSnapshot.manifestListLocation(), // snapshot expired - secondSnapshot.allManifests().get(0).path(), // manifest contained only deletes, was dropped + secondSnapshot.allManifests(table.io()).get(0).path(), // manifest contained only deletes, was dropped FILE_A.path()), // deleted deletedFiles); @@ -821,7 +821,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { Snapshot firstSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create one manifest", - 1, firstSnapshot.allManifests().size()); + 1, firstSnapshot.allManifests(table.io()).size()); rightAfterSnapshot(); @@ -831,7 +831,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { Snapshot secondSnapshot = table.currentSnapshot(); Assert.assertEquals("Should replace manifest with a rewritten manifest", - 1, secondSnapshot.allManifests().size()); + 1, secondSnapshot.allManifests(table.io()).size()); table.newFastAppend() // do not merge to keep the last snapshot's manifest valid .appendFile(FILE_C) @@ -857,7 +857,7 @@ public void testExpireOlderThanWithDeleteInMergedManifests() { Assert.assertEquals("Should remove expired manifest lists and deleted data file", Sets.newHashSet( firstSnapshot.manifestListLocation(), // snapshot expired - firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete + firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete secondSnapshot.manifestListLocation(), // snapshot expired FILE_A.path()), // deleted deletedFiles); @@ -879,7 +879,7 @@ public void testExpireOlderThanWithRollback() { Snapshot firstSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create one manifest", - 1, firstSnapshot.allManifests().size()); + 1, firstSnapshot.allManifests(table.io()).size()); rightAfterSnapshot(); @@ -888,8 +888,8 @@ public void testExpireOlderThanWithRollback() { .commit(); Snapshot secondSnapshot = table.currentSnapshot(); - Set secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests()); - secondSnapshotManifests.removeAll(firstSnapshot.allManifests()); + Set secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io())); + secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io())); Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size()); table.manageSnapshots() @@ -928,7 +928,7 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { Snapshot firstSnapshot = table.currentSnapshot(); Assert.assertEquals("Should create one manifest", - 1, firstSnapshot.allManifests().size()); + 1, firstSnapshot.allManifests(table.io()).size()); rightAfterSnapshot(); @@ -937,8 +937,8 @@ public void testExpireOlderThanWithRollbackAndMergedManifests() { .commit(); Snapshot secondSnapshot = table.currentSnapshot(); - Set secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests()); - secondSnapshotManifests.removeAll(firstSnapshot.allManifests()); + Set secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io())); + secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io())); Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size()); table.manageSnapshots() diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 3f465fe72223..9d460276525a 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -269,14 +269,14 @@ public void testBinPackWithDeleteAllData() { Assert.assertEquals( "Data manifest should not have existing data file", 0, - (long) table.currentSnapshot().dataManifests().get(0).existingFilesCount()); + (long) table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount()); Assert.assertEquals("Data manifest should have 1 delete data file", 1L, - (long) table.currentSnapshot().dataManifests().get(0).deletedFilesCount()); + (long) table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount()); Assert.assertEquals( "Delete manifest added row count should equal total count", total, - (long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount()); + (long) table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount()); } @Test @@ -962,7 +962,8 @@ public void testAutoSortShuffleOutput() { .execute(); Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1); - Assert.assertTrue("Should have written 40+ files", Iterables.size(table.currentSnapshot().addedFiles()) >= 40); + Assert.assertTrue("Should have written 40+ files", + Iterables.size(table.currentSnapshot().addedFiles(table.io())) >= 40); table.refresh(); @@ -1223,7 +1224,7 @@ private List, Pair>> checkForOverlappingFiles(Table ta NestedField field = table.schema().caseInsensitiveFindField(column); Class javaClass = (Class) field.type().typeId().javaClass(); - Map> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles()) + Map> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles(table.io())) .collect(Collectors.groupingBy(DataFile::partition)); Stream, Pair>> overlaps = diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 40adb7d4c918..f30251e74001 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -129,7 +129,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); SparkActions actions = SparkActions.get(); @@ -143,7 +143,7 @@ public void testRewriteSmallManifestsNonPartitionedTable() { table.refresh(); - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size()); Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); @@ -183,7 +183,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); SparkActions actions = SparkActions.get(); @@ -207,7 +207,7 @@ public void testRewriteManifestsWithCommitStateUnknownException() { table.refresh(); // table should reflect the changes, since the commit was successful - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size()); Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); @@ -262,7 +262,7 @@ public void testRewriteSmallManifestsPartitionedTable() { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 4 manifests before rewrite", 4, manifests.size()); SparkActions actions = SparkActions.get(); @@ -284,7 +284,7 @@ public void testRewriteSmallManifestsPartitionedTable() { table.refresh(); - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size()); Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount()); @@ -347,7 +347,8 @@ public void testRewriteImportedManifests() throws IOException { .stagingLocation(temp.newFolder().toString()) .execute(); - Assert.assertEquals("Action should rewrite all manifests", snapshot.allManifests(), result.rewrittenManifests()); + Assert.assertEquals("Action should rewrite all manifests", + snapshot.allManifests(table.io()), result.rewrittenManifests()); Assert.assertEquals("Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); } finally { @@ -375,7 +376,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifests before rewrite", 1, manifests.size()); // set the target manifest size to a small value to force splitting records into multiple files @@ -395,7 +396,7 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { table.refresh(); - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size()); Dataset resultDF = spark.read().format("iceberg").load(tableLocation); @@ -430,7 +431,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); SparkActions actions = SparkActions.get(); @@ -447,7 +448,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { table.refresh(); - List newManifests = table.currentSnapshot().allManifests(); + List newManifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size()); Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0))); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java index 1391362823e1..f6d292f89f8b 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java @@ -181,7 +181,7 @@ private void writeAndValidateWithLocations(Table table, File location, File expe } Assert.assertEquals("Both iterators should be exhausted", expectedIter.hasNext(), actualIter.hasNext()); - table.currentSnapshot().addedFiles().forEach(dataFile -> + table.currentSnapshot().addedFiles(table.io()).forEach(dataFile -> Assert.assertTrue( String.format( "File should have the parent directory %s, but has: %s.", diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 7655b4b82f13..55605288b808 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -203,7 +203,7 @@ public void testSplitOptionsOverridesTableProperties() throws IOException { .mode("append") .save(tableLocation); - List files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles()); + List files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles(icebergTable.io())); Assert.assertEquals("Should have written 1 file", 1, files.size()); long fileSize = files.get(0).fileSizeInBytes(); @@ -327,7 +327,7 @@ public void testMetadataSplitSizeOptionOverrideTableProperties() throws IOExcept .mode("append") .save(tableLocation); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Must be 2 manifests", 2, manifests.size()); @@ -356,7 +356,7 @@ public void testDefaultMetadataSplitSize() throws IOException { HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); - tables.create(SCHEMA, spec, options, tableLocation); + Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation); List expectedRecords = Lists.newArrayList( new SimpleRecord(1, "a"), @@ -371,7 +371,7 @@ public void testDefaultMetadataSplitSize() throws IOException { int splitSize = (int) TableProperties.METADATA_SPLIT_SIZE_DEFAULT; // 32MB split size int expectedSplits = ((int) tables.load(tableLocation + "#entries") - .currentSnapshot().allManifests().get(0).length() + splitSize - 1) / splitSize; + .currentSnapshot().allManifests(icebergTable.io()).get(0).length() + splitSize - 1) / splitSize; Dataset metadataDf = spark.read() .format("iceberg") diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index ea6aaae5319b..d6f65af4d88b 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -148,9 +148,9 @@ public void testEntriesTable() throws Exception { Snapshot snapshot = table.currentSnapshot(); - Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests().size()); + Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests(table.io()).size()); - InputFile manifest = table.io().newInputFile(snapshot.allManifests().get(0).path()); + InputFile manifest = table.io().newInputFile(snapshot.allManifests(table.io()).get(0).path()); List expected = Lists.newArrayList(); try (CloseableIterable rows = Avro.read(manifest).project(entriesTable.schema()).build()) { // each row must inherit snapshot_id and sequence_number @@ -206,7 +206,7 @@ public void testEntriesTableDataFilePrune() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); - DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next(); List singleActual = rowsToJava(spark.read() .format("iceberg") @@ -233,7 +233,7 @@ public void testEntriesTableDataFilePruneMulti() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); - DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next(); List multiActual = rowsToJava(spark.read() .format("iceberg") @@ -261,7 +261,7 @@ public void testFilesSelectMap() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); - DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next(); List multiActual = rowsToJava(spark.read() .format("iceberg") @@ -308,7 +308,8 @@ public void testAllEntriesTable() throws Exception { .collectAsList(); List expected = Lists.newArrayList(); - for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::allManifests))) { + for (ManifestFile manifest : Iterables.concat( + Iterables.transform(table.snapshots(), s -> s.allManifests(table.io())))) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { // each row must inherit snapshot_id and sequence_number @@ -384,7 +385,7 @@ public void testFilesTable() throws Exception { .collectAsList(); List expected = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().dataManifests()) { + for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { @@ -441,7 +442,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { .collectAsList(); List expected = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().dataManifests()) { + for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { @@ -530,7 +531,7 @@ public void testFilesUnpartitionedTable() throws Exception { .save(loadLocation(tableIdentifier)); table.refresh(); - DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles()); + DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles(table.io())); // add a second file df2.select("id", "data").write() @@ -547,7 +548,7 @@ public void testFilesUnpartitionedTable() throws Exception { .collectAsList(); List expected = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().dataManifests()) { + for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { @@ -644,7 +645,9 @@ public void testAllDataFilesTable() throws Exception { actual.sort(Comparator.comparing(o -> o.getString(1))); List expected = Lists.newArrayList(); - for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::dataManifests))) { + Iterable dataManifests = Iterables.concat(Iterables.transform(table.snapshots(), + snapshot -> snapshot.dataManifests(table.io()))); + for (ManifestFile manifest : dataManifests) { InputFile in = table.io().newInputFile(manifest.path()); try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { @@ -897,7 +900,7 @@ public void testManifestsTable() { manifestTable.schema(), "manifests")); GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> + List expected = Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest -> builder .set("content", manifest.content().id()) .set("path", manifest.path()) @@ -967,7 +970,7 @@ public void testPruneManifestsTable() { GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct())); GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert( projectedSchema.findType("partition_summaries.element").asStructType(), "partition_summary")); - List expected = Lists.transform(table.currentSnapshot().allManifests(), manifest -> + List expected = Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest -> builder.set("partition_spec_id", manifest.partitionSpecId()) .set("path", manifest.path()) .set("partition_summaries", Lists.transform(manifest.partitions(), partition -> @@ -999,11 +1002,11 @@ public void testAllManifestsTable() { .mode("append") .save(loadLocation(tableIdentifier)); - manifests.addAll(table.currentSnapshot().allManifests()); + manifests.addAll(table.currentSnapshot().allManifests(table.io())); table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - manifests.addAll(table.currentSnapshot().allManifests()); + manifests.addAll(table.currentSnapshot().allManifests(table.io())); List actual = spark.read() .format("iceberg") diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java index f6ca5f7de9f4..cd1404766d46 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java @@ -156,7 +156,7 @@ private void checkSparkDataFile(Table table) throws IOException { table.refresh(); - List manifests = table.currentSnapshot().allManifests(); + List manifests = table.currentSnapshot().allManifests(table.io()); Assert.assertEquals("Should have 1 manifest", 1, manifests.size()); List dataFiles = Lists.newArrayList(); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 8bf57ba65ff1..5b158c518ae4 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -133,7 +133,7 @@ public void testBasicWrite() throws IOException { List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); Assert.assertEquals("Result rows should match", expected, actual); - for (ManifestFile manifest : table.currentSnapshot().allManifests()) { + for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) { for (DataFile file : ManifestFiles.read(manifest, table.io())) { // TODO: avro not support split if (!format.equals(FileFormat.AVRO)) { @@ -372,7 +372,7 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws Assert.assertEquals("Result rows should match", expected, actual); List files = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().allManifests()) { + for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) { for (DataFile file : ManifestFiles.read(manifest, table.io())) { files.add(file); } @@ -590,7 +590,7 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti Assert.assertEquals("Result rows should match", expected, actual); List files = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().allManifests()) { + for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) { for (DataFile file : ManifestFiles.read(manifest, table.io())) { files.add(file); } diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java index 3952019fb058..187dd4470a05 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java @@ -61,7 +61,7 @@ public void testRefreshCommand() { // Modify table outside of spark, it should be cached so Spark should see the same value after mutation Table table = validationCatalog.loadTable(tableIdent); - DataFile file = table.currentSnapshot().addedFiles().iterator().next(); + DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next(); table.newDelete().deleteFile(file).commit(); List cachedActual = sql("SELECT * FROM %s", tableName);