-
Notifications
You must be signed in to change notification settings - Fork 3k
Add IcebergStorageHandler #1107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@massdosage @cmathiesen . In what order should we review the work here? #933 , #1103 and then this PR ? |
Hello @rdsr! So, we discussed this with @rdblue and he asked us to try split the code coming out of Hiveberg into separate, discrete PRs where possible, even if some of the PRs didn't add anything that was directly useable. So that's what we've tried to do but only once all 3 are committed does this become really useful for end users. I would actually suggest doing the reviews in the reverse order and start with this one which is the simplest. Once we have this and #1103 merged in we can add a whole lot more HiveRunner tests to #933 which test the InputFormat, the SerDe and the StorageHandler all working together which would be really nice. |
|
Ah, I just realised this actually depends on the InputFormat and SerDe classes being present so we'd actually need #1103 merged first and then should either add this code to #933 or only look at this after that has been merged and then add the HiveRunner tests for the StorageHandler to this PR. So I'd say please look at #1103 first. |
|
|
||
| @Override | ||
| public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think is this the place we can pass in many of the options which Iceberg supports. E.g reading a specific snapshot, case insensitive match etc..
| @Override | ||
| public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc exprNodeDesc) { | ||
| DecomposedPredicate predicate = new DecomposedPredicate(); | ||
| predicate.residualPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a first implementation it is correct.
Maybe stating the obvious, but if we know that the Iceberg predicate will cover the expression fully, then we can return empty residualPredicate so we will not have an unnecessary filtering operation in Hive. Or if the expression filtering is covered by Iceberg partially then Hive should filter only for the residual filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge here is that Iceberg produces per-split residuals, so we would need to return them for each split, or find the common filter that needs to be applied to all splits.
There's also a question of whether Hive or Iceberg is better at doing the filtering. For vectorized reads, Iceberg may be better. But for row-based reads, engines are typically better. That's why we delegate final filtering to Spark, which will benefit from that engine's codegen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Iceberg push these filters to the specific file readers (ORC/Parquet)? Or this predicate pushdown is only for filtering out specific files and not the content of the files?
In Hive we found it very beneficial to push every filter to the readers. This is especially true for column based formats, like Parquet or ORC, where we do not have to deserialize the whole data if filtering already removes the unnecessary rows.
Thanks,
Peter
PS: Maybe this is not the best place to start this conversation. If you feel so, feel free to redirect it to the correct channel. Just started to familiarize myself with the workings of the Iceberg community.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have some prior work on how we intend to push filters down over here. The intention was to first get this "first implementation" merged in and then raise a subsequent PR to add improvements. I need to update this PR since the InputFormat was merged in (working on it as we speak). We'd definitely appreciate input on the subsequent PRs if there are better ways of interacting with Hive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing! I will follow the stuff around the HiveSerde and friends :)
As a first implementation it is definitely ok to return the whole expression as a residualPredicate. Hive just have to try to apply that again which is absolutely ok from correctness side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary, thanks for taking a look at these, it's great to have more people with knowledge of Hive participating!
|
|
||
| @Override | ||
| public Class<? extends OutputFormat> getOutputFormatClass() { | ||
| return HiveIgnoreKeyTextOutputFormat.class; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In table metadata, we use FileOutputFormat because it can't be instantiated, so any write attempted through a Hive table library would fail, instead of writing data that doesn't appear in the table. Does that need to be done here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, we haven't tried the write path at all but I agree that it would be better if it failed rather than silently doing nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option would be to create a HiveIcebergOutputFormat class that throws UnsupportedOperationExceptions, this is what they do in Delta's Hive connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like a good option. If we get to the phase when we have a working writer implementation, then if the correct writer is already specified then we do not have to recreate all of the tables. We just change the jars and everything should work like charm :)
|
|
||
| @Override | ||
| public String toString() { | ||
| return this.getClass().getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do other storage handlers do? Could this be a short name, like "iceberg"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a look at a few and most of them don't override this method. Hive's own JDBC storage handler returns the full class name as a string via a [Constant]( - https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/common/src/java/org/apache/hadoop/hive/conf/Constants.java#L62). So it could be OK like this, or we just remove the method.
|
This looks good to me, just a couple minor questions. I'll merge this to unblock next steps. Where does this fit in the overall plan for Hive support? Are tests going to be added next? What works right now and what doesn't? |
Well, this is actually a good milestone as we now have everything that's needed to be able to read Iceberg tables from Hive merged into master. Up next we were going to add back in the features like pushdowns, system tables, time travel reads etc. but they're all improvements, what is in Iceberg now should work end to end for the read path. One thing I'd like to change to make this easier to use is for the module to build an uber jar so you only have to add one jar to Hive's classpath instead of 6+ which is the case at the moment. Once that's done we also need to add documentation describing this all.
@guilload ended up adding the tests in #1192 - the main test class is TestHiveIcebergInputFormat. We were planning to flesh that out with more tests as we add in the above features etc. |
Maybe something like "STORED AS ICEBERG" should help to save the users some time if they are creating the Hive tables manually above existing Iceberg tables. See the same for JSON: https://issues.apache.org/jira/browse/HIVE-19899 - this would need a Hive change after the Iceberg release, but having IcebergStorageFormatDescriptor.java might be a part of the Iceberg project. |
That's a great idea, I'll look into it when I get a chance. |
|
Thanks for the update, @massdosage! If we have a working read path, then it would be awesome to start working on some docs commits to the site as well, although some of the configuration may change with updates to table resolution, #1155. |
This PR adds an
IcebergStorageHandlerthat bundles together all the interfaces needed to read Iceberg table from Hive. This would first require #933 and #1103 to be merged in as it requires the InputFormat and the SerDe classesWe are planning on adding tests using HiveRunner for this class once everything is merged in :D
@rdblue @rdsr @massdosage @teabot