diff --git a/README.md b/README.md
index fbd7697..987aad1 100644
--- a/README.md
+++ b/README.md
@@ -12,6 +12,40 @@ https://github.com/ExpediaGroup/incubator-iceberg/tree/build-hiveberg-modules an
```
Ultimately we would like to contribute this Hive Input Format to Iceberg so this would no longer be required.
+## Features
+### IcebergInputFormat
+
+To use the `IcebergInputFormat` you need to create a Hive table using DDL:
+```sql
+CREATE TABLE source_db.table_a
+ ROW FORMAT SERDE 'com.expediagroup.hiveberg.IcebergSerDe'
+ STORED AS
+ INPUTFORMAT 'com.expediagroup.hiveberg.IcebergInputFormat'
+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ LOCATION 'path_to_iceberg_data_warehouse';
+```
+
+You must ensure that:
+- You are creating Iceberg tables using the Iceberg `HadoopCatalog` API.
+ - Using this means all your Iceberg tables are created under one common location which you must point the Hive table `LOCATION` at.
+- Ensure your Hive table has the **same** name as your Iceberg table.
+
+For example, if you created the Hive table as shown above then your Iceberg table must be created using a `TableIdentifier` as follows where both table names match:
+```
+TableIdentifier id = TableIdentifier.parse("source_db.table_a");
+```
+
+### IcebergStorageHandler
+This is implemented as a simplified option for creating Hiveberg tables. The Hive DDL should instead look like:
+```sql
+CREATE TABLE source_db.table_a
+ STORED BY 'com.expediagroup.hiveberg.IcebergStorageHandler'
+ LOCATION 'path_to_iceberg_data_warehouse';
+```
+
+### Predicate Pushdown
+Pushdown of the HiveSQL `WHERE` clause has been implemented so that filters are pushed to the Iceberg `TableScan` level as well as the Parquet `Reader`. ORC implementations are still in the works.
+
# Legal
This project is available under the [Apache 2.0 License](http://www.apache.org/licenses/LICENSE-2.0.html).
diff --git a/pom.xml b/pom.xml
index 538b1d9..0958677 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,13 +17,14 @@
0.7.1-incubating
+ 2.3.6
org.apache.hive
hive-serde
- 2.3.4
+ ${hive.version}
compile
@@ -61,11 +62,23 @@
${iceberg.version}
all
+
+ com.esotericsoftware
+ kryo-shaded
+ 4.0.2
+
+
org.apache.avro
avro
1.9.2
+
+ org.apache.hive
+ hive-exec
+ ${hive.version}
+ core
+
com.klarna
hiverunner
diff --git a/src/main/java/com/expediagroup/hiveberg/IcebergFilterFactory.java b/src/main/java/com/expediagroup/hiveberg/IcebergFilterFactory.java
new file mode 100644
index 0000000..1f7080c
--- /dev/null
+++ b/src/main/java/com/expediagroup/hiveberg/IcebergFilterFactory.java
@@ -0,0 +1,153 @@
+/**
+ * Copyright (C) 2020 Expedia, Inc. and the Apache Software Foundation.
+ *
+ * This class was inspired by code written for converting to Parquet filters:
+ *
+ * https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/
+ * hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java#L46
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.expediagroup.hiveberg;
+
+import java.util.List;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.iceberg.expressions.Expression;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+
+public class IcebergFilterFactory {
+
+ private IcebergFilterFactory () {}
+
+ public static Expression generateFilterExpression(SearchArgument sarg) {
+ List leaves = sarg.getLeaves();
+ List childNodes = sarg.getExpression().getChildren();
+
+ switch (sarg.getExpression().getOperator()) {
+ case OR:
+ ExpressionTree orLeft = childNodes.get(0);
+ ExpressionTree orRight = childNodes.get(1);
+ return or(translate(orLeft, leaves), translate(orRight, leaves));
+ case AND:
+ ExpressionTree andLeft = childNodes.get(0);
+ ExpressionTree andRight = childNodes.get(1);
+ if(childNodes.size() > 2) {
+ Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, leaves);
+ return and(
+ translate(andLeft, leaves), translate(andRight, leaves), evaluatedChildren);
+ } else {
+ return and(translate(andLeft, leaves), translate(andRight, leaves));
+ }
+ case NOT:
+ return not(translateLeaf(sarg.getLeaves().get(0)));
+ case LEAF:
+ return translateLeaf(sarg.getLeaves().get(0));
+ case CONSTANT:
+ return null;
+ default:
+ throw new IllegalStateException("Unknown operator: " + sarg.getExpression().getOperator());
+ }
+ }
+
+ /**
+ * Remove first 2 nodes already evaluated and return an array of the evaluated leftover nodes.
+ * @param allChildNodes All child nodes to be evaluated for the AND expression.
+ * @param leaves All instances of the leaf nodes.
+ * @return Array of leftover evaluated nodes.
+ */
+ private static Expression[] getLeftoverLeaves(List allChildNodes, List leaves) {
+ allChildNodes.remove(0);
+ allChildNodes.remove(0);
+
+ Expression[] evaluatedLeaves = new Expression[allChildNodes.size()];
+ for(int i = 0; i < allChildNodes.size(); i ++) {
+ Expression filter = translate(allChildNodes.get(i), leaves);
+ evaluatedLeaves[i] = filter;
+ }
+ return evaluatedLeaves;
+ }
+
+ /**
+ * Recursive method to traverse down the ExpressionTree to evaluate each expression and its leaf nodes.
+ * @param tree Current ExpressionTree where the 'top' node is being evaluated.
+ * @param leaves List of all leaf nodes within the tree.
+ * @return Expression that is translated from the Hive SearchArgument.
+ */
+ private static Expression translate(ExpressionTree tree, List leaves) {
+ switch (tree.getOperator()) {
+ case OR:
+ return or(translate(tree.getChildren().get(0), leaves),
+ translate(tree.getChildren().get(1), leaves));
+ case AND:
+ if(tree.getChildren().size() > 2) {
+ Expression[] evaluatedChildren = getLeftoverLeaves(tree.getChildren(), leaves);
+ return and(translate(tree.getChildren().get(0), leaves),
+ translate(tree.getChildren().get(1), leaves), evaluatedChildren);
+ } else {
+ return and(translate(tree.getChildren().get(0), leaves),
+ translate(tree.getChildren().get(1), leaves));
+ }
+ case NOT:
+ return not(translate(tree.getChildren().get(0), leaves));
+ case LEAF:
+ return translateLeaf(leaves.get(tree.getLeaf()));
+ case CONSTANT:
+ //We are unsure of how the CONSTANT case works, so using the approach of:
+ //https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/
+ // ParquetFilterPredicateConverter.java#L116
+ return null;
+ default:
+ throw new IllegalStateException("Unknown operator: " + tree.getOperator());
+ }
+ }
+
+ /**
+ * Translate leaf nodes from Hive operator to Iceberg operator.
+ * @param leaf Leaf node
+ * @return Expression fully translated from Hive PredicateLeaf
+ */
+ private static Expression translateLeaf(PredicateLeaf leaf) {
+ String column = leaf.getColumnName();
+ switch (leaf.getOperator()){
+ case EQUALS:
+ return equal(column, leaf.getLiteral());
+ case NULL_SAFE_EQUALS:
+ return equal(notNull(column).ref().name(), leaf.getLiteral()); //TODO: Unsure..
+ case LESS_THAN:
+ return lessThan(column, leaf.getLiteral());
+ case LESS_THAN_EQUALS:
+ return lessThanOrEqual(column, leaf.getLiteral());
+ case IN:
+ return in(column, leaf.getLiteralList());
+ case BETWEEN:
+ return and((greaterThanOrEqual(column, leaf.getLiteralList().get(0))),
+ lessThanOrEqual(column, leaf.getLiteralList().get(1)));
+ case IS_NULL:
+ return isNull(column);
+ default:
+ throw new IllegalStateException("Unknown operator: " + leaf.getOperator());
+ }
+ }
+}
diff --git a/src/main/java/com/expediagroup/hiveberg/IcebergInputFormat.java b/src/main/java/com/expediagroup/hiveberg/IcebergInputFormat.java
index e58dfa9..67c7fe5 100644
--- a/src/main/java/com/expediagroup/hiveberg/IcebergInputFormat.java
+++ b/src/main/java/com/expediagroup/hiveberg/IcebergInputFormat.java
@@ -15,6 +15,7 @@
*/
package com.expediagroup.hiveberg;
+import com.google.common.collect.Lists;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -22,7 +23,10 @@
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
-
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -35,6 +39,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.CloseableIterable;
@@ -42,28 +47,43 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
public class IcebergInputFormat implements InputFormat {
private static final Logger LOG = LoggerFactory.getLogger(IcebergInputFormat.class);
+ static final String TABLE_LOCATION = "location";
+ static final String TABLE_NAME = "name";
+ static final String TABLE_FILTER_SERIALIZED = "iceberg.filter.serialized";
+
private Table table;
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- String tableDir = job.get("location");
- URI location = null;
+ String tableDir = job.get(TABLE_LOCATION);
+ URI location;
try {
location = new URI(tableDir);
} catch (URISyntaxException e) {
throw new IOException("Unable to create URI for table location: '" + tableDir + "'");
}
HadoopCatalog catalog = new HadoopCatalog(job,location.getPath());
- TableIdentifier id = TableIdentifier.parse(job.get("name"));
+ TableIdentifier id = TableIdentifier.parse(job.get(TABLE_NAME));
catalog.loadTable(id);
table = catalog.loadTable(id);
- List tasks = Lists.newArrayList(table.newScan().planTasks());
+ List tasks;
+ if(job.get(TABLE_FILTER_SERIALIZED) == null) {
+ tasks = Lists.newArrayList(table.newScan().planTasks());
+ } else {
+ ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities.
+ deserializeObject(job.get( TABLE_FILTER_SERIALIZED), ExprNodeGenericFuncDesc.class);
+ SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);
+ Expression filter = IcebergFilterFactory.generateFilterExpression(sarg);
+
+ tasks = Lists.newArrayList(table.newScan()
+ .filter(filter)
+ .planTasks());
+ }
+
return createSplits(tasks);
}
diff --git a/src/main/java/com/expediagroup/hiveberg/IcebergObjectInspectorGenerator.java b/src/main/java/com/expediagroup/hiveberg/IcebergObjectInspectorGenerator.java
index f9d6c0a..6a3f6ea 100644
--- a/src/main/java/com/expediagroup/hiveberg/IcebergObjectInspectorGenerator.java
+++ b/src/main/java/com/expediagroup/hiveberg/IcebergObjectInspectorGenerator.java
@@ -17,7 +17,6 @@
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
diff --git a/src/main/java/com/expediagroup/hiveberg/IcebergReaderFactory.java b/src/main/java/com/expediagroup/hiveberg/IcebergReaderFactory.java
index 38dd4d5..8542c5c 100644
--- a/src/main/java/com/expediagroup/hiveberg/IcebergReaderFactory.java
+++ b/src/main/java/com/expediagroup/hiveberg/IcebergReaderFactory.java
@@ -32,7 +32,8 @@ class IcebergReaderFactory {
IcebergReaderFactory() {
}
- public CloseableIterable createReader(DataFile file, FileScanTask currentTask, InputFile inputFile, Schema tableSchema, boolean reuseContainers) {
+ public CloseableIterable createReader(DataFile file, FileScanTask currentTask, InputFile inputFile,
+ Schema tableSchema, boolean reuseContainers) {
switch (file.format()) {
case AVRO:
return buildAvroReader(currentTask, inputFile, tableSchema, reuseContainers);
@@ -42,11 +43,11 @@ public CloseableIterable createReader(DataFile file, FileScanTask curren
return buildParquetReader(currentTask, inputFile, tableSchema, reuseContainers);
default:
- throw new UnsupportedOperationException(String.format("Cannot read %s file: %s", file.format().name(), file.path()));
+ throw new UnsupportedOperationException(
+ String.format("Cannot read %s file: %s", file.format().name(), file.path()));
}
}
- // FIXME: use generic reader function
private CloseableIterable buildAvroReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) {
Avro.ReadBuilder builder = Avro.read(file)
.createReaderFunc(DataReader::create)
@@ -60,7 +61,7 @@ private CloseableIterable buildAvroReader(FileScanTask task, InputFile file, Sch
return builder.build();
}
- // FIXME: use generic reader function
+ //Predicate pushdown support for ORC can be tracked here: https://github.com/apache/incubator-iceberg/issues/787
private CloseableIterable buildOrcReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) {
ORC.ReadBuilder builder = ORC.read(file)
// .createReaderFunc() // FIXME: implement
@@ -70,11 +71,12 @@ private CloseableIterable buildOrcReader(FileScanTask task, InputFile file, Sche
return builder.build();
}
- // FIXME: use generic reader function
- private CloseableIterable buildParquetReader(FileScanTask task, InputFile file, Schema schema, boolean reuseContainers) {
+ private CloseableIterable buildParquetReader(FileScanTask task, InputFile file, Schema schema,
+ boolean reuseContainers) {
Parquet.ReadBuilder builder = Parquet.read(file)
.createReaderFunc(messageType -> GenericParquetReaders.buildReader(schema, messageType))
.project(schema)
+ .filter(task.residual())
.split(task.start(), task.length());
if (reuseContainers) {
diff --git a/src/main/java/com/expediagroup/hiveberg/IcebergSchemaToTypeInfo.java b/src/main/java/com/expediagroup/hiveberg/IcebergSchemaToTypeInfo.java
index 904ef1f..7c616a6 100644
--- a/src/main/java/com/expediagroup/hiveberg/IcebergSchemaToTypeInfo.java
+++ b/src/main/java/com/expediagroup/hiveberg/IcebergSchemaToTypeInfo.java
@@ -15,9 +15,9 @@
*/
package com.expediagroup.hiveberg;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
@@ -27,8 +27,6 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-import com.google.common.collect.ImmutableMap;
-
/**
* Class to convert Iceberg types to Hive TypeInfo
*/
diff --git a/src/main/java/com/expediagroup/hiveberg/IcebergSerDe.java b/src/main/java/com/expediagroup/hiveberg/IcebergSerDe.java
index c028861..fe58f01 100644
--- a/src/main/java/com/expediagroup/hiveberg/IcebergSerDe.java
+++ b/src/main/java/com/expediagroup/hiveberg/IcebergSerDe.java
@@ -19,19 +19,15 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-
import javax.annotation.Nullable;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
@@ -39,14 +35,10 @@
public class IcebergSerDe extends AbstractSerDe {
private Schema schema;
- private TableMetadata metadata;
private ObjectInspector inspector;
- private List columnNames;
- private List columnTypes;
@Override
public void initialize(@Nullable Configuration configuration, Properties properties) throws SerDeException {
- //TODO Add methods to dynamically find most recent metadata
HadoopCatalog catalog = new HadoopCatalog(configuration, properties.getProperty("location"));
TableIdentifier id = TableIdentifier.parse(properties.getProperty("name"));
Table table = catalog.loadTable(id);
diff --git a/src/main/java/com/expediagroup/hiveberg/IcebergStorageHandler.java b/src/main/java/com/expediagroup/hiveberg/IcebergStorageHandler.java
new file mode 100644
index 0000000..1e2f4e3
--- /dev/null
+++ b/src/main/java/com/expediagroup/hiveberg/IcebergStorageHandler.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2020 Expedia, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.expediagroup.hiveberg;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+
+public class IcebergStorageHandler extends DefaultStorageHandler implements HiveStoragePredicateHandler {
+
+ private Configuration conf;
+
+ @Override
+ public Class extends InputFormat> getInputFormatClass() {
+ return IcebergInputFormat.class;
+ }
+
+ @Override
+ public Class extends OutputFormat> getOutputFormatClass() {
+ return HiveIgnoreKeyTextOutputFormat.class;
+ }
+
+ @Override
+ public Class extends AbstractSerDe> getSerDeClass() {
+ return IcebergSerDe.class;
+ }
+
+ @Override
+ public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) {
+ super.configureInputJobProperties(tableDesc, jobProperties);
+ }
+
+ @Override
+ public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) {
+ super.configureOutputJobProperties(tableDesc, jobProperties);
+ }
+
+ @Override
+ public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) {
+ super.configureTableJobProperties(tableDesc, jobProperties);
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+ super.configureJobConf(tableDesc, jobConf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+ @Override
+ public String toString() {
+ return this.getClass().getName();
+ }
+
+ /**
+ * Extract and serialize the filter expression and add it to the Configuration for the InputFormat to access.
+ * @param jobConf Job configuration for InputFormat to access
+ * @param deserializer Deserializer
+ * @param exprNodeDesc Filter expression extracted by Hive
+ * @return DecomposedPredicate that tells Hive what parts of the predicate are handled by the StorageHandler
+ * and what parts Hive needs to handle.
+ */
+ @Override
+ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc exprNodeDesc) {
+ getConf().set("iceberg.filter.serialized", SerializationUtilities.serializeObject(exprNodeDesc));
+
+ //TODO: Decide what Iceberg can handle and what to return to Hive
+ DecomposedPredicate predicate = new DecomposedPredicate();
+ predicate.residualPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc;
+ return predicate;
+ }
+}
diff --git a/src/main/java/com/expediagroup/hiveberg/IcebergWritable.java b/src/main/java/com/expediagroup/hiveberg/IcebergWritable.java
index 8a4007c..24c9c6e 100644
--- a/src/main/java/com/expediagroup/hiveberg/IcebergWritable.java
+++ b/src/main/java/com/expediagroup/hiveberg/IcebergWritable.java
@@ -18,7 +18,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
diff --git a/src/test/java/com/expediagroup/hiveberg/TestIcebergFilterFactory.java b/src/test/java/com/expediagroup/hiveberg/TestIcebergFilterFactory.java
new file mode 100644
index 0000000..1316772
--- /dev/null
+++ b/src/test/java/com/expediagroup/hiveberg/TestIcebergFilterFactory.java
@@ -0,0 +1,207 @@
+/**
+ * Copyright (C) 2020 Expedia, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.expediagroup.hiveberg;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.iceberg.expressions.And;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Not;
+import org.apache.iceberg.expressions.Or;
+import org.apache.iceberg.expressions.UnboundPredicate;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestIcebergFilterFactory {
+
+ @Test
+ public void testEqualsOperand () {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder.startAnd().equals("salary", PredicateLeaf.Type.LONG, 3000L).end().build();
+
+ UnboundPredicate expected = Expressions.equal("salary", 3000L);
+ UnboundPredicate actual = (UnboundPredicate) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.literal(), expected.literal());
+ assertEquals(actual.ref().name(), expected.ref().name());
+ }
+
+ @Test
+ public void testNotEqualsOperand () {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder.startNot().equals("salary", PredicateLeaf.Type.LONG, 3000L).end().build();
+
+ Not expected = (Not) Expressions.not(Expressions.equal("salary", 3000L));
+ Not actual = (Not) IcebergFilterFactory.generateFilterExpression(arg);
+
+ UnboundPredicate childExpressionActual = (UnboundPredicate) actual.child();
+ UnboundPredicate childExpressionExpected = Expressions.equal("salary", 3000L);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.child().op(), expected.child().op());
+ assertEquals(childExpressionActual.ref().name(), childExpressionExpected.ref().name());
+ assertEquals(childExpressionActual.literal(), childExpressionExpected.literal());
+ }
+
+ @Test
+ public void testLessThanOperand () {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder.startAnd().lessThan("salary", PredicateLeaf.Type.LONG, 3000L).end().build();
+
+ UnboundPredicate expected = Expressions.lessThan("salary", 3000L);
+ UnboundPredicate actual = (UnboundPredicate) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.literal(), expected.literal());
+ assertEquals(actual.ref().name(), expected.ref().name());
+ }
+
+ @Test
+ public void testLessThanEqualsOperand() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder.startAnd().lessThanEquals("salary", PredicateLeaf.Type.LONG, 3000L).end().build();
+
+ UnboundPredicate expected = Expressions.lessThanOrEqual("salary", 3000L);
+ UnboundPredicate actual = (UnboundPredicate) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.literal(), expected.literal());
+ assertEquals(actual.ref().name(), expected.ref().name());
+ }
+
+ @Test
+ public void testInOperand() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder.startAnd().in("salary", PredicateLeaf.Type.LONG, 3000L, 4000L).end().build();
+
+ UnboundPredicate expected = Expressions.in("salary", 3000L, 4000L);
+ UnboundPredicate actual = (UnboundPredicate) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.literals(), expected.literals());
+ assertEquals(actual.ref().name(), expected.ref().name());
+ }
+
+ @Test
+ public void testBetweenOperand() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder
+ .startAnd()
+ .between("salary", PredicateLeaf.Type.LONG, 3000L, 4000L).end().build();
+
+ And expected = (And) Expressions.and(Expressions.greaterThanOrEqual("salary", 3000L),
+ Expressions.lessThanOrEqual("salary", 3000L));
+ And actual = (And) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.left().op(), expected.left().op());
+ assertEquals(actual.right().op(), expected.right().op());
+ }
+
+ @Test
+ public void testIsNullOperand() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder.startAnd().isNull("salary", PredicateLeaf.Type.LONG).end().build();
+
+ UnboundPredicate expected = Expressions.isNull("salary");
+ UnboundPredicate actual = (UnboundPredicate) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.ref().name(), expected.ref().name());
+ }
+
+ @Test
+ public void testAndOperand() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder
+ .startAnd()
+ .equals("salary", PredicateLeaf.Type.LONG, 3000L)
+ .equals("salary", PredicateLeaf.Type.LONG, 4000L)
+ .end().build();
+
+ And expected = (And) Expressions
+ .and(Expressions.equal("salary", 3000L),Expressions.equal("salary", 4000L));
+ And actual = (And) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.left().op(), expected.left().op());
+ assertEquals(actual.right().op(), expected.right().op());
+ }
+
+ @Test
+ public void tesOrOperand() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder
+ .startOr()
+ .equals("salary", PredicateLeaf.Type.LONG, 3000L)
+ .equals("salary", PredicateLeaf.Type.LONG, 4000L)
+ .end().build();
+
+ Or expected = (Or) Expressions
+ .or(Expressions.equal("salary", 3000L),Expressions.equal("salary", 4000L));
+ Or actual = (Or) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.left().op(), expected.left().op());
+ assertEquals(actual.right().op(), expected.right().op());
+ }
+
+ @Test
+ public void testManyAndOperand() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument arg = builder
+ .startAnd()
+ .equals("salary", PredicateLeaf.Type.LONG, 3000L)
+ .equals("job", PredicateLeaf.Type.LONG, 4000L)
+ .equals("name", PredicateLeaf.Type.LONG, 9000L)
+ .end()
+ .build();
+
+ And expected = (And) Expressions.and(
+ Expressions.equal("salary", 3000L),
+ Expressions.equal("job", 4000L),
+ Expressions.equal("name", 9000L ));
+
+ And actual = (And) IcebergFilterFactory.generateFilterExpression(arg);
+
+ assertEquals(actual.op(), expected.op());
+ assertEquals(actual.right().op(), expected.right().op());
+ assertEquals(actual.left().op(), expected.left().op());
+ }
+
+ /*@Test
+ public void complexArgument() {
+ SearchArgument.Builder builder = SearchArgumentFactory.newBuilder();
+ SearchArgument searchArgument = builder
+ .startAnd()
+ .startNot()
+ .isNull("name", PredicateLeaf.Type.STRING)
+ .end()
+ .startNot()
+ .lessThanEquals("salary", PredicateLeaf.Type.LONG, 3000L)
+ .end()
+ .startNot()
+ .between("salary", PredicateLeaf.Type.LONG, 2000L, 3000L)
+ .end()
+ .end()
+ .build();
+
+ Expression filter = IcebergFilterFactory.getFilterExpression(searchArgument);
+ }*/
+}
diff --git a/src/test/java/com/expediagroup/hiveberg/TestIcebergInputFormat.java b/src/test/java/com/expediagroup/hiveberg/TestIcebergInputFormat.java
index 3f21b55..f975e87 100644
--- a/src/test/java/com/expediagroup/hiveberg/TestIcebergInputFormat.java
+++ b/src/test/java/com/expediagroup/hiveberg/TestIcebergInputFormat.java
@@ -15,14 +15,13 @@
*/
package com.expediagroup.hiveberg;
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
+import com.google.common.collect.Lists;
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.StandaloneHiveRunner;
+import com.klarna.hiverunner.annotations.HiveSQL;
import java.io.File;
import java.io.IOException;
import java.util.List;
-
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
@@ -42,10 +41,9 @@
import org.junit.Test;
import org.junit.runner.RunWith;
-import com.google.common.collect.Lists;
-import com.klarna.hiverunner.HiveShell;
-import com.klarna.hiverunner.StandaloneHiveRunner;
-import com.klarna.hiverunner.annotations.HiveSQL;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
@RunWith(StandaloneHiveRunner.class)
public class TestIcebergInputFormat {
@@ -78,7 +76,26 @@ public void before() throws IOException {
}
@Test
- public void testInputFormat() {
+ public void testStorageHandler() {
+ shell.execute("CREATE DATABASE source_db");
+ shell.execute(new StringBuilder()
+ .append("CREATE TABLE source_db.table_a ")
+ .append("STORED BY 'com.expediagroup.hiveberg.IcebergStorageHandler' ")
+ .append("LOCATION '")
+ .append(tableLocation.getAbsolutePath())
+ .append("'")
+ .toString());
+
+ List