Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ subprojects {
all {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.mortbay.jetty'
exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'

resolutionStrategy {
force 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.2'
Expand Down Expand Up @@ -332,11 +333,34 @@ project(':iceberg-mr') {
compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
}

compileOnly("org.apache.hive:hive-exec::core") {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
exclude group: 'com.google.guava'
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.calcite.avatica'
exclude group: 'org.apache.hive', module: 'hive-llap-tez'
exclude group: 'org.apache.logging.log4j'
exclude group: 'org.pentaho' // missing dependency
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compileOnly("org.apache.hive:hive-serde")

testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')

testCompile("org.apache.avro:avro:1.9.2")
testCompile("org.apache.calcite:calcite-core")
testCompile("com.esotericsoftware:kryo-shaded:4.0.2")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
testCompile("com.klarna:hiverunner:5.2.1") {
exclude group: 'javax.jms', module: 'jms'
exclude group: 'org.apache.hive', module: 'hive-exec'
exclude group: 'org.codehaus.jettison', module: 'jettison'
exclude group: 'org.apache.calcite.avatica'
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.iceberg.mr;

import java.io.File;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;

public class InputFormatConfig {
Expand Down Expand Up @@ -73,6 +75,10 @@ public ConfigBuilder(Configuration conf) {
conf.setBoolean(LOCALITY, false);
}

public Configuration conf() {
return conf;
}

public ConfigBuilder filter(Expression expression) {
conf.set(FILTER_EXPRESSION, SerializationUtil.serializeToBase64(expression));
return this;
Expand All @@ -88,6 +94,14 @@ public ConfigBuilder schema(Schema schema) {
return this;
}

public ConfigBuilder readFrom(TableIdentifier identifier) {
return readFrom(identifier.toString());
}

public ConfigBuilder readFrom(File path) {
return readFrom(path.toString());
}

public ConfigBuilder readFrom(String path) {
conf.set(TABLE_PATH, path);
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.mr.mapred.TableResolver;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
implements CombineHiveInputFormat.AvoidSplitCombination {

private transient Table table;
private transient Schema schema;

@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
table = TableResolver.resolveTableFromConfiguration(job);
schema = table.schema();

forwardConfigSettings(job);

return Arrays.stream(super.getSplits(job, numSplits))
.map(split -> new HiveIcebergSplit((IcebergSplit) split, table.location()))
.toArray(InputSplit[]::new);
}

@Override
public RecordReader<Void, Container<Record>> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
// Since Hive passes a copy of `job` in `getSplits`, we need to forward the conf settings again.
forwardConfigSettings(job);
return super.getRecordReader(split, job, reporter);
}

@Override
public boolean shouldSkipCombine(Path path, Configuration conf) {
return true;
}

/**
* Forward configuration settings to the underlying MR input format.
*/
private void forwardConfigSettings(JobConf job) {
Preconditions.checkNotNull(table, "Table cannot be null");
Preconditions.checkNotNull(schema, "Schema cannot be null");

// Once mapred.TableResolver and mapreduce.TableResolver use the same property for the location of the table
// (TABLE_LOCATION vs. TABLE_PATH), this line can be removed: see https://github.com/apache/iceberg/issues/1155.
job.set(InputFormatConfig.TABLE_PATH, table.location());
job.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
}
}
93 changes: 93 additions & 0 deletions mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.iceberg.mr.SerializationUtil;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;

// Hive requires file formats to return splits that are instances of `FileSplit`.
public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {

private IcebergSplit innerSplit;

// Hive uses the path name of a split to map it back to a partition (`PartitionDesc`) or table description object
// (`TableDesc`) which specifies the relevant input format for reading the files belonging to that partition or table.
// That way, `HiveInputFormat` and `CombineHiveInputFormat` can read files with different file formats in the same
// MapReduce job and merge compatible splits together.
private String tableLocation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like what's happening is the table location is used as the split's path so that Hive associates all splits with the same PartitionDesc that contains a TableDesc. Is that correct? If so, I think it would be better to add that as the comment. It's difficult to read the Hive code and figure out what's going on using just the pointers here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive uses the path name of the split to map it back to a PartitionDesc or TableDesc, which specify the relevant input format for reading the files belonging to that partition or table. That way, HiveInputFormat and CombineHiveInputFormat can read files with different input formats in the same MR job and combine compatible splits together.

I'll update the comment.


// public no-argument constructor for deserialization
public HiveIcebergSplit() {}

HiveIcebergSplit(IcebergSplit split, String tableLocation) {
this.innerSplit = split;
this.tableLocation = tableLocation;
}

@Override
public IcebergSplit icebergSplit() {
return innerSplit;
}

@Override
public long getLength() {
return innerSplit.getLength();
}

@Override
public String[] getLocations() {
return innerSplit.getLocations();
}

@Override
public Path getPath() {
return new Path(tableLocation);
}

@Override
public long getStart() {
return 0;
}

@Override
public void write(DataOutput out) throws IOException {
byte[] bytes = SerializationUtil.serializeToBytes(tableLocation);
out.writeInt(bytes.length);
out.write(bytes);

innerSplit.write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
tableLocation = SerializationUtil.deserializeFromBytes(bytes);

innerSplit = new IcebergSplit();
innerSplit.readFields(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,22 @@
import java.io.DataInput;
import java.io.DataOutput;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;

/**
* Wraps an Iceberg Record in a Writable which Hive can use in the SerDe.
* A simple container of objects that you can get and set.
*
* @param <T> the Java type of the object held by this container
*/
public class IcebergWritable implements Writable {

private Record record;
private Schema schema;

public IcebergWritable(Record record, Schema schema) {
this.record = record;
this.schema = schema;
}
public class Container<T> implements Writable {

@SuppressWarnings("checkstyle:HiddenField")
public void wrapRecord(Record record) {
this.record = record;
}

public Record record() {
return record;
}
private T value;

public Schema schema() {
return schema;
public T get() {
return value;
}

@SuppressWarnings("checkstyle:HiddenField")
public void wrapSchema(Schema schema) {
this.schema = schema;
public void set(T newValue) {
this.value = newValue;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr

@Override
public Class<? extends Writable> getSerializedClass() {
return IcebergWritable.class;
return Container.class;
}

@Override
Expand All @@ -70,7 +70,7 @@ public SerDeStats getSerDeStats() {

@Override
public Object deserialize(Writable writable) {
return ((IcebergWritable) writable).record();
return ((Container<?>) writable).get();
}

@Override
Expand Down
Loading