-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Inherit snapshot ids for manifest entries #675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Preconditions.checkArgument(manifest.snapshotId() == null, "Snapshot id must be assigned during commit"); | ||
|
|
||
| // TODO: avoid reading manifests to simply get stats | ||
| try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to collect more metadata while writing manifests so that we don't have to read manifests to simply get stats. Clearly, this kills all the benefits of inhering the snapshot id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to collect more metadata to build the summary without reading the passing manifest from file system?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can keep the summary in ManifestFile instead. For now, this is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we can use the manifest's summary stats for top-level properties. Partition-level properties are optional so we should just not include them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada, yes, I would like to avoid reading passed manifests for better performance.
@rdblue, could you elaborate on what you mean by top-level and partition-level properties? Do you mean changed-partition-count is optional while added-records, added-data-files and others aren't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we would produce all of the summary stats, but the most important ones are total-data-files, total-records, and the added- or deleted- properties that are used to produce totals. I think it's okay to not write the changed-partition-count metrics if they require scanning the appended manifest.
I think my response was confusing because we keep additional summary information about each partition in our version. I can move that upstream if everyone wants it, but it can make the metadata files quite large. Without a use case for doing this upstream, I didn't think it was a good idea to make everyone's metadata significantly larger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a few examples of what stats you collect per partition? One of our customers was asking about some stats about what partitions were modified, for example.
| conf: SerializableConfiguration, | ||
| spec: PartitionSpec, | ||
| basePath: String): Iterator[SparkDataFile] => Iterator[Manifest] = { files => | ||
| basePath: String): Iterator[SparkDataFile] => Iterator[ManifestFile] = { files => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use the real ManifestFile with all stats if we want to skip reading/writing manifests on the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as this works, I'm all for it. Can we do the same with DataFile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should work. I've created #763.
What about making DataFile and ManifestFile extend Serializable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether it's a good idea to make an interface Serializable. I'll have to think about that one.
|
|
||
| List<PartitionFieldSummary> summaries() { | ||
| return Lists.transform(Arrays.asList(fields), PartitionFieldStats::toSummary); | ||
| return Arrays.stream(fields).map(PartitionFieldStats::toSummary).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transformation has to be eager as PartitionFieldStats is not serializable.
| * @param file an InputFile | ||
| * @return a manifest reader | ||
| */ | ||
| public static ManifestReader read(InputFile file) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, am I correct that using read(InputFile file) is not safe only when we update the table schema and use some partition filters (as ManifestReader will build a partition spec based on the old schema stored in the metadata of the manifest)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And why is it actually safe to use it without filters? Won't we have the wrong partition in DataFile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct. The issue is when the manifest is read using the schema at the time the manifest was written. If a column used by a partition transform is renamed, then expression binding can fail.
It's okay to use it without filters because partition tuples are accessed by position, not by name. So an expression will be bound to the current partition schema.
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
|
|
||
| // keep reference of the first appended manifest, so that we can avoid merging first bin(s) | ||
| // which has the first appended manifest and have not crossed the limit of minManifestsCountToMerge | ||
| if (firstAppendedManifest == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for moving this out of the try/catch.
| deleteFile(newManifest.path()); | ||
| } | ||
|
|
||
| for (ManifestFile manifest : appendManifests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to clean appendManifestsWithMetadata in case of uncommitted? Or leave to the caller as you comment below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The appended manifests become part of the table, so there is no need to delete them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean even manifests that are not committed are part of the table? Or all appended manifests must be committed successfully so we don't have to care for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada, yes, all appended manifests must be successfully committed. They are not generated by Iceberg anymore. Instead, they are produced by users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One case we need to consider is when we add a manifest using merge append and it gets combined with other manifests/files in apply. In that case, the added manifest will never be part of the table and can become orphan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems reasonable to clean up the external manifest if the commit is successful but that manifest is not part of the table metadata in MergingSnapshotProducer. Otherwise, the caller will have to detect which of the appended was merged and delete only those to ensure we don't have orphan manifests. It is different from what we have right now because all manifests are copied before they are appended to the metadata. What do you think, guys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
| Preconditions.checkArgument(manifest.snapshotId() == null, "Snapshot id must be assigned during commit"); | ||
|
|
||
| // TODO: avoid reading manifests to simply get stats | ||
| try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to collect more metadata to build the summary without reading the passing manifest from file system?
|
I've updated this PR with some recent progress. I am working on tests. In addition, there are a couple of open points to discuss. |
|
Looks like this conflicts with the recent update to rewrite manifests. Can you update, @aokolnychyi? |
53abdbd to
e960b64
Compare
|
This PR is ready for a closer look. |
chenjunjiedada
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
I run a few basic benchmarks locally (e.g. add/read 100 manifests that are 275KB in size and contain 10000 entries). With this PR: Before this PR: Right now, we still read manifests to get stats. I think we can address that in a follow-up PR. The important part is there is no performance degradation on read. |
|
@aokolnychyi, the implementation looks correct to me. My only remaining concern is our forward-compatibility guarantee that older readers will continue to be able to read tables written by future versions within the format version in metadata. Technically, this breaks that guarantee for tables that are appended to using manifests. I think that means we should test that older readers can read tables written with this that don't have appended manifests (I think Avro will throw a runtime error if it encounters a null snapshot ID, but will allow attempting to read). We should also probably add a feature flag to turn on this breaking behavior -- that way you can opt into using manifest files without a snapshot ID, knowing that it will break older readers. What do you think? |
|
I started a thread on the dev list to discuss breaking changes to the format and how to handle them. I proposed the feature flag approach. |
|
The feature flag approach makes sense to me in this case. Then what about exposing a table property to enable metadata inheritance (false by default) and rewriting appended manifests if that property is false? That will complicate the logic for cleanup but it will keep the format forward-compatible by default. |
That's what I was thinking. I don't think we should add a flag that turns on or off inheritance, though. When reading, we must always add the inherited data. Otherwise we can corrupt other places in metadata. I'd prefer a specific flag to allow writing manifests without snapshot IDs. If it is allowed, then we can append manifests and otherwise we have to rewrite them. |
|
Sorry, that’s what I meant by enabling inheritance as well. |
| * @param specsById a Map from spec ID to partition spec | ||
| * @return a manifest reader | ||
| */ | ||
| public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract this API in a separated PR and merge first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented the feature flag locally. As soon as #738 is in, I’ll update this PR and I think we will merge it quickly as well. If this is already blocking your work, I can extract the API changes in ManifestReader. Let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in! Sorry for not getting it in sooner since it was blocking this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi , Thanks for the update. I have merged with some of this patch locally and it works well for now. Please take the time and go ahead with this PR. I may need some time to finish mine since we are on holiday season:)
99cb763 to
18e0d98
Compare
|
@chenjunjiedada @rdblue, I've updated the PR and got rid of reading manifests if we can inherit the snapshot id. Also, there is a feature flag now. Let me know what you think. |
| protected void cleanUncommitted(Set<ManifestFile> committed) { | ||
| cleanUncommitted(newManifests, committed); | ||
| cleanUncommitted(addedManifests, committed); | ||
| if (!snapshotIdInheritanceEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? I thought that added manifests are owned by the table only if inheritance is allowed. That would mean that added manifests are only removed if inheritance is allowed, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If inheritance is enabled, addedManifests contains original manifests that should not be removed no matter what the operation outcome is. If the commit fails, the caller can retry. If the commit succeeds, the manifests are part of the metadata now.
If inheritance is not enabled, addedManifests will contain manifest copies. Those must be always cleaned up as the caller doesn't have access to them.
I tried to summarize that in the description to AppendFiles.
By default, the manifest will be rewritten to assign all entries this update's snapshot ID.
In that case, it is always the responsibility of the caller to manage the lifecycle of
the original manifest.If manifest entries are allowed to inherit the snapshot ID assigned on commit, the manifest
should never be deleted manually if the commit succeeds as it will become part of the table
metadata and will be cleaned up on expiry. If the manifest gets merged with others while
preparing a new snapshot, it will be deleted automatically if this operation is successful.
If the commit fails, the manifest will never be deleted and it is up to the caller whether
to delete or reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like in the other cleanup method, should this check whether committed is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so I think the difference between this and the logic in MergingSnapshotProducer is that this won't compact and rewrite the committed manifests. That means that if the commit succeeded, then all the added manifests are part of the table (so we know they will all be in the committed list).
Here's a quick summary:
- Inheritance is enabled
- Commit succeeded - do not delete, the files are owned and part of the table
- Commit failed - do not delete, the files are not owned
- Inheritance is not enabled - added manifests are rewritten and are owned
- Commit succeeded - run normal manifest cleanup (rely on the committed set)
- Commit failed - run normal manifest cleanup (committed set will be empty)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the summary is correct. In MergingSnapshotProducer, though, we can merge the appended manifest while preparing a new snapshot and it will never be part of the table metadata. That's why we have an extra loop through added manifests that were not rewritten.
core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
| appendedManifest = copiedManifest; | ||
| } | ||
|
|
||
| // keep reference of the first appended manifest, so that we can avoid merging first bin(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll need to double-check this logic after I introduced separate lists for rewritten manifests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should work fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I just realized that the appended manifests are added to metadata before the rewritten append manifests. That means that this should actually be the first appended manifest or the first rewritten if all were rewritten. The only case we have to worry about is when the first manifest is rewritten, but a manifest with a null snapshot ID is added later.
It's probably okay to move on since it would be extremely rare and the only problem would be that a bin might get merged when it otherwise wouldn't have been. Not a big problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I had that in mind but decided to go for simplicity as the use case would be extremely rare and there will be no correctness/performance issue.
core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
Outdated
Show resolved
Hide resolved
| validateFilesCounts(); | ||
|
|
||
| // TODO: add sequence numbers here | ||
| Iterable<ManifestFile> newManifestsWithMetadata = Iterables.transform( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This place requires attention. When we introduce sequence numbers, we will have to iterate through all new manifests. For now, iterating through newManifests and rewrittenAddedManifests is redundant. However, I expect we will add sequence number quickly and we will simply need to change the closure.
| private final org.apache.avro.Schema schema; | ||
| private Status status = Status.EXISTING; | ||
| private long snapshotId = 0L; | ||
| private Long snapshotId = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure: have you checked that older readers can read files produced with optional instead of required snapshotIds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I tested this locally.
| firstAppendedManifest = manifestFile; | ||
| } | ||
| ManifestFile appendedManifest; | ||
| if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like snapshots that were written with a version that used -1 will still rewrite those snapshots so we preserve the existing behavior. +1.
|
+1 I had a few minor comments and I'd like to change the property name, but I think that this is ready to go overall. @aokolnychyi, can you make the property name change and merge? |
|
I resolved the remaining comments. I'll do some additional testing. If everything is fine, I'll merge it. Let me know if there are any other comments in the meantime. |
fd4373e to
9953b58
Compare
|
I am going to merge this one. |
|
Thanks for the review, @rdblue and @chenjunjiedada! |
|
Thank you for building the framework for inheritance! This is really helpful. |
This PR addresses #504.