-
Notifications
You must be signed in to change notification settings - Fork 2.9k
[WIP] V4 Manifest Read Support #14533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Introduces the foundational types for V4 manifest format support: - TrackedFile interface as unified representation for all V4 entry types - DeletionVector and ManifestStats interfaces - GenericTrackedFile implementation and test
| * Manifest deletion vector entry (V4+ only) - marks entries in a manifest as deleted without | ||
| * rewriting the manifest. | ||
| */ | ||
| MANIFEST_DV(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the option of having the DV located in a field of the data or delete manifest record. That way we don't have to wait to find the DV before processing a manifest file. Not sure what others think here, but since the DV metadata/content is likely going to be different between the Metadata DV (inline) and Data DV (stored in Puffin), I don't see much value in trying to reuse metadata fields for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my preference too, and I advocated for it in our community discussion but this is what we settled on. Our current v4 proposal specifically uses MANIFEST_DV as a separate content type that references manifests via the referenced_file field. We can certainly change it, but want to hear from others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just check with Amogh and I don't think that this has been firmly decided yet. From the implementation here, I think we should embed the DV in manifest-specific metadata.
| * <p>When present, the deletion vector is stored inline in the manifest rather than in a separate | ||
| * Puffin file. | ||
| */ | ||
| ByteBuffer inlineContent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this in my comment below, but I don't think there's much value in combining the inline MDV metadata and fields to track data DVs stored in Puffin. These aren't overlapping, so I'd keep them separate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data and metadata DVs seemed like similar concepts to me. But I don't mind changing it.
| 100, | ||
| "location", | ||
| Types.StringType.get(), | ||
| "Location of the file. Optional if content_type is 5 and deletion_vector.inline_content is not null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not using a separate entry for inline would make this required, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
| 104, | ||
| "file_size_in_bytes", | ||
| Types.LongType.get(), | ||
| "Total file size in bytes. Must be defined if location is defined"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
| * <p>Contains status, snapshot ID, sequence numbers, and first-row-id. Optional - may be null if | ||
| * tracking info is inherited. | ||
| */ | ||
| TrackingInfo trackingInfo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this also requires a field to be defined, so we have a record of the ID used for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Will add it as soon as we define it. Right now it's TBD in the proposal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just assign it now and update the proposal. I also have example assignments in my exploration if you want to be consistent:
fn tracked_file_write_schema(
table_schema: &Schema,
stats_modes: Option<&HashMap<FieldId, StatsMode>>,
) -> Schema {
let default = HashMap::from_iter((0..=32).into_iter().map(|id| (id, DEFAULT_STATS_MODE)));
let modes: &HashMap<FieldId, StatsMode> = stats_modes.unwrap_or_else(|| &default);
StructType::new([
required("tracking_info", tracking_info_schema(), 147), // TODO: assigned ID
required("content", DataType::INTEGER, 134), // now required!
required("location", DataType::STRING, 100),
required("file_format", DataType::STRING, 101),
required("record_count", DataType::LONG, 103),
required("file_size_in_bytes", DataType::LONG, 104),
required(
"content_stats",
content_stats_schema(table_schema, modes),
146,
), // TODO: ID is from stats proposal
optional("key_metadata", DataType::BINARY, 131),
optional("split_offsets", ArrayType::new(DataType::LONG, false), 132), // TODO: missing element ID (133)
optional("content_slice", slice_schema(), 148), // TODO: assigned ID
optional("referenced_file", DataType::STRING, 143),
optional("manifest_stats", manifest_stats_schema(), 149), // TODO: assigned ID
optional("min_sequence_number", DataType::LONG, 516),
])
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is content_slice?
| * @throws IllegalStateException if content_type is not DATA | ||
| * @throws UnsupportedOperationException if ContentStats not yet implemented | ||
| */ | ||
| DataFile asDataFile(PartitionSpec spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we want to pass in the spec, since the record contains an ID. Wouldn't it be better to pass in a map of specs by ID when reading manifests so that this is already known when adapting to DataFile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I thought about it, but was not sure which would be cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this API, I think it is cleaner to call asDataFile() without any arguments.
| DeleteFile asDeleteFile(PartitionSpec spec); | ||
|
|
||
| /** Set the status for this tracked file entry. */ | ||
| void setStatus(TrackingInfo.Status status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API should not expose any setter methods. The implementation used can, if needed, for things like inherited metadata. But the API interface itself should not force implementations to be mutable. In general, we want to think of the API interfaces as immutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it as immutable initially, and added mutability while implementing inheritable metadata. That means that in places which need mutability, we will need to downcast it, which is fine I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I didn't realize that we had added the setter methods to the ManifestEntry interface. Looks like that was probably allowed because ManifestEntry is in core and not exposed in the public API. Instead, DataFile and DeleteFile are in the API.
Here, I think we need to be more careful. This is very likely going to be in API along side DataFile, so it should be an immutable interface. To avoid downcasting, the manifest reader should configure its Parquet reader to produce the concrete class, GenericTrackedFile. That class can be mutable so the reader can pass instances to InheritableTrackedMetadata and then the reader should return those instances as TrackedFile (because it is CloseableIterable<TrackedFile>). That shouldn't require casting.
| */ | ||
| public interface TrackingInfo { | ||
| /** Status of an entry in a tracked file */ | ||
| enum Status { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this enum already defined somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in ManifestEntry.Status, but it is in core. For v4, we need it in the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, this is fine for now. We may want to move it later, but we'll see what makes sense.
| @Override | ||
| public CloseableIterable<FileScanTask> doPlanFiles() { | ||
| Snapshot snapshot = snapshot(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Avoid unnecessary whitespace changes. They cause conflicts.
| : 2; | ||
|
|
||
| if (formatVersion >= 4) { | ||
| return planV4Files(snapshot, io); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than modifying data table scan right now, let's leave this out. We don't need to plug anything into table scans at this point, since that is just a configuration API.
| * | ||
| * <p>Use this method to copy data without stats when collecting files. | ||
| */ | ||
| F copyWithoutStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional that we removed copyWithStats(Set<Integer> requestedColumnIds) from ContentFile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be supported. I left it out for now because the stats was still being defined by @nastra.
| implements TrackedFile<GenericTrackedFile>, | ||
| IndexedRecord, | ||
| StructLike, | ||
| SpecificData.SchemaConstructable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for Avro interfaces since we no longer use Avro generics to read metadata. You can remove IndexedRecord because it is replaced by StructLike and remove SpecificData.SchemaConstructable because that's handled by reflection in the readers.
| import org.apache.iceberg.util.ByteBuffers; | ||
|
|
||
| /** Generic implementation of {@link TrackedFile} for V4 manifests. */ | ||
| public class GenericTrackedFile extends SupportsIndexProjection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The prefix Generic was used to identify Avro generic classes that are compatible with IndexedRecord. Now that we have replaced IndexedRecord with StructLike, I'd recommend renaming this and other classes to TrackedFileStruct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I don't think that this class should be public. Let's keep everything package-private right now.
| ManifestStats.ADDED_ROWS_COUNT.asOptional(), | ||
| ManifestStats.EXISTING_ROWS_COUNT.asOptional(), | ||
| ManifestStats.DELETED_ROWS_COUNT.asOptional(), | ||
| ManifestStats.MIN_SEQUENCE_NUMBER.asOptional()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each of the sub-structs should be its own struct type (like ManifestStatsStruct and TrackingInfoStruct) that is read and written the same way that this class is. The TrackedFile schema should embed each sub struct as a field.
| private Object contentStats = null; | ||
|
|
||
| // Cached schema for Avro | ||
| private transient Schema avroSchema = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this.
| ManifestStats.MIN_SEQUENCE_NUMBER.asOptional()); | ||
|
|
||
| /** Used by Avro reflection to instantiate this class when reading manifest files. */ | ||
| public GenericTrackedFile(Schema avroSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove this along with the other Avro artifacts.
| * @param partitionSpecId the partition spec ID | ||
| * @param recordCount the number of records | ||
| */ | ||
| public GenericTrackedFile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this is a "full constructor". I'm also not sure that we need this constructor at all. It depends on whether we create TrackedFile instances directly and I doubt that we will. I think it is more likely that we will use a wrapper on the write path to write DataFile as a TrackedFile (similar to the V3Metadata wrapper classes). In that case, this class only needs to be used in the read path.
Let's remove this constructor for now, unless you need a complete one for test purposes (in which case, it should be package-private).
| * @param toCopy a tracked file to copy | ||
| * @param copyStats whether to copy stats | ||
| */ | ||
| private GenericTrackedFile(GenericTrackedFile toCopy, boolean copyStats) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For v4, we can remove the paths that use a copyStats boolean. Instead, we should use a set of field IDs to identify the fields to copy, much like the newer copy API.
| private String referencedFile = null; | ||
|
|
||
| // Manifest stats (for manifest entries) | ||
| private Integer addedFilesCount = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These fields are required in v4, so they should be primitives in ManifestStatsStruct.
|
|
||
| // Deletion vector fields | ||
| private Long deletionVectorOffset = null; | ||
| private Long deletionVectorSizeInBytes = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be required in the DV struct, since the struct itself is optional.
| } | ||
|
|
||
| @Override | ||
| public TrackingInfo trackingInfo() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Structs contained within TrackedFile should be actual structs.
| } | ||
|
|
||
| @Override | ||
| public void put(int i, Object v) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed, same as getSchema.
| * represent data files, delete files, or manifest references. TODO: implement caching. | ||
| */ | ||
| class V4ManifestReader extends CloseableGroup implements CloseableIterable<TrackedFile<?>> { | ||
| static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original ManifestReader allows callers to project columns by calling select(String... columns), but this isn't a very useful API because the majority of the time that a manifest is being projected, it is by the Iceberg library for a specific purpose. That's why the arguments to select are usually "*", BaseScan.SCAN_WITH_STATS_COLUMNS, or BaseScan.SCAN_COLUMNS. And now that we need to project columnar stats based on fields used in filters, the last two constants are no longer useful.
I think that the v4 implementation should provide more useful projection capabilities:
- By default, all columns are projected, but without needing to check for
* - A filter-based stats projection that is automatic, based on the filters pushed down to this reader
- A custom stats projection like
projectStats(Set<Integer> fieldIds)that sets the content stats projection (to be unioned with the fields from Support dateCreated expressions in ScanSummary. #2) - A
project(Schema)method that can perform more fine-grained projection when the reader is used in metadata table scans and other custom scan situations (likeManifestFiles.readPaths).
Removing select from the API allows us to simplify the logic in this reader. We don't need to keep lists of fields to pass in or detect when both select and project are called.
| private TrackedFile<?> manifestDV = null; | ||
|
|
||
| protected V4ManifestReader( | ||
| InputFile file, InheritableTrackedMetadata inheritableMetadata, Long manifestFirstRowId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should still accept specsById to set the spec for each data or delete file. We don't want to require callers to do something like this that requires the caller to correctly find a spec:
trackedFile.asDataFile(specsById.get(trackedFile.specId()));| return this; | ||
| } | ||
|
|
||
| public CloseableIterable<TrackedFile<?>> entries() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In v4, we are inverting the relationship between an entry and a file. Now, a tracked file contains its "entry" metadata as tracking info. The purpose of that change is to avoid needing additional methods to get the "entries" versus getting the "files".
The reader can now be simplified. It needs two read capabilities: allFiles and liveFiles (which filters out removed/deleted files). The ClosableIterable interface should use liveFiles because that's the simplest generic contract: when a manifest is read you don't return deleted files unless they are requested.
| private CloseableIterable<TrackedFile<?>> open(Collection<String> cols) { | ||
| Schema projection = buildProjection(cols); | ||
|
|
||
| FileFormat format = FileFormat.fromFileName(file.location()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens in the old reader because there was no reason to plumb the format through. In v4, this should be passed into the manifest reader instead.
| addCloseable(entries); | ||
|
|
||
| CloseableIterable<TrackedFile<?>> transformed = | ||
| CloseableIterable.transform(entries, inheritableMetadata::apply); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All filtering should happen before modifying rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this is not true. Row ID assignment needs to happen first, then rows can be filtered.
|
|
||
| transformed = CloseableIterable.transform(transformed, rowIdAssigner(manifestFirstRowId)); | ||
|
|
||
| transformed = assignPositions(transformed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't safe to assign positions like this because this can't account for row group skipping. Instead, this should project the row position using the metadata column (adding MetadataColumns.ROW_POSITION in the schema) and updating the set method of StructLike to handle the position.
|
|
||
| @Override | ||
| public TrackedFile<?> apply(TrackedFile<?> entry) { | ||
| entry.setPos(position++); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use the result of the ++ operator in Iceberg because it is confusing. It is much more clear to increment and then use the value or to use the value and then increment:
entry.setPos(position);
position++;|
|
||
| private static Function<TrackedFile<?>, TrackedFile<?>> rowIdAssigner(Long firstRowId) { | ||
| if (firstRowId == null) { | ||
| return entry -> entry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the behavior of row ID assignment in a subtle but incorrect way. When the ID passed in is NULL, the behavior is to explicitly set all first_row_id values to NULL. This requirement was added for updating from v2 to v3.
|
|
||
| @Override | ||
| public TrackedFile<?> apply(TrackedFile<?> entry) { | ||
| if (entry.contentType() == FileContent.DATA) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic here is mostly correct, but can be improved. I agree that first_row_id is only assigned if the file is a data file. It should also only be assigned in v4 if this is a leaf manifest -- so we probably don't even want to run an ID assigner on the root manifest because the root be written with explicit first_row_id values, just like the manifest list is in v3.
The main issue that I see here is that this only assigns first_row_id if the tracking info is non-null. There should be no case where tracking info is null. For any file, the only time tracking info should be null is when it is not yet written to a manifest or is to be written to a manifest (and so tracking info will be supressed/overwritten). There's no case where it should be null in this class because this is reading from a manifest.
The only exception to that (the paragraph just above) is when reading with a custom projection schema -- meaning that the file is being read with a projection through a metadata table. For example:
SELECT location, tracking_info.first_row_id FROM db.table.data_filesIn cases like this, we need to ensure that the necessary information to produce first_row_id is present. In the current reader, we always project first_row_id and I would suggest always tracking the entirety of tracking_info so that we know problems from the wrong first_row_id will not be possible. We can always project more fields than requested, so I'd just make sure all fields of tracking_info are projected any time this reader is used.
If tracking_info is assumed to always be present, then this should not check whether it is here. Instead, this should assume it is non-null and throw an NPE if that assumption is violated.
| try { | ||
| bitmap.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); | ||
| } catch (IOException e) { | ||
| throw new RuntimeIOException(e, "Failed to deserialize Roaring bitmap from manifest DV"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer the built-in UncheckedIOException to RuntimeIOException.
|
|
||
| ByteBuffer buffer = dvInfo.inlineContent(); | ||
| byte[] bytes = new byte[buffer.remaining()]; | ||
| buffer.asReadOnlyBuffer().get(bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ByteBuffers.toByteArray instead of copying directly.
| return this; | ||
| } | ||
|
|
||
| public V4ManifestReader withDeletionVector(TrackedFile<?> dv) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine for now, but if the DV is tracked by the same entry as the manifest itself, then this should be passed in through the constructor.
| entries, | ||
| entry -> { | ||
| TrackingInfo tracking = entry.trackingInfo(); | ||
| return tracking == null || tracking.status() != TrackingInfo.Status.DELETED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that tracking should never be null here as well.
| entry -> { | ||
| Long pos = entry.pos(); | ||
| // positions are 0-based and should not exceed Integer.MAX_VALUE | ||
| return pos == null || !deletedPositions.contains(pos.intValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is a bad practice to assume a row should be filtered (or some other action) based on a missing value like this, similar to the missing tracking_info cases below.
The problem is that this will cause silent data loss if pos is incorrectly projected as null because of a bug in another part of the codebase. This filter depends on having a valid pos so it should fail if pos is null rather than choosing to skip data.
| } catch (IOException e) { | ||
| throw new RuntimeIOException(e, "Failed to deserialize Roaring bitmap from manifest DV"); | ||
| } | ||
| return bitmap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: In Iceberg, control flow blocks should be separated from the statements following them by an empty newline.
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
Thanks for the feedback, Ryan. I will send a new revision. |
WIP PR for s.apache.org/iceberg-single-file-commit
Implemented so far: