Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
Expand All @@ -57,7 +56,9 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -267,17 +268,28 @@ public static List<DataFile> partitionDataFiles(Table table, Map<String, Object>

public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) throws IOException {
table.refresh();

Map<Long, List<DataFile>> result = Maps.newHashMap();
List<ManifestFile> manifestFiles = table.currentSnapshot().dataManifests();
for (ManifestFile manifestFile : manifestFiles) {
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
List<DataFile> dataFiles = Lists.newArrayList(reader);
if (result.containsKey(manifestFile.snapshotId())) {
result.get(manifestFile.snapshotId()).addAll(dataFiles);
} else {
result.put(manifestFile.snapshotId(), dataFiles);
}
Snapshot current = table.currentSnapshot();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to modify the old code rather than traversing snapshots and planning scans. All you need to do is to read the manifest entries and add the data files to the map using each entry's snapshot ID instead of the manifest's snapshot ID. You can use ManifestGroup.entries to get the entries.

Copy link
Member

@szehon-ho szehon-ho Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here ManifestEntry interface and methods are package-protected, hence they can't be used outside (As I mentioned over at the other thread, when I was also looking to fix the original code).

And so was wondering, whether the interface can be exposed as public? I was curious the original reason why it was kept package protected, as "Entries" metadata table is already publically exposed via Spark to users, and it's already publically documented on the spec.

Copy link
Contributor

@rdblue rdblue Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we don't expose the manifest entry is that it's confusing. We don't want users to read a manifest and assume that all of the entries represent files in the table because we track deleted files in the same metadata. So it is more that users would need to know more about the spec and we don't think that it is likely to be used correctly.

I'm still open to the idea of making this public. But if we don't need it then I'd opt not to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you mean. If you are curious, a real life use case for ManifestEntry we designed was a bit similar to this.

We were trying to build a Data Latency monitoring application that measures max data latency per partition. So we wanted to go through snapshots , then search reachable manifest entries for all ADDED data files matching a partition, and find the latest commit time from them all.

We end up trying to join snapshot + all_entries metadata tables, but due to perf issues and bugs with metadata tables aggregation, started to look at Table and ManifestReader API to explore a set of known snapshots/manifest files directly, as we kind of knew what time frame the partition was expected to land at latest. But without knowing the context of DataFile, this solution is hard to get working (DataFile in EXISTING mode resulting from metadata rewrite throws it off).

Not sure if there was an easier way to do it :)

while (current != null) {
TableScan tableScan = table.newScan();
if (current.parentId() != null) {
// Collect the data files that was added only in current snapshot.
tableScan.appendsBetween(current.parentId(), current.snapshotId());
} else {
// Collect the data files that was added in the oldest snapshot.
tableScan.useSnapshot(current.snapshotId());
}
try (CloseableIterable<FileScanTask> scanTasks = tableScan.planFiles()) {
result.put(current.snapshotId(), ImmutableList.copyOf(Iterables.transform(scanTasks, FileScanTask::file)));
}

// Continue to traverse the parent snapshot if exists.
if (current.parentId() == null) {
break;
}
// Iterate to the parent snapshot.
current = table.snapshot(current.parentId());
}
return result;
}
Expand Down