diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 7e7f2c6fe7c4..4cfccfbadccb 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -38,12 +38,11 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.ManifestReader; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; @@ -57,7 +56,9 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -267,17 +268,28 @@ public static List partitionDataFiles(Table table, Map public static Map> snapshotToDataFiles(Table table) throws IOException { table.refresh(); + Map> result = Maps.newHashMap(); - List manifestFiles = table.currentSnapshot().dataManifests(); - for (ManifestFile manifestFile : manifestFiles) { - try (ManifestReader reader = ManifestFiles.read(manifestFile, table.io())) { - List dataFiles = Lists.newArrayList(reader); - if (result.containsKey(manifestFile.snapshotId())) { - result.get(manifestFile.snapshotId()).addAll(dataFiles); - } else { - result.put(manifestFile.snapshotId(), dataFiles); - } + Snapshot current = table.currentSnapshot(); + while (current != null) { + TableScan tableScan = table.newScan(); + if (current.parentId() != null) { + // Collect the data files that was added only in current snapshot. + tableScan.appendsBetween(current.parentId(), current.snapshotId()); + } else { + // Collect the data files that was added in the oldest snapshot. + tableScan.useSnapshot(current.snapshotId()); + } + try (CloseableIterable scanTasks = tableScan.planFiles()) { + result.put(current.snapshotId(), ImmutableList.copyOf(Iterables.transform(scanTasks, FileScanTask::file))); + } + + // Continue to traverse the parent snapshot if exists. + if (current.parentId() == null) { + break; } + // Iterate to the parent snapshot. + current = table.snapshot(current.parentId()); } return result; }