-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Core: Add reference_snapshot_id column and filter to AllManifest table #4847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if (value == null) { | ||
| return null; | ||
| } | ||
| return Splitter.on(",") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Might be good to declare the splitter one time as a private static field, as opposed to constructing it every time (unless there's some serialization-related reason it has to be reinstantiated).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to Kyle's suggestion
|
Let me take a look. |
|
|
||
| package org.apache.iceberg; | ||
|
|
||
| public class ScanProperties { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking about the best way to support this. I agree about having a Spark read option but I wonder whether we should have scan options or some sort of useSnapshots(ids) method in TableScan. We can only support that method in ALL_* metadata tables and throw an exception otherwise.
Thoughts, @rdblue @szehon-ho @RussellSpitzer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't terrible, but I would much rather expose this in some other way. I don't think adding it to TableScan makes sense because it is very specific to this feature and would cause confusion in other cases. A data scan, for example, can't scan more than one snapshot.
What about exposing a reference_snapshot_id column (or with an _ for a metadata column)? If we did that, we could accept a filter from Spark, like reference_snapshot_id IN (1, 2, 3, 4) and special case the pushdown. We'd need to ensure that we give a consistent value for that column, which could be the latest snapshot that has the manifest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue yea , I considered that. My thought at the time is that same manifest may have reference in many snapshots, thus many references reference_snapshot_id, so it becomes messy. Currently I guess we don't do de-duplication in all-manifest table, but adding this column this would enforce that we return duplicates, one time per snapshot that references it. Also seems like this would require complex code like adding a ProjectionEvaluator for snapshot filtering, just for this one case. Not totally against it, and maybe this is a useful column even for outside users, just seems a bit harder to do in my view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A data scan, for example, can't scan more than one snapshot.
It is true but ALL_* tables, on the other hand, don't support useSnapshot, asOfTime and incremental features.
What about exposing a reference_snapshot_id column
I'd be open for that too. It will be the snapshot ID of the manifest list we read these manifests from, right? Adding a new column is easy. How can we do the filtering? We need to filter Snapshots having a generic filter Expression that can reference any columns. Is there an easy way to perform the filtering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think to perform the filtering we just need to go through all the snapshots and evaluate the expression on each one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially a custom expression visitor applied on Snapshot object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi, that, or we could just pattern match and reject anything but IN predicates. As long as we're the ones controlling what filters we use, we have some flexibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this makes sense to me, trying an implementation.
Not necessarily, we could return any one or maybe the latest one. There aren't that many snapshots, so we can figure out an ordering and return the latest. |
69cf23f to
364e5c4
Compare
|
@aokolnychyi change is ready (based on @rdblue suggestion) if you guys have time for another pass, thanks |
|
FYI @rdblue if you have time to take a look, as well, thanks |
|
I'll have time to take a look today. Sorry for the delay! |
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation looks correct to me. I left some minor comments.
Will take a look at the tests tomorrow.
| */ | ||
| private <T> Boolean compare(BoundReference<T> ref, | ||
| Literal<T> lit, | ||
| Function<Integer, Boolean> compareResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there a better name than compareResult for this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed, hope its a little better
|
|
||
| @Override | ||
| public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) { | ||
| Comparator<Object> longComparator = Comparators.forType(Types.LongType.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comparator<Object> looked suspicious at first but I think it should work.
|
Thanks for review, addressed the round of comments |
| * This table may return duplicate rows. | ||
| */ | ||
| public class AllManifestsTable extends BaseMetadataTable { | ||
| private static final int REF_SNAPSHOT_ID = 18; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we keeping this a constant because we refer to it again in the isSnapshotRef function? I just wonder because we don't do this for any of our other fields in the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea that's it actually, its used in two places. Cleaner just to put 18 in both?
|
|
||
| @Override | ||
| public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) { | ||
| return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked about these a bit offline, I think the gt/lt's here probably won't every be useful but I don't see a problem in including them
| public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) { | ||
| if (isSnapshotRef(ref)) { | ||
| Comparator<Object> longComparator = Comparators.forType(Types.LongType.get()); | ||
| boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this with
literalSet.contains(snapshotId)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think its the same in this case, but thought its safer to use the Comparator lookup for Long type to get the official Iceberg way to compare (if it ever differs from Java's comparator). I guess this is for all the comparisons in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm late to this review, but the literalSet will use the correct comparator for a given type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK works for me , I will make a follow up pr then!
| public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) { | ||
| if (isSnapshotRef(ref)) { | ||
| Comparator<Object> longComparator = Comparators.forType(Types.LongType.get()); | ||
| boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another call of "contains?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we should eventually update this. The set's contains method will be correct. It's probably not a big deal right now since there are so few snapshots.
| manifest.length(), | ||
| manifest.partitionSpecId(), | ||
| manifest.snapshotId(), | ||
| manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 0 may be the wrong thing here, shouldn't it be "null" if we don't have the info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is in the original schema in ManifestsTable, where the counts are required. I also did not notice that data file counts are optional in AllManifestsTable when I recently added counts for delete files. I think we should continue to use 0 for compatibility but I wonder why we have such a mismatch in nullability between these tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it is probably because we did not have those counts populated initially so there may be old manifests without that info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That means I have to switch to optional delete counts as well. I'll follow up with a PR for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd still use 0 for data files count when reading a delete manifest as null means unknown but we know there is no data file in a delete manifest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this is copy + add from ManifestsTable class (should have mentioned), so if we fix it can be in another pr.
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a few comments on using "Contains" in the SnapshotEvaluator but this looks close to me.
There are also number of formatting nits as well we should clean up before merging.
|
Let me take another look as well. |
| ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); | ||
|
|
||
| return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> { | ||
| SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if you want to keep it on one line, you could just use MANIFEST_FILE_SCHEMA in the constructor of the evaluator class (you have access to it there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might keep it as argument, the class is more re-usable if it takes the schema as an argument, was thinking we could add this for other metadata tables at some point.
| * This table may return duplicate rows. | ||
| */ | ||
| public class AllManifestsTable extends BaseMetadataTable { | ||
| private static final int REF_SNAPSHOT_ID = 18; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side comment / non-blocking: I'm a big fan of using named constants for schema element IDs.
Should we update to use named position constants throughout, particularly for schemas like the metadata tables that are evolving (note that content is listed first but has ID 14). But is the cherry-picking that would be required by a large diff like this considered an unnecessary change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I am not sure if there is consensus, Russell made a discussion in #4847 (comment) but didn't finished (and for the other constants, they are never referenced outside). But If so, we should do it in another pr as its unrelated.
| private <T> Boolean compareSnapshotRef(BoundReference<T> ref, Literal<T> lit, | ||
| Function<Integer, Boolean> desiredResult) { | ||
| if (isSnapshotRef(ref)) { | ||
| Literal<Long> longLit = lit.to(Types.LongType.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine it’s unlikely to happen as the query should throw before this, but any concern with potential null being passed in forlit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea it seems it's checked when trying to make the predicate:
java.lang.NullPointerException: Cannot create expression literal from null
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907)
at org.apache.iceberg.expressions.Literals.from(Literals.java:62)
at org.apache.iceberg.expressions.UnboundPredicate.<init>(UnboundPredicate.java:40)
at org.apache.iceberg.expressions.Expressions.equal(Expressions.java:175)
So i think it's not necessary, as it's pretty internal. Also, none of the other Evaluators check this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho, once the expression is bound, you are guaranteed that the lit corresponding to the ref will be a LongType because binding coerces the literal types to match the corresponding term. This shouldn't be needed, unless you're doing it to get a Long comparator because you have a long snapshotId.
|
|
||
| manifests.addAll(table.currentSnapshot().allManifests()); | ||
| Stream<Pair<Long, ManifestFile>> snapshotIdToManifests = | ||
| StreamSupport.stream(table.snapshots().spliterator(), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can you add an inline comment for what the false stands for? If its something like parallel or something then don’t worry about it, but usually it helps me reading the code to have the parameter name in-lined like , /* caseSensitive */ false);.
But again, thinking on it (and the usage of spliterator), if it’s parallel or something then feel free to ignore it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I usually do, but this one is just parallel and pretty common, so maybe skip this time? Would be great if StreamSupport has a default value.
|
Restarting as Flink test is somehow cancelled. |
af11c10 to
4786c08
Compare
|
Fixed and added to new Spark 3.3 test module, merged. Thanks all for review @kbendick @aokolnychyi @RussellSpitzer . We can open another pr if any comment is not resolved |
This is a pre-requisite for expire-snapshot performance improvement in : #4736