Skip to content

Conversation

@guilload
Copy link
Contributor

@guilload guilload commented Jul 10, 2020

This PR implements:

  • a MR v1 input format that wraps the MR v2 input format
  • a Hive input format built on top of the MR v1 input format

A few things:

  • The Hive input format successfully passes the Expedia test suite (see this branch). Once this work is merged, @massdosage will open a PR with a test suite based on HiveRunner.
  • I've chosen to separate the MR and Hive implementations that way we can reuse the test suite for the MR input format and keep the Hive specific stuff on its own. In the future, I'd also like to experiment wit other in-memory representations for Hive so that also allows that.
  • I've taken the IcebergSplit class out off IcebergInputFormat and the class now extends mapreduce.InputSplit and implements mapred.InputSplit so it can be returned by both MR v1 and v2 input formats.
  • I'm not a fan of the IcebergSplitContainer interface trick that I used to avoid overriding getRecordReader in subclasses of MapredIcebergInputFormat. Recommendations for a more elegant way to do so are welcome.
  • I've refactored the TestIcebergInputFormat class quite a bit so it be can run against the two MR file formats implementations.

@rdblue @rdsr @massdosage @cmathiesen

cc @edgarRd @gustavoatt

@rdblue rdblue requested a review from rdsr July 10, 2020 20:45

// The table location of the split allows Hive to map a split to a table and/or partition.
// See calls to `getPartitionDescFromPathRecursively` in `CombineHiveInputFormat` or `HiveInputFormat`.
private String tableLocation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like what's happening is the table location is used as the split's path so that Hive associates all splits with the same PartitionDesc that contains a TableDesc. Is that correct? If so, I think it would be better to add that as the comment. It's difficult to read the Hive code and figure out what's going on using just the pointers here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive uses the path name of the split to map it back to a PartitionDesc or TableDesc, which specify the relevant input format for reading the files belonging to that partition or table. That way, HiveInputFormat and CombineHiveInputFormat can read files with different input formats in the same MR job and combine compatible splits together.

I'll update the comment.

public void write(DataOutput dataOutput) {
throw new UnsupportedOperationException("write is not supported.");
public void readFields(DataInput in) throws IOException {
throw new UnsupportedOperationException("readFields is not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these last two functions need to change?

/**
* Generic Mrv1 InputFormat API for Iceberg.
*
* @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records
Copy link
Contributor

@rdblue rdblue Jul 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this to Java class of records constructed by Iceberg; default is {@link Record}?

It is odd that this currently states that T could be either A or B, but defaults to C.


static TaskAttemptContext getTaskAttemptContext(JobConf job) {
TaskAttemptID taskAttemptID = Optional.ofNullable(TaskAttemptID.forName(job.get("mapred.task.id")))
.orElse(new TaskAttemptID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would mapred.task.id be null? Should we throw an exception in that case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing that this is because an attempt context is passed as a JobContext. Let's fix that problem and then we won't need to do this. The helpers I mentioned also demonstrate how to create a TaskAttemptContext from specific values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this and Hive (when spawning a local MR job) or YARN populates the mapreduce.task.id and mapreduce.task.attempt.id" (among many other properties). So I changed this line to: TaskAttemptID.forName(job.get("mapreduce.task.attempt.id"). Several input formats in the Hive codebase do the same.

I believe it is not the responsibility of the input format to create a TaskAttemptID from scratch and setting the mapred{uce}* properties. The framework using the input format are responsible for that.

During my tests, I've seen that the task attempt it is not set only when Hive uses the the file input format outside of a MR job (single fetch task). This is when we need to fallback to the default constructor.

The same logic applies for the JobContext object.


@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
return innerInputFormat.getSplits(getTaskAttemptContext(job))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSplits accepts a JobContext and I think it makes sense to pass objects that are as close as possible to what the mapreduce framework would use. We have some helper methods in our branch for reading Hive tables from Spark's DSv2 that you might want to check out: https://github.com/Netflix/iceberg/blob/netflix-spark-2.4/metacat/src/main/java/com/netflix/iceberg/batch/MapReduceUtil.java.

Those can help you create mapreduce objects after inspecting the mapred objects.

@rdblue
Copy link
Contributor

rdblue commented Jul 11, 2020

Looks good to me. I made a few comments, but I don't think there is anything major to fix. It would be nice to fix some of the comments and to instantiate the class, JobContext, for getSplits. Otherwise this looks good and we should be able to commit it soon.

@rdblue
Copy link
Contributor

rdblue commented Jul 11, 2020

@massdosage & @guilload, is the plan to add the HiveRunner tests in a separate PR once this is merged? If you want me to, I can merge this and we can fix the comments in a follow-up to unblock next steps.

@massdosage
Copy link
Contributor

@massdosage & @guilload, is the plan to add the HiveRunner tests in a separate PR once this is merged? If you want me to, I can merge this and we can fix the comments in a follow-up to unblock next steps.

I'm taking a look at this now, but yes, I should be able to add the HiveRunner steps in a fast-follow to this PR. I'd also like @cmathiesen to take a look at this and comment on what impact these changes have on the features we were going to move over next from Hiveberg like various pushdowns and exposing system tables which we have ready against the other InputFormat.

@massdosage
Copy link
Contributor

@guilload Thanks for raising the InputFormat PR, I've been taking a look at it today to try get some HiveRunner tests working against it. I checked out the branch from the PR and did the minimal I could to get a test running that inserts some data and then returns it, essentially this:

https://github.com/ExpediaGroup/iceberg/blob/if-all-the-things/mr/src/test/java/org/apache/iceberg/mr/mapred/TestInputFormatWithHadoopTables.java#L86

with some changes to the class of the input format (and ignoring all the other test in that class). Unfortunately this is failing for me with exceptions like so:


          Caused by:
                java.lang.NullPointerException
                    at java.util.Objects.requireNonNull(Objects.java:203)
                    at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2296)
                    at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:111)
                    at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
                    at org.apache.iceberg.SchemaParser.fromJson(SchemaParser.java:247)
                    at org.apache.iceberg.mr.mapreduce.IcebergInputFormat$IcebergRecordReader.initialize(IcebergInputFormat.java:186)
                    at org.apache.iceberg.mr.mapred.MapredIcebergInputFormat$MapredIcebergRecordReader.<init>(MapredIcebergInputFormat.java:93)
                    at org.apache.iceberg.mr.mapred.MapredIcebergInputFormat.getRecordReader(MapredIcebergInputFormat.java:72)
                    at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:695)
                    at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:333)
                    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
                    ... 13 more

Line 186 in the mapreduce input format is doing this:

this.tableSchema = SchemaParser.fromJson(conf.get(InputFormatConfig.TABLE_SCHEMA));

How did you manage to get the HiveRunner tests passing? Am I missing something?

@guilload guilload force-pushed the guilload--hive-input-format branch from e55aa74 to f3928c7 Compare July 14, 2020 01:57
@guilload
Copy link
Contributor Author

@rdblue Thanks for the review. I'll make the changes you suggested tomorrow.

@massdosage, please take a look at my latest commit.

@guilload guilload force-pushed the guilload--hive-input-format branch 3 times, most recently from 9351f18 to 23ad535 Compare July 14, 2020 23:04
schema = table.schema();

List<String> projectedColumns = parseProjectedColumns(job);
projection = projectedColumns.isEmpty() ? schema : projection.select(projectedColumns);
Copy link
Contributor

@massdosage massdosage Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
projection = projectedColumns.isEmpty() ? schema : projection.select(projectedColumns);
projection = projectedColumns.isEmpty() ? schema : schema.select(projectedColumns);

Shouldn't the code be like above? If not I get a NPE like this when running a HiveRunner test:

       Caused by:
                java.lang.NullPointerException
                    at org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getSplits(HiveIcebergInputFormat.java:58)
                    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:372)
                    at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:304)
                    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
                    ... 13 more

@massdosage
Copy link
Contributor

@rdblue Thanks for the review. I'll make the changes you suggested tomorrow.

@massdosage, please take a look at my latest commit.

I merged in your changes from yesterday, I've added a comment above on a NPE I ran into. I then also get the following error when I run the equivalent of this test:

org.apache.iceberg.mr.hive.TestHiveIcebergInputFormat > emptyTable FAILED
    java.lang.IllegalArgumentException: Failed to executeQuery Hive query SELECT id, data FROM source_db.table_a: java.io.IOException: java.lang.ClassCastException: org.apache.iceberg.mr.mapred.Container cannot be cast to org.apache.hadoop.io.BinaryComparable
        at com.klarna.hiverunner.HiveServerContainer.executeStatement(HiveServerContainer.java:147)
        at com.klarna.hiverunner.builder.HiveShellBase.executeStatementsWithCommandShellEmulation(HiveShellBase.java:115)
        at com.klarna.hiverunner.builder.HiveShellBase.executeStatementWithCommandShellEmulation(HiveShellBase.java:109)
        at com.klarna.hiverunner.builder.HiveShellBase.executeStatement(HiveShellBase.java:99)
        at org.apache.iceberg.mr.hive.TestHiveIcebergInputFormat.emptyTable(TestHiveIcebergInputFormat.java:197)

        Caused by:
        org.apache.hive.service.cli.HiveSQLException: java.io.IOException: java.lang.ClassCastException: org.apache.iceberg.mr.mapred.Container cannot be cast to org.apache.hadoop.io.BinaryComparable
            at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:499)
            at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:307)
            at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:878)
            at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:559)
            at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:551)
            at com.klarna.hiverunner.HiveServerContainer.executeStatement(HiveServerContainer.java:129)
            ... 4 more

            Caused by:
            java.io.IOException: java.lang.ClassCastException: org.apache.iceberg.mr.mapred.Container cannot be cast to org.apache.hadoop.io.BinaryComparable
                at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:521)
                at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:428)
                at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:147)
                at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2208)
                at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:494)
                ... 9 more

                Caused by:
                java.lang.ClassCastException: org.apache.iceberg.mr.mapred.Container cannot be cast to org.apache.hadoop.io.BinaryComparable
                    at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doDeserialize(LazySimpleSerDe.java:151)
                    at org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.deserialize(AbstractEncodingAwareSerDe.java:76)
                    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:502)
                    ... 13 more

@guilload guilload force-pushed the guilload--hive-input-format branch from 23ad535 to b373262 Compare July 15, 2020 17:28
@guilload guilload force-pushed the guilload--hive-input-format branch 2 times, most recently from 3cdd9f1 to 61859cc Compare July 16, 2020 02:39

@Override
public List<InputSplit> getSplits(JobContext context) {
if (splits != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive keeps a cache of input format instance arounds (see HiveInputFormat) that breaks this logic so I've chosen to remove it for now.

We can re-implement this later but the logic will have to be a bit more robust.

@guilload guilload force-pushed the guilload--hive-input-format branch from 61859cc to f535af0 Compare July 16, 2020 22:02
@guilload
Copy link
Contributor Author

@massdosage Sorry, I broke this branch multiple times in the past few days but this PR is stable and in good shape now. I've added a short HiveRunner test to ensure that this won't happen again. You can extend it or remove it later when you add your own tests.

In addition, you can check out this two commits 10b62fc and f2232b8. In the former, I branched off of your if-all-the-things branch and cherry picked this PR's changes. In the latter, I've modified your tests to run with the "new" HiveIcebergInputFormat. The test suite passes successfully.

@rdblue @rdsr I've made the changes you suggested and I believe this PR is ready for a second round of review.

@massdosage
Copy link
Contributor

@massdosage Sorry, I broke this branch multiple times in the past few days but this PR is stable and in good shape now. I've added a short HiveRunner test to ensure that this won't happen again. You can extend it or remove it later when you add your own tests.

In addition, you can check out this two commits 10b62fc and f2232b8. In the former, I branched off of your if-all-the-things branch and cherry picked this PR's changes. In the latter, I've modified your tests to run with the "new" HiveIcebergInputFormat. The test suite passes successfully.

@rdblue @rdsr I've made the changes you suggested and I believe this PR is ready for a second round of review.

No problem, I took a quick look and the tests you've added look good, I think it makes sense to have them directly in this PR to show that it all works. I don't have time today but I'll try take a proper look on Monday. I'd also like to try out your branch on a Hive client to check everything works end to end on a full Hadoop cluster, not sure if you've also done that yet?

@massdosage
Copy link
Contributor

The tests look good and work for me. I'm now trying to get them running from a Hive client. I've built all the Iceberg jars and uploaded them to the host with the Hive client. I've then created a table using Iceberg from Spark. I've now opened a Hive session and done the following to add the jars to the classpath:

add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-api.jar;
add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-core.jar;
add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-data.jar;
add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-mr.jar;
add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-parquet.jar;
add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-bundled-guava.jar;

I get what look like successful responses from Hive at this point:

hive> add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-bundled-guava.jar;
Added [/home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-bundled-guava.jar] to class path
Added resources: [/home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-bundled-guava.jar]

I then try create a table something like so:

CREATE EXTERNAL TABLE default.hiveberg_table_a_guilload 
ROW FORMAT SERDE 'org.apache.iceberg.mr.mapred.IcebergSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.iceberg.mr.hive.HiveIcebergInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' 
LOCATION 'hdfs://path/to/spark-created/table_a';

However I then see this error in the Hive user logs:
2020-07-20T16:20:29,298 ERROR [759dc8f9-310b-4889-a9db-a48cf4c1c424 main([])]: exec.DDLTask (DDLTask.java:failed(639)) - java.lang.NoClassDefFoundError: org/apache/iceberg/relocated/com/google/common/base/Preconditions
which is very strange since that class should be there. I've run out of time to look further today, will carry on later in the week. @guilload have you managed to get it running end to end in Hive "for reals"? If so, did you do any steps differently to what I'm doing?

@guilload
Copy link
Contributor Author

@massdosage, yes I have managed to use the input format in a "real" Hive client. I'm taking a shortcut though, I just add iceberg-mr to the iceberg-spark-runtime subproject dependencies and use that jar with add jar .... I'll try your way. I don't see anything wrong with it.

@massdosage
Copy link
Contributor

@guilload OK, thanks, let me know what you find. It has made me think we should probably have this mr module produce an "uber jar" so for the Hive use case you just have to add 1 jar to the classpath, not 6.

// Adding the ORDER BY clause will cause Hive to spawn a local MR job this time.
List<Object[]> descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC");

Assert.assertEquals(3, rows.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be Assert.assertEquals(3, descRows.size());

@rdsr
Copy link
Contributor

rdsr commented Jul 22, 2020

The changes LGTM!

@rdblue rdblue merged commit 129c969 into apache:master Jul 22, 2020
@rdblue
Copy link
Contributor

rdblue commented Jul 22, 2020

Thanks for reviewing, @rdsr!

@guilload guilload deleted the guilload--hive-input-format branch July 22, 2020 21:42
This was referenced Jul 23, 2020
cmathiesen pushed a commit to ExpediaGroup/iceberg that referenced this pull request Aug 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants