diff --git a/mr/src/main/java/org/apache/iceberg/mr/IcebergMRConfig.java b/mr/src/main/java/org/apache/iceberg/mr/IcebergMRConfig.java new file mode 100644 index 000000000000..ec81bca904e6 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/IcebergMRConfig.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr; + +import java.io.File; +import java.util.Optional; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.mr.mapreduce.IcebergInputFormat; + +public final class IcebergMRConfig { + + private static final String AS_OF_TIMESTAMP = "iceberg.mr.as.of.time"; + private static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive"; + private static final String CATALOG = "iceberg.mr.catalog"; + private static final String FILTER_EXPRESSION = "iceberg.mr.filter.expression"; + private static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model"; + private static final String LOCALITY = "iceberg.mr.locality"; + private static final String READ_SCHEMA = "iceberg.mr.read.schema"; + private static final String REUSE_CONTAINERS = "iceberg.mr.reuse.containers"; + private static final String SKIP_RESIDUAL_FILTERING = "skip.residual.filtering"; + private static final String SNAPSHOT_ID = "iceberg.mr.snapshot.id"; + private static final String SPLIT_SIZE = "iceberg.mr.split.size"; + private static final String TABLE_PATH = "iceberg.mr.table.path"; + private static final String TABLE_SCHEMA = "iceberg.mr.table.schema"; + + private IcebergMRConfig() { + } + + public static class Builder { + + private final Configuration conf; + + private Builder() { + this(new Configuration()); + } + + private Builder(Configuration conf) { + this.conf = conf; + } + + public Configuration build() { + Table table = IcebergInputFormat.findTable(conf); + schema(table.schema()); + return conf; + } + + public static Builder newInstance() { + return new Builder(); + } + + public static Builder newInstance(Configuration conf) { + return new Builder(conf); + } + + public Builder copy() { + return newInstance(new Configuration(conf)); + } + + public Builder asOfTime(long asOfTime) { + conf.setLong(AS_OF_TIMESTAMP, asOfTime); + return this; + } + + public Builder caseSensitive(boolean caseSensitive) { + conf.setBoolean(CASE_SENSITIVE, caseSensitive); + return this; + } + + public Builder catalogLoader(Class> loader) { + conf.setClass(CATALOG, loader, Function.class); + return this; + } + + public Builder filter(Expression expression) { + conf.set(FILTER_EXPRESSION, SerializationUtil.serializeToBase64(expression)); + return this; + } + + /** + * If this API is called. The input splits + * constructed will have host location information + */ + public Builder preferLocality() { + conf.setBoolean(LOCALITY, true); + return this; + } + + public Builder project(Schema schema) { + conf.set(READ_SCHEMA, SchemaParser.toJson(schema)); + return this; + } + + public Builder readFrom(File path) { + return readFrom(path.toString()); + } + + public Builder readFrom(TableIdentifier identifier) { + return readFrom(identifier.toString()); + } + + public Builder readFrom(String path) { + conf.set(TABLE_PATH, path); + return this; + } + + public Builder schema(Schema schema) { + conf.set(TABLE_SCHEMA, SchemaParser.toJson(schema)); + return this; + } + + /** + * Compute platforms pass down filters to data sources. If the data source cannot apply some filters, or only + * partially applies the filter, it will return the residual filter back. If the platform can correctly apply + * the residual filters, then it should call this api. Otherwise the current api will throw an exception if the + * passed in filter is not completely satisfied. + */ + public Builder skipResidualFiltering() { + conf.setBoolean(SKIP_RESIDUAL_FILTERING, true); + return this; + } + + public Builder reuseContainers(boolean reuse) { + conf.setBoolean(REUSE_CONTAINERS, reuse); + return this; + } + + public Builder snapshotId(long snapshotId) { + conf.setLong(SNAPSHOT_ID, snapshotId); + return this; + } + + public Builder splitSize(long splitSize) { + conf.setLong(SPLIT_SIZE, splitSize); + return this; + } + + public Builder useHiveRows() { + conf.setEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.HIVE); + return this; + } + + public Builder usePigTuples() { + conf.setEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.PIG); + return this; + } + + } + + public static long asOfTime(Configuration conf) { + return conf.getLong(AS_OF_TIMESTAMP, -1); + } + + public static boolean caseSensitive(Configuration conf) { + return conf.getBoolean(CASE_SENSITIVE, true); + } + + public static String catalogLoader(Configuration conf) { + return conf.get(CATALOG); + } + + public static Expression filter(Configuration conf) { + // TODO add a filter parser to get rid of Serialization + return SerializationUtil.deserializeFromBase64(conf.get(FILTER_EXPRESSION)); + } + + public static Schema projection(Configuration conf) { + return parseSchemaFromJson(conf.get(READ_SCHEMA)); + } + + public static String readFrom(Configuration conf) { + return conf.get(TABLE_PATH); + } + + public static boolean reuseContainers(Configuration conf) { + return conf.getBoolean(REUSE_CONTAINERS, false); + } + + public static Schema schema(Configuration conf) { + return parseSchemaFromJson(conf.get(TABLE_SCHEMA)); + } + + public static boolean applyResidualFiltering(Configuration conf) { + return !conf.getBoolean(SKIP_RESIDUAL_FILTERING, false); + } + + public static long snapshotId(Configuration conf) { + return conf.getLong(SNAPSHOT_ID, -1); + } + + public static InMemoryDataModel inMemoryDataModel(Configuration conf) { + return conf.getEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.defaultModel()); + } + + public static long splitSize(Configuration conf) { + return conf.getLong(SPLIT_SIZE, 0); + } + + public static boolean localityPreferred(Configuration conf) { + return conf.getBoolean(LOCALITY, false); + } + + private static Schema parseSchemaFromJson(@Nullable String schema) { + return Optional.ofNullable(schema) + .map(SchemaParser::fromJson) + .orElse(null); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/InMemoryDataModel.java b/mr/src/main/java/org/apache/iceberg/mr/InMemoryDataModel.java new file mode 100644 index 000000000000..1e148f6c4af3 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/InMemoryDataModel.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr; + +public enum InMemoryDataModel { + GENERIC, + HIVE, + PIG; + + public static InMemoryDataModel defaultModel() { + return GENERIC; + } + + public boolean isHiveOrPig() { + return this == HIVE || this == PIG; + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java new file mode 100644 index 000000000000..15c842cd2d23 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; + +/** + * A simple container of objects that you can get and set. + * + * @param the Java type of the object held by this container + */ +public class Container implements Writable { + + private T value; + + public T get() { + return value; + } + + public void set(T newValue) { + this.value = newValue; + } + + @Override + public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("readFields is not supported"); + } + + @Override + public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("write is not supported"); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java new file mode 100644 index 000000000000..c6bdd0ff2b12 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.mr.IcebergMRConfig; +import org.apache.iceberg.mr.mapreduce.IcebergSplit; + +/** + * Generic Mrv1 InputFormat API for Iceberg. + * + * @param T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records + */ +public class MapredIcebergInputFormat implements InputFormat> { + + private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat innerInputFormat; + + public MapredIcebergInputFormat() { + this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>(); + } + + /** + * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and + * returns a helper to add further configuration. + * + * @param job the {@code JobConf} to configure + */ + public static IcebergMRConfig.Builder configure(JobConf job) { + job.setInputFormat(MapredIcebergInputFormat.class); + return IcebergMRConfig.Builder.newInstance(job); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return innerInputFormat.getSplits(getTaskAttemptContext(job)) + .stream() + .map(InputSplit.class::cast) + .toArray(InputSplit[]::new); + } + + @Override + public RecordReader> getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + return new MapredIcebergRecordReader<>(innerInputFormat, (IcebergSplit) split, job, reporter); + } + + static TaskAttemptContext getTaskAttemptContext(JobConf job) { + TaskAttemptID taskAttemptID = Optional.ofNullable(TaskAttemptID.forName(job.get("mapred.task.id"))) + .orElse(new TaskAttemptID()); + + return new TaskAttemptContextImpl(job, taskAttemptID); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergRecordReader.java b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergRecordReader.java new file mode 100644 index 000000000000..72c0fbcd684a --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergRecordReader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.mr.mapreduce.IcebergSplit; + +class MapredIcebergRecordReader implements RecordReader> { + + private final org.apache.hadoop.mapreduce.RecordReader innerReader; + private final long splitLength; // for getPos() + + MapredIcebergRecordReader( + org.apache.iceberg.mr.mapreduce.IcebergInputFormat mapreduceInputFormat, + IcebergSplit split, + JobConf job, + Reporter reporter + ) throws IOException { + this.splitLength = split.getLength(); + + TaskAttemptContext context = MapredIcebergInputFormat.getTaskAttemptContext(job); + + try { + innerReader = mapreduceInputFormat.createRecordReader(split, context); + innerReader.initialize(split, context); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public boolean next(Void key, Container value) throws IOException { + try { + if (innerReader.nextKeyValue()) { + value.set(innerReader.getCurrentValue()); + return true; + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + + return false; + } + + @Override + public Void createKey() { + return null; + } + + @Override + public Container createValue() { + return new Container<>(); + } + + @Override + public long getPos() throws IOException { + return (long) (splitLength * getProgress()); + } + + @Override + public float getProgress() throws IOException { + if (innerReader == null) { + return 0; + } + + try { + return innerReader.getProgress(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + if (innerReader != null) { + innerReader.close(); + } + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 9db226e76980..7e8ac3fa1136 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -19,16 +19,10 @@ package org.apache.iceberg.mr.mapreduce; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -36,44 +30,22 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.mr.SerializationUtil; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.mr.IcebergMRConfig; +import org.apache.iceberg.mr.InMemoryDataModel; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,128 +56,17 @@ public class IcebergInputFormat extends InputFormat { private static final Logger LOG = LoggerFactory.getLogger(IcebergInputFormat.class); - static final String AS_OF_TIMESTAMP = "iceberg.mr.as.of.time"; - static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive"; - static final String FILTER_EXPRESSION = "iceberg.mr.filter.expression"; - static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model"; - static final String READ_SCHEMA = "iceberg.mr.read.schema"; - static final String REUSE_CONTAINERS = "iceberg.mr.reuse.containers"; - static final String SNAPSHOT_ID = "iceberg.mr.snapshot.id"; - static final String SPLIT_SIZE = "iceberg.mr.split.size"; - static final String TABLE_PATH = "iceberg.mr.table.path"; - static final String TABLE_SCHEMA = "iceberg.mr.table.schema"; - static final String LOCALITY = "iceberg.mr.locality"; - static final String CATALOG = "iceberg.mr.catalog"; - static final String SKIP_RESIDUAL_FILTERING = "skip.residual.filtering"; - private transient List splits; - private enum InMemoryDataModel { - PIG, - HIVE, - GENERIC // Default data model is of Iceberg Generics - } - /** * Configures the {@code Job} to use the {@code IcebergInputFormat} and * returns a helper to add further configuration. * * @param job the {@code Job} to configure */ - public static ConfigBuilder configure(Job job) { + public static IcebergMRConfig.Builder configure(Job job) { job.setInputFormatClass(IcebergInputFormat.class); - return new ConfigBuilder(job.getConfiguration()); - } - - public static class ConfigBuilder { - private final Configuration conf; - - public ConfigBuilder(Configuration conf) { - this.conf = conf; - // defaults - conf.setEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC); - conf.setBoolean(SKIP_RESIDUAL_FILTERING, false); - conf.setBoolean(CASE_SENSITIVE, true); - conf.setBoolean(REUSE_CONTAINERS, false); - conf.setBoolean(LOCALITY, false); - } - - public ConfigBuilder readFrom(String path) { - conf.set(TABLE_PATH, path); - Table table = findTable(conf); - conf.set(TABLE_SCHEMA, SchemaParser.toJson(table.schema())); - return this; - } - - public ConfigBuilder filter(Expression expression) { - conf.set(FILTER_EXPRESSION, SerializationUtil.serializeToBase64(expression)); - return this; - } - - public ConfigBuilder project(Schema schema) { - conf.set(READ_SCHEMA, SchemaParser.toJson(schema)); - return this; - } - - public ConfigBuilder reuseContainers(boolean reuse) { - conf.setBoolean(REUSE_CONTAINERS, reuse); - return this; - } - - public ConfigBuilder caseSensitive(boolean caseSensitive) { - conf.setBoolean(CASE_SENSITIVE, caseSensitive); - return this; - } - - public ConfigBuilder snapshotId(long snapshotId) { - conf.setLong(SNAPSHOT_ID, snapshotId); - return this; - } - - public ConfigBuilder asOfTime(long asOfTime) { - conf.setLong(AS_OF_TIMESTAMP, asOfTime); - return this; - } - - public ConfigBuilder splitSize(long splitSize) { - conf.setLong(SPLIT_SIZE, splitSize); - return this; - } - - /** - * If this API is called. The input splits - * constructed will have host location information - */ - public ConfigBuilder preferLocality() { - conf.setBoolean(LOCALITY, true); - return this; - } - - public ConfigBuilder catalogFunc(Class> catalogFuncClass) { - conf.setClass(CATALOG, catalogFuncClass, Function.class); - return this; - } - - public ConfigBuilder useHiveRows() { - conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.HIVE.name()); - return this; - } - - public ConfigBuilder usePigTuples() { - conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.PIG.name()); - return this; - } - - /** - * Compute platforms pass down filters to data sources. If the data source cannot apply some filters, or only - * partially applies the filter, it will return the residual filter back. If the platform can correctly apply - * the residual filters, then it should call this api. Otherwise the current api will throw an exception if the - * passed in filter is not completely satisfied. - */ - public ConfigBuilder skipResidualFiltering() { - conf.setBoolean(SKIP_RESIDUAL_FILTERING, true); - return this; - } + return IcebergMRConfig.Builder.newInstance(job.getConfiguration()); } @Override @@ -218,36 +79,39 @@ public List getSplits(JobContext context) { Configuration conf = context.getConfiguration(); Table table = findTable(conf); TableScan scan = table.newScan() - .caseSensitive(conf.getBoolean(CASE_SENSITIVE, true)); - long snapshotId = conf.getLong(SNAPSHOT_ID, -1); + .caseSensitive(IcebergMRConfig.caseSensitive(conf)); + + long snapshotId = IcebergMRConfig.snapshotId(conf); if (snapshotId != -1) { scan = scan.useSnapshot(snapshotId); } - long asOfTime = conf.getLong(AS_OF_TIMESTAMP, -1); + + long asOfTime = IcebergMRConfig.asOfTime(conf); if (asOfTime != -1) { scan = scan.asOfTime(asOfTime); } - long splitSize = conf.getLong(SPLIT_SIZE, 0); + + long splitSize = IcebergMRConfig.splitSize(conf); if (splitSize > 0) { scan = scan.option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize)); } - String schemaStr = conf.get(READ_SCHEMA); - if (schemaStr != null) { - scan.project(SchemaParser.fromJson(schemaStr)); + + Schema projection = IcebergMRConfig.projection(conf); + if (projection != null) { + scan.project(projection); } - // TODO add a filter parser to get rid of Serialization - Expression filter = SerializationUtil.deserializeFromBase64(conf.get(FILTER_EXPRESSION)); + Expression filter = IcebergMRConfig.filter(conf); if (filter != null) { scan = scan.filter(filter); } splits = Lists.newArrayList(); - boolean applyResidual = !conf.getBoolean(SKIP_RESIDUAL_FILTERING, false); - InMemoryDataModel model = conf.getEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC); + boolean applyResidual = IcebergMRConfig.applyResidualFiltering(conf); + InMemoryDataModel model = IcebergMRConfig.inMemoryDataModel(conf); try (CloseableIterable tasksIterable = scan.planTasks()) { tasksIterable.forEach(task -> { - if (applyResidual && (model == InMemoryDataModel.HIVE || model == InMemoryDataModel.PIG)) { + if (applyResidual && model.isHiveOrPig()) { //TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet checkResiduals(task); } @@ -277,309 +141,26 @@ public RecordReader createRecordReader(InputSplit split, TaskAttemptCon return new IcebergRecordReader<>(); } - private static final class IcebergRecordReader extends RecordReader { - private TaskAttemptContext context; - private Schema tableSchema; - private Schema expectedSchema; - private boolean reuseContainers; - private boolean caseSensitive; - private InMemoryDataModel inMemoryDataModel; - private Map namesToPos; - private Iterator tasks; - private T currentRow; - private CloseableIterator currentIterator; - - @Override - public void initialize(InputSplit split, TaskAttemptContext newContext) { - Configuration conf = newContext.getConfiguration(); - // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances - CombinedScanTask task = ((IcebergSplit) split).task; - this.context = newContext; - this.tasks = task.files().iterator(); - this.tableSchema = SchemaParser.fromJson(conf.get(TABLE_SCHEMA)); - String readSchemaStr = conf.get(READ_SCHEMA); - this.expectedSchema = readSchemaStr != null ? SchemaParser.fromJson(readSchemaStr) : tableSchema; - this.namesToPos = buildNameToPos(expectedSchema); - this.reuseContainers = conf.getBoolean(REUSE_CONTAINERS, false); - this.caseSensitive = conf.getBoolean(CASE_SENSITIVE, true); - this.inMemoryDataModel = conf.getEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC); - this.currentIterator = open(tasks.next()); - } - - @Override - public boolean nextKeyValue() throws IOException { - while (true) { - if (currentIterator.hasNext()) { - currentRow = currentIterator.next(); - return true; - } else if (tasks.hasNext()) { - currentIterator.close(); - currentIterator = open(tasks.next()); - } else { - currentIterator.close(); - return false; - } - } - } - - @Override - public Void getCurrentKey() { - return null; - } - - @Override - public T getCurrentValue() { - return currentRow; - } - - @Override - public float getProgress() { - // TODO: We could give a more accurate progress based on records read from the file. Context.getProgress does not - // have enough information to give an accurate progress value. This isn't that easy, since we don't know how much - // of the input split has been processed and we are pushing filters into Parquet and ORC. But we do know when a - // file is opened and could count the number of rows returned, so we can estimate. And we could also add a row - // count to the readers so that we can get an accurate count of rows that have been either returned or filtered - // out. - return context.getProgress(); - } - - @Override - public void close() throws IOException { - currentIterator.close(); - } - - private static Map buildNameToPos(Schema expectedSchema) { - Map nameToPos = Maps.newHashMap(); - for (int pos = 0; pos < expectedSchema.asStruct().fields().size(); pos++) { - Types.NestedField field = expectedSchema.asStruct().fields().get(pos); - nameToPos.put(field.name(), pos); - } - return nameToPos; - } - - private CloseableIterator open(FileScanTask currentTask) { - DataFile file = currentTask.file(); - // schema of rows returned by readers - PartitionSpec spec = currentTask.spec(); - Set idColumns = Sets.intersection(spec.identitySourceIds(), TypeUtil.getProjectedIds(expectedSchema)); - boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); - - CloseableIterable iterable; - if (hasJoinedPartitionColumns) { - Schema readDataSchema = TypeUtil.selectNot(expectedSchema, idColumns); - Schema identityPartitionSchema = TypeUtil.select(expectedSchema, idColumns); - iterable = CloseableIterable.transform(open(currentTask, readDataSchema), - row -> withIdentityPartitionColumns(row, identityPartitionSchema, spec, file.partition())); - } else { - iterable = open(currentTask, expectedSchema); - } - - return iterable.iterator(); - } - - private CloseableIterable open(FileScanTask currentTask, Schema readSchema) { - DataFile file = currentTask.file(); - // TODO we should make use of FileIO to create inputFile - InputFile inputFile = HadoopInputFile.fromLocation(file.path(), context.getConfiguration()); - CloseableIterable iterable; - switch (file.format()) { - case AVRO: - iterable = newAvroIterable(inputFile, currentTask, readSchema); - break; - case ORC: - iterable = newOrcIterable(inputFile, currentTask, readSchema); - break; - case PARQUET: - iterable = newParquetIterable(inputFile, currentTask, readSchema); - break; - default: - throw new UnsupportedOperationException( - String.format("Cannot read %s file: %s", file.format().name(), file.path())); - } - - return iterable; - } - - @SuppressWarnings("unchecked") - private T withIdentityPartitionColumns( - T row, Schema identityPartitionSchema, PartitionSpec spec, StructLike partition) { - switch (inMemoryDataModel) { - case PIG: - case HIVE: - throw new UnsupportedOperationException( - "Adding partition columns to Pig and Hive data model are not supported yet"); - case GENERIC: - return (T) withIdentityPartitionColumns((Record) row, identityPartitionSchema, spec, partition); - } - return row; - } - - private Record withIdentityPartitionColumns( - Record record, Schema identityPartitionSchema, PartitionSpec spec, StructLike partitionTuple) { - List partitionFields = spec.fields(); - List identityColumns = identityPartitionSchema.columns(); - GenericRecord row = GenericRecord.create(expectedSchema.asStruct()); - namesToPos.forEach((name, pos) -> { - Object field = record.getField(name); - if (field != null) { - row.set(pos, field); - } - - // if the current name, pos points to an identity partition column, we set the - // column at pos correctly by reading the corresponding value from partitionTuple` - for (int i = 0; i < identityColumns.size(); i++) { - Types.NestedField identityColumn = identityColumns.get(i); - for (int j = 0; j < partitionFields.size(); j++) { - PartitionField partitionField = partitionFields.get(j); - if (name.equals(identityColumn.name()) && - identityColumn.fieldId() == partitionField.sourceId() && - "identity".equals(partitionField.transform().toString())) { - row.set(pos, partitionTuple.get(j, spec.javaClasses()[j])); - } - } - } - }); - - return row; - } - - private CloseableIterable applyResidualFiltering(CloseableIterable iter, Expression residual, - Schema readSchema) { - boolean applyResidual = !context.getConfiguration().getBoolean(SKIP_RESIDUAL_FILTERING, false); - - if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) { - Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive); - return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record)); - } else { - return iter; - } - } - - private CloseableIterable newAvroIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { - Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile) - .project(readSchema) - .split(task.start(), task.length()); - if (reuseContainers) { - avroReadBuilder.reuseContainers(); - } - - switch (inMemoryDataModel) { - case PIG: - case HIVE: - //TODO implement value readers for Pig and Hive - throw new UnsupportedOperationException("Avro support not yet supported for Pig and Hive"); - case GENERIC: - avroReadBuilder.createReaderFunc(DataReader::create); - } - return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema); - } - - private CloseableIterable newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { - Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile) - .project(readSchema) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); - if (reuseContainers) { - parquetReadBuilder.reuseContainers(); - } - - switch (inMemoryDataModel) { - case PIG: - case HIVE: - //TODO implement value readers for Pig and Hive - throw new UnsupportedOperationException("Parquet support not yet supported for Pig and Hive"); - case GENERIC: - parquetReadBuilder.createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(readSchema, fileSchema)); - } - return applyResidualFiltering(parquetReadBuilder.build(), task.residual(), readSchema); - } - - private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { - ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile) - .project(readSchema) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); - // ORC does not support reuse containers yet - switch (inMemoryDataModel) { - case PIG: - case HIVE: - //TODO: implement value readers for Pig and Hive - throw new UnsupportedOperationException("ORC support not yet supported for Pig and Hive"); - case GENERIC: - orcReadBuilder.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema)); - } - - return applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema); - } - } - - private static Table findTable(Configuration conf) { - String path = conf.get(TABLE_PATH); + public static Table findTable(Configuration conf) { + String path = IcebergMRConfig.readFrom(conf); Preconditions.checkArgument(path != null, "Table path should not be null"); if (path.contains("/")) { HadoopTables tables = new HadoopTables(conf); return tables.load(path); } - String catalogFuncClass = conf.get(CATALOG); - if (catalogFuncClass != null) { - Function catalogFunc = (Function) + String catalogLoaderClass = IcebergMRConfig.catalogLoader(conf); + if (catalogLoaderClass != null) { + Function catalogLoader = (Function) DynConstructors.builder(Function.class) - .impl(catalogFuncClass) + .impl(catalogLoaderClass) .build() .newInstance(); - Catalog catalog = catalogFunc.apply(conf); + Catalog catalog = catalogLoader.apply(conf); TableIdentifier tableIdentifier = TableIdentifier.parse(path); return catalog.loadTable(tableIdentifier); } else { throw new IllegalArgumentException("No custom catalog specified to load table " + path); } } - - static class IcebergSplit extends InputSplit implements Writable { - static final String[] ANYWHERE = new String[]{"*"}; - private CombinedScanTask task; - private transient String[] locations; - private transient Configuration conf; - - IcebergSplit(Configuration conf, CombinedScanTask task) { - this.task = task; - this.conf = conf; - } - - @Override - public long getLength() { - return task.files().stream().mapToLong(FileScanTask::length).sum(); - } - - @Override - public String[] getLocations() { - boolean localityPreferred = conf.getBoolean(LOCALITY, false); - if (!localityPreferred) { - return ANYWHERE; - } - if (locations != null) { - return locations; - } - locations = Util.blockLocations(task, conf); - return locations; - } - - @Override - public void write(DataOutput out) throws IOException { - byte[] data = SerializationUtil.serializeToBytes(this.task); - out.writeInt(data.length); - out.write(data); - } - - @Override - public void readFields(DataInput in) throws IOException { - byte[] data = new byte[in.readInt()]; - in.readFully(data); - this.task = SerializationUtil.deserializeFromBytes(data); - } - } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java new file mode 100644 index 000000000000..9d05052a0d87 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mr.IcebergMRConfig; +import org.apache.iceberg.mr.InMemoryDataModel; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +public class IcebergRecordReader extends RecordReader { + private TaskAttemptContext context; + private Schema tableSchema; + private Schema expectedSchema; + private boolean reuseContainers; + private boolean caseSensitive; + private InMemoryDataModel inMemoryDataModel; + private Map namesToPos; + private Iterator tasks; + private T currentRow; + private CloseableIterator currentIterator; + + @Override + public void initialize(InputSplit split, TaskAttemptContext newContext) { + Configuration conf = newContext.getConfiguration(); + // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances + CombinedScanTask task = ((IcebergSplit) split).getTask(); + this.context = newContext; + this.tasks = task.files().iterator(); + this.tableSchema = IcebergMRConfig.schema(conf); + Schema projection = IcebergMRConfig.projection(conf); + this.expectedSchema = projection != null ? projection : tableSchema; + this.namesToPos = buildNameToPos(expectedSchema); + this.reuseContainers = IcebergMRConfig.reuseContainers(conf); + this.caseSensitive = IcebergMRConfig.caseSensitive(conf); + this.inMemoryDataModel = IcebergMRConfig.inMemoryDataModel(conf); + this.currentIterator = open(tasks.next()); + } + + @Override + public boolean nextKeyValue() throws IOException { + while (true) { + if (currentIterator.hasNext()) { + currentRow = currentIterator.next(); + return true; + } else if (tasks.hasNext()) { + currentIterator.close(); + currentIterator = open(tasks.next()); + } else { + currentIterator.close(); + return false; + } + } + } + + @Override + public Void getCurrentKey() { + return null; + } + + @Override + public T getCurrentValue() { + return currentRow; + } + + @Override + public float getProgress() { + // TODO: We could give a more accurate progress based on records read from the file. Context.getProgress does not + // have enough information to give an accurate progress value. This isn't that easy, since we don't know how much + // of the input split has been processed and we are pushing filters into Parquet and ORC. But we do know when a + // file is opened and could count the number of rows returned, so we can estimate. And we could also add a row + // count to the readers so that we can get an accurate count of rows that have been either returned or filtered + // out. + return context.getProgress(); + } + + @Override + public void close() throws IOException { + currentIterator.close(); + } + + private static Map buildNameToPos(Schema expectedSchema) { + Map nameToPos = Maps.newHashMap(); + for (int pos = 0; pos < expectedSchema.asStruct().fields().size(); pos++) { + Types.NestedField field = expectedSchema.asStruct().fields().get(pos); + nameToPos.put(field.name(), pos); + } + return nameToPos; + } + + private CloseableIterator open(FileScanTask currentTask) { + DataFile file = currentTask.file(); + // schema of rows returned by readers + PartitionSpec spec = currentTask.spec(); + Set idColumns = Sets.intersection(spec.identitySourceIds(), TypeUtil.getProjectedIds(expectedSchema)); + boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); + + CloseableIterable iterable; + if (hasJoinedPartitionColumns) { + Schema readDataSchema = TypeUtil.selectNot(expectedSchema, idColumns); + Schema identityPartitionSchema = TypeUtil.select(expectedSchema, idColumns); + iterable = CloseableIterable.transform(open(currentTask, readDataSchema), + row -> withIdentityPartitionColumns(row, identityPartitionSchema, spec, file.partition())); + } else { + iterable = open(currentTask, expectedSchema); + } + + return iterable.iterator(); + } + + private CloseableIterable open(FileScanTask currentTask, Schema readSchema) { + DataFile file = currentTask.file(); + // TODO we should make use of FileIO to create inputFile + InputFile inputFile = HadoopInputFile.fromLocation(file.path(), context.getConfiguration()); + CloseableIterable iterable; + switch (file.format()) { + case AVRO: + iterable = newAvroIterable(inputFile, currentTask, readSchema); + break; + case ORC: + iterable = newOrcIterable(inputFile, currentTask, readSchema); + break; + case PARQUET: + iterable = newParquetIterable(inputFile, currentTask, readSchema); + break; + default: + throw new UnsupportedOperationException( + String.format("Cannot read %s file: %s", file.format().name(), file.path())); + } + + return iterable; + } + + @SuppressWarnings("unchecked") + private T withIdentityPartitionColumns( + T row, Schema identityPartitionSchema, PartitionSpec spec, StructLike partition) { + switch (inMemoryDataModel) { + case PIG: + case HIVE: + throw new UnsupportedOperationException( + "Adding partition columns to Pig and Hive data model are not supported yet"); + case GENERIC: + return (T) withIdentityPartitionColumns((Record) row, identityPartitionSchema, spec, partition); + } + return row; + } + + private Record withIdentityPartitionColumns( + Record record, Schema identityPartitionSchema, PartitionSpec spec, StructLike partitionTuple) { + List partitionFields = spec.fields(); + List identityColumns = identityPartitionSchema.columns(); + GenericRecord row = GenericRecord.create(expectedSchema.asStruct()); + namesToPos.forEach((name, pos) -> { + Object field = record.getField(name); + if (field != null) { + row.set(pos, field); + } + + // if the current name, pos points to an identity partition column, we set the + // column at pos correctly by reading the corresponding value from partitionTuple` + for (int i = 0; i < identityColumns.size(); i++) { + Types.NestedField identityColumn = identityColumns.get(i); + for (int j = 0; j < partitionFields.size(); j++) { + PartitionField partitionField = partitionFields.get(j); + if (name.equals(identityColumn.name()) && + identityColumn.fieldId() == partitionField.sourceId() && + "identity".equals(partitionField.transform().toString())) { + row.set(pos, partitionTuple.get(j, spec.javaClasses()[j])); + } + } + } + }); + + return row; + } + + private CloseableIterable applyResidualFiltering(CloseableIterable iter, Expression residual, + Schema readSchema) { + boolean applyResidual = IcebergMRConfig.applyResidualFiltering(context.getConfiguration()); + + if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) { + Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive); + return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record)); + } else { + return iter; + } + } + + private CloseableIterable newAvroIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { + Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile) + .project(readSchema) + .split(task.start(), task.length()); + if (reuseContainers) { + avroReadBuilder.reuseContainers(); + } + + switch (inMemoryDataModel) { + case PIG: + case HIVE: + //TODO implement value readers for Pig and Hive + throw new UnsupportedOperationException("Avro support not yet supported for Pig and Hive"); + case GENERIC: + avroReadBuilder.createReaderFunc(DataReader::create); + } + return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema); + } + + private CloseableIterable newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { + Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile) + .project(readSchema) + .filter(task.residual()) + .caseSensitive(caseSensitive) + .split(task.start(), task.length()); + if (reuseContainers) { + parquetReadBuilder.reuseContainers(); + } + + switch (inMemoryDataModel) { + case PIG: + case HIVE: + //TODO implement value readers for Pig and Hive + throw new UnsupportedOperationException("Parquet support not yet supported for Pig and Hive"); + case GENERIC: + parquetReadBuilder.createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(readSchema, fileSchema)); + } + return applyResidualFiltering(parquetReadBuilder.build(), task.residual(), readSchema); + } + + private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { + ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile) + .project(readSchema) + .filter(task.residual()) + .caseSensitive(caseSensitive) + .split(task.start(), task.length()); + // ORC does not support reuse containers yet + switch (inMemoryDataModel) { + case PIG: + case HIVE: + //TODO: implement value readers for Pig and Hive + throw new UnsupportedOperationException("ORC support not yet supported for Pig and Hive"); + case GENERIC: + orcReadBuilder.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema)); + } + + return applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema); + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java new file mode 100644 index 000000000000..2ea0e2c8fea4 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.mr.IcebergMRConfig; +import org.apache.iceberg.mr.SerializationUtil; + +public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred.InputSplit, Writable { + + public static final String[] ANYWHERE = new String[]{"*"}; + + private CombinedScanTask task; + + private transient String[] locations; + private final transient Configuration conf; + + public IcebergSplit(Configuration conf, CombinedScanTask task) { + this.conf = conf; + this.task = task; + } + + @Override + public long getLength() { + return task.files().stream().mapToLong(FileScanTask::length).sum(); + } + + @Override + public String[] getLocations() { + if (locations == null) { + locations = IcebergMRConfig.localityPreferred(conf) ? Util.blockLocations(task, conf) : ANYWHERE; + } + + return locations; + } + + public CombinedScanTask getTask() { + return task; + } + + @Override + public void readFields(DataInput in) throws IOException { + byte[] data = new byte[in.readInt()]; + in.readFully(data); + this.task = SerializationUtil.deserializeFromBytes(data); + } + + @Override + public void write(DataOutput out) throws IOException { + byte[] data = SerializationUtil.serializeToBytes(this.task); + out.writeInt(data.length); + out.write(data); + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java deleted file mode 100644 index a5dbbef0ac4e..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java +++ /dev/null @@ -1,531 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.mapreduce; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.AssertHelpers; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Files; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.TestHelpers.Row; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.RandomGenericData; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.apache.iceberg.types.Types.NestedField.required; - -@RunWith(Parameterized.class) -public class TestIcebergInputFormat { - static final Schema SCHEMA = new Schema( - required(1, "data", Types.StringType.get()), - required(2, "id", Types.LongType.get()), - required(3, "date", Types.StringType.get())); - - static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) - .identity("date") - .bucket("id", 1) - .build(); - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - private HadoopTables tables; - private Configuration conf; - - @Parameterized.Parameters - public static Object[][] parameters() { - return new Object[][]{ - new Object[]{"parquet"}, - new Object[]{"avro"}, - new Object[]{"orc"} - }; - } - - private final FileFormat format; - - public TestIcebergInputFormat(String format) { - this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); - } - - @Before - public void before() { - conf = new Configuration(); - tables = new HadoopTables(conf); - } - - @Test - public void testUnpartitionedTable() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - List expectedRecords = RandomGenericData.generate(table.schema(), 1, 0L); - DataFile dataFile = writeFile(table, null, format, expectedRecords); - table.newAppend() - .appendFile(dataFile) - .commit(); - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder.readFrom(location.toString()); - validate(job, expectedRecords); - } - - @Test - public void testPartitionedTable() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Table table = tables.create(SCHEMA, SPEC, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - List expectedRecords = RandomGenericData.generate(table.schema(), 1, 0L); - expectedRecords.get(0).set(2, "2020-03-20"); - DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords); - table.newAppend() - .appendFile(dataFile) - .commit(); - - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder.readFrom(location.toString()); - validate(job, expectedRecords); - } - - @Test - public void testFilterExp() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Table table = tables.create(SCHEMA, SPEC, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - List expectedRecords = RandomGenericData.generate(table.schema(), 2, 0L); - expectedRecords.get(0).set(2, "2020-03-20"); - expectedRecords.get(1).set(2, "2020-03-20"); - DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords); - DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format, - RandomGenericData.generate(table.schema(), 2, 0L)); - table.newAppend() - .appendFile(dataFile1) - .appendFile(dataFile2) - .commit(); - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder.readFrom(location.toString()) - .filter(Expressions.equal("date", "2020-03-20")); - validate(job, expectedRecords); - } - - @Test - public void testResiduals() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Table table = tables.create(SCHEMA, SPEC, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - List writeRecords = RandomGenericData.generate(table.schema(), 2, 0L); - writeRecords.get(0).set(1, 123L); - writeRecords.get(0).set(2, "2020-03-20"); - writeRecords.get(1).set(1, 456L); - writeRecords.get(1).set(2, "2020-03-20"); - - List expectedRecords = new ArrayList<>(); - expectedRecords.add(writeRecords.get(0)); - - DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, writeRecords); - DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format, - RandomGenericData.generate(table.schema(), 2, 0L)); - table.newAppend() - .appendFile(dataFile1) - .appendFile(dataFile2) - .commit(); - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder.readFrom(location.toString()) - .filter(Expressions.and( - Expressions.equal("date", "2020-03-20"), - Expressions.equal("id", 123))); - validate(job, expectedRecords); - - // skip residual filtering - job = Job.getInstance(conf); - configBuilder = IcebergInputFormat.configure(job); - configBuilder.skipResidualFiltering().readFrom(location.toString()) - .filter(Expressions.and( - Expressions.equal("date", "2020-03-20"), - Expressions.equal("id", 123))); - validate(job, writeRecords); - } - - @Test - public void testFailedResidualFiltering() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Table table = tables.create(SCHEMA, SPEC, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - List expectedRecords = RandomGenericData.generate(table.schema(), 2, 0L); - expectedRecords.get(0).set(2, "2020-03-20"); - expectedRecords.get(1).set(2, "2020-03-20"); - - DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords); - table.newAppend() - .appendFile(dataFile1) - .commit(); - - Job jobShouldFail1 = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(jobShouldFail1); - configBuilder.useHiveRows().readFrom(location.toString()) - .filter(Expressions.and( - Expressions.equal("date", "2020-03-20"), - Expressions.equal("id", 0))); - AssertHelpers.assertThrows( - "Residuals are not evaluated today for Iceberg Generics In memory model of HIVE", - UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.", - () -> validate(jobShouldFail1, expectedRecords)); - - Job jobShouldFail2 = Job.getInstance(conf); - configBuilder = IcebergInputFormat.configure(jobShouldFail2); - configBuilder.usePigTuples().readFrom(location.toString()) - .filter(Expressions.and( - Expressions.equal("date", "2020-03-20"), - Expressions.equal("id", 0))); - AssertHelpers.assertThrows( - "Residuals are not evaluated today for Iceberg Generics In memory model of PIG", - UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.", - () -> validate(jobShouldFail2, expectedRecords)); - } - - @Test - public void testProjection() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Schema projectedSchema = TypeUtil.select(SCHEMA, ImmutableSet.of(1)); - Table table = tables.create(SCHEMA, SPEC, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - List inputRecords = RandomGenericData.generate(table.schema(), 1, 0L); - DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, inputRecords); - table.newAppend() - .appendFile(dataFile) - .commit(); - - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder - .readFrom(location.toString()) - .project(projectedSchema); - List outputRecords = readRecords(job.getConfiguration()); - Assert.assertEquals(inputRecords.size(), outputRecords.size()); - Assert.assertEquals(projectedSchema.asStruct(), outputRecords.get(0).struct()); - } - - private static final Schema LOG_SCHEMA = new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "date", Types.StringType.get()), - Types.NestedField.optional(3, "level", Types.StringType.get()), - Types.NestedField.optional(4, "message", Types.StringType.get()) - ); - - private static final PartitionSpec IDENTITY_PARTITION_SPEC = - PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build(); - - @Test - public void testIdentityPartitionProjections() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Table table = tables.create(LOG_SCHEMA, IDENTITY_PARTITION_SPEC, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - - List inputRecords = RandomGenericData.generate(LOG_SCHEMA, 10, 0); - Integer idx = 0; - AppendFiles append = table.newAppend(); - for (Record record : inputRecords) { - record.set(1, "2020-03-2" + idx); - record.set(2, idx.toString()); - append.appendFile(writeFile(table, Row.of("2020-03-2" + idx, idx.toString()), format, ImmutableList.of(record))); - idx += 1; - } - append.commit(); - - // individual fields - validateIdentityPartitionProjections(location.toString(), withColumns("date"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("level"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("message"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("id"), inputRecords); - // field pairs - validateIdentityPartitionProjections(location.toString(), withColumns("date", "message"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("level", "message"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("date", "level"), inputRecords); - // out-of-order pairs - validateIdentityPartitionProjections(location.toString(), withColumns("message", "date"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("message", "level"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("level", "date"), inputRecords); - // full projection - validateIdentityPartitionProjections(location.toString(), LOG_SCHEMA, inputRecords); - // out-of-order triplets - validateIdentityPartitionProjections(location.toString(), withColumns("date", "level", "message"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("level", "date", "message"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("date", "message", "level"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("level", "message", "date"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("message", "date", "level"), inputRecords); - validateIdentityPartitionProjections(location.toString(), withColumns("message", "level", "date"), inputRecords); - } - - private static Schema withColumns(String... names) { - Map indexByName = TypeUtil.indexByName(LOG_SCHEMA.asStruct()); - Set projectedIds = Sets.newHashSet(); - for (String name : names) { - projectedIds.add(indexByName.get(name)); - } - return TypeUtil.select(LOG_SCHEMA, projectedIds); - } - - private void validateIdentityPartitionProjections( - String tablePath, Schema projectedSchema, List inputRecords) throws Exception { - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder - .readFrom(tablePath) - .project(projectedSchema); - List actualRecords = readRecords(job.getConfiguration()); - - Set fieldNames = TypeUtil.indexByName(projectedSchema.asStruct()).keySet(); - for (int pos = 0; pos < inputRecords.size(); pos++) { - Record inputRecord = inputRecords.get(pos); - Record actualRecord = actualRecords.get(pos); - Assert.assertEquals("Projected schema should match", projectedSchema.asStruct(), actualRecord.struct()); - for (String name : fieldNames) { - Assert.assertEquals( - "Projected field " + name + " should match", inputRecord.getField(name), actualRecord.getField(name)); - } - } - } - - @Test - public void testSnapshotReads() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - List expectedRecords = RandomGenericData.generate(table.schema(), 1, 0L); - table.newAppend() - .appendFile(writeFile(table, null, format, expectedRecords)) - .commit(); - long snapshotId = table.currentSnapshot().snapshotId(); - table.newAppend() - .appendFile(writeFile(table, null, format, RandomGenericData.generate(table.schema(), 1, 0L))) - .commit(); - - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder - .readFrom(location.toString()) - .snapshotId(snapshotId); - - validate(job, expectedRecords); - } - - @Test - public void testLocality() throws Exception { - File location = temp.newFolder(format.name()); - Assert.assertTrue(location.delete()); - Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), - location.toString()); - List expectedRecords = RandomGenericData.generate(table.schema(), 1, 0L); - table.newAppend() - .appendFile(writeFile(table, null, format, expectedRecords)) - .commit(); - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder.readFrom(location.toString()); - - for (InputSplit split : splits(job.getConfiguration())) { - Assert.assertArrayEquals(IcebergInputFormat.IcebergSplit.ANYWHERE, split.getLocations()); - } - - configBuilder.preferLocality(); - for (InputSplit split : splits(job.getConfiguration())) { - Assert.assertArrayEquals(new String[]{"localhost"}, split.getLocations()); - } - } - - public static class HadoopCatalogFunc implements Function { - @Override - public Catalog apply(Configuration conf) { - return new HadoopCatalog(conf, conf.get("warehouse.location")); - } - } - - @Test - public void testCustomCatalog() throws Exception { - conf = new Configuration(); - conf.set("warehouse.location", temp.newFolder("hadoop_catalog").getAbsolutePath()); - - Catalog catalog = new HadoopCatalogFunc().apply(conf); - TableIdentifier tableIdentifier = TableIdentifier.of("db", "t"); - Table table = catalog.createTable(tableIdentifier, SCHEMA, SPEC, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); - List expectedRecords = RandomGenericData.generate(table.schema(), 1, 0L); - expectedRecords.get(0).set(2, "2020-03-20"); - DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords); - table.newAppend() - .appendFile(dataFile) - .commit(); - - Job job = Job.getInstance(conf); - IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job); - configBuilder - .catalogFunc(HadoopCatalogFunc.class) - .readFrom(tableIdentifier.toString()); - validate(job, expectedRecords); - } - - private static void validate(Job job, List expectedRecords) { - List actualRecords = readRecords(job.getConfiguration()); - Assert.assertEquals(expectedRecords, actualRecords); - } - - private static List splits(Configuration conf) { - TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); - IcebergInputFormat icebergInputFormat = new IcebergInputFormat<>(); - return icebergInputFormat.getSplits(context); - } - - private static List readRecords(Configuration conf) { - TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); - IcebergInputFormat icebergInputFormat = new IcebergInputFormat<>(); - List splits = icebergInputFormat.getSplits(context); - return - FluentIterable - .from(splits) - .transformAndConcat(split -> readRecords(icebergInputFormat, split, context)) - .toList(); - } - - private static Iterable readRecords( - IcebergInputFormat inputFormat, InputSplit split, TaskAttemptContext context) { - RecordReader recordReader = inputFormat.createRecordReader(split, context); - List records = new ArrayList<>(); - try { - recordReader.initialize(split, context); - while (recordReader.nextKeyValue()) { - records.add(recordReader.getCurrentValue()); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - return records; - } - - private DataFile writeFile( - Table table, StructLike partitionData, FileFormat fileFormat, List records) throws IOException { - File file = temp.newFile(); - Assert.assertTrue(file.delete()); - FileAppender appender; - switch (fileFormat) { - case AVRO: - appender = Avro.write(Files.localOutput(file)) - .schema(table.schema()) - .createWriterFunc(DataWriter::create) - .named(fileFormat.name()) - .build(); - break; - case PARQUET: - appender = Parquet.write(Files.localOutput(file)) - .schema(table.schema()) - .createWriterFunc(GenericParquetWriter::buildWriter) - .named(fileFormat.name()) - .build(); - break; - case ORC: - appender = ORC.write(Files.localOutput(file)) - .schema(table.schema()) - .createWriterFunc(GenericOrcWriter::buildWriter) - .build(); - break; - default: - throw new UnsupportedOperationException("Cannot write format: " + fileFormat); - } - - try { - appender.addAll(records); - } finally { - appender.close(); - } - - DataFiles.Builder builder = DataFiles.builder(table.spec()) - .withPath(file.toString()) - .withFormat(format) - .withFileSizeInBytes(file.length()) - .withMetrics(appender.metrics()); - if (partitionData != null) { - builder.withPartition(partitionData); - } - return builder.build(); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormatHelper.java b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormatHelper.java new file mode 100644 index 000000000000..b8e19eac4662 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormatHelper.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.rules.TemporaryFolder; + +public class TestIcebergInputFormatHelper { + + private final HadoopTables tables; + private final Schema schema; + private final PartitionSpec spec; + private final FileFormat fileFormat; + private final TemporaryFolder tmp; + private final File location; + + private Table table; + + public TestIcebergInputFormatHelper(HadoopTables tables, Schema schema, PartitionSpec spec, FileFormat fileFormat, + TemporaryFolder tmp, File location) { + this.tables = tables; + this.schema = schema; + this.spec = spec; + this.fileFormat = fileFormat; + this.tmp = tmp; + this.location = location; + } + + public Table getTable() { + return table; + } + + public Table createTable(Schema theSchema, PartitionSpec theSpec) { + Map properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()); + table = tables.create(theSchema, theSpec, properties, location.toString()); + return table; + } + + public Table createTable(Catalog catalog, TableIdentifier identifier) { + Map properties = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()); + table = catalog.createTable(identifier, schema, spec, properties); + return table; + } + + public Table createPartitionedTable() { + return createTable(schema, spec); + } + + public Table createUnpartitionedTable() { + return createTable(schema, PartitionSpec.unpartitioned()); + } + + public List generateRandomRecords(int num, long seed) { + Preconditions.checkNotNull(table, "table not set"); + return RandomGenericData.generate(table.schema(), num, seed); + } + + public void appendToTable(DataFile... dataFiles) { + Preconditions.checkNotNull(table, "table not set"); + + AppendFiles append = table.newAppend(); + + for (DataFile dataFile : dataFiles) { + append = append.appendFile(dataFile); + } + + append.commit(); + } + + public void appendToTable(StructLike partition, List records) throws IOException { + appendToTable(writeFile(partition, records)); + } + + public DataFile writeFile(StructLike partition, List records) throws IOException { + Preconditions.checkNotNull(table, "table not set"); + + File file = tmp.newFile(); + Assert.assertTrue(file.delete()); + + FileAppender appender; + + switch (fileFormat) { + case AVRO: + appender = Avro.write(Files.localOutput(file)) + .schema(table.schema()) + .createWriterFunc(DataWriter::create) + .named(fileFormat.name()) + .build(); + break; + + case PARQUET: + appender = Parquet.write(Files.localOutput(file)) + .schema(table.schema()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .named(fileFormat.name()) + .build(); + break; + + case ORC: + appender = ORC.write(Files.localOutput(file)) + .schema(table.schema()) + .createWriterFunc(GenericOrcWriter::buildWriter) + .build(); + break; + + default: + throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + } + + try { + appender.addAll(records); + } finally { + appender.close(); + } + + DataFiles.Builder builder = DataFiles.builder(table.spec()) + .withPath(file.toString()) + .withFormat(fileFormat) + .withFileSizeInBytes(file.length()) + .withMetrics(appender.metrics()); + + if (partition != null) { + builder.withPartition(partition); + } + + return builder.build(); + } + +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormats.java b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormats.java new file mode 100644 index 000000000000..0a31b7e552b2 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormats.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TestHelpers.Row; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.mr.IcebergMRConfig; +import org.apache.iceberg.mr.mapred.Container; +import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.required; + +@RunWith(Parameterized.class) +public class TestIcebergInputFormats { + + static final List> TESTED_INPUT_FORMATS = ImmutableList.of( + TestInputFormat.newFactory("MapRedIcebergInputFormat", TestMapRedIcebergInputFormat::create), + TestInputFormat.newFactory("IcebergInputFormat", TestIcebergInputFormat::create)); + + static final List TESTED_FILE_FORMATS = ImmutableList.of("avro", "orc", "parquet"); + + static final Schema SCHEMA = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "id", Types.LongType.get()), + required(3, "date", Types.StringType.get())); + + static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .identity("date") + .bucket("id", 1) + .build(); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + // before variables + private HadoopTables tables; + private Configuration conf; + private File location; + private TestIcebergInputFormatHelper helper; + private IcebergMRConfig.Builder builder; + + // parametrized variables + private final TestInputFormat.Factory testInputFormat; + private final FileFormat fileFormat; + + @Before + public void before() throws IOException { + conf = new Configuration(); + tables = new HadoopTables(conf); + + location = temp.newFolder(testInputFormat.name(), fileFormat.name()); + Assert.assertTrue(location.delete()); + + helper = new TestIcebergInputFormatHelper(tables, SCHEMA, SPEC, fileFormat, temp, location); + builder = IcebergMRConfig.Builder.newInstance(conf).readFrom(location); + } + + @Parameterized.Parameters + public static Object[][] parameters() { + Object[][] parameters = new Object[TESTED_INPUT_FORMATS.size() * TESTED_FILE_FORMATS.size()][2]; + + int idx = 0; + + for (TestInputFormat.Factory inputFormat : TESTED_INPUT_FORMATS) { + for (String fileFormat : TESTED_FILE_FORMATS) { + parameters[idx++] = new Object[] {inputFormat, fileFormat}; + } + } + + return parameters; + } + + public TestIcebergInputFormats(TestInputFormat.Factory testInputFormat, String fileFormat) { + this.testInputFormat = testInputFormat; + this.fileFormat = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + } + + @Test + public void testUnpartitionedTable() throws Exception { + helper.createUnpartitionedTable(); + List expectedRecords = helper.generateRandomRecords(1, 0L); + helper.appendToTable(null, expectedRecords); + + testInputFormat.create(builder.build()).validate(expectedRecords); + } + + @Test + public void testPartitionedTable() throws Exception { + helper.createPartitionedTable(); + List expectedRecords = helper.generateRandomRecords(1, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords); + + testInputFormat.create(builder.build()).validate(expectedRecords); + } + + @Test + public void testFilterExp() throws Exception { + helper.createPartitionedTable(); + + List expectedRecords = helper.generateRandomRecords(2, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + expectedRecords.get(1).set(2, "2020-03-20"); + + DataFile dataFile1 = helper.writeFile(Row.of("2020-03-20", 0), expectedRecords); + DataFile dataFile2 = helper.writeFile(Row.of("2020-03-21", 0), helper.generateRandomRecords(2, 0L)); + helper.appendToTable(dataFile1, dataFile2); + + builder.filter(Expressions.equal("date", "2020-03-20")); + testInputFormat.create(builder.build()).validate(expectedRecords); + } + + @Test + public void testResiduals() throws Exception { + helper.createPartitionedTable(); + + List writeRecords = helper.generateRandomRecords(2, 0L); + writeRecords.get(0).set(1, 123L); + writeRecords.get(0).set(2, "2020-03-20"); + writeRecords.get(1).set(1, 456L); + writeRecords.get(1).set(2, "2020-03-20"); + + List expectedRecords = new ArrayList<>(); + expectedRecords.add(writeRecords.get(0)); + + DataFile dataFile1 = helper.writeFile(Row.of("2020-03-20", 0), writeRecords); + DataFile dataFile2 = helper.writeFile(Row.of("2020-03-21", 0), helper.generateRandomRecords(2, 0L)); + helper.appendToTable(dataFile1, dataFile2); + + builder.filter(Expressions.and( + Expressions.equal("date", "2020-03-20"), + Expressions.equal("id", 123))); + testInputFormat.create(builder.build()).validate(expectedRecords); + + // skip residual filtering + builder.skipResidualFiltering(); + testInputFormat.create(builder.build()).validate(writeRecords); + } + + @Test + public void testFailedResidualFiltering() throws Exception { + helper.createPartitionedTable(); + + List expectedRecords = helper.generateRandomRecords(2, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + expectedRecords.get(1).set(2, "2020-03-20"); + + helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords); + + builder.useHiveRows() + .filter(Expressions.and( + Expressions.equal("date", "2020-03-20"), + Expressions.equal("id", 0))); + + AssertHelpers.assertThrows( + "Residuals are not evaluated today for Iceberg Generics In memory model of HIVE", + UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.", + () -> testInputFormat.create(builder.build())); + + builder.usePigTuples(); + + AssertHelpers.assertThrows( + "Residuals are not evaluated today for Iceberg Generics In memory model of PIG", + UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.", + () -> testInputFormat.create(builder.build())); + } + + @Test + public void testProjection() throws Exception { + helper.createPartitionedTable(); + List inputRecords = helper.generateRandomRecords(1, 0L); + helper.appendToTable(Row.of("2020-03-20", 0), inputRecords); + + Schema projection = TypeUtil.select(SCHEMA, ImmutableSet.of(1)); + builder.project(projection); + + List outputRecords = testInputFormat.create(builder.build()).getRecords(); + + Assert.assertEquals(inputRecords.size(), outputRecords.size()); + Assert.assertEquals(projection.asStruct(), outputRecords.get(0).struct()); + } + + private static final Schema LOG_SCHEMA = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "date", Types.StringType.get()), + Types.NestedField.optional(3, "level", Types.StringType.get()), + Types.NestedField.optional(4, "message", Types.StringType.get()) + ); + + private static final PartitionSpec IDENTITY_PARTITION_SPEC = + PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build(); + + @Test + public void testIdentityPartitionProjections() throws Exception { + helper.createTable(LOG_SCHEMA, IDENTITY_PARTITION_SPEC); + List inputRecords = helper.generateRandomRecords(10, 0L); + + Integer idx = 0; + AppendFiles append = helper.getTable().newAppend(); + for (Record record : inputRecords) { + record.set(1, "2020-03-2" + idx); + record.set(2, idx.toString()); + append.appendFile(helper.writeFile(Row.of("2020-03-2" + idx, idx.toString()), ImmutableList.of(record))); + idx += 1; + } + append.commit(); + + // individual fields + validateIdentityPartitionProjections(withColumns("date"), inputRecords); + validateIdentityPartitionProjections(withColumns("level"), inputRecords); + validateIdentityPartitionProjections(withColumns("message"), inputRecords); + validateIdentityPartitionProjections(withColumns("id"), inputRecords); + // field pairs + validateIdentityPartitionProjections(withColumns("date", "message"), inputRecords); + validateIdentityPartitionProjections(withColumns("level", "message"), inputRecords); + validateIdentityPartitionProjections(withColumns("date", "level"), inputRecords); + // out-of-order pairs + validateIdentityPartitionProjections(withColumns("message", "date"), inputRecords); + validateIdentityPartitionProjections(withColumns("message", "level"), inputRecords); + validateIdentityPartitionProjections(withColumns("level", "date"), inputRecords); + // full projection + validateIdentityPartitionProjections(LOG_SCHEMA, inputRecords); + // out-of-order triplets + validateIdentityPartitionProjections(withColumns("date", "level", "message"), inputRecords); + validateIdentityPartitionProjections(withColumns("level", "date", "message"), inputRecords); + validateIdentityPartitionProjections(withColumns("date", "message", "level"), inputRecords); + validateIdentityPartitionProjections(withColumns("level", "message", "date"), inputRecords); + validateIdentityPartitionProjections(withColumns("message", "date", "level"), inputRecords); + validateIdentityPartitionProjections(withColumns("message", "level", "date"), inputRecords); + } + + private static Schema withColumns(String... names) { + Map indexByName = TypeUtil.indexByName(LOG_SCHEMA.asStruct()); + Set projectedIds = Sets.newHashSet(); + for (String name : names) { + projectedIds.add(indexByName.get(name)); + } + return TypeUtil.select(LOG_SCHEMA, projectedIds); + } + + private void validateIdentityPartitionProjections( + Schema projectedSchema, List inputRecords) throws Exception { + builder.project(projectedSchema); + List actualRecords = testInputFormat.create(builder.build()).getRecords(); + + Set fieldNames = TypeUtil.indexByName(projectedSchema.asStruct()).keySet(); + for (int pos = 0; pos < inputRecords.size(); pos++) { + Record inputRecord = inputRecords.get(pos); + Record actualRecord = actualRecords.get(pos); + Assert.assertEquals("Projected schema should match", projectedSchema.asStruct(), actualRecord.struct()); + for (String name : fieldNames) { + Assert.assertEquals( + "Projected field " + name + " should match", inputRecord.getField(name), actualRecord.getField(name)); + } + } + } + + @Test + public void testSnapshotReads() throws Exception { + helper.createUnpartitionedTable(); + + List expectedRecords = helper.generateRandomRecords(1, 0L); + helper.appendToTable(null, expectedRecords); + long snapshotId = helper.getTable().currentSnapshot().snapshotId(); + + helper.appendToTable(null, helper.generateRandomRecords(1, 0L)); + + builder.snapshotId(snapshotId); + testInputFormat.create(builder.build()).validate(expectedRecords); + } + + @Test + public void testLocality() throws Exception { + helper.createUnpartitionedTable(); + List expectedRecords = helper.generateRandomRecords(1, 0L); + helper.appendToTable(null, expectedRecords); + + for (InputSplit split : testInputFormat.create(builder.build()).getSplits()) { + Assert.assertArrayEquals(IcebergSplit.ANYWHERE, split.getLocations()); + } + + builder.preferLocality(); + + for (InputSplit split : testInputFormat.create(builder.build()).getSplits()) { + Assert.assertArrayEquals(new String[]{"localhost"}, split.getLocations()); + } + } + + public static class HadoopCatalogLoader implements Function { + @Override + public Catalog apply(Configuration conf) { + return new HadoopCatalog(conf, conf.get("warehouse.location")); + } + } + + @Test + public void testCustomCatalog() throws IOException { + conf.set("warehouse.location", temp.newFolder("hadoop_catalog").getAbsolutePath()); + + Catalog catalog = new HadoopCatalogLoader().apply(conf); + TableIdentifier identifier = TableIdentifier.of("db", "t"); + helper.createTable(catalog, identifier); + + List expectedRecords = helper.generateRandomRecords(1, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords); + + builder.catalogLoader(HadoopCatalogLoader.class) + .readFrom(identifier); + + testInputFormat.create(builder.build()).validate(expectedRecords); + } + + private abstract static class TestInputFormat { + + private final List splits; + private final List records; + + private TestInputFormat(List splits, List records) { + this.splits = splits; + this.records = records; + } + + public List getRecords() { + return records; + } + + public List getSplits() { + return splits; + } + + public void validate(List expected) { + Assert.assertEquals(expected, records); + } + + public interface Factory { + String name(); + TestInputFormat create(Configuration conf); + } + + public static Factory newFactory(String name, Function> function) { + return new Factory() { + @Override + public String name() { + return name; + } + + @Override + public TestInputFormat create(Configuration conf) { + return function.apply(conf); + } + }; + } + } + + private static class TestMapRedIcebergInputFormat extends TestInputFormat { + + private TestMapRedIcebergInputFormat(List splits, List records) { + super(splits, records); + } + + private static TestMapRedIcebergInputFormat create(Configuration conf) { + JobConf job = new JobConf(conf); + MapredIcebergInputFormat inputFormat = new MapredIcebergInputFormat<>(); + + try { + org.apache.hadoop.mapred.InputSplit[] splits = inputFormat.getSplits(job, 1); + + List iceSplits = new ArrayList<>(splits.length); + List records = new ArrayList<>(); + + for (org.apache.hadoop.mapred.InputSplit split : splits) { + iceSplits.add((IcebergSplit) split); + org.apache.hadoop.mapred.RecordReader> + reader = inputFormat.getRecordReader(split, job, Reporter.NULL); + + try { + Container container = reader.createValue(); + + while (reader.next(null, container)) { + records.add(container.get()); + } + } finally { + reader.close(); + } + } + + return new TestMapRedIcebergInputFormat<>(iceSplits, records); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe); + } + } + } + + private static class TestIcebergInputFormat extends TestInputFormat { + + private TestIcebergInputFormat(List splits, List records) { + super(splits, records); + } + + private static TestIcebergInputFormat create(Configuration conf) { + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + IcebergInputFormat inputFormat = new IcebergInputFormat<>(); + List splits = inputFormat.getSplits(context); + + List iceSplits = new ArrayList<>(splits.size()); + List records = new ArrayList<>(); + + for (InputSplit split : splits) { + iceSplits.add((IcebergSplit) split); + + try (RecordReader reader = inputFormat.createRecordReader(split, context)) { + reader.initialize(split, context); + + while (reader.nextKeyValue()) { + records.add(reader.getCurrentValue()); + } + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe); + } + } + + return new TestIcebergInputFormat<>(iceSplits, records); + } + } + +}