-
Notifications
You must be signed in to change notification settings - Fork 3k
[WIP] Mapred input format #933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # mr/src/main/java/org/apache/iceberg/mr/IcebergInputFormat.java # mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormat.java
Prepare for upstream WIP PR
|
Looks like the current problem is that missing |
OK, I don't have that issue (probably because I have the jar cached locally) but I know what you're talking about as I've seen it in many other projects. I'll try remove it locally and see if I can reproduce it and then add an exclusion like you suggest. |
|
Hi @massdosage, following @rdblue's suggestion, I'd like to open a PR against this one to bring the improvements I've made on #1104 but before I do that, do you think you could rebase this branch onto master and clean up / squash some commits to make things easier for me. This PR has currently a lot of commits and is hard to navigate. |
| import org.apache.iceberg.orc.ORC; | ||
| import org.apache.iceberg.parquet.Parquet; | ||
|
|
||
| public class IcebergRecordReader<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to reuse the IcebergRecordReader already implemented in
iceberg/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Line 280 in cad1249
| private static final class IcebergRecordReader<T> extends RecordReader<Void, T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This intention here is for common record reader code across both the mapred and mapreduce sub-packages to be here and then the specific implementations just deal with any particulars related to the different APIs. Right now the InputFormat in the mapred package is only used by Hive. I'd be happy to move more common code out as and when we find it, this is just a start at how it could be done. I'd prefer to do that in later PRs and get a working. albeit basic, end to end read path for Hive merged first.
| } | ||
| switch (leaf.getOperator()) { | ||
| case EQUALS: | ||
| return equal(column, leaf.getLiteral()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to convert literal values from the Hive data types to Iceberg data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure what this means - is this for things like Dates and Timestamps to be of the right type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup! Heres what Hive will give you https://github.com/apache/hive/blob/cb213d88304034393d68cc31a95be24f5aac62b6/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java#L49-L56 but Iceberg expects https://github.com/linkedin/iceberg/blob/6e5601bbea2c032f11d90114e2c3e64dc6c0e5c3/api/src/main/java/org/apache/iceberg/types/Type.java#L29-L45. So you will need add conversions for the parity. Would also suggest adding one more test case which covers all datatypes that Hive can give us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also take some insights from
iceberg/orc/src/main/java/org/apache/iceberg/orc/ExpressionToSearchArgument.java
Line 241 in cad1249
| private <T> Object literal(Type icebergType, T icebergLiteral) { |
| case AND: | ||
| ExpressionTree andLeft = childNodes.get(0); | ||
| ExpressionTree andRight = childNodes.get(1); | ||
| if (childNodes.size() > 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can also be true for OR?
Also, maybe simplify this to
Expression result = Expression.alwaysTrue();
for (ExpressionTree child: childNodes) {
result = and(result, translate(child, leaves))
}| * @param leaves All instances of the leaf nodes. | ||
| * @return Array of leftover evaluated nodes. | ||
| */ | ||
| private static Expression[] getLeftoverLeaves(List<ExpressionTree> allChildNodes, List<PredicateLeaf> leaves) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this should be unnecessary if the logic for OR/AND operators is simplified to use a for loop as in my suggestion above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah a great suggestion, I'll make those changes :D
| //We are unsure of how the CONSTANT case works, so using the approach of: | ||
| //https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ | ||
| // ParquetFilterPredicateConverter.java#L116 | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We return null here but Expression.and(), Expressions.not() and Expressions.or()` have null checks in them, so seems like this will fail regardless? Is it better to throw a more readable exception here instead.
| */ | ||
| private static Expression translateLeaf(PredicateLeaf leaf) { | ||
| String column = leaf.getColumnName(); | ||
| if (column.equals("snapshot__id")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reference the snapshot__id constant defined in SystemTableUtil
| SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc); | ||
| Expression filter = IcebergFilterFactory.generateFilterExpression(sarg); | ||
|
|
||
| long snapshotIdToScan = extractSnapshotID(conf, exprNodeDesc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this may cause issues if the table itself contains a column called snapshot__id. @rdblue Do you think we should reserve a few column names (or a prefix) in the spec for these virtual columns? I guess such virtual columns are generally useful for supporting time travel/versioning/incremental scans in purely SQL engines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this was an edge case that cropped up when we were testing. We got around it by making it configurable but with a default of snapshot__id. So by default we're adding this extra column to a table schema, but if a user knows they already have a column with this same name they can set the virtual column to a different name:
TBLPROPERTIES ('iceberg.hive.snapshot.virtual.column.name' = 'new_column_name')
But reserving column names would be a nice addition so this check doesn't need to happen
| /** | ||
| * Creates an Iterable of Records with all snapshot metadata that can be used with the RecordReader. | ||
| */ | ||
| public class SnapshotIterable implements CloseableIterable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really needed? Can we reuse
| public TableScan newScan() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason we ended up adding the SnapshotIterable is so we could wrap a row from the SnapshotsTable in a Record which could be passed back to the RecordReader without needing to change any code for the RR. It could be super helpful if there was a method in SnapshotsTable that returned Records for the whole table, as there isn't a Reader, so to speak, for the METADATA file format (https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/FileFormat.java#L31) that we can add in to the ReaderFactory that would allow us 'read' from the SnapshotTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that we will need N such classes for the N metadata tables we offer. I would suggest looking at how this is handled for Spark.
| if (task.isDataTask()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we can probably consider adding support for metadata tables in a separate PR? This PR has a lot of new functionality to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A great call out, that looks like exactly the sort of thing we should be doing here! I'll remove most of the system tables stuff from our PR's and we'll do a separate PR that should be easier to review :))
I was assuming it will get squashed when merged into master so the (too many) commits would get removed at that stage. Is there a specific problem on your side? If we rebase it that will break everyone's existing checkouts. |
|
@shardulm94 thank you for the review! I've made some optimisations to the |
| private static Expression translateLeaf(PredicateLeaf leaf) { | ||
| String column = leaf.getColumnName(); | ||
| if (column.equals("snapshot__id")) { | ||
| if (column.equals(SystemTableUtil.DEFAULT_SNAPSHOT_ID_COLUMN_NAME)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this may not be enough since the column name is configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good point, I'll address this in the follow up PR
| * @return Expression that is translated from the Hive SearchArgument. | ||
| */ | ||
| private static Expression translate(ExpressionTree tree, List<PredicateLeaf> leaves) { | ||
| private static Expression translate(ExpressionTree tree, List<PredicateLeaf> leaves, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: childNodes doesn't need to be parameter, can be a local variable instead.
| translate(tree.getChildren().get(1), leaves)); | ||
| Expression orResult = Expressions.alwaysFalse(); | ||
| for (ExpressionTree child : childNodes) { | ||
| orResult = or(orResult, translate(child, leaves, childNodes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think passing childNodes here is incorrect. It should be child.getChildren() else we just keep on passing the root child nodes over and over. However, I think we should just make childNodes a local variable instead so that we don't make this mistake. It would also be good to add some tests for nested expressions.
| case STRING: | ||
| return leaf.getLiteral(); | ||
| case DATE: | ||
| return ((Timestamp) leaf.getLiteral()).getTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC Hive's Date type predicate literal is of type java.sql.Date. So this looks incorrect? (Unsure)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you're correct about the Date type, and I was using that initially, but then when running the tests I would get a ClassCastException: Can't cast Timestamp as Date: it looks like Hive does some internal conversion into a Timestamp when calling leaf.getLiteral for a Date type:
https://github.com/apache/hive/blob/branch-2.3/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L108
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh! Nice find. Timestamp.getTime() is probably still incorrect though as Iceberg expects the number of days since epoch as a literal for the Date type. Timestamp.getTime() will give you the number of mills since epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, might be good to mention this Hive behaviour as a comment as it looks very non-intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, good shout on the comment and using the correct granularity, I'll make those changes!
| case DECIMAL: | ||
| return BigDecimal.valueOf(((HiveDecimalWritable) leaf.getLiteral()).doubleValue()); | ||
| case TIMESTAMP: | ||
| return ((Timestamp) leaf.getLiteral()).getTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp.getTime() gives back milliseconds since epoch, but Iceberg expects microsecond granularity. You might want to factor in Timestamp.getNanos() to get microsecond granularity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An integration test using HiveRunner will probably be very helpful in finding these conversion issues. I would suggest looking at
| public class TestLocalScan { |
| public void testFilterWithDateAndTimestamp() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah nice, thank you for the pointer!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not released yet, but later we have to consider the TimeZones as well. See: https://issues.apache.org/jira/browse/HIVE-20007
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg leaves time zone handling to the engine. Engines should pass concrete values to Iceberg in expressions.
For example, a query might have a clause WHERE ts > TIMESTAMP '...'. It is the query engine's responsibility to determine what concrete value that timestamp actually represents. If ts is a timestamp without time zone, for example, the engine is responsible for converting the timestamp with time zone for comparison.
Iceberg should receive an unambiguous value to use in the comparison. Preferably, the value is in microseconds from epoch, but we can convert from alternative representations like millis + nanos.
| } | ||
| } | ||
|
|
||
| private static List<Object> hiveLiteralListToIcebergType(List<Object> hiveLiteralTypes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should just reuse leafToIcebergType here, seems like theres equivalent logic in two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree about trying to reuse leafToIcebergType. Some refactoring needs to happen for the IN operator case where we need to convert a literal List instead of just a single literal and my current logic doesn't support that very well - but I am on it!
This draft PR shows the current status of an InputFormat for Hive. We've developed this mostly independently but took some inspiration from the
mapreduceInputFormat that was recently merged in. We have a separate branch where we're working on Hive read support via a SerDe, StorageHandler etc. which is still in its early stages (see https://github.com/ExpediaGroup/incubator-iceberg/tree/add-hive-read-support/hive/src/). The most interesting thing there currently is a HiveRunner test which is pretty much an integration test for themapredInputFormat that runs it against an in-memory Hive metastore and MR cluster, see https://github.com/ExpediaGroup/incubator-iceberg/blob/add-hive-read-support/hive/src/test/java/org/apache/iceberg/hive/serde/TestIcebergInputFormat.java#L87For this PR we'd appreciate feedback on:
mapreduceInputFormat (do we want to tackle it in this PR or do that in a subsequent follow-on PR?).