-
Notifications
You must be signed in to change notification settings - Fork 8
[WIP] StorageHandler and PredicatePushdown #11
Conversation
| shell.execute("CREATE DATABASE source_db"); | ||
| shell.execute(new StringBuilder() | ||
| .append("CREATE TABLE source_db.table_a ") | ||
| .append("ROW FORMAT SERDE 'com.expediagroup.hiveberg.IcebergSerDe' ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worth having a test which uses the INPUTFORMAT directly (i.e. the current test) and then another one which tests the storage handler? So we make sure that both use cases work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good shout
src/main/java/com/expediagroup/hiveberg/IcebergFilterFactory.java
Outdated
Show resolved
Hide resolved
src/main/java/com/expediagroup/hiveberg/IcebergFilterFactory.java
Outdated
Show resolved
Hide resolved
src/main/java/com/expediagroup/hiveberg/IcebergFilterFactory.java
Outdated
Show resolved
Hide resolved
massdosage
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm going to need you to walk me through this. Will see when I can put in some time for that.
I don't know whether this will be helpful at all but I know once a upon a long time ago we did something along these lines for a custom InputFormat that we wrote for dealing with ORC files in Cascading where we also wanted to push predicates down to the underlying files. The main code for that is at https://github.com/HotelsDotCom/corc/blob/eb4fc4f462ae9243daeebaa0640d65085e3c8d98/corc-core/src/main/java/com/hotels/corc/mapred/CorcInputFormat.java and you can also look in the "sarg" package for some related classes. It might also be a good idea to ask Dave M to review this as he may remember some useful things (although it was a long time ago).
pom.xml
Outdated
| <version>${iceberg.version}</version> | ||
| <classifier>all</classifier> | ||
| </dependency> | ||
| <!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to put these links in, should be self-evident from the Maven co-ordinates.
pom.xml
Outdated
| <artifactId>avro</artifactId> | ||
| <version>1.9.2</version> | ||
| </dependency> | ||
| <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
|
|
||
| IcebergFilterFactory () {} | ||
|
|
||
| public static Expression getFilterExpression(SearchArgument sarg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public static Expression getFilterExpression(SearchArgument sarg) { | |
| public static Expression generateFilterExpression(SearchArgument sarg) { |
| case OR: | ||
| return or(recurseExpressionTree(tree.getChildren().get(0), leaves), | ||
| recurseExpressionTree(tree.getChildren().get(1), leaves)); | ||
| case AND: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle case where there is more than 2 children from the AND operator
teabot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great to also update README with feature list and limitations. Example for this PR:
Features
- Storage handler implementation for simplified creation of Iceberg tables
- Predicate pushdown of Hive SQL WHERE clause to file reader (Parquet only)
| @Override | ||
| public void configureInputJobProperties(TableDesc tableDesc, | ||
| Map<String, String> jobProperties) { | ||
| // do nothing by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call super?
| // do nothing by default | |
| super.configureInputJobProperties(tableDesc, jobProperties); |
| @Override | ||
| public void configureOutputJobProperties(TableDesc tableDesc, | ||
| Map<String, String> jobProperties) { | ||
| // do nothing by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call super?
| // do nothing by default | |
| super.configureOutputJobProperties(tableDesc, jobProperties); |
| @Override | ||
| public void configureTableJobProperties(TableDesc tableDesc, | ||
| Map<String, String> jobProperties) { | ||
| //do nothing by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call super?
| //do nothing by default | |
| super.configureTableJobProperties(tableDesc, jobProperties); |
|
|
||
| @Override | ||
| public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { | ||
| //do nothing by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call super?
| //do nothing by default | |
| super.configureJobConf(tableDesc, jobConf); |
| * @param jobConf - job configuration for InputFormat to access | ||
| * @param deserializer - deserializer | ||
| * @param exprNodeDesc - filter expression extracted by Hive | ||
| * @return - decomposed predicate that tells Hive what parts of the predicate are handled by the StorageHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * @return - decomposed predicate that tells Hive what parts of the predicate are handled by the StorageHandler | |
| * @return Decomposed predicate that tells Hive what parts of the predicate are handled by the StorageHandler |
|
|
||
| /** | ||
| * Extract and serialize the filter expression and add it to the Configuration for the InputFormat to access. | ||
| * @param jobConf - job configuration for InputFormat to access |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * @param jobConf - job configuration for InputFormat to access | |
| * @param jobConf Job configuration for InputFormat to access |
| /** | ||
| * Extract and serialize the filter expression and add it to the Configuration for the InputFormat to access. | ||
| * @param jobConf - job configuration for InputFormat to access | ||
| * @param deserializer - deserializer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * @param deserializer - deserializer | |
| * @param deserializer Deserializer |
| * Extract and serialize the filter expression and add it to the Configuration for the InputFormat to access. | ||
| * @param jobConf - job configuration for InputFormat to access | ||
| * @param deserializer - deserializer | ||
| * @param exprNodeDesc - filter expression extracted by Hive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * @param exprNodeDesc - filter expression extracted by Hive | |
| * @param exprNodeDesc Filter expression extracted by Hive |
| assertEquals(actual.left().op(), expected.left().op()); | ||
| } | ||
|
|
||
| /*@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was testing out some more complex tests but haven't finished on the more comprehensive tests yet - will take out for now and probably do another PR with more tests (when i look at bigger tables + join statements etc)
|
|
||
| public CloseableIterable<Record> createReader(DataFile file, FileScanTask currentTask, InputFile inputFile, Schema tableSchema, boolean reuseContainers) { | ||
| public CloseableIterable<Record> createReader(DataFile file, FileScanTask currentTask, InputFile inputFile, | ||
| Schema tableSchema, boolean reuseContainers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ORC reader, perhaps add a comment advising the reader to look here to determine when this can be wired up for ORC: apache/iceberg#787
Addressed your other review comments, going to work on an update to the README then will ping again for a final check |
README.md
Outdated
| ## Features | ||
| ### IcebergInputFormat | ||
|
|
||
| To use the `IcebergInputFormat` you need to write a Hive query similar to: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| To use the `IcebergInputFormat` you need to write a Hive query similar to: | |
| To use the `IcebergInputFormat` you need to create a Hive table using DDL similar to: |
README.md
Outdated
| ``` | ||
|
|
||
| ### IcebergStorageHandler | ||
| This is implemented as a simplified option for creating Hiveberg tables. The Hive query should instead look like: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| This is implemented as a simplified option for creating Hiveberg tables. The Hive query should instead look like: | |
| This is implemented as a simplified option for creating Hiveberg tables. The Hive DDL should instead look like: |
README.md
Outdated
| ``` | ||
|
|
||
| ### Predicate Pushdown | ||
| Pushdown of the HiveSQL `WHERE` clause has been implemented so that filter's are pushed to the Iceberg `TableScan` level as well as the Parquet `Reader`. ORC implementations are still in the works. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Pushdown of the HiveSQL `WHERE` clause has been implemented so that filter's are pushed to the Iceberg `TableScan` level as well as the Parquet `Reader`. ORC implementations are still in the works. | |
| Pushdown of the HiveSQL `WHERE` clause has been implemented so that filters are pushed to the Iceberg `TableScan` level as well as the Parquet `Reader`. ORC implementations are still in the works. |
pom.xml
Outdated
| <groupId>org.apache.hive</groupId> | ||
| <artifactId>hive-serde</artifactId> | ||
| <version>2.3.4</version> | ||
| <version>2.3.6</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a hive.version property up above and reference it here and on line 78 so we only have one place to update it in future.
|
|
||
| public class IcebergFilterFactory { | ||
|
|
||
| IcebergFilterFactory () {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be made private?
|
|
||
| /** | ||
| * Remove first 2 nodes already evaluated and return an array of the evaluated leftover nodes. | ||
| * @param allChildNodes - all child nodes to be evaluated for the AND expression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * @param allChildNodes - all child nodes to be evaluated for the AND expression. | |
| * @param allChildNodes All child nodes to be evaluated for the AND expression. |
(applies to all other javadoc in this PR, remove the - and capitalise the first letter)
| @@ -0,0 +1,153 @@ | |||
| /** | |||
| * Copyright (C) 2020 Expedia, Inc and the Apache Software Foundation. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * Copyright (C) 2020 Expedia, Inc and the Apache Software Foundation. | |
| * Copyright (C) 2020 Expedia, Inc. and the Apache Software Foundation. |
| */ | ||
| package com.expediagroup.hiveberg; | ||
|
|
||
| import static org.junit.Assert.assertEquals; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static needs to go after non-static
massdosage
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two very small "nits", rest looks ready to go.
An initial PR to show the progress I've made on implementing PredicatePushdown for issue #6