Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ project(':iceberg-mr') {
exclude group: 'org.pentaho' // missing dependency
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compileOnly("org.apache.hive:hive-metastore")
compileOnly("org.apache.hive:hive-serde")

testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.iceberg.mr.mapred.IcebergSerDe;

public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {

private Configuration conf;

@Override
public Class<? extends InputFormat> getInputFormatClass() {
return HiveIcebergInputFormat.class;
}

@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return HiveIgnoreKeyTextOutputFormat.class;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In table metadata, we use FileOutputFormat because it can't be instantiated, so any write attempted through a Hive table library would fail, instead of writing data that doesn't appear in the table. Does that need to be done here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, we haven't tried the write path at all but I agree that it would be better if it failed rather than silently doing nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to create a HiveIcebergOutputFormat class that throws UnsupportedOperationExceptions, this is what they do in Delta's Hive connector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a good option. If we get to the phase when we have a working writer implementation, then if the correct writer is already specified then we do not have to recreate all of the tables. We just change the jars and everything should work like charm :)

}

@Override
public Class<? extends AbstractSerDe> getSerDeClass() {
return IcebergSerDe.class;
}

@Override
public HiveMetaHook getMetaHook() {
return null;
}

@Override
public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
return null;
}

@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {

}

@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {

}

@Override
public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> map) {

}

@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {

}

@Override
public Configuration getConf() {
return conf;
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

@Override
public String toString() {
return this.getClass().getName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do other storage handlers do? Could this be a short name, like "iceberg"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at a few and most of them don't override this method. Hive's own JDBC storage handler returns the full class name as a string via a [Constant]( - https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/common/src/java/org/apache/hadoop/hive/conf/Constants.java#L62). So it could be OK like this, or we just remove the method.

}

/**
* @param jobConf Job configuration for InputFormat to access
* @param deserializer Deserializer
* @param exprNodeDesc Filter expression extracted by Hive
* @return Entire filter to take advantage of Hive's pruning as well as Iceberg's pruning.
*/
@Override
public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc exprNodeDesc) {
DecomposedPredicate predicate = new DecomposedPredicate();
predicate.residualPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc;
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc;
return predicate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.mr.mapred.IcebergSerDe;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -159,11 +158,8 @@ public void testJoinTables() {
private void createHiveTable(String table, String location) {
shell.execute(String.format(
"CREATE TABLE default.%s " +
"ROW FORMAT SERDE '%s' " +
"STORED AS " +
"INPUTFORMAT '%s' " +
"OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' " +
"STORED BY '%s' " +
"LOCATION '%s'",
table, IcebergSerDe.class.getName(), HiveIcebergInputFormat.class.getName(), location));
table, HiveIcebergStorageHandler.class.getName(), location));
}
}