Skip to content
This repository was archived by the owner on Mar 12, 2021. It is now read-only.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,40 @@ https://github.com/ExpediaGroup/incubator-iceberg/tree/build-hiveberg-modules an
```
Ultimately we would like to contribute this Hive Input Format to Iceberg so this would no longer be required.

## Features
### IcebergInputFormat

To use the `IcebergInputFormat` you need to create a Hive table using DDL:
```sql
CREATE TABLE source_db.table_a
ROW FORMAT SERDE 'com.expediagroup.hiveberg.IcebergSerDe'
STORED AS
INPUTFORMAT 'com.expediagroup.hiveberg.IcebergInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'path_to_iceberg_data_warehouse';
```

You must ensure that:
- You are creating Iceberg tables using the Iceberg `HadoopCatalog` API.
- Using this means all your Iceberg tables are created under one common location which you must point the Hive table `LOCATION` at.
- Ensure your Hive table has the **same** name as your Iceberg table.

For example, if you created the Hive table as shown above then your Iceberg table must be created using a `TableIdentifier` as follows where both table names match:
```
TableIdentifier id = TableIdentifier.parse("source_db.table_a");
```

### IcebergStorageHandler
This is implemented as a simplified option for creating Hiveberg tables. The Hive DDL should instead look like:
```sql
CREATE TABLE source_db.table_a
STORED BY 'com.expediagroup.hiveberg.IcebergStorageHandler'
LOCATION 'path_to_iceberg_data_warehouse';
```

### Predicate Pushdown
Pushdown of the HiveSQL `WHERE` clause has been implemented so that filters are pushed to the Iceberg `TableScan` level as well as the Parquet `Reader`. ORC implementations are still in the works.

# Legal
This project is available under the [Apache 2.0 License](http://www.apache.org/licenses/LICENSE-2.0.html).

Expand Down
15 changes: 14 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
<!-- this requires https://github.com/ExpediaGroup/incubator-iceberg/tree/build-hiveberg-modules to be built and installed
locally -->
<iceberg.version>0.7.1-incubating</iceberg.version>
<hive.version>2.3.6</hive.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>2.3.4</version>
<version>${hive.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -61,11 +62,23 @@
<version>${iceberg.version}</version>
<classifier>all</classifier>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>4.0.2</version>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>core</classifier>
</dependency>
<dependency>
<groupId>com.klarna</groupId>
<artifactId>hiverunner</artifactId>
Expand Down
153 changes: 153 additions & 0 deletions src/main/java/com/expediagroup/hiveberg/IcebergFilterFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Copyright (C) 2020 Expedia, Inc. and the Apache Software Foundation.
*
* This class was inspired by code written for converting to Parquet filters:
*
* https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/
* hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java#L46
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.hiveberg;

import java.util.List;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.iceberg.expressions.Expression;

import static org.apache.iceberg.expressions.Expressions.and;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
import static org.apache.iceberg.expressions.Expressions.in;
import static org.apache.iceberg.expressions.Expressions.isNull;
import static org.apache.iceberg.expressions.Expressions.lessThan;
import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
import static org.apache.iceberg.expressions.Expressions.not;
import static org.apache.iceberg.expressions.Expressions.notNull;
import static org.apache.iceberg.expressions.Expressions.or;

public class IcebergFilterFactory {

private IcebergFilterFactory () {}

public static Expression generateFilterExpression(SearchArgument sarg) {
List<PredicateLeaf> leaves = sarg.getLeaves();
List<ExpressionTree> childNodes = sarg.getExpression().getChildren();

switch (sarg.getExpression().getOperator()) {
case OR:
ExpressionTree orLeft = childNodes.get(0);
ExpressionTree orRight = childNodes.get(1);
return or(translate(orLeft, leaves), translate(orRight, leaves));
case AND:
ExpressionTree andLeft = childNodes.get(0);
ExpressionTree andRight = childNodes.get(1);
if(childNodes.size() > 2) {
Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, leaves);
return and(
translate(andLeft, leaves), translate(andRight, leaves), evaluatedChildren);
} else {
return and(translate(andLeft, leaves), translate(andRight, leaves));
}
case NOT:
return not(translateLeaf(sarg.getLeaves().get(0)));
case LEAF:
return translateLeaf(sarg.getLeaves().get(0));
case CONSTANT:
return null;
default:
throw new IllegalStateException("Unknown operator: " + sarg.getExpression().getOperator());
}
}

/**
* Remove first 2 nodes already evaluated and return an array of the evaluated leftover nodes.
* @param allChildNodes All child nodes to be evaluated for the AND expression.
* @param leaves All instances of the leaf nodes.
* @return Array of leftover evaluated nodes.
*/
private static Expression[] getLeftoverLeaves(List<ExpressionTree> allChildNodes, List<PredicateLeaf> leaves) {
allChildNodes.remove(0);
allChildNodes.remove(0);

Expression[] evaluatedLeaves = new Expression[allChildNodes.size()];
for(int i = 0; i < allChildNodes.size(); i ++) {
Expression filter = translate(allChildNodes.get(i), leaves);
evaluatedLeaves[i] = filter;
}
return evaluatedLeaves;
}

/**
* Recursive method to traverse down the ExpressionTree to evaluate each expression and its leaf nodes.
* @param tree Current ExpressionTree where the 'top' node is being evaluated.
* @param leaves List of all leaf nodes within the tree.
* @return Expression that is translated from the Hive SearchArgument.
*/
private static Expression translate(ExpressionTree tree, List<PredicateLeaf> leaves) {
switch (tree.getOperator()) {
case OR:
return or(translate(tree.getChildren().get(0), leaves),
translate(tree.getChildren().get(1), leaves));
case AND:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle case where there is more than 2 children from the AND operator

if(tree.getChildren().size() > 2) {
Expression[] evaluatedChildren = getLeftoverLeaves(tree.getChildren(), leaves);
return and(translate(tree.getChildren().get(0), leaves),
translate(tree.getChildren().get(1), leaves), evaluatedChildren);
} else {
return and(translate(tree.getChildren().get(0), leaves),
translate(tree.getChildren().get(1), leaves));
}
case NOT:
return not(translate(tree.getChildren().get(0), leaves));
case LEAF:
return translateLeaf(leaves.get(tree.getLeaf()));
case CONSTANT:
//We are unsure of how the CONSTANT case works, so using the approach of:
//https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/
// ParquetFilterPredicateConverter.java#L116
return null;
default:
throw new IllegalStateException("Unknown operator: " + tree.getOperator());
}
}

/**
* Translate leaf nodes from Hive operator to Iceberg operator.
* @param leaf Leaf node
* @return Expression fully translated from Hive PredicateLeaf
*/
private static Expression translateLeaf(PredicateLeaf leaf) {
String column = leaf.getColumnName();
switch (leaf.getOperator()){
case EQUALS:
return equal(column, leaf.getLiteral());
case NULL_SAFE_EQUALS:
return equal(notNull(column).ref().name(), leaf.getLiteral()); //TODO: Unsure..
case LESS_THAN:
return lessThan(column, leaf.getLiteral());
case LESS_THAN_EQUALS:
return lessThanOrEqual(column, leaf.getLiteral());
case IN:
return in(column, leaf.getLiteralList());
case BETWEEN:
return and((greaterThanOrEqual(column, leaf.getLiteralList().get(0))),
lessThanOrEqual(column, leaf.getLiteralList().get(1)));
case IS_NULL:
return isNull(column);
default:
throw new IllegalStateException("Unknown operator: " + leaf.getOperator());
}
}
}
34 changes: 27 additions & 7 deletions src/main/java/com/expediagroup/hiveberg/IcebergInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package com.expediagroup.hiveberg;

import com.google.common.collect.Lists;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
Expand All @@ -35,35 +39,51 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

public class IcebergInputFormat implements InputFormat {
private static final Logger LOG = LoggerFactory.getLogger(IcebergInputFormat.class);

static final String TABLE_LOCATION = "location";
static final String TABLE_NAME = "name";
static final String TABLE_FILTER_SERIALIZED = "iceberg.filter.serialized";

private Table table;

@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
String tableDir = job.get("location");
URI location = null;
String tableDir = job.get(TABLE_LOCATION);
URI location;
try {
location = new URI(tableDir);
} catch (URISyntaxException e) {
throw new IOException("Unable to create URI for table location: '" + tableDir + "'");
}
HadoopCatalog catalog = new HadoopCatalog(job,location.getPath());
TableIdentifier id = TableIdentifier.parse(job.get("name"));
TableIdentifier id = TableIdentifier.parse(job.get(TABLE_NAME));
catalog.loadTable(id);
table = catalog.loadTable(id);

List<CombinedScanTask> tasks = Lists.newArrayList(table.newScan().planTasks());
List<CombinedScanTask> tasks;
if(job.get(TABLE_FILTER_SERIALIZED) == null) {
tasks = Lists.newArrayList(table.newScan().planTasks());
} else {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities.
deserializeObject(job.get( TABLE_FILTER_SERIALIZED), ExprNodeGenericFuncDesc.class);
SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);
Expression filter = IcebergFilterFactory.generateFilterExpression(sarg);

tasks = Lists.newArrayList(table.newScan()
.filter(filter)
.planTasks());
}

return createSplits(tasks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class IcebergReaderFactory {
IcebergReaderFactory() {
}

public CloseableIterable<Record> createReader(DataFile file, FileScanTask currentTask, InputFile inputFile, Schema tableSchema, boolean reuseContainers) {
public CloseableIterable<Record> createReader(DataFile file, FileScanTask currentTask, InputFile inputFile,
Schema tableSchema, boolean reuseContainers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ORC reader, perhaps add a comment advising the reader to look here to determine when this can be wired up for ORC: apache/iceberg#787

switch (file.format()) {
case AVRO:
return buildAvroReader(currentTask, inputFile, tableSchema, reuseContainers);
Expand All @@ -42,11 +43,11 @@ public CloseableIterable<Record> createReader(DataFile file, FileScanTask curren
return buildParquetReader(currentTask, inputFile, tableSchema, reuseContainers);

default:
throw new UnsupportedOperationException(String.format("Cannot read %s file: %s", file.format().name(), file.path()));
throw new UnsupportedOperationException(
String.format("Cannot read %s file: %s", file.format().name(), file.path()));
}
}

// FIXME: use generic reader function
private CloseableIterable buildAvroReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) {
Avro.ReadBuilder builder = Avro.read(file)
.createReaderFunc(DataReader::create)
Expand All @@ -60,7 +61,7 @@ private CloseableIterable buildAvroReader(FileScanTask task, InputFile file, Sch
return builder.build();
}

// FIXME: use generic reader function
//Predicate pushdown support for ORC can be tracked here: https://github.com/apache/incubator-iceberg/issues/787
private CloseableIterable buildOrcReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) {
ORC.ReadBuilder builder = ORC.read(file)
// .createReaderFunc() // FIXME: implement
Expand All @@ -70,11 +71,12 @@ private CloseableIterable buildOrcReader(FileScanTask task, InputFile file, Sche
return builder.build();
}

// FIXME: use generic reader function
private CloseableIterable buildParquetReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) {
private CloseableIterable buildParquetReader(FileScanTask task, InputFile file, Schema schema,
boolean reuseContainers) {
Parquet.ReadBuilder builder = Parquet.read(file)
.createReaderFunc(messageType -> GenericParquetReaders.buildReader(schema, messageType))
.project(schema)
.filter(task.residual())
.split(task.start(), task.length());

if (reuseContainers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package com.expediagroup.hiveberg;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
Expand All @@ -27,8 +27,6 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import com.google.common.collect.ImmutableMap;

/**
* Class to convert Iceberg types to Hive TypeInfo
*/
Expand Down
Loading