diff --git a/build.gradle b/build.gradle index 6510cc586ac0..dd12383de1b8 100644 --- a/build.gradle +++ b/build.gradle @@ -384,6 +384,7 @@ project(':iceberg-mr') { exclude group: 'org.pentaho' // missing dependency exclude group: 'org.slf4j', module: 'slf4j-log4j12' } + compileOnly("org.apache.hive:hive-metastore") compileOnly("org.apache.hive:hive-serde") testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java new file mode 100644 index 000000000000..06cbe4ac56d8 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.iceberg.mr.mapred.IcebergSerDe; + +public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler { + + private Configuration conf; + + @Override + public Class getInputFormatClass() { + return HiveIcebergInputFormat.class; + } + + @Override + public Class getOutputFormatClass() { + return HiveIgnoreKeyTextOutputFormat.class; + } + + @Override + public Class getSerDeClass() { + return IcebergSerDe.class; + } + + @Override + public HiveMetaHook getMetaHook() { + return null; + } + + @Override + public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + return null; + } + + @Override + public void configureInputJobProperties(TableDesc tableDesc, Map map) { + + } + + @Override + public void configureOutputJobProperties(TableDesc tableDesc, Map map) { + + } + + @Override + public void configureTableJobProperties(TableDesc tableDesc, Map map) { + + } + + @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public String toString() { + return this.getClass().getName(); + } + + /** + * @param jobConf Job configuration for InputFormat to access + * @param deserializer Deserializer + * @param exprNodeDesc Filter expression extracted by Hive + * @return Entire filter to take advantage of Hive's pruning as well as Iceberg's pruning. + */ + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc exprNodeDesc) { + DecomposedPredicate predicate = new DecomposedPredicate(); + predicate.residualPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc; + predicate.pushedPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc; + return predicate; + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java index 6443d450453c..7b5b14ac3aff 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java @@ -34,7 +34,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.mr.TestHelper; -import org.apache.iceberg.mr.mapred.IcebergSerDe; import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Before; @@ -159,11 +158,8 @@ public void testJoinTables() { private void createHiveTable(String table, String location) { shell.execute(String.format( "CREATE TABLE default.%s " + - "ROW FORMAT SERDE '%s' " + - "STORED AS " + - "INPUTFORMAT '%s' " + - "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' " + + "STORED BY '%s' " + "LOCATION '%s'", - table, IcebergSerDe.class.getName(), HiveIcebergInputFormat.class.getName(), location)); + table, HiveIcebergStorageHandler.class.getName(), location)); } }