diff --git a/build.gradle b/build.gradle index a1f176c2b8fd..34860474ea9e 100644 --- a/build.gradle +++ b/build.gradle @@ -135,6 +135,12 @@ project(':iceberg-data') { compile project(':iceberg-api') compile project(':iceberg-core') compileOnly project(':iceberg-parquet') + compileOnly project(':iceberg-orc') + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'commons-beanutils' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } testCompile("org.apache.hadoop:hadoop-client") { exclude group: 'org.apache.avro', module: 'avro' @@ -200,10 +206,16 @@ project(':iceberg-orc') { exclude group: 'org.apache.hive', module: 'hive-storage-api' } + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'commons-beanutils' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } compileOnly("org.apache.hadoop:hadoop-client") { exclude group: 'org.apache.avro', module: 'avro' } + testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java new file mode 100644 index 000000000000..e5f06191ba8d --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.orc; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.orc.OrcValueReader; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +/** + * ORC reader for Generic Record. + */ +public class GenericOrcReader implements OrcValueReader { + + private final Schema schema; + private final List columns; + private final Converter[] converters; + + private GenericOrcReader(Schema expectedSchema, TypeDescription readSchema) { + this.schema = expectedSchema; + this.columns = readSchema.getChildren(); + this.converters = buildConverters(); + } + + private Converter[] buildConverters() { + Preconditions.checkState(schema.columns().size() == columns.size(), + "Expected schema must have same number of columns as projection."); + Converter[] newConverters = new Converter[columns.size()]; + List icebergCols = schema.columns(); + for (int c = 0; c < newConverters.length; ++c) { + newConverters[c] = buildConverter(icebergCols.get(c), columns.get(c)); + } + return newConverters; + } + + public static OrcValueReader buildReader(Schema expectedSchema, + TypeDescription fileSchema) { + return new GenericOrcReader(expectedSchema, fileSchema); + } + + @Override + public Record read(VectorizedRowBatch batch, int row) { + Record rowRecord = GenericRecord.create(schema); + for (int c = 0; c < batch.cols.length; ++c) { + rowRecord.set(c, converters[c].convert(batch.cols[c], row)); + } + return rowRecord; + } + + interface Converter { + T convert(ColumnVector vector, int row); + } + + private static class BooleanConverter implements Converter { + @Override + public Boolean convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return ((LongColumnVector) vector).vector[rowIndex] != 0; + } + } + } + + private static class ByteConverter implements Converter { + @Override + public Byte convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return (byte) ((LongColumnVector) vector).vector[rowIndex]; + } + } + } + + private static class ShortConverter implements Converter { + @Override + public Short convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return (short) ((LongColumnVector) vector).vector[rowIndex]; + } + } + } + + private static class IntConverter implements Converter { + @Override + public Integer convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return (int) ((LongColumnVector) vector).vector[rowIndex]; + } + } + } + + private static class LongConverter implements Converter { + @Override + public Long convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return ((LongColumnVector) vector).vector[rowIndex]; + } + } + } + + private static class FloatConverter implements Converter { + @Override + public Float convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return (float) ((DoubleColumnVector) vector).vector[rowIndex]; + } + } + } + + private static class DoubleConverter implements Converter { + @Override + public Double convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return ((DoubleColumnVector) vector).vector[rowIndex]; + } + } + } + + private static class TimestampConverter implements Converter { + private Long convert(TimestampColumnVector vector, int row) { + // compute microseconds past 1970. + return (vector.time[row] / 1000) * 1_000_000 + vector.nanos[row] / 1000; + } + + @Override + public Long convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return convert((TimestampColumnVector) vector, rowIndex); + } + } + } + + private static class BinaryConverter implements Converter { + @Override + public byte[] convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + BytesColumnVector bytesVector = (BytesColumnVector) vector; + return Arrays.copyOfRange(bytesVector.vector[rowIndex], + bytesVector.start[rowIndex], + bytesVector.start[rowIndex] + bytesVector.length[rowIndex]); + } + } + } + + private static class StringConverter implements Converter { + @Override + public String convert(ColumnVector vector, int row) { + BinaryConverter converter = new BinaryConverter(); + byte[] byteData = converter.convert(vector, row); + if (byteData == null) { + return null; + } + return new String(byteData, StandardCharsets.UTF_8); + } + } + + private static class DecimalConverter implements Converter { + @Override + public BigDecimal convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return ((DecimalColumnVector) vector).vector[rowIndex] + .getHiveDecimal().bigDecimalValue(); + } + } + } + + private static class ListConverter implements Converter> { + private final Converter childConverter; + + ListConverter(Types.NestedField icebergField, TypeDescription schema) { + Preconditions.checkArgument(icebergField.type().isListType()); + TypeDescription child = schema.getChildren().get(0); + + childConverter = buildConverter(icebergField + .type() + .asListType() + .fields() + .get(0), child); + } + + List readList(ListColumnVector vector, int row) { + int offset = (int) vector.offsets[row]; + int length = (int) vector.lengths[row]; + + List list = Lists.newArrayListWithExpectedSize(length); + for (int c = 0; c < length; ++c) { + list.add(childConverter.convert(vector.child, offset + c)); + } + return list; + } + + @Override + public List convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return readList((ListColumnVector) vector, rowIndex); + } + } + } + + private static class MapConverter implements Converter { + private final Converter keyConvert; + private final Converter valueConvert; + + MapConverter(Types.NestedField icebergField, TypeDescription schema) { + Preconditions.checkArgument(icebergField.type().isMapType()); + TypeDescription keyType = schema.getChildren().get(0); + TypeDescription valueType = schema.getChildren().get(1); + List mapFields = icebergField.type().asMapType().fields(); + + keyConvert = buildConverter(mapFields.get(0), keyType); + valueConvert = buildConverter(mapFields.get(1), valueType); + } + + Map readMap(MapColumnVector vector, int row) { + final int offset = (int) vector.offsets[row]; + final int length = (int) vector.lengths[row]; + + // serialize the keys + Map map = Maps.newHashMapWithExpectedSize(length); + for (int c = 0; c < length; ++c) { + String key = String.valueOf(keyConvert.convert(vector.keys, offset + c)); + Object value = valueConvert.convert(vector.values, offset + c); + map.put(key, value); + } + + return map; + } + + @Override + public Map convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return readMap((MapColumnVector) vector, rowIndex); + } + } + } + + private static class StructConverter implements Converter { + private final Converter[] children; + private final Schema icebergStructSchema; + + StructConverter(final Types.NestedField icebergField, final TypeDescription schema) { + Preconditions.checkArgument(icebergField.type().isStructType()); + icebergStructSchema = new Schema(icebergField.type().asStructType().fields()); + List icebergChildren = icebergField.type().asStructType().fields(); + children = new Converter[schema.getChildren().size()]; + + Preconditions.checkState(icebergChildren.size() == children.length, + "Expected schema must have same number of columns as projection."); + for (int c = 0; c < children.length; ++c) { + children[c] = buildConverter(icebergChildren.get(c), schema.getChildren().get(c)); + } + } + + Record writeStruct(StructColumnVector vector, int row) { + Record data = GenericRecord.create(icebergStructSchema); + for (int c = 0; c < children.length; ++c) { + data.set(c, children[c].convert(vector.fields[c], row)); + } + return data; + } + + @Override + public Record convert(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return writeStruct((StructColumnVector) vector, rowIndex); + } + } + } + + private static Converter buildConverter(final Types.NestedField icebergField, + final TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + return new ByteConverter(); + case SHORT: + return new ShortConverter(); + case DATE: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(); + case BINARY: + return new BinaryConverter(); + case STRING: + case CHAR: + case VARCHAR: + return new StringConverter(); + case STRUCT: + return new StructConverter(icebergField, schema); + case LIST: + return new ListConverter(icebergField, schema); + case MAP: + return new MapConverter(icebergField, schema); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java new file mode 100644 index 000000000000..a6648a9ee142 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.orc; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +/** + */ +public class GenericOrcWriter implements OrcValueWriter { + private final Converter[] converters; + + private GenericOrcWriter(TypeDescription schema) { + this.converters = buildConverters(schema); + } + + public static OrcValueWriter buildWriter(TypeDescription fileSchema) { + return new GenericOrcWriter(fileSchema); + } + + @SuppressWarnings("unchecked") + @Override + public void write(Record value, VectorizedRowBatch output) throws IOException { + int row = output.size++; + for (int c = 0; c < converters.length; ++c) { + converters[c].addValue(row, value.get(c, converters[c].getJavaClass()), output.cols[c]); + } + } + + /** + * The interface for the conversion from Spark's SpecializedGetters to + * ORC's ColumnVectors. + */ + interface Converter { + + Class getJavaClass(); + + /** + * Take a value from the Spark data value and add it to the ORC output. + * @param rowId the row in the ColumnVector + * @param data either an InternalRow or ArrayData + * @param output the ColumnVector to put the value into + */ + void addValue(int rowId, T data, ColumnVector output); + } + + static class BooleanConverter implements Converter { + @Override + public Class getJavaClass() { + return Boolean.class; + } + + @Override + public void addValue(int rowId, Boolean data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; + } + } + } + + static class ByteConverter implements Converter { + @Override + public Class getJavaClass() { + return Byte.class; + } + + public void addValue(int rowId, Byte data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + static class ShortConverter implements Converter { + @Override + public Class getJavaClass() { + return Short.class; + } + + public void addValue(int rowId, Short data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + static class IntConverter implements Converter { + @Override + public Class getJavaClass() { + return Integer.class; + } + + public void addValue(int rowId, Integer data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + static class LongConverter implements Converter { + @Override + public Class getJavaClass() { + return Long.class; + } + + public void addValue(int rowId, Long data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + static class FloatConverter implements Converter { + @Override + public Class getJavaClass() { + return Float.class; + } + + public void addValue(int rowId, Float data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DoubleColumnVector) output).vector[rowId] = data; + } + } + } + + static class DoubleConverter implements Converter { + @Override + public Class getJavaClass() { + return Double.class; + } + + public void addValue(int rowId, Double data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DoubleColumnVector) output).vector[rowId] = data; + } + } + } + + static class StringConverter implements Converter { + @Override + public Class getJavaClass() { + return String.class; + } + + public void addValue(int rowId, String data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + byte[] value = data.getBytes(StandardCharsets.UTF_8); + ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); + } + } + } + + static class BytesConverter implements Converter { + @Override + public Class getJavaClass() { + return byte[].class; + } + + public void addValue(int rowId, byte[] data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + // getBinary always makes a copy, so we don't need to worry about it + // being changed behind our back. + ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); + } + } + } + + static class TimestampConverter implements Converter { + @Override + public Class getJavaClass() { + return Long.class; + } + + public void addValue(int rowId, Long data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + TimestampColumnVector cv = (TimestampColumnVector) output; + long micros = data; + cv.time[rowId] = micros / 1_000; // millis + cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos + } + } + } + + static class Decimal18Converter implements Converter { + private final int precision; + private final int scale; + + Decimal18Converter(TypeDescription schema) { + this.precision = schema.getPrecision(); + this.scale = schema.getScale(); + } + + @Override + public Class getJavaClass() { + return BigDecimal.class; + } + + public void addValue(int rowId, BigDecimal data, ColumnVector output) { + // TODO: validate precision and scale from schema + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DecimalColumnVector) output).vector[rowId] + .setFromLongAndScale(data.longValueExact(), scale); + } + } + } + + static class Decimal38Converter implements Converter { + private final int precision; + private final int scale; + + Decimal38Converter(TypeDescription schema) { + this.precision = schema.getPrecision(); + this.scale = schema.getScale(); + } + + @Override + public Class getJavaClass() { + return BigDecimal.class; + } + + public void addValue(int rowId, BigDecimal data, ColumnVector output) { + // TODO: validate precision and scale from schema + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data)); + } + } + } + + static class StructConverter implements Converter { + private final Converter[] children; + + StructConverter(TypeDescription schema) { + this.children = new Converter[schema.getChildren().size()]; + for (int c = 0; c < children.length; ++c) { + children[c] = buildConverter(schema.getChildren().get(c)); + } + } + + @Override + public Class getJavaClass() { + return Record.class; + } + + @SuppressWarnings("unchecked") + public void addValue(int rowId, Record data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + StructColumnVector cv = (StructColumnVector) output; + for (int c = 0; c < children.length; ++c) { + children[c].addValue(rowId, data.get(c, children[c].getJavaClass()), cv.fields[c]); + } + } + } + } + + static class ListConverter implements Converter { + private final Converter children; + + ListConverter(TypeDescription schema) { + this.children = buildConverter(schema.getChildren().get(0)); + } + + @Override + public Class getJavaClass() { + return List.class; + } + + @SuppressWarnings("unchecked") + public void addValue(int rowId, List data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + List value = (List) data; + ListColumnVector cv = (ListColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = value.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.child.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + children.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); + } + } + } + } + + static class MapConverter implements Converter { + private final Converter keyConverter; + private final Converter valueConverter; + + MapConverter(TypeDescription schema) { + this.keyConverter = buildConverter(schema.getChildren().get(0)); + this.valueConverter = buildConverter(schema.getChildren().get(1)); + } + + @Override + public Class getJavaClass() { + return Map.class; + } + + @SuppressWarnings("unchecked") + public void addValue(int rowId, Map data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + Map map = (Map) data; + List keys = Lists.newArrayListWithExpectedSize(map.size()); + List values = Lists.newArrayListWithExpectedSize(map.size()); + for (Map.Entry entry : map.entrySet()) { + keys.add(entry.getKey()); + values.add(entry.getValue()); + } + MapColumnVector cv = (MapColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = map.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.keys.ensureSize(cv.childCount, true); + cv.values.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + int pos = (int) (e + cv.offsets[rowId]); + keyConverter.addValue(pos, keys.get(e), cv.keys); + valueConverter.addValue(pos, values.get(e), cv.values); + } + } + } + } + + private static Converter buildConverter(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + return new ByteConverter(); + case SHORT: + return new ShortConverter(); + case DATE: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case BINARY: + return new BytesConverter(); + case STRING: + case CHAR: + case VARCHAR: + return new StringConverter(); + case DECIMAL: + return schema.getPrecision() <= 18 ? + new Decimal18Converter(schema) : + new Decimal38Converter(schema); + case TIMESTAMP: + return new TimestampConverter(); + case STRUCT: + return new StructConverter(schema); + case LIST: + return new ListConverter(schema); + case MAP: + return new MapConverter(schema); + } + throw new IllegalArgumentException("Unhandled type " + schema); + } + + private static Converter[] buildConverters(TypeDescription schema) { + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Top level must be a struct " + schema); + } + + List children = schema.getChildren(); + Converter[] result = new Converter[children.size()]; + for (int c = 0; c < children.size(); ++c) { + result[c] = buildConverter(children.get(c)); + } + return result; + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java b/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java index 608e8fa22483..ada32703837a 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java +++ b/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java @@ -135,6 +135,37 @@ public void testReorderedProjection() throws Exception { Assert.assertNull("Should contain the correct 2 value", projected.get(2)); } + @Test + public void testRenamedAddedField() throws Exception { + Schema schema = new Schema( + Types.NestedField.required(1, "a", Types.LongType.get()), + Types.NestedField.required(2, "b", Types.LongType.get()), + Types.NestedField.required(3, "d", Types.LongType.get()) + ); + + Record record = GenericRecord.create(schema.asStruct()); + record.setField("a", 100L); + record.setField("b", 200L); + record.setField("d", 300L); + + Schema renamedAdded = new Schema( + Types.NestedField.optional(1, "a", Types.LongType.get()), + Types.NestedField.optional(2, "b", Types.LongType.get()), + Types.NestedField.optional(3, "c", Types.LongType.get()), + Types.NestedField.optional(4, "d", Types.LongType.get()) + ); + + Record projected = writeAndRead("rename_and_add_column_projection", schema, renamedAdded, record); + Assert.assertEquals("Should contain the correct value in column 1", projected.get(0), 100L); + Assert.assertEquals("Should contain the correct value in column a", projected.getField("a"), 100L); + Assert.assertEquals("Should contain the correct value in column 2", projected.get(1), 200L); + Assert.assertEquals("Should contain the correct value in column b", projected.getField("b"), 200L); + Assert.assertEquals("Should contain the correct value in column 3", projected.get(2), 300L); + Assert.assertEquals("Should contain the correct value in column c", projected.getField("c"), 300L); + Assert.assertNull("Should contain empty value on new column 4", projected.get(3)); + Assert.assertNull("Should contain the correct value in column d", projected.getField("d")); + } + @Test public void testEmptyProjection() throws Exception { Schema schema = new Schema( diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java new file mode 100644 index 000000000000..9f92fa4320bb --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.orc; + +import com.google.common.collect.Iterables; +import java.io.File; +import java.io.IOException; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.TestReadProjection; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; + +public class TestGenericReadProjection extends TestReadProjection { + + @Override + protected Record writeAndRead(String desc, + Schema writeSchema, Schema readSchema, + Record record) throws IOException { + File file = temp.newFile(desc + ".orc"); + file.delete(); + + try (FileAppender appender = ORC.write(Files.localOutput(file)) + .schema(writeSchema) + .createWriterFunc(GenericOrcWriter::buildWriter) + .build()) { + appender.add(record); + } + + Iterable records = ORC.read(Files.localInput(file)) + .schema(readSchema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema)) + .build(); + + return Iterables.getOnlyElement(records); + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java b/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java deleted file mode 100644 index 6b83afc096a5..000000000000 --- a/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.orc; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.IdentityHashMap; -import java.util.Map; -import java.util.Set; -import org.apache.orc.TypeDescription; - -/** - * The mapping from ORC's TypeDescription to the Iceberg column ids. - *

- * Keep the API limited to Map rather than a concrete type so that we can - * change it later. - */ -public class ColumnIdMap implements Map { - - private final IdentityHashMap idMap = - new IdentityHashMap<>(); - - @Override - public int size() { - return idMap.size(); - } - - @Override - public boolean isEmpty() { - return idMap.isEmpty(); - } - - @Override - public boolean containsKey(Object key) { - return idMap.containsKey(key); - } - - @Override - public boolean containsValue(Object value) { - return idMap.containsValue(value); - } - - @Override - public Integer get(Object key) { - return idMap.get(key); - } - - @Override - public Integer put(TypeDescription key, Integer value) { - return idMap.put(key, value); - } - - @Override - public Integer remove(Object key) { - return idMap.remove(key); - } - - @Override - public void putAll(Map map) { - idMap.putAll(map); - } - - @Override - public void clear() { - idMap.clear(); - } - - @Override - public Set keySet() { - return idMap.keySet(); - } - - @Override - public Collection values() { - return idMap.values(); - } - - @Override - public Set> entrySet() { - return idMap.entrySet(); - } - - public ByteBuffer serialize() { - StringBuilder buffer = new StringBuilder(); - boolean needComma = false; - for (TypeDescription key : idMap.keySet()) { - if (needComma) { - buffer.append(','); - } else { - needComma = true; - } - buffer.append(key.getId()); - buffer.append(':'); - buffer.append(idMap.get(key).intValue()); - } - return ByteBuffer.wrap(buffer.toString().getBytes(StandardCharsets.UTF_8)); - } - - public static ColumnIdMap deserialize(TypeDescription schema, - ByteBuffer serial) { - ColumnIdMap result = new ColumnIdMap(); - String[] parts = StandardCharsets.UTF_8.decode(serial).toString().split(","); - for (int i = 0; i < parts.length; ++i) { - String[] subparts = parts[i].split(":"); - result.put(schema.findSubtype(Integer.parseInt(subparts[0])), - Integer.parseInt(subparts[1])); - } - return result; - } -} diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 0f7ee46cff96..6ffaf05d26a7 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -58,9 +58,9 @@ public static class WriteBuilder { private WriteBuilder(OutputFile file) { this.file = file; if (file instanceof HadoopOutputFile) { - conf = new Configuration(((HadoopOutputFile) file).getConf()); + this.conf = new Configuration(((HadoopOutputFile) file).getConf()); } else { - conf = new Configuration(); + this.conf = new Configuration(); } } @@ -100,7 +100,7 @@ public WriteBuilder overwrite(boolean enabled) { public FileAppender build() { Preconditions.checkNotNull(schema, "Schema is required"); - return new OrcFileAppender<>(TypeConversion.toOrc(schema, new ColumnIdMap()), + return new OrcFileAppender<>(schema, this.file, createWriterFunc, conf, metadata, conf.getInt(VECTOR_ROW_BATCH_SIZE, VectorizedRowBatch.DEFAULT_SIZE)); } @@ -117,15 +117,15 @@ public static class ReadBuilder { private Long start = null; private Long length = null; - private Function> readerFunc; + private Function> readerFunc; private ReadBuilder(InputFile file) { Preconditions.checkNotNull(file, "Input file cannot be null"); this.file = file; if (file instanceof HadoopInputFile) { - conf = new Configuration(((HadoopInputFile) file).getConf()); + this.conf = new Configuration(((HadoopInputFile) file).getConf()); } else { - conf = new Configuration(); + this.conf = new Configuration(); } } @@ -157,7 +157,7 @@ public ReadBuilder config(String property, String value) { return this; } - public ReadBuilder createReaderFunc(Function> readerFunction) { + public ReadBuilder createReaderFunc(Function> readerFunction) { this.readerFunc = readerFunction; return this; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java new file mode 100644 index 000000000000..558650aec967 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; + +/** + * Utilities for mapping Iceberg to ORC schemas. + */ +public final class ORCSchemaUtil { + + private enum BinaryType { + UUID, FIXED, BINARY + } + + private enum IntegerType { + TIME, INTEGER + } + + private static class OrcField { + private final String name; + private final TypeDescription type; + + OrcField(String name, TypeDescription type) { + this.name = name; + this.type = type; + } + + public String name() { + return name; + } + + public TypeDescription type() { + return type; + } + } + + private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; + private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required"; + + private static final String ICEBERG_BINARY_TYPE_ATTRIBUTE = "iceberg.binary-type"; + private static final String ICEBERG_INTEGER_TYPE_ATTRIBUTE = "iceberg.integer-type"; + private static final String ICEBERG_FIELD_LENGTH = "iceberg.length"; + + private static final ImmutableMap TYPE_MAPPING = + ImmutableMap.builder() + .put(Type.TypeID.BOOLEAN, TypeDescription.Category.BOOLEAN) + .put(Type.TypeID.INTEGER, TypeDescription.Category.INT) + .put(Type.TypeID.TIME, TypeDescription.Category.INT) + .put(Type.TypeID.LONG, TypeDescription.Category.LONG) + .put(Type.TypeID.FLOAT, TypeDescription.Category.FLOAT) + .put(Type.TypeID.DOUBLE, TypeDescription.Category.DOUBLE) + .put(Type.TypeID.DATE, TypeDescription.Category.DATE) + .put(Type.TypeID.TIMESTAMP, TypeDescription.Category.TIMESTAMP) + .put(Type.TypeID.STRING, TypeDescription.Category.STRING) + .put(Type.TypeID.UUID, TypeDescription.Category.BINARY) + .put(Type.TypeID.FIXED, TypeDescription.Category.BINARY) + .put(Type.TypeID.BINARY, TypeDescription.Category.BINARY) + .put(Type.TypeID.DECIMAL, TypeDescription.Category.DECIMAL) + .build(); + + private ORCSchemaUtil() {} + + public static TypeDescription convert(Schema schema) { + final TypeDescription root = TypeDescription.createStruct(); + final Types.StructType schemaRoot = schema.asStruct(); + for (Types.NestedField field : schemaRoot.asStructType().fields()) { + TypeDescription orcColumType = convert(field.fieldId(), field.type(), field.isRequired()); + root.addField(field.name(), orcColumType); + } + return root; + } + + private static TypeDescription convert(Integer fieldId, Type type, boolean isRequired) { + final TypeDescription orcType; + + switch (type.typeId()) { + case BOOLEAN: + orcType = TypeDescription.createBoolean(); + break; + case INTEGER: + orcType = TypeDescription.createInt(); + orcType.setAttribute(ICEBERG_INTEGER_TYPE_ATTRIBUTE, IntegerType.INTEGER.toString()); + break; + case TIME: + orcType = TypeDescription.createInt(); + orcType.setAttribute(ICEBERG_INTEGER_TYPE_ATTRIBUTE, IntegerType.TIME.toString()); + break; + case LONG: + orcType = TypeDescription.createLong(); + break; + case FLOAT: + orcType = TypeDescription.createFloat(); + break; + case DOUBLE: + orcType = TypeDescription.createDouble(); + break; + case DATE: + orcType = TypeDescription.createDate(); + break; + case TIMESTAMP: + orcType = TypeDescription.createTimestamp(); + break; + case STRING: + orcType = TypeDescription.createString(); + break; + case UUID: + orcType = TypeDescription.createBinary(); + orcType.setAttribute(ICEBERG_BINARY_TYPE_ATTRIBUTE, BinaryType.UUID.toString()); + break; + case FIXED: + orcType = TypeDescription.createBinary(); + orcType.setAttribute(ICEBERG_BINARY_TYPE_ATTRIBUTE, BinaryType.FIXED.toString()); + orcType.setAttribute(ICEBERG_FIELD_LENGTH, Integer.toString(((Types.FixedType) type).length())); + break; + case BINARY: + orcType = TypeDescription.createBinary(); + orcType.setAttribute(ICEBERG_BINARY_TYPE_ATTRIBUTE, BinaryType.BINARY.toString()); + break; + case DECIMAL: { + Types.DecimalType decimal = (Types.DecimalType) type; + orcType = TypeDescription.createDecimal() + .withScale(decimal.scale()) + .withPrecision(decimal.precision()); + break; + } + case STRUCT: { + orcType = TypeDescription.createStruct(); + for (Types.NestedField field : type.asStructType().fields()) { + TypeDescription childType = convert(field.fieldId(), field.type(), field.isRequired()); + orcType.addField(field.name(), childType); + } + break; + } + case LIST: { + Types.ListType list = (Types.ListType) type; + TypeDescription elementType = convert(list.elementId(), list.elementType(), + list.isElementRequired()); + orcType = TypeDescription.createList(elementType); + break; + } + case MAP: { + Types.MapType map = (Types.MapType) type; + TypeDescription keyType = convert(map.keyId(), map.keyType(), true); + TypeDescription valueType = convert(map.valueId(), map.valueType(), map.isValueRequired()); + orcType = TypeDescription.createMap(keyType, valueType); + break; + } + default: + throw new IllegalArgumentException("Unhandled type " + type.typeId()); + } + + // Set Iceberg column attributes for mapping + orcType.setAttribute(ICEBERG_ID_ATTRIBUTE, String.valueOf(fieldId)); + orcType.setAttribute(ICEBERG_REQUIRED_ATTRIBUTE, String.valueOf(isRequired)); + return orcType; + } + + /** + * Convert an ORC schema to an Iceberg schema. This method handles the convertion from the original + * Iceberg column mapping IDs if present in the ORC column attributes, otherwise, ORC column IDs + * will be assigned following ORCs pre-order ID assignment. + * + * @return the Iceberg schema + */ + public static Schema convert(TypeDescription orcSchema) { + List children = orcSchema.getChildren(); + List childrenNames = orcSchema.getFieldNames(); + Preconditions.checkState(children.size() == childrenNames.size(), + "Error in ORC file, children fields and names do not match."); + + List icebergFields = Lists.newArrayListWithExpectedSize(children.size()); + AtomicInteger lastColumnId = new AtomicInteger(getMaxIcebergId(orcSchema)); + for (int i = 0; i < children.size(); i++) { + icebergFields.add(convertOrcToIceberg(children.get(i), childrenNames.get(i), + lastColumnId::incrementAndGet)); + } + + return new Schema(icebergFields); + } + + /** + * Converts an Iceberg schema to a corresponding ORC schema within the context of an existing + * ORC file schema. + * This method also handles schema evolution from the original ORC file schema + * to the given Iceberg schema. It builds the desired reader schema with the schema + * evolution rules and pass that down to the ORC reader, + * which would then use its schema evolution to map that to the writer’s schema. + * + * Example: + * + * Iceberg writer ORC writer + * struct<a (1): int, b (2): string> struct<a: int, b: string> + * struct<a (1): struct<b (2): string, c (3): date>> struct<a: struct<b:string, c:date>> + * + * + * Iceberg reader ORC reader + * + * struct<a (2): string, c (3): date> struct<b: string, c: date> + * struct<aa (1): struct<cc (3): date, bb (2): string>> struct<a: struct<c:date, b:string>> + * + * + * @param schema an Iceberg schema + * @param originalOrcSchema an existing ORC file schema + * @return the resulting ORC schema + */ + public static TypeDescription buildOrcProjection(Schema schema, + TypeDescription originalOrcSchema) { + final Map icebergToOrc = icebergToOrcMapping("root", originalOrcSchema); + return buildOrcProjection(Integer.MIN_VALUE, schema.asStruct(), true, icebergToOrc); + } + + private static TypeDescription buildOrcProjection(Integer fieldId, Type type, boolean isRequired, + Map mapping) { + final TypeDescription orcType; + + switch (type.typeId()) { + case STRUCT: + orcType = TypeDescription.createStruct(); + for (Types.NestedField nestedField : type.asStructType().fields()) { + // Using suffix _r to avoid potential underlying issues in ORC reader + // with reused column names between ORC and Iceberg; + // e.g. renaming column c -> d and adding new column d + String name = Optional.ofNullable(mapping.get(nestedField.fieldId())) + .map(OrcField::name) + .orElse(nestedField.name() + "_r" + nestedField.fieldId()); + TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), + nestedField.isRequired(), mapping); + orcType.addField(name, childType); + } + break; + case LIST: + Types.ListType list = (Types.ListType) type; + TypeDescription elementType = buildOrcProjection(list.elementId(), list.elementType(), + list.isElementRequired(), mapping); + orcType = TypeDescription.createList(elementType); + break; + case MAP: + Types.MapType map = (Types.MapType) type; + TypeDescription keyType = buildOrcProjection(map.keyId(), map.keyType(), true, mapping); + TypeDescription valueType = buildOrcProjection(map.valueId(), map.valueType(), map.isValueRequired(), + mapping); + orcType = TypeDescription.createMap(keyType, valueType); + break; + default: + if (mapping.containsKey(fieldId)) { + TypeDescription originalType = mapping.get(fieldId).type(); + Optional promotedType = getPromotedType(type, originalType); + + if (promotedType.isPresent()) { + orcType = promotedType.get(); + } else { + Preconditions.checkArgument(isSameType(originalType, type), + "Can not promote %s type to %s", + originalType.getCategory(), type.typeId().name()); + orcType = originalType.clone(); + } + } else { + if (isRequired) { + throw new IllegalArgumentException( + String.format("Field %d of type %s is required and was not found.", fieldId, type.toString())); + } + + orcType = convert(fieldId, type, false); + } + } + + return orcType; + } + + private static Map icebergToOrcMapping(String name, TypeDescription orcType) { + Map icebergToOrc = Maps.newHashMap(); + switch (orcType.getCategory()) { + case STRUCT: + List childrenNames = orcType.getFieldNames(); + List children = orcType.getChildren(); + for (int i = 0; i < children.size(); i++) { + icebergToOrc.putAll(icebergToOrcMapping(childrenNames.get(i), children.get(i))); + } + break; + case LIST: + icebergToOrc.putAll(icebergToOrcMapping("element", orcType.getChildren().get(0))); + break; + case MAP: + icebergToOrc.putAll(icebergToOrcMapping("key", orcType.getChildren().get(0))); + icebergToOrc.putAll(icebergToOrcMapping("value", orcType.getChildren().get(1))); + break; + } + + if (orcType.getId() > 0) { + // Only add to non-root types. + icebergID(orcType) + .ifPresent(integer -> icebergToOrc.put(integer, new OrcField(name, orcType))); + } + + return icebergToOrc; + } + + + private static Optional getPromotedType(Type icebergType, + TypeDescription originalOrcType) { + TypeDescription promotedOrcType = null; + if (Type.TypeID.LONG.equals(icebergType.typeId()) && + TypeDescription.Category.INT.equals(originalOrcType.getCategory())) { + // Promote: int to long + promotedOrcType = TypeDescription.createLong(); + } else if (Type.TypeID.DOUBLE.equals(icebergType.typeId()) && + TypeDescription.Category.FLOAT.equals(originalOrcType.getCategory())) { + // Promote: float to double + promotedOrcType = TypeDescription.createDouble(); + } else if (Type.TypeID.DECIMAL.equals(icebergType.typeId()) && + TypeDescription.Category.DECIMAL.equals(originalOrcType.getCategory())) { + // Promote: decimal(P, S) to decimal(P', S) if P' > P + Types.DecimalType newDecimal = (Types.DecimalType) icebergType; + if (newDecimal.scale() == originalOrcType.getScale() && + newDecimal.precision() > originalOrcType.getPrecision()) { + promotedOrcType = TypeDescription.createDecimal() + .withScale(newDecimal.scale()) + .withPrecision(newDecimal.precision()); + } + } + return Optional.ofNullable(promotedOrcType); + } + + private static boolean isSameType(TypeDescription orcType, Type icebergType) { + return Objects.equals(TYPE_MAPPING.get(icebergType.typeId()), orcType.getCategory()); + } + + private static Optional icebergID(TypeDescription orcType) { + return Optional.ofNullable(orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE)) + .map(Integer::parseInt); + } + + private static boolean isRequired(TypeDescription orcType) { + String isRequiredStr = orcType.getAttributeValue(ICEBERG_REQUIRED_ATTRIBUTE); + if (isRequiredStr != null) { + return Boolean.parseBoolean(isRequiredStr); + } + return false; + } + + private static Types.NestedField getIcebergType(int icebergID, String name, Type type, + boolean isRequired) { + return isRequired ? + Types.NestedField.required(icebergID, name, type) : + Types.NestedField.optional(icebergID, name, type); + } + + private static Types.NestedField convertOrcToIceberg(TypeDescription orcType, String name, + TypeUtil.NextID nextID) { + + final int icebergID = icebergID(orcType).orElseGet(nextID::get); + final boolean isRequired = isRequired(orcType); + + switch (orcType.getCategory()) { + case BOOLEAN: + return getIcebergType(icebergID, name, Types.BooleanType.get(), isRequired); + case BYTE: + case SHORT: + case INT: + IntegerType integerType = IntegerType.valueOf( + orcType.getAttributeValue(ICEBERG_INTEGER_TYPE_ATTRIBUTE) + ); + switch (integerType) { + case TIME: + return getIcebergType(icebergID, name, Types.TimeType.get(), isRequired); + case INTEGER: + return getIcebergType(icebergID, name, Types.IntegerType.get(), isRequired); + default: + throw new IllegalStateException("Invalid Integer type found in ORC type attribute"); + } + case LONG: + return getIcebergType(icebergID, name, Types.LongType.get(), isRequired); + case FLOAT: + return getIcebergType(icebergID, name, Types.FloatType.get(), isRequired); + case DOUBLE: + return getIcebergType(icebergID, name, Types.DoubleType.get(), isRequired); + case STRING: + case CHAR: + case VARCHAR: + return getIcebergType(icebergID, name, Types.StringType.get(), isRequired); + case BINARY: + BinaryType binaryType = BinaryType.valueOf( + orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE)); + switch (binaryType) { + case UUID: + return getIcebergType(icebergID, name, Types.UUIDType.get(), isRequired); + case FIXED: + int fixedLength = Integer.parseInt(orcType.getAttributeValue(ICEBERG_FIELD_LENGTH)); + return getIcebergType(icebergID, name, Types.FixedType.ofLength(fixedLength), isRequired); + case BINARY: + return getIcebergType(icebergID, name, Types.BinaryType.get(), isRequired); + default: + throw new IllegalStateException("Invalid Binary type found in ORC type attribute"); + } + case DATE: + return getIcebergType(icebergID, name, Types.DateType.get(), isRequired); + case TIMESTAMP: + return getIcebergType(icebergID, name, Types.TimestampType.withZone(), isRequired); + case DECIMAL: + return getIcebergType(icebergID, name, + Types.DecimalType.of(orcType.getPrecision(), orcType.getScale()), + isRequired); + case STRUCT: { + List fieldNames = orcType.getFieldNames(); + List fieldTypes = orcType.getChildren(); + List fields = new ArrayList<>(fieldNames.size()); + for (int c = 0; c < fieldNames.size(); ++c) { + String childName = fieldNames.get(c); + TypeDescription type = fieldTypes.get(c); + Types.NestedField field = convertOrcToIceberg(type, childName, nextID); + fields.add(field); + } + + return getIcebergType(icebergID, name, Types.StructType.of(fields), isRequired); + } + case LIST: { + TypeDescription elementType = orcType.getChildren().get(0); + Types.NestedField element = convertOrcToIceberg(elementType, "element", nextID); + + Types.ListType listTypeWithElem = isRequired(elementType) ? + Types.ListType.ofRequired(element.fieldId(), element.type()) : + Types.ListType.ofOptional(element.fieldId(), element.type()); + return isRequired ? + Types.NestedField.required(icebergID, name, listTypeWithElem) : + Types.NestedField.optional(icebergID, name, listTypeWithElem); + } + case MAP: { + TypeDescription keyType = orcType.getChildren().get(0); + Types.NestedField key = convertOrcToIceberg(keyType, "key", nextID); + TypeDescription valueType = orcType.getChildren().get(1); + Types.NestedField value = convertOrcToIceberg(valueType, "value", nextID); + + Types.MapType mapTypeWithKV = isRequired(valueType) ? + Types.MapType.ofRequired(key.fieldId(), value.fieldId(), key.type(), value.type()) : + Types.MapType.ofOptional(key.fieldId(), value.fieldId(), key.type(), value.type()); + + return getIcebergType(icebergID, name, mapTypeWithKV, isRequired); + } + default: + // We don't have an answer for union types. + throw new IllegalArgumentException("Can't handle " + orcType); + } + } + + private static int getMaxIcebergId(TypeDescription originalOrcSchema) { + int maxId = icebergID(originalOrcSchema).orElse(0); + final List children = Optional.ofNullable(originalOrcSchema.getChildren()) + .orElse(Collections.emptyList()); + for (TypeDescription child : children) { + maxId = Math.max(maxId, getMaxIcebergId(child)); + } + + return maxId; + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index 007045a32404..3f67496ab86a 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -24,17 +24,16 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Metrics; +import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; -import org.apache.orc.ColumnStatistics; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.StripeInformation; @@ -47,31 +46,30 @@ */ class OrcFileAppender implements FileAppender { private final int batchSize; - private final TypeDescription orcSchema; - private final ColumnIdMap columnIds = new ColumnIdMap(); - private final Path path; + private final Schema schema; + private final OutputFile file; private final Writer writer; private final VectorizedRowBatch batch; private final OrcValueWriter valueWriter; private boolean isClosed = false; private final Configuration conf; - private static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids"; - - OrcFileAppender(TypeDescription schema, OutputFile file, + OrcFileAppender(Schema schema, OutputFile file, Function> createWriterFunc, Configuration conf, Map metadata, int batchSize) { this.conf = conf; - orcSchema = schema; - path = new Path(file.location()); + this.file = file; this.batchSize = batchSize; - batch = orcSchema.createRowBatch(this.batchSize); + this.schema = schema; + + TypeDescription orcSchema = ORCSchemaUtil.convert(this.schema); + this.batch = orcSchema.createRowBatch(this.batchSize); OrcFile.WriterOptions options = OrcFile.writerOptions(conf); options.setSchema(orcSchema); - writer = newOrcWriter(file, columnIds, options, metadata); - valueWriter = newOrcValueWriter(orcSchema, createWriterFunc); + this.writer = newOrcWriter(file, options, metadata); + this.valueWriter = newOrcValueWriter(orcSchema, createWriterFunc); } @Override @@ -82,38 +80,14 @@ public void add(D datum) { writer.addRowBatch(batch); batch.reset(); } - } catch (IOException e) { - throw new RuntimeException("Problem writing to ORC file " + path, e); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Problem writing to ORC file " + file.location()); } } @Override public Metrics metrics() { - try { - long rows = writer.getNumberOfRows(); - ColumnStatistics[] stats = writer.getStatistics(); - // we don't currently have columnSizes or distinct counts. - Map valueCounts = new HashMap<>(); - Map nullCounts = new HashMap<>(); - Integer[] icebergIds = new Integer[orcSchema.getMaximumId() + 1]; - for (TypeDescription type : columnIds.keySet()) { - icebergIds[type.getId()] = columnIds.get(type); - } - for (int c = 1; c < stats.length; ++c) { - if (icebergIds[c] != null) { - valueCounts.put(icebergIds[c], stats[c].getNumberOfValues()); - } - } - for (TypeDescription child : orcSchema.getChildren()) { - int childId = child.getId(); - if (icebergIds[childId] != null) { - nullCounts.put(icebergIds[childId], rows - stats[childId].getNumberOfValues()); - } - } - return new Metrics(rows, null, valueCounts, nullCounts); - } catch (IOException e) { - throw new RuntimeException("Can't get statistics " + path, e); - } + return OrcMetrics.fromWriter(writer); } @Override @@ -126,12 +100,14 @@ public long length() { @Override public List splitOffsets() { Preconditions.checkState(isClosed, "File is not yet closed"); + String fileLoc = file.location(); Reader reader; try { - reader = OrcFile.createReader(path, new OrcFile.ReaderOptions(conf)); - } catch (IOException e) { - throw new RuntimeIOException("Cannot read file " + path, e); + reader = OrcFile.createReader(new Path(fileLoc), new OrcFile.ReaderOptions(conf)); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Cannot read file " + fileLoc); } + List stripes = reader.getStripes(); return Collections.unmodifiableList(Lists.transform(stripes, StripeInformation::getOffset)); } @@ -152,18 +128,16 @@ public void close() throws IOException { } private static Writer newOrcWriter(OutputFile file, - ColumnIdMap columnIds, OrcFile.WriterOptions options, Map metadata) { final Path locPath = new Path(file.location()); final Writer writer; try { writer = OrcFile.createWriter(locPath, options); - } catch (IOException e) { - throw new RuntimeException("Can't create file " + locPath, e); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Can't create file " + locPath); } - writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize()); metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); return writer; diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java index 60094e72a9f8..1c9d4ca4d037 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java @@ -43,11 +43,11 @@ class OrcIterable extends CloseableGroup implements CloseableIterable { private final InputFile file; private final Long start; private final Long length; - private final Function> readerFunction; + private final Function> readerFunction; OrcIterable(InputFile file, Configuration config, Schema schema, - Long start, Long length, - Function> readerFunction) { + Long start, Long length, + Function> readerFunction) { this.schema = schema; this.readerFunction = readerFunction; this.file = file; @@ -59,10 +59,12 @@ class OrcIterable extends CloseableGroup implements CloseableIterable { @SuppressWarnings("unchecked") @Override public Iterator iterator() { + Reader orcFileReader = newFileReader(file, config); + TypeDescription readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, orcFileReader.getSchema()); + return new OrcIterator( - newOrcIterator(file, TypeConversion.toOrc(schema, new ColumnIdMap()), - start, length, newFileReader(file, config)), - readerFunction.apply(schema)); + newOrcIterator(file, readOrcSchema, start, length, orcFileReader), + readerFunction.apply(readOrcSchema)); } private static VectorizedRowBatchIterator newOrcIterator(InputFile file, diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java index 5dc6421b3782..65bc64d0a9d5 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java @@ -29,6 +29,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.orc.OrcFile; import org.apache.orc.Reader; +import org.apache.orc.Writer; public class OrcMetrics { @@ -40,12 +41,13 @@ public static Metrics fromInputFile(InputFile file) { return fromInputFile(file, config); } - public static Metrics fromInputFile(InputFile file, Configuration config) { + static Metrics fromInputFile(InputFile file, Configuration config) { try { final Reader orcReader = OrcFile.createReader(new Path(file.location()), OrcFile.readerOptions(config)); // TODO: implement rest of the methods for ORC metrics + // https://github.com/apache/incubator-iceberg/pull/199 return new Metrics(orcReader.getNumberOfRows(), null, null, @@ -56,4 +58,15 @@ public static Metrics fromInputFile(InputFile file, Configuration config) { throw new RuntimeIOException(ioe, "Failed to read footer of file: %s", file); } } + + static Metrics fromWriter(Writer writer) { + // TODO: implement rest of the methods for ORC metrics in + // https://github.com/apache/incubator-iceberg/pull/199 + return new Metrics(writer.getNumberOfRows(), + null, + null, + Collections.emptyMap(), + null, + null); + } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index 5f1e167fa4b3..74d2fddddcb0 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -29,6 +29,7 @@ public interface OrcValueWriter { /** * Writes the data. + * * @param value the data value to write. * @param output the VectorizedRowBatch to which the output will be written. * @throws IOException if there's any IO error while writing the data value. diff --git a/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java b/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java deleted file mode 100644 index 29448f916287..000000000000 --- a/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.orc; - -import java.util.ArrayList; -import java.util.List; -import org.apache.iceberg.Schema; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.apache.orc.TypeDescription; - -public class TypeConversion { - - /** - * Convert a given Iceberg schema to ORC. - * @param schema the Iceberg schema to convert - * @param columnIds an output with the column ids - * @return the ORC schema - */ - public static TypeDescription toOrc(Schema schema, - ColumnIdMap columnIds) { - return toOrc(null, schema.asStruct(), columnIds); - } - - static TypeDescription toOrc(Integer fieldId, - Type type, - ColumnIdMap columnIds) { - TypeDescription result; - switch (type.typeId()) { - case BOOLEAN: - result = TypeDescription.createBoolean(); - break; - case INTEGER: - result = TypeDescription.createInt(); - break; - case LONG: - result = TypeDescription.createLong(); - break; - case FLOAT: - result = TypeDescription.createFloat(); - break; - case DOUBLE: - result = TypeDescription.createDouble(); - break; - case DATE: - result = TypeDescription.createDate(); - break; - case TIME: - result = TypeDescription.createInt(); - break; - case TIMESTAMP: - result = TypeDescription.createTimestamp(); - break; - case STRING: - result = TypeDescription.createString(); - break; - case UUID: - result = TypeDescription.createBinary(); - break; - case FIXED: - result = TypeDescription.createBinary(); - break; - case BINARY: - result = TypeDescription.createBinary(); - break; - case DECIMAL: { - Types.DecimalType decimal = (Types.DecimalType) type; - result = TypeDescription.createDecimal() - .withScale(decimal.scale()) - .withPrecision(decimal.precision()); - break; - } - case STRUCT: { - result = TypeDescription.createStruct(); - for (Types.NestedField field : type.asStructType().fields()) { - result.addField(field.name(), toOrc(field.fieldId(), field.type(), columnIds)); - } - break; - } - case LIST: { - Types.ListType list = (Types.ListType) type; - result = TypeDescription.createList(toOrc(list.elementId(), list.elementType(), - columnIds)); - break; - } - case MAP: { - Types.MapType map = (Types.MapType) type; - TypeDescription key = toOrc(map.keyId(), map.keyType(), columnIds); - result = TypeDescription.createMap(key, - toOrc(map.valueId(), map.valueType(), columnIds)); - break; - } - default: - throw new IllegalArgumentException("Unhandled type " + type.typeId()); - } - if (fieldId != null) { - columnIds.put(result, fieldId); - } - return result; - } - - /** - * Convert an ORC schema to an Iceberg schema. - * @param schema the ORC schema - * @param columnIds the column ids - * @return the Iceberg schema - */ - public Schema fromOrc(TypeDescription schema, ColumnIdMap columnIds) { - return new Schema(convertOrcToType(schema, columnIds).asStructType().fields()); - } - - Type convertOrcToType(TypeDescription schema, ColumnIdMap columnIds) { - switch (schema.getCategory()) { - case BOOLEAN: - return Types.BooleanType.get(); - case BYTE: - case SHORT: - case INT: - return Types.IntegerType.get(); - case LONG: - return Types.LongType.get(); - case FLOAT: - return Types.FloatType.get(); - case DOUBLE: - return Types.DoubleType.get(); - case STRING: - case CHAR: - case VARCHAR: - return Types.StringType.get(); - case BINARY: - return Types.BinaryType.get(); - case DATE: - return Types.DateType.get(); - case TIMESTAMP: - return Types.TimestampType.withoutZone(); - case DECIMAL: - return Types.DecimalType.of(schema.getPrecision(), schema.getScale()); - case STRUCT: { - List fieldNames = schema.getFieldNames(); - List fieldTypes = schema.getChildren(); - List fields = new ArrayList<>(fieldNames.size()); - for (int c = 0; c < fieldNames.size(); ++c) { - String name = fieldNames.get(c); - TypeDescription type = fieldTypes.get(c); - fields.add(Types.NestedField.optional(columnIds.get(type), name, - convertOrcToType(type, columnIds))); - } - return Types.StructType.of(fields); - } - case LIST: { - TypeDescription child = schema.getChildren().get(0); - return Types.ListType.ofOptional(columnIds.get(child), - convertOrcToType(child, columnIds)); - } - case MAP: { - TypeDescription key = schema.getChildren().get(0); - TypeDescription value = schema.getChildren().get(1); - return Types.MapType.ofOptional(columnIds.get(key), columnIds.get(value), - convertOrcToType(key, columnIds), convertOrcToType(value, columnIds)); - } - default: - // We don't have an answer for union types. - throw new IllegalArgumentException("Can't handle " + schema); - } - } -} diff --git a/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java b/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java index ddc0bce97ca4..125a37c39498 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java +++ b/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Iterator; +import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; @@ -52,8 +53,8 @@ private void advance() { if (!advanced) { try { rows.nextBatch(batch); - } catch (IOException e) { - throw new RuntimeException("Problem reading ORC file " + fileLocation, e); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Problem reading ORC file " + fileLocation); } advanced = true; } diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestBuildOrcProjection.java b/orc/src/test/java/org/apache/iceberg/orc/TestBuildOrcProjection.java new file mode 100644 index 000000000000..2a86f235ac88 --- /dev/null +++ b/orc/src/test/java/org/apache/iceberg/orc/TestBuildOrcProjection.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.junit.Assert.assertEquals; + +/** + * Test projections on ORC types. + */ +public class TestBuildOrcProjection { + + @Test + public void testProjectionPrimitiveNoOp() { + Schema originalSchema = new Schema( + optional(1, "a", Types.IntegerType.get()), + optional(2, "b", Types.StringType.get()) + ); + + // Original mapping (stored in ORC) + TypeDescription orcSchema = ORCSchemaUtil.convert(originalSchema); + assertEquals(2, orcSchema.getChildren().size()); + assertEquals(1, orcSchema.findSubtype("a").getId()); + assertEquals(TypeDescription.Category.INT, orcSchema.findSubtype("a").getCategory()); + assertEquals(2, orcSchema.findSubtype("b").getId()); + assertEquals(TypeDescription.Category.STRING, orcSchema.findSubtype("b").getCategory()); + } + + @Test + public void testProjectionPrimitive() { + Schema originalSchema = new Schema( + optional(1, "a", Types.IntegerType.get()), + optional(2, "b", Types.StringType.get()) + ); + + // Original mapping (stored in ORC) + TypeDescription orcSchema = ORCSchemaUtil.convert(originalSchema); + + // Evolve schema + Schema evolveSchema = new Schema( + optional(2, "a", Types.StringType.get()), + optional(3, "c", Types.DateType.get()) // will produce ORC column c_r3 (new) + ); + + TypeDescription newOrcSchema = ORCSchemaUtil.buildOrcProjection(evolveSchema, orcSchema); + assertEquals(2, newOrcSchema.getChildren().size()); + assertEquals(1, newOrcSchema.findSubtype("b").getId()); + assertEquals(TypeDescription.Category.STRING, newOrcSchema.findSubtype("b").getCategory()); + assertEquals(2, newOrcSchema.findSubtype("c_r3").getId()); + assertEquals(TypeDescription.Category.DATE, newOrcSchema.findSubtype("c_r3").getCategory()); + } + + @Test + public void testProjectionNestedNoOp() { + Types.StructType nestedStructType = Types.StructType.of( + optional(2, "b", Types.StringType.get()), + optional(3, "c", Types.DateType.get()) + ); + Schema originalSchema = new Schema( + optional(1, "a", nestedStructType) + ); + + // Original mapping (stored in ORC) + TypeDescription orcSchema = ORCSchemaUtil.convert(originalSchema); + + TypeDescription newOrcSchema = ORCSchemaUtil.buildOrcProjection(originalSchema, orcSchema); + assertEquals(1, newOrcSchema.getChildren().size()); + assertEquals(TypeDescription.Category.STRUCT, newOrcSchema.findSubtype("a").getCategory()); + TypeDescription nestedCol = newOrcSchema.findSubtype("a"); + assertEquals(2, nestedCol.findSubtype("b").getId()); + assertEquals(TypeDescription.Category.STRING, nestedCol.findSubtype("b").getCategory()); + assertEquals(3, nestedCol.findSubtype("c").getId()); + assertEquals(TypeDescription.Category.DATE, nestedCol.findSubtype("c").getCategory()); + } + + @Test + public void testProjectionNested() { + Types.StructType nestedStructType = Types.StructType.of( + optional(2, "b", Types.StringType.get()), + optional(3, "c", Types.DateType.get()) + ); + Schema originalSchema = new Schema( + optional(1, "a", nestedStructType) + ); + + // Original mapping (stored in ORC) + TypeDescription orcSchema = ORCSchemaUtil.convert(originalSchema); + + // Evolve schema + Types.StructType newNestedStructType = Types.StructType.of( + optional(3, "cc", Types.DateType.get()), + optional(2, "bb", Types.StringType.get()) + ); + Schema evolveSchema = new Schema( + optional(1, "aa", newNestedStructType) + ); + + TypeDescription newOrcSchema = ORCSchemaUtil.buildOrcProjection(evolveSchema, orcSchema); + assertEquals(1, newOrcSchema.getChildren().size()); + assertEquals(TypeDescription.Category.STRUCT, newOrcSchema.findSubtype("a").getCategory()); + TypeDescription nestedCol = newOrcSchema.findSubtype("a"); + assertEquals(2, nestedCol.findSubtype("c").getId()); + assertEquals(TypeDescription.Category.DATE, nestedCol.findSubtype("c").getCategory()); + assertEquals(3, nestedCol.findSubtype("b").getId()); + assertEquals(TypeDescription.Category.STRING, nestedCol.findSubtype("b").getCategory()); + } + +} diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java new file mode 100644 index 000000000000..461e1d4bd930 --- /dev/null +++ b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.junit.Test; + +import static org.apache.iceberg.AssertHelpers.assertThrows; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.junit.Assert.assertEquals; + +public class TestORCSchemaUtil { + + @Test + public void testRoundtripConversionPrimitive() { + Schema expectedSchema = new Schema( + optional(1, "intCol", Types.IntegerType.get()), + optional(3, "longCol", Types.LongType.get()), + optional(6, "intCol2", Types.IntegerType.get()), + optional(20, "intCol3", Types.IntegerType.get()), + required(9, "doubleCol", Types.DoubleType.get()), + required(10, "uuidCol", Types.UUIDType.get()), + optional(2, "booleanCol", Types.BooleanType.get()), + optional(21, "fixedCol", Types.FixedType.ofLength(4096)), + required(22, "binaryCol", Types.BinaryType.get()), + required(23, "stringCol", Types.StringType.get()), + required(24, "decimalCol", Types.DecimalType.of(15, 3)), + required(25, "floatCol", Types.FloatType.get()), + optional(30, "dateCol", Types.DateType.get()), + required(32, "timeCol", Types.TimeType.get()), + required(34, "timestampCol", Types.TimestampType.withZone()) + ); + TypeDescription orcSchema = ORCSchemaUtil.convert(expectedSchema); + assertEquals(expectedSchema.asStruct(), ORCSchemaUtil.convert(orcSchema).asStruct()); + } + + @Test + public void testRoundtripConversionNested() { + Types.StructType leafStructType = Types.StructType.of( + optional(6, "leafLongCol", Types.LongType.get()), + optional(7, "leafBinaryCol", Types.BinaryType.get()) + ); + Types.StructType nestedStructType = Types.StructType.of( + optional(4, "longCol", Types.LongType.get()), + optional(5, "leafStructCol", leafStructType) + ); + Types.StructType structPrimTypeForList = Types.StructType.of( + optional(506, "leafLongCol", Types.LongType.get()), + optional(507, "leafBinaryCol", Types.BinaryType.get()) + ); + Types.StructType leafStructTypeForList = Types.StructType.of( + optional(516, "leafLongCol", Types.LongType.get()), + optional(517, "leafBinaryCol", Types.BinaryType.get()) + ); + Types.StructType nestedStructTypeForList = Types.StructType.of( + optional(504, "longCol", Types.LongType.get()), + optional(505, "leafStructCol", leafStructTypeForList) + ); + Types.StructType structPrimTypeForMap = Types.StructType.of( + optional(606, "leafLongCol", Types.LongType.get()), + optional(607, "leafBinaryCol", Types.BinaryType.get()) + ); + Types.StructType leafStructTypeForMap = Types.StructType.of( + optional(616, "leafLongCol", Types.LongType.get()), + optional(617, "leafBinaryCol", Types.BinaryType.get()) + ); + Types.StructType nestedStructTypeForMap = Types.StructType.of( + optional(604, "longCol", Types.LongType.get()), + optional(605, "leafStructCol", leafStructTypeForMap) + ); + Types.StructType leafStructTypeForStruct = Types.StructType.of( + optional(716, "leafLongCol", Types.LongType.get()), + optional(717, "leafBinaryCol", Types.BinaryType.get()) + ); + Types.StructType nestedStructTypeForStruct = Types.StructType.of( + optional(704, "longCol", Types.LongType.get()), + optional(705, "leafStructCol", leafStructTypeForStruct) + ); + // all fields in expected iceberg schema will be optional since we don't have a column mapping + Schema expectedSchema = new Schema( + optional(1, "intCol", Types.IntegerType.get()), + optional(2, "longCol", Types.LongType.get()), + optional(3, "nestedStructCol", nestedStructType), + optional(8, "intCol3", Types.IntegerType.get()), + optional(9, "doubleCol", Types.DoubleType.get()), + required(10, "uuidCol", Types.UUIDType.get()), + optional(20, "booleanCol", Types.BooleanType.get()), + optional(21, "fixedCol", Types.FixedType.ofLength(4096)), + required(22, "binaryCol", Types.BinaryType.get()), + required(23, "stringCol", Types.StringType.get()), + required(24, "decimalCol", Types.DecimalType.of(15, 3)), + required(25, "floatCol", Types.FloatType.get()), + optional(30, "dateCol", Types.DateType.get()), + required(32, "timeCol", Types.TimeType.get()), + required(34, "timestampCol", Types.TimestampType.withZone()), + required(35, "listPrimCol", + Types.ListType.ofRequired(135, Types.LongType.get())), + required(36, "listPrimNestCol", + Types.ListType.ofRequired(136, structPrimTypeForList)), + required(37, "listNestedCol", + Types.ListType.ofRequired(137, nestedStructTypeForList)), + optional(38, "mapPrimCol", + Types.MapType.ofRequired(138, 238, Types.StringType.get(), Types.FixedType.ofLength(4096))), + required(39, "mapPrimNestCol", + Types.MapType.ofRequired(139, 239, Types.StringType.get(), structPrimTypeForMap)), + required(40, "mapNestedCol", + Types.MapType.ofRequired(140, 240, Types.StringType.get(), nestedStructTypeForMap)), + required(41, "structListNestCol", + Types.ListType.ofRequired(241, + Types.StructType.of( + optional(816, "leafLongCol", Types.LongType.get()), + optional(817, "leafBinaryCol", Types.BinaryType.get()) + )) + ), + required(42, "structMapNestCol", + Types.MapType.ofRequired(242, 342, Types.StringType.get(), + Types.StructType.of( + optional(916, "leafLongCol", Types.LongType.get()), + optional(917, "leafBinaryCol", Types.BinaryType.get()) + ) + )), + required(43, "structStructNestCol", + Types.StructType.of(required(243, "innerStructNest", + Types.StructType.of( + optional(1016, "leafLongCol", Types.LongType.get()), + optional(1017, "leafBinaryCol", Types.BinaryType.get()) + )) + )), + required(44, "structStructComplexNestCol", + Types.StructType.of(required(244, "innerStructNest", + Types.StructType.of( + optional(1116, "leafLongCol", Types.LongType.get()), + optional(1117, "leftMapOfListStructCol", + Types.MapType.ofRequired(1150, 1151, + Types.StringType.get(), + Types.ListType.ofRequired(1250, nestedStructTypeForStruct)) + ) + )) + )) + ); + TypeDescription orcSchema = ORCSchemaUtil.convert(expectedSchema); + assertEquals(expectedSchema.asStruct(), ORCSchemaUtil.convert(orcSchema).asStruct()); + } + + @Test + public void testTypePromotions() { + Schema originalSchema = new Schema( + optional(1, "a", Types.IntegerType.get()), + optional(2, "b", Types.FloatType.get()), + optional(3, "c", Types.DecimalType.of(10, 2)) + ); + + // Original mapping (stored in ORC) + TypeDescription orcSchema = ORCSchemaUtil.convert(originalSchema); + + // Evolve schema + Schema evolveSchema = new Schema( + optional(1, "a", Types.LongType.get()), + optional(2, "b", Types.DoubleType.get()), + optional(3, "c", Types.DecimalType.of(15, 2)) + ); + + TypeDescription newOrcSchema = ORCSchemaUtil.buildOrcProjection(evolveSchema, orcSchema); + assertEquals(3, newOrcSchema.getChildren().size()); + assertEquals(1, newOrcSchema.findSubtype("a").getId()); + assertEquals(TypeDescription.Category.LONG, newOrcSchema.findSubtype("a").getCategory()); + assertEquals(2, newOrcSchema.findSubtype("b").getId()); + assertEquals(TypeDescription.Category.DOUBLE, newOrcSchema.findSubtype("b").getCategory()); + TypeDescription decimalC = newOrcSchema.findSubtype("c"); + assertEquals(3, decimalC.getId()); + assertEquals(TypeDescription.Category.DECIMAL, decimalC.getCategory()); + assertEquals(15, decimalC.getPrecision()); + assertEquals(2, decimalC.getScale()); + } + + @Test + public void testInvalidTypePromotions() { + Schema originalSchema = new Schema( + optional(1, "a", Types.LongType.get()) + ); + + TypeDescription orcSchema = ORCSchemaUtil.convert(originalSchema); + Schema evolveSchema = new Schema( + optional(1, "a", Types.IntegerType.get()) + ); + + assertThrows("Should not allow invalid type promotion", + IllegalArgumentException.class, "Can not promote", () -> { + ORCSchemaUtil.buildOrcProjection(evolveSchema, orcSchema); + }); + } +} diff --git a/site/docs/spec.md b/site/docs/spec.md index 74e64e1ae98c..27455cb81711 100644 --- a/site/docs/spec.md +++ b/site/docs/spec.md @@ -506,7 +506,7 @@ Lists must use the [3-level representation](https://github.com/apache/parquet-fo One of the interesting challenges with this is how to map Iceberg’s schema evolution (id based) on to ORC’s (name based). In theory, we could use Iceberg’s column ids as the column and field names, but that would suck from a user’s point of view. -The column ids would be stored in ORC’s user metadata as `iceberg.column.id` with a comma separated list of the ids. +The column IDs must be stored in ORC type attributes using the key `iceberg.id`, and `iceberg.required` to store `"true"` if the Iceberg column is required, otherwise it will be optional. Iceberg would build the desired reader schema with their schema evolution rules and pass that down to the ORC reader, which would then use its schema evolution to map that to the writer’s schema. Basically, Iceberg would need to change the names of columns and fields to get the desired mapping. diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 3d1ac6f16332..3542198b7abc 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -22,10 +22,7 @@ import java.math.BigDecimal; import java.sql.Timestamp; import java.util.List; -import org.apache.iceberg.Schema; -import org.apache.iceberg.orc.ColumnIdMap; import org.apache.iceberg.orc.OrcValueReader; -import org.apache.iceberg.orc.TypeConversion; import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; @@ -57,27 +54,25 @@ */ public class SparkOrcReader implements OrcValueReader { private static final int INITIAL_SIZE = 128 * 1024; - private final int numFields; - private final TypeDescription readSchema; + private final List columns; private final Converter[] converters; - public SparkOrcReader(Schema readSchema) { - this.readSchema = TypeConversion.toOrc(readSchema, new ColumnIdMap()); - numFields = readSchema.columns().size(); + public SparkOrcReader(TypeDescription readOrcSchema) { + columns = readOrcSchema.getChildren(); converters = buildConverters(); } private Converter[] buildConverters() { - final Converter[] newConverters = new Converter[numFields]; - for (int c = 0; c < numFields; ++c) { - newConverters[c] = buildConverter(readSchema.getChildren().get(c)); + final Converter[] newConverters = new Converter[columns.size()]; + for (int c = 0; c < newConverters.length; ++c) { + newConverters[c] = buildConverter(columns.get(c)); } return newConverters; } @Override public InternalRow read(VectorizedRowBatch batch, int row) { - final UnsafeRowWriter rowWriter = new UnsafeRowWriter(numFields, INITIAL_SIZE); + final UnsafeRowWriter rowWriter = new UnsafeRowWriter(columns.size(), INITIAL_SIZE); rowWriter.reset(); rowWriter.zeroOutNullBytes(); for (int c = 0; c < batch.cols.length; ++c) { @@ -424,7 +419,8 @@ public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int writer.setNullAt(column); } else { BytesColumnVector bytesVector = (BytesColumnVector) vector; - writer.write(column, bytesVector.vector[rowIndex], bytesVector.start[rowIndex], bytesVector.length[rowIndex]); + writer.write(column, bytesVector.vector[rowIndex], + bytesVector.start[rowIndex], bytesVector.length[rowIndex]); } } @@ -526,7 +522,6 @@ private static class StructConverter implements Converter { for (int c = 0; c < children.length; ++c) { children[c] = buildConverter(schema.getChildren().get(c)); } - } int writeStruct(UnsafeWriter parentWriter, StructColumnVector vector, int row) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index daee8270f997..21b896ba8a68 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -215,13 +215,8 @@ public void addValue(int rowId, int column, SpecializedGetters data, output.isNull[rowId] = false; TimestampColumnVector cv = (TimestampColumnVector) output; long micros = data.getLong(column); - cv.time[rowId] = (micros / 1_000_000) * 1000; - int nanos = (int) (micros % 1_000_000) * 1000; - if (nanos < 0) { - nanos += 1_000_000_000; - cv.time[rowId] -= 1000; - } - cv.nanos[rowId] = nanos; + cv.time[rowId] = micros / 1_000; // millis + cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos } } } diff --git a/versions.lock b/versions.lock index 917755e44923..eb269c2d0b8a 100644 --- a/versions.lock +++ b/versions.lock @@ -60,7 +60,7 @@ commons-net:commons-net:3.1 (3 constraints: 3d222e61) commons-pool:commons-pool:1.6 (4 constraints: e336ab5e) dk.brics.automaton:automaton:1.11-8 (1 constraints: 92088a8d) hsqldb:hsqldb:1.8.0.10 (1 constraints: f008499f) -io.airlift:aircompressor:0.10 (1 constraints: 090a9fb2) +io.airlift:aircompressor:0.15 (1 constraints: 0e0aa4b2) io.dropwizard.metrics:metrics-core:3.1.5 (6 constraints: 865ea0ba) io.dropwizard.metrics:metrics-graphite:3.1.5 (1 constraints: 1a0dc936) io.dropwizard.metrics:metrics-json:3.1.5 (1 constraints: 1a0dc936) @@ -146,9 +146,9 @@ org.apache.htrace:htrace-core:3.1.0-incubating (2 constraints: cd22cffa) org.apache.httpcomponents:httpclient:4.5.6 (4 constraints: 573134dd) org.apache.httpcomponents:httpcore:4.4.10 (3 constraints: d327f763) org.apache.ivy:ivy:2.4.0 (3 constraints: 0826dbf1) -org.apache.orc:orc-core:1.5.6 (2 constraints: d011de27) +org.apache.orc:orc-core:1.6.2 (2 constraints: cd116527) org.apache.orc:orc-mapreduce:1.5.5 (1 constraints: c30cc227) -org.apache.orc:orc-shims:1.5.6 (1 constraints: 420aebbc) +org.apache.orc:orc-shims:1.6.2 (1 constraints: 3f0aeabc) org.apache.parquet:parquet-avro:1.10.1 (1 constraints: 35052a3b) org.apache.parquet:parquet-column:1.10.1 (3 constraints: 9429eeca) org.apache.parquet:parquet-common:1.10.1 (2 constraints: 4c1e7385) @@ -202,6 +202,7 @@ org.glassfish.jersey.core:jersey-server:2.22.2 (3 constraints: 553f5d56) org.glassfish.jersey.media:jersey-media-jaxb:2.22.2 (1 constraints: 3111f1d4) org.iq80.snappy:snappy:0.2 (1 constraints: 890d5927) org.javassist:javassist:3.18.1-GA (1 constraints: 570d4740) +org.jetbrains:annotations:17.0.0 (1 constraints: 6e0a64c7) org.jodd:jodd-core:3.5.2 (2 constraints: 0c1bda93) org.json:json:20090211 (1 constraints: 890c4218) org.json4s:json4s-ast_2.11:3.5.3 (1 constraints: 0c0b9ae9) diff --git a/versions.props b/versions.props index a83da88e313e..6da57a579d1b 100644 --- a/versions.props +++ b/versions.props @@ -3,7 +3,7 @@ com.google.guava:guava = 28.0-jre org.apache.avro:avro = 1.8.2 org.apache.hadoop:* = 2.7.3 org.apache.hive:hive-standalone-metastore = 1.2.1 -org.apache.orc:orc-core = 1.5.6 +org.apache.orc:orc-core = 1.6.2 org.apache.parquet:parquet-avro = 1.10.1 org.apache.spark:spark-hive_2.11 = 2.4.4 org.apache.spark:spark-avro_2.11 = 2.4.4