From efd91f65f00edff4edb0f8037b8465669f24eb08 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 10 Apr 2020 21:34:58 -0700 Subject: [PATCH 1/5] Orc nested partition support --- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 15 +- .../iceberg/orc/OrcSchemaWithTypeVisitor.java | 90 ++++ .../org/apache/iceberg/orc/OrcValReader.java | 36 ++ .../apache/iceberg/orc/OrcValueReaders.java | 231 +++++++++ .../iceberg/spark/data/SparkOrcReader.java | 477 +++--------------- .../spark/data/SparkOrcValueReaders.java | 213 ++++++++ .../iceberg/spark/source/RowDataReader.java | 10 +- .../spark/data/TestSparkOrcReader.java | 2 +- .../spark/source/TestPartitionValues.java | 3 - 9 files changed, 649 insertions(+), 428 deletions(-) create mode 100644 orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java create mode 100644 orc/src/main/java/org/apache/iceberg/orc/OrcValReader.java create mode 100644 orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java create mode 100644 spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 3af6f587783b..19727a1bffdb 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -67,7 +67,7 @@ public TypeDescription type() { } } - private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; + static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required"; /** @@ -214,6 +214,7 @@ public static Schema convert(TypeDescription orcSchema) { "Error in ORC file, children fields and names do not match."); List icebergFields = Lists.newArrayListWithExpectedSize(children.size()); + // TODO how we get field ids from ORC schema AtomicInteger lastColumnId = new AtomicInteger(getMaxIcebergId(orcSchema)); for (int i = 0; i < children.size(); i++) { icebergFields.add(convertOrcToIceberg(children.get(i), childrenNames.get(i), @@ -308,7 +309,7 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo orcType = convert(fieldId, type, false); } } - + orcType.setAttribute(ICEBERG_ID_ATTRIBUTE, fieldId.toString()); return orcType; } @@ -377,11 +378,17 @@ private static boolean isSameType(TypeDescription orcType, Type icebergType) { } } - private static Optional icebergID(TypeDescription orcType) { + static Optional icebergID(TypeDescription orcType) { return Optional.ofNullable(orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE)) .map(Integer::parseInt); } + static int fieldId(TypeDescription orcType) { + String idStr = orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE); + Preconditions.checkNotNull(idStr, "Missing expected '%s' property", ICEBERG_ID_ATTRIBUTE); + return Integer.parseInt(idStr); + } + private static boolean isRequired(TypeDescription orcType) { String isRequiredStr = orcType.getAttributeValue(ICEBERG_REQUIRED_ATTRIBUTE); if (isRequiredStr != null) { @@ -496,7 +503,7 @@ private static Types.NestedField convertOrcToIceberg(TypeDescription orcType, St } } - private static int getMaxIcebergId(TypeDescription originalOrcSchema) { + static int getMaxIcebergId(TypeDescription originalOrcSchema) { int maxId = icebergID(originalOrcSchema).orElse(0); final List children = Optional.ofNullable(originalOrcSchema.getChildren()) .orElse(Collections.emptyList()); diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java new file mode 100644 index 000000000000..477290974fb5 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; + + +public abstract class OrcSchemaWithTypeVisitor { + public static T visit( + org.apache.iceberg.Schema iSchema, TypeDescription schema, OrcSchemaWithTypeVisitor visitor) { + return visit(iSchema.asStruct(), schema, visitor); + } + + public static T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeVisitor visitor) { + switch (schema.getCategory()) { + case STRUCT: + return visitRecord(iType.asStructType(), schema, visitor); + + case UNION: + // We don't have an answer for union types. + throw new IllegalArgumentException("Can't handle " + schema); + + case LIST: + Types.ListType list = iType.asListType(); + return visitor.array( + list, schema, + visit(list.elementType(), schema.getChildren().get(0), visitor)); + + case MAP: + Types.MapType map = iType.asMapType(); + return visitor.map( + map, schema, + visit(map.keyType(), schema.getChildren().get(0), visitor), + visit(map.valueType(), schema.getChildren().get(1), visitor)); + + default: + return visitor.primitive(iType.asPrimitiveType(), schema); + } + } + + private static T visitRecord( + Types.StructType struct, TypeDescription record, OrcSchemaWithTypeVisitor visitor) { + List fields = record.getChildren(); + List names = record.getFieldNames(); + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (TypeDescription field : fields) { + int fieldId = ORCSchemaUtil.fieldId(field); + Types.NestedField iField = struct != null ? struct.field(fieldId) : null; + results.add(visit(iField != null ? iField.type() : null, field, visitor)); + } + return visitor.record(struct, record, names, results); + } + + public T record(Types.StructType iStruct, TypeDescription record, List names, List fields) { + return null; + } + + public T array(Types.ListType iList, TypeDescription array, T element) { + return null; + } + + public T map(Types.MapType iMap, TypeDescription map, T key, T value) { + return null; + } + + public T primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + return null; + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValReader.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValReader.java new file mode 100644 index 000000000000..565901852fe9 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValReader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import org.apache.orc.storage.ql.exec.vector.ColumnVector; + + +public interface OrcValReader { + default T read(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return nonNullRead(vector, rowIndex); + } + } + + T nonNullRead(ColumnVector vector, int row); +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java new file mode 100644 index 000000000000..90a857073b2f --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.types.Types; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + + +public class OrcValueReaders { + private OrcValueReaders() { + } + + public static OrcValReader booleans() { + return BooleanReader.INSTANCE; + } + + public static OrcValReader shorts() { + return ShortReader.INSTANCE; + } + + public static OrcValReader ints() { + return IntegerReader.INSTANCE; + } + + public static OrcValReader longs() { + return LongReader.INSTANCE; + } + + public static OrcValReader floats() { + return FloatReader.INSTANCE; + } + + public static OrcValReader doubles() { + return DoubleReader.INSTANCE; + } + + public static OrcValReader bytes() { + return BytesReader.INSTANCE; + } + + public static OrcValReader byteReader() { + return ByteReader.INSTANCE; + } + + private static class BooleanReader implements OrcValReader { + static final OrcValReader INSTANCE = new BooleanReader(); + + private BooleanReader() { + } + + @Override + public Boolean nonNullRead(ColumnVector vector, int row) { + return ((LongColumnVector) vector).vector[row] != 0; + } + } + + private static class ShortReader implements OrcValReader { + static final OrcValReader INSTANCE = new ShortReader(); + + private ShortReader() { + } + + @Override + public Short nonNullRead(ColumnVector vector, int row) { + return (short) ((LongColumnVector) vector).vector[row]; + } + } + + private static class IntegerReader implements OrcValReader { + static final OrcValReader INSTANCE = new IntegerReader(); + + private IntegerReader() { + } + + @Override + public Integer nonNullRead(ColumnVector vector, int row) { + return (int) ((LongColumnVector) vector).vector[row]; + } + } + + private static class LongReader implements OrcValReader { + static final OrcValReader INSTANCE = new LongReader(); + + private LongReader() { + } + + @Override + public Long nonNullRead(ColumnVector vector, int row) { + return ((LongColumnVector) vector).vector[row]; + } + } + + private static class FloatReader implements OrcValReader { + private static final FloatReader INSTANCE = new FloatReader(); + + private FloatReader() { + } + + @Override + public Float nonNullRead(ColumnVector vector, int row) { + return (float) ((DoubleColumnVector) vector).vector[row]; + } + } + + private static class DoubleReader implements OrcValReader { + private static final DoubleReader INSTANCE = new DoubleReader(); + + private DoubleReader() { + } + + @Override + public Double nonNullRead(ColumnVector vector, int row) { + return ((DoubleColumnVector) vector).vector[row]; + } + } + + private static class ByteReader implements OrcValReader { + private static final ByteReader INSTANCE = new ByteReader(); + + private ByteReader() { + } + + @Override + public Byte nonNullRead(ColumnVector vector, int row) { + return (byte) ((LongColumnVector) vector).vector[row]; + } + } + + private static class BytesReader implements OrcValReader { + private static final BytesReader INSTANCE = new BytesReader(); + + private BytesReader() { + } + + @Override + public byte[] nonNullRead(ColumnVector vector, int row) { + BytesColumnVector bytesVector = (BytesColumnVector) vector; + + return Arrays.copyOfRange( + bytesVector.vector[row], bytesVector.start[row], bytesVector.start[row] + bytesVector.length[row]); + } + } + + public abstract static class StructReader implements OrcValReader { + private final OrcValReader[] readers; + private final int[] positions; + private final Object[] constants; + + protected StructReader(List> readers) { + this.readers = readers.toArray(new OrcValReader[0]); + this.positions = new int[0]; + this.constants = new Object[0]; + } + + protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { + this.readers = readers.toArray(new OrcValReader[0]); + List fields = struct.fields(); + List positionList = Lists.newArrayListWithCapacity(fields.size()); + List constantList = Lists.newArrayListWithCapacity(fields.size()); + for (int pos = 0; pos < fields.size(); pos += 1) { + Types.NestedField field = fields.get(pos); + Object constant = idToConstant.get(field.fieldId()); + if (constant != null) { + positionList.add(pos); + constantList.add(idToConstant.get(field.fieldId())); + } + } + + this.positions = positionList.stream().mapToInt(Integer::intValue).toArray(); + this.constants = constantList.toArray(); + } + + protected abstract T create(); + + protected abstract T reuseOrCreate(); + + protected abstract void set(T struct, int pos, Object value); + + public OrcValReader reader(int pos) { + return readers[pos]; + } + + @Override + public T nonNullRead(ColumnVector vector, int row) { + StructColumnVector structVector = (StructColumnVector) vector; + return readInternal(create(), structVector.fields, row); + } + + public T read(VectorizedRowBatch batch, int row) { + return readInternal(reuseOrCreate(), batch.cols, row); + } + + private T readInternal(T struct, ColumnVector[] columnVectors, int row) { + for (int c = 0; c < readers.length; ++c) { + set(struct, c, reader(c).read(columnVectors[c], row)); + } + + for (int i = 0; i < positions.length; i += 1) { + set(struct, positions[i], constants[i]); + } + + return struct; + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 829ca45df245..6241eec49107 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -19,32 +19,19 @@ package org.apache.iceberg.spark.data; -import java.math.BigDecimal; -import java.sql.Timestamp; +import com.google.common.collect.ImmutableMap; import java.util.List; +import java.util.Map; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.orc.OrcValReader; import org.apache.iceberg.orc.OrcValueReader; +import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; -import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; -import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; -import org.apache.orc.storage.ql.exec.vector.ListColumnVector; -import org.apache.orc.storage.ql.exec.vector.LongColumnVector; -import org.apache.orc.storage.ql.exec.vector.MapColumnVector; -import org.apache.orc.storage.ql.exec.vector.StructColumnVector; -import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.storage.serde2.io.DateWritable; -import org.apache.orc.storage.serde2.io.HiveDecimalWritable; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; -import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter; -import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; -import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter; -import org.apache.spark.sql.catalyst.util.ArrayData; -import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.Decimal; -import org.apache.spark.unsafe.Platform; /** * Converts the OrcInterator, which returns ORC's VectorizedRowBatch to a @@ -53,425 +40,83 @@ * It minimizes allocations by reusing most of the objects in the implementation. */ public class SparkOrcReader implements OrcValueReader { - private static final int INITIAL_SIZE = 128 * 1024; - private final List columns; - private final Converter[] converters; - private final UnsafeRowWriter rowWriter; + private final SparkOrcValueReaders.StructReader reader; - public SparkOrcReader(TypeDescription readOrcSchema) { - columns = readOrcSchema.getChildren(); - converters = buildConverters(); - rowWriter = new UnsafeRowWriter(columns.size(), INITIAL_SIZE); + public SparkOrcReader(org.apache.iceberg.Schema expectedSchema, TypeDescription readSchema) { + this(expectedSchema, readSchema, ImmutableMap.of()); } - private Converter[] buildConverters() { - final Converter[] newConverters = new Converter[columns.size()]; - for (int c = 0; c < newConverters.length; ++c) { - newConverters[c] = buildConverter(columns.get(c)); - } - return newConverters; + @SuppressWarnings("unchecked") + public SparkOrcReader( + org.apache.iceberg.Schema expectedSchema, TypeDescription readOrcSchema, Map idToConstant) { + reader = (SparkOrcValueReaders.StructReader) OrcSchemaWithTypeVisitor.visit( + expectedSchema, readOrcSchema, new ReadBuilder(idToConstant)); } @Override public InternalRow read(VectorizedRowBatch batch, int row) { - rowWriter.reset(); - rowWriter.zeroOutNullBytes(); - for (int c = 0; c < batch.cols.length; ++c) { - converters[c].convert(rowWriter, c, batch.cols[c], row); - } - return rowWriter.getRow(); + return reader.read(batch, row); } - private static String rowToString(SpecializedGetters row, TypeDescription schema) { - final List children = schema.getChildren(); - final StringBuilder rowBuilder = new StringBuilder("{"); + private static class ReadBuilder extends OrcSchemaWithTypeVisitor> { + private final Map idToConstant; - for (int c = 0; c < children.size(); ++c) { - rowBuilder.append("\""); - rowBuilder.append(schema.getFieldNames().get(c)); - rowBuilder.append("\": "); - rowBuilder.append(rowEntryToString(row, c, children.get(c))); - if (c != children.size() - 1) { - rowBuilder.append(", "); - } + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; } - rowBuilder.append("}"); - return rowBuilder.toString(); - } - - private static String rowEntryToString(SpecializedGetters row, int ord, TypeDescription schema) { - switch (schema.getCategory()) { - case BOOLEAN: - return Boolean.toString(row.getBoolean(ord)); - case BYTE: - return Byte.toString(row.getByte(ord)); - case SHORT: - return Short.toString(row.getShort(ord)); - case INT: - return Integer.toString(row.getInt(ord)); - case LONG: - return Long.toString(row.getLong(ord)); - case FLOAT: - return Float.toString(row.getFloat(ord)); - case DOUBLE: - return Double.toString(row.getDouble(ord)); - case CHAR: - case VARCHAR: - case STRING: - return "\"" + row.getUTF8String(ord) + "\""; - case BINARY: { - byte[] bin = row.getBinary(ord); - final StringBuilder binStr; - if (bin == null) { - binStr = new StringBuilder("null"); - } else { - binStr = new StringBuilder("["); - for (int i = 0; i < bin.length; ++i) { - if (i != 0) { - binStr.append(", "); - } - int value = bin[i] & 0xff; - if (value < 16) { - binStr.append("0"); - binStr.append(Integer.toHexString(value)); - } else { - binStr.append(Integer.toHexString(value)); - } - } - binStr.append("]"); - } - return binStr.toString(); - } - case DECIMAL: - return row.getDecimal(ord, schema.getPrecision(), schema.getScale()).toString(); - case DATE: - return "\"" + new DateWritable(row.getInt(ord)) + "\""; - case TIMESTAMP_INSTANT: - Timestamp ts = new Timestamp((row.getLong(ord) / 1_000_000) * 1_000); // initialize with seconds - ts.setNanos((int) (row.getLong(ord) % 1_000_000) * 1_000); // add the rest (millis to nanos) - return "\"" + ts + "\""; - case STRUCT: - return rowToString(row.getStruct(ord, schema.getChildren().size()), schema); - case LIST: { - TypeDescription child = schema.getChildren().get(0); - final StringBuilder listStr = new StringBuilder("["); - ArrayData list = row.getArray(ord); - for (int e = 0; e < list.numElements(); ++e) { - if (e != 0) { - listStr.append(", "); - } - listStr.append(rowEntryToString(list, e, child)); - } - listStr.append("]"); - return listStr.toString(); - } - case MAP: { - TypeDescription keyType = schema.getChildren().get(0); - TypeDescription valueType = schema.getChildren().get(1); - MapData map = row.getMap(ord); - ArrayData keys = map.keyArray(); - ArrayData values = map.valueArray(); - StringBuilder mapStr = new StringBuilder("["); - for (int e = 0; e < map.numElements(); ++e) { - if (e != 0) { - mapStr.append(", "); - } - mapStr.append(rowEntryToString(keys, e, keyType)); - mapStr.append(": "); - mapStr.append(rowEntryToString(values, e, valueType)); - } - mapStr.append("]"); - return mapStr.toString(); - } - default: - throw new IllegalArgumentException("Unhandled type " + schema); - } - } - private static int getArrayElementSize(TypeDescription type) { - switch (type.getCategory()) { - case BOOLEAN: - case BYTE: - return 1; - case SHORT: - return 2; - case INT: - case FLOAT: - return 4; - default: - return 8; - } - } - - /** - * The common interface for converting from a ORC ColumnVector to a Spark - * UnsafeRow. UnsafeRows need two different interfaces for writers and thus - * we have two methods the first is for structs (UnsafeRowWriter) and the - * second is for lists and maps (UnsafeArrayWriter). If Spark adds a common - * interface similar to SpecializedGetters we could that and a single set of - * methods. - */ - interface Converter { - default void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) { - int rowIndex = vector.isRepeating ? 0 : row; - if (!vector.noNulls && vector.isNull[rowIndex]) { - writer.setNullAt(column); - } else { - convertNonNullValue(writer, column, vector, rowIndex); - } - } - - default void convert(UnsafeArrayWriter writer, int element, ColumnVector vector, int row) { - int rowIndex = vector.isRepeating ? 0 : row; - if (!vector.noNulls && vector.isNull[rowIndex]) { - writer.setNull(element); - } else { - convertNonNullValue(writer, element, vector, rowIndex); - } - } - - void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row); - } - - private static class BooleanConverter implements Converter { @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - writer.write(ordinal, ((LongColumnVector) vector).vector[row] != 0); + public OrcValReader record( + Types.StructType expected, TypeDescription record, List names, List> fields) { + return SparkOrcValueReaders.struct(fields, expected, idToConstant); } - } - private static class ByteConverter implements Converter { @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - writer.write(ordinal, (byte) ((LongColumnVector) vector).vector[row]); + public OrcValReader array(Types.ListType iList, TypeDescription array, OrcValReader elementReader) { + return SparkOrcValueReaders.array(elementReader); } - } - - private static class ShortConverter implements Converter { - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - writer.write(ordinal, (short) ((LongColumnVector) vector).vector[row]); - } - } - - private static class IntConverter implements Converter { - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - writer.write(ordinal, (int) ((LongColumnVector) vector).vector[row]); - } - } - - private static class LongConverter implements Converter { - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - writer.write(ordinal, ((LongColumnVector) vector).vector[row]); - } - } - - private static class FloatConverter implements Converter { - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - writer.write(ordinal, (float) ((DoubleColumnVector) vector).vector[row]); - } - } - - private static class DoubleConverter implements Converter { - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - writer.write(ordinal, ((DoubleColumnVector) vector).vector[row]); - } - } - private static class TimestampTzConverter implements Converter { @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - TimestampColumnVector timestampVector = (TimestampColumnVector) vector; - // compute microseconds past 1970. - writer.write(ordinal, (timestampVector.time[row] / 1000) * 1_000_000 + timestampVector.nanos[row] / 1000); + public OrcValReader map( + Types.MapType iMap, TypeDescription map, OrcValReader keyReader, OrcValReader valueReader) { + return SparkOrcValueReaders.map(keyReader, valueReader); } - } - private static class BinaryConverter implements Converter { @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - BytesColumnVector bytesVector = (BytesColumnVector) vector; - writer.write(ordinal, bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); - } - } - - private static class Decimal18Converter implements Converter { - private final int precision; - private final int scale; - - Decimal18Converter(int precision, int scale) { - this.precision = precision; - this.scale = scale; - } - - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row]; - writer.write(ordinal, - new Decimal().set(value.serialize64(value.scale()), value.precision(), value.scale()), - precision, scale); - } - } - - private static class Decimal38Converter implements Converter { - private final int precision; - private final int scale; - - Decimal38Converter(int precision, int scale) { - this.precision = precision; - this.scale = scale; - } - - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - BigDecimal value = ((DecimalColumnVector) vector).vector[row] - .getHiveDecimal().bigDecimalValue(); - writer.write(ordinal, - new Decimal().set(new scala.math.BigDecimal(value), precision, scale), - precision, scale); - } - } - - private static class StructConverter implements Converter { - private final Converter[] children; - - StructConverter(final TypeDescription schema) { - children = new Converter[schema.getChildren().size()]; - for (int c = 0; c < children.length; ++c) { - children[c] = buildConverter(schema.getChildren().get(c)); - } - } - - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - StructColumnVector structVector = (StructColumnVector) vector; - UnsafeRowWriter childWriter = new UnsafeRowWriter(writer, children.length); - int start = childWriter.cursor(); - childWriter.resetRowWriter(); - for (int c = 0; c < children.length; ++c) { - children[c].convert(childWriter, c, structVector.fields[c], row); - } - writer.setOffsetAndSizeFromPreviousCursor(ordinal, start); - } - } - - private static class ListConverter implements Converter { - private final Converter childConverter; - private final TypeDescription child; - - ListConverter(TypeDescription schema) { - child = schema.getChildren().get(0); - childConverter = buildConverter(child); - } - - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - ListColumnVector listVector = (ListColumnVector) vector; - int offset = (int) listVector.offsets[row]; - int length = (int) listVector.lengths[row]; - - UnsafeArrayWriter childWriter = new UnsafeArrayWriter(writer, getArrayElementSize(child)); - int start = childWriter.cursor(); - childWriter.initialize(length); - for (int c = 0; c < length; ++c) { - childConverter.convert(childWriter, c, listVector.child, offset + c); - } - writer.setOffsetAndSizeFromPreviousCursor(ordinal, start); - } - } - - private static class MapConverter implements Converter { - private static final int KEY_SIZE_BYTES = 8; - - private final Converter keyConvert; - private final Converter valueConvert; - - private final int keySize; - private final int valueSize; - - MapConverter(TypeDescription schema) { - final TypeDescription keyType = schema.getChildren().get(0); - final TypeDescription valueType = schema.getChildren().get(1); - keyConvert = buildConverter(keyType); - keySize = getArrayElementSize(keyType); - - valueConvert = buildConverter(valueType); - valueSize = getArrayElementSize(valueType); - } - - @Override - public void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector vector, int row) { - MapColumnVector mapVector = (MapColumnVector) vector; - final int offset = (int) mapVector.offsets[row]; - final int length = (int) mapVector.lengths[row]; - - UnsafeArrayWriter keyWriter = new UnsafeArrayWriter(writer, keySize); - final int start = keyWriter.cursor(); - // save room for the key size - keyWriter.grow(KEY_SIZE_BYTES); - keyWriter.increaseCursor(KEY_SIZE_BYTES); - - // serialize the keys - keyWriter.initialize(length); - for (int c = 0; c < length; ++c) { - keyConvert.convert(keyWriter, c, mapVector.keys, offset + c); - } - // store the serialized size of the keys - Platform.putLong(keyWriter.getBuffer(), start, - keyWriter.cursor() - start - KEY_SIZE_BYTES); - - // serialize the values - UnsafeArrayWriter valueWriter = new UnsafeArrayWriter(writer, valueSize); - valueWriter.initialize(length); - for (int c = 0; c < length; ++c) { - valueConvert.convert(valueWriter, c, mapVector.values, offset + c); + public OrcValReader primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + switch (primitive.getCategory()) { + case BOOLEAN: + return OrcValueReaders.booleans(); + case BYTE: + return OrcValueReaders.byteReader(); + case SHORT: + return OrcValueReaders.shorts(); + case DATE: + case INT: + return OrcValueReaders.ints(); + case LONG: + return OrcValueReaders.longs(); + case FLOAT: + return OrcValueReaders.floats(); + case DOUBLE: + return OrcValueReaders.doubles(); + case TIMESTAMP_INSTANT: + return SparkOrcValueReaders.timestampTzs(); + case DECIMAL: + if (primitive.getPrecision() <= Decimal.MAX_LONG_DIGITS()) { + return new SparkOrcValueReaders.Decimal18Reader(primitive.getPrecision(), primitive.getScale()); + } else { + return new SparkOrcValueReaders.Decimal38Reader(primitive.getPrecision(), primitive.getScale()); + } + case CHAR: + case VARCHAR: + case STRING: + return SparkOrcValueReaders.strings(); + case BINARY: + return OrcValueReaders.bytes(); + default: + throw new IllegalArgumentException("Unhandled type " + primitive); } - writer.setOffsetAndSizeFromPreviousCursor(ordinal, start); - } - } - - static Converter buildConverter(final TypeDescription schema) { - switch (schema.getCategory()) { - case BOOLEAN: - return new BooleanConverter(); - case BYTE: - return new ByteConverter(); - case SHORT: - return new ShortConverter(); - case DATE: - case INT: - return new IntConverter(); - case LONG: - return new LongConverter(); - case FLOAT: - return new FloatConverter(); - case DOUBLE: - return new DoubleConverter(); - case TIMESTAMP_INSTANT: - return new TimestampTzConverter(); - case DECIMAL: - if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) { - return new Decimal18Converter(schema.getPrecision(), schema.getScale()); - } else { - return new Decimal38Converter(schema.getPrecision(), schema.getScale()); - } - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - return new BinaryConverter(); - case STRUCT: - return new StructConverter(schema); - case LIST: - return new ListConverter(schema); - case MAP: - return new MapConverter(schema); - default: - throw new IllegalArgumentException("Unhandled type " + schema); } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java new file mode 100644 index 000000000000..a4795a07fb81 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import com.google.common.collect.Lists; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.orc.OrcValReader; +import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.types.Types; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.UTF8String; + + +class SparkOrcValueReaders { + private SparkOrcValueReaders() { + } + + static OrcValReader strings() { + return StringReader.INSTANCE; + } + + static OrcValReader timestampTzs() { + return TimestampTzReader.INSTANCE; + } + + static OrcValReader struct( + List> readers, Types.StructType struct, Map idToConstant) { + return new StructReader(readers, struct, idToConstant); + } + + static OrcValReader array(OrcValReader elementReader) { + return new ArrayReader(elementReader); + } + + static OrcValReader map(OrcValReader keyReader, OrcValReader valueReader) { + return new MapReader(keyReader, valueReader); + } + + private static class ArrayReader implements OrcValReader { + private final OrcValReader elementReader; + private final List reusedList = Lists.newArrayList(); + + private ArrayReader(OrcValReader elementReader) { + this.elementReader = elementReader; + } + + @Override + public ArrayData nonNullRead(ColumnVector vector, int row) { + reusedList.clear(); + ListColumnVector listVector = (ListColumnVector) vector; + int offset = (int) listVector.offsets[row]; + int length = (int) listVector.lengths[row]; + for (int c = 0; c < length; ++c) { + reusedList.add(elementReader.read(listVector.child, offset + c)); + } + return new GenericArrayData(reusedList.toArray()); + } + } + + private static class MapReader implements OrcValReader { + private final OrcValReader keyReader; + private final OrcValReader valueReader; + + private final List reusedKeyList = Lists.newArrayList(); + private final List reusedValueList = Lists.newArrayList(); + + private MapReader(OrcValReader keyReader, OrcValReader valueReader) { + this.keyReader = keyReader; + this.valueReader = valueReader; + } + + @Override + public MapData nonNullRead(ColumnVector vector, int row) { + reusedKeyList.clear(); + reusedValueList.clear(); + MapColumnVector mapVector = (MapColumnVector) vector; + int offset = (int) mapVector.offsets[row]; + long length = mapVector.lengths[row]; + for (int c = 0; c < length; c++) { + reusedKeyList.add(keyReader.read(mapVector.keys, offset + c)); + reusedValueList.add(valueReader.read(mapVector.values, offset + c)); + } + + return new ArrayBasedMapData( + new GenericArrayData(reusedKeyList.toArray()), + new GenericArrayData(reusedValueList.toArray())); + } + } + + static class StructReader extends OrcValueReaders.StructReader { + private final int numFields; + private InternalRow internalRow; + + protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { + super(readers, struct, idToConstant); + this.numFields = readers.size(); + } + + @Override + protected InternalRow create() { + return new GenericInternalRow(numFields); + } + + @Override + protected InternalRow reuseOrCreate() { + if (internalRow == null) { + internalRow = new GenericInternalRow(numFields); + } + return internalRow; + } + + @Override + protected void set(InternalRow struct, int pos, Object value) { + if (value != null) { + struct.update(pos, value); + } else { + struct.setNullAt(pos); + } + } + } + + private static class StringReader implements OrcValReader { + private static final StringReader INSTANCE = new StringReader(); + + private StringReader() { + } + + @Override + public UTF8String nonNullRead(ColumnVector vector, int row) { + BytesColumnVector bytesVector = (BytesColumnVector) vector; + return UTF8String.fromBytes(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); + } + } + + private static class TimestampTzReader implements OrcValReader { + private static final TimestampTzReader INSTANCE = new TimestampTzReader(); + + private TimestampTzReader() { + } + + @Override + public Long nonNullRead(ColumnVector vector, int row) { + TimestampColumnVector timestampVector = (TimestampColumnVector) vector; + return (timestampVector.time[row] / 1000) * 1_000_000 + timestampVector.nanos[row] / 1000; + } + } + + static class Decimal18Reader implements OrcValReader { + //TODO: these are being unused. check for bug + private final int precision; + private final int scale; + + Decimal18Reader(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public Decimal nonNullRead(ColumnVector vector, int row) { + HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row]; + return new Decimal().set(value.serialize64(value.scale()), value.precision(), value.scale()); + } + } + + static class Decimal38Reader implements OrcValReader { + private final int precision; + private final int scale; + + Decimal38Reader(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public Decimal nonNullRead(ColumnVector vector, int row) { + BigDecimal value = ((DecimalColumnVector) vector).vector[row] + .getHiveDecimal().bigDecimalValue(); + return new Decimal().set(new scala.math.BigDecimal(value), precision, scale); + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index f4ea6c7ee15f..05802ff2cb4d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -67,7 +67,8 @@ import scala.collection.JavaConverters; class RowDataReader extends BaseDataReader { - private static final Set SUPPORTS_CONSTANTS = Sets.newHashSet(FileFormat.AVRO, FileFormat.PARQUET); + private static final Set SUPPORTS_CONSTANTS = Sets.newHashSet( + FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC); // for some reason, the apply method can't be called from Java without reflection private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") .impl(UnsafeProjection.class, InternalRow.class) @@ -143,7 +144,7 @@ private CloseableIterable open(FileScanTask task, Schema readSchema break; case ORC: - iter = newOrcIterable(location, task, readSchema); + iter = newOrcIterable(location, task, readSchema, idToConstant); break; default: @@ -185,11 +186,12 @@ private CloseableIterable newParquetIterable( private CloseableIterable newOrcIterable( InputFile location, FileScanTask task, - Schema readSchema) { + Schema readSchema, + Map idToConstant) { return ORC.read(location) .project(readSchema) .split(task.start(), task.length()) - .createReaderFunc(SparkOrcReader::new) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) .caseSensitive(caseSensitive) .build(); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java index 071c903fd06f..6a588504065a 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java @@ -71,7 +71,7 @@ private void writeAndValidateRecords(Schema schema, Iterable expect try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) .project(schema) - .createReaderFunc(SparkOrcReader::new) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(schema, readOrcSchema)) .build()) { final Iterator actualRows = reader.iterator(); final Iterator expectedRows = expected.iterator(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java index e6c7621de2b0..44686d69d789 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java @@ -41,7 +41,6 @@ import org.apache.spark.sql.SparkSession; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -311,8 +310,6 @@ public void testPartitionValueTypes() throws Exception { @Test public void testNestedPartitionValues() throws Exception { - Assume.assumeTrue("ORC can't project nested partition values", !format.equalsIgnoreCase("orc")); - String[] columnNames = new String[] { "b", "i", "l", "f", "d", "date", "ts", "s", "bytes", "dec_9_0", "dec_11_2", "dec_38_10" }; From 66edd0350cf0d6b13cfb92d54cf4de342955aec5 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 1 May 2020 08:09:15 -0700 Subject: [PATCH 2/5] Removed legacy code from Spark --- .../spark/source/PartitionRowConverter.java | 107 ------------------ 1 file changed, 107 deletions(-) delete mode 100644 spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java deleted file mode 100644 index 13996f959217..000000000000 --- a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.spark.source; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.function.Function; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.util.ByteBuffers; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.types.BinaryType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StringType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.unsafe.types.UTF8String; - -/** - * Objects of this class generate an {@link InternalRow} by utilizing the partition schema passed during construction. - */ -class PartitionRowConverter implements Function { - private final DataType[] types; - private final int[] positions; - private final Class[] javaTypes; - private final GenericInternalRow reusedRow; - - PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) { - StructType partitionType = SparkSchemaUtil.convert(partitionSchema); - StructField[] fields = partitionType.fields(); - - this.types = new DataType[fields.length]; - this.positions = new int[types.length]; - this.javaTypes = new Class[types.length]; - this.reusedRow = new GenericInternalRow(types.length); - - List partitionFields = spec.fields(); - for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) { - this.types[rowIndex] = fields[rowIndex].dataType(); - - int sourceId = partitionSchema.columns().get(rowIndex).fieldId(); - for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) { - PartitionField field = spec.fields().get(specIndex); - if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) { - positions[rowIndex] = specIndex; - javaTypes[rowIndex] = spec.javaClasses()[specIndex]; - break; - } - } - } - } - - @Override - public InternalRow apply(StructLike tuple) { - for (int i = 0; i < types.length; i += 1) { - Object value = tuple.get(positions[i], javaTypes[i]); - if (value != null) { - reusedRow.update(i, convert(value, types[i])); - } else { - reusedRow.setNullAt(i); - } - } - - return reusedRow; - } - - /** - * Converts the objects into instances used by Spark's InternalRow. - * - * @param value a data value - * @param type the Spark data type - * @return the value converted to the representation expected by Spark's InternalRow. - */ - private static Object convert(Object value, DataType type) { - if (type instanceof StringType) { - return UTF8String.fromString(value.toString()); - } else if (type instanceof BinaryType) { - return ByteBuffers.toByteArray((ByteBuffer) value); - } else if (type instanceof DecimalType) { - return Decimal.fromDecimal(value); - } - return value; - } -} From 4e7df0f3b9ebb9261bc161095d99ece434064ced Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Thu, 21 May 2020 22:15:36 -0700 Subject: [PATCH 3/5] address comments --- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 2 +- .../iceberg/orc/OrcSchemaWithTypeVisitor.java | 19 +++---- .../apache/iceberg/orc/OrcValueReaders.java | 57 +++---------------- .../iceberg/spark/data/SparkOrcReader.java | 15 ++--- .../spark/data/SparkOrcValueReaders.java | 33 ++++------- 5 files changed, 37 insertions(+), 89 deletions(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 19727a1bffdb..35e82c1b0ff9 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -503,7 +503,7 @@ private static Types.NestedField convertOrcToIceberg(TypeDescription orcType, St } } - static int getMaxIcebergId(TypeDescription originalOrcSchema) { + private static int getMaxIcebergId(TypeDescription originalOrcSchema) { int maxId = icebergID(originalOrcSchema).orElse(0); final List children = Optional.ofNullable(originalOrcSchema.getChildren()) .orElse(Collections.emptyList()); diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 477290974fb5..44ce80346239 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -35,27 +35,26 @@ public static T visit( public static T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeVisitor visitor) { switch (schema.getCategory()) { case STRUCT: - return visitRecord(iType.asStructType(), schema, visitor); + return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); case UNION: - // We don't have an answer for union types. - throw new IllegalArgumentException("Can't handle " + schema); + throw new UnsupportedOperationException("Cannot handle " + schema); case LIST: - Types.ListType list = iType.asListType(); - return visitor.array( + Types.ListType list = iType != null ? iType.asListType() : null; + return visitor.list( list, schema, visit(list.elementType(), schema.getChildren().get(0), visitor)); case MAP: - Types.MapType map = iType.asMapType(); + Types.MapType map = iType != null ? iType.asMapType() : null; return visitor.map( map, schema, - visit(map.keyType(), schema.getChildren().get(0), visitor), - visit(map.valueType(), schema.getChildren().get(1), visitor)); + visit(map != null ? map.keyType() : null, schema.getChildren().get(0), visitor), + visit(map != null ? map.valueType() : null, schema.getChildren().get(1), visitor)); default: - return visitor.primitive(iType.asPrimitiveType(), schema); + return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, schema); } } @@ -76,7 +75,7 @@ public T record(Types.StructType iStruct, TypeDescription record, List n return null; } - public T array(Types.ListType iList, TypeDescription array, T element) { + public T list(Types.ListType iList, TypeDescription array, T element) { return null; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java index 90a857073b2f..5b9c0c78137c 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java @@ -29,47 +29,38 @@ import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.StructColumnVector; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; public class OrcValueReaders { private OrcValueReaders() { } - public static OrcValReader booleans() { + public static OrcValReader booleans() { return BooleanReader.INSTANCE; } - public static OrcValReader shorts() { - return ShortReader.INSTANCE; - } - - public static OrcValReader ints() { + public static OrcValReader ints() { return IntegerReader.INSTANCE; } - public static OrcValReader longs() { + public static OrcValReader longs() { return LongReader.INSTANCE; } - public static OrcValReader floats() { + public static OrcValReader floats() { return FloatReader.INSTANCE; } - public static OrcValReader doubles() { + public static OrcValReader doubles() { return DoubleReader.INSTANCE; } - public static OrcValReader bytes() { + public static OrcValReader bytes() { return BytesReader.INSTANCE; } - public static OrcValReader byteReader() { - return ByteReader.INSTANCE; - } - private static class BooleanReader implements OrcValReader { - static final OrcValReader INSTANCE = new BooleanReader(); + static final BooleanReader INSTANCE = new BooleanReader(); private BooleanReader() { } @@ -80,20 +71,8 @@ public Boolean nonNullRead(ColumnVector vector, int row) { } } - private static class ShortReader implements OrcValReader { - static final OrcValReader INSTANCE = new ShortReader(); - - private ShortReader() { - } - - @Override - public Short nonNullRead(ColumnVector vector, int row) { - return (short) ((LongColumnVector) vector).vector[row]; - } - } - private static class IntegerReader implements OrcValReader { - static final OrcValReader INSTANCE = new IntegerReader(); + static final IntegerReader INSTANCE = new IntegerReader(); private IntegerReader() { } @@ -105,7 +84,7 @@ public Integer nonNullRead(ColumnVector vector, int row) { } private static class LongReader implements OrcValReader { - static final OrcValReader INSTANCE = new LongReader(); + static final LongReader INSTANCE = new LongReader(); private LongReader() { } @@ -140,18 +119,6 @@ public Double nonNullRead(ColumnVector vector, int row) { } } - private static class ByteReader implements OrcValReader { - private static final ByteReader INSTANCE = new ByteReader(); - - private ByteReader() { - } - - @Override - public Byte nonNullRead(ColumnVector vector, int row) { - return (byte) ((LongColumnVector) vector).vector[row]; - } - } - private static class BytesReader implements OrcValReader { private static final BytesReader INSTANCE = new BytesReader(); @@ -198,8 +165,6 @@ protected StructReader(List> readers, Types.StructType struct, M protected abstract T create(); - protected abstract T reuseOrCreate(); - protected abstract void set(T struct, int pos, Object value); public OrcValReader reader(int pos) { @@ -212,10 +177,6 @@ public T nonNullRead(ColumnVector vector, int row) { return readInternal(create(), structVector.fields, row); } - public T read(VectorizedRowBatch batch, int row) { - return readInternal(reuseOrCreate(), batch.cols, row); - } - private T readInternal(T struct, ColumnVector[] columnVectors, int row) { for (int c = 0; c < readers.length; ++c) { set(struct, c, reader(c).read(columnVectors[c], row)); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 6241eec49107..8b6d1dcb8008 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -29,12 +29,13 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.Decimal; /** - * Converts the OrcInterator, which returns ORC's VectorizedRowBatch to a + * Converts the OrcIterator, which returns ORC's VectorizedRowBatch to a * set of Spark's UnsafeRows. * * It minimizes allocations by reusing most of the objects in the implementation. @@ -49,13 +50,13 @@ public SparkOrcReader(org.apache.iceberg.Schema expectedSchema, TypeDescription @SuppressWarnings("unchecked") public SparkOrcReader( org.apache.iceberg.Schema expectedSchema, TypeDescription readOrcSchema, Map idToConstant) { - reader = (SparkOrcValueReaders.StructReader) OrcSchemaWithTypeVisitor.visit( + this.reader = (SparkOrcValueReaders.StructReader) OrcSchemaWithTypeVisitor.visit( expectedSchema, readOrcSchema, new ReadBuilder(idToConstant)); } @Override public InternalRow read(VectorizedRowBatch batch, int row) { - return reader.read(batch, row); + return reader.read(new StructColumnVector(batch.size, batch.cols), row); } private static class ReadBuilder extends OrcSchemaWithTypeVisitor> { @@ -72,7 +73,7 @@ public OrcValReader record( } @Override - public OrcValReader array(Types.ListType iList, TypeDescription array, OrcValReader elementReader) { + public OrcValReader list(Types.ListType iList, TypeDescription array, OrcValReader elementReader) { return SparkOrcValueReaders.array(elementReader); } @@ -88,9 +89,9 @@ public OrcValReader primitive(Type.PrimitiveType iPrimitive, TypeDescription case BOOLEAN: return OrcValueReaders.booleans(); case BYTE: - return OrcValueReaders.byteReader(); + // Iceberg does not have a byte type. Use int case SHORT: - return OrcValueReaders.shorts(); + // Iceberg does not have a short type. Use int case DATE: case INT: return OrcValueReaders.ints(); @@ -111,7 +112,7 @@ public OrcValReader primitive(Type.PrimitiveType iPrimitive, TypeDescription case CHAR: case VARCHAR: case STRING: - return SparkOrcValueReaders.strings(); + return SparkOrcValueReaders.utf8String(); case BINARY: return OrcValueReaders.bytes(); default: diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index a4795a07fb81..3a25a2fb1af4 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -47,7 +47,7 @@ class SparkOrcValueReaders { private SparkOrcValueReaders() { } - static OrcValReader strings() { + static OrcValReader utf8String() { return StringReader.INSTANCE; } @@ -70,7 +70,6 @@ static OrcValReader map(OrcValReader keyReader, OrcValReader valueReade private static class ArrayReader implements OrcValReader { private final OrcValReader elementReader; - private final List reusedList = Lists.newArrayList(); private ArrayReader(OrcValReader elementReader) { this.elementReader = elementReader; @@ -78,14 +77,14 @@ private ArrayReader(OrcValReader elementReader) { @Override public ArrayData nonNullRead(ColumnVector vector, int row) { - reusedList.clear(); ListColumnVector listVector = (ListColumnVector) vector; int offset = (int) listVector.offsets[row]; int length = (int) listVector.lengths[row]; + List elements = Lists.newArrayListWithExpectedSize(length); for (int c = 0; c < length; ++c) { - reusedList.add(elementReader.read(listVector.child, offset + c)); + elements.add(elementReader.read(listVector.child, offset + c)); } - return new GenericArrayData(reusedList.toArray()); + return new GenericArrayData(elements.toArray()); } } @@ -93,9 +92,6 @@ private static class MapReader implements OrcValReader { private final OrcValReader keyReader; private final OrcValReader valueReader; - private final List reusedKeyList = Lists.newArrayList(); - private final List reusedValueList = Lists.newArrayList(); - private MapReader(OrcValReader keyReader, OrcValReader valueReader) { this.keyReader = keyReader; this.valueReader = valueReader; @@ -103,25 +99,24 @@ private MapReader(OrcValReader keyReader, OrcValReader valueReader) { @Override public MapData nonNullRead(ColumnVector vector, int row) { - reusedKeyList.clear(); - reusedValueList.clear(); MapColumnVector mapVector = (MapColumnVector) vector; int offset = (int) mapVector.offsets[row]; long length = mapVector.lengths[row]; + List keys = Lists.newArrayListWithExpectedSize((int) length); + List values = Lists.newArrayListWithExpectedSize((int) length); for (int c = 0; c < length; c++) { - reusedKeyList.add(keyReader.read(mapVector.keys, offset + c)); - reusedValueList.add(valueReader.read(mapVector.values, offset + c)); + keys.add(keyReader.read(mapVector.keys, offset + c)); + values.add(valueReader.read(mapVector.values, offset + c)); } return new ArrayBasedMapData( - new GenericArrayData(reusedKeyList.toArray()), - new GenericArrayData(reusedValueList.toArray())); + new GenericArrayData(keys.toArray()), + new GenericArrayData(values.toArray())); } } static class StructReader extends OrcValueReaders.StructReader { private final int numFields; - private InternalRow internalRow; protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { super(readers, struct, idToConstant); @@ -133,14 +128,6 @@ protected InternalRow create() { return new GenericInternalRow(numFields); } - @Override - protected InternalRow reuseOrCreate() { - if (internalRow == null) { - internalRow = new GenericInternalRow(numFields); - } - return internalRow; - } - @Override protected void set(InternalRow struct, int pos, Object value) { if (value != null) { From cb684b7468710dbcaf22e6c25986ab0f8e5e2026 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 22 May 2020 00:36:56 -0700 Subject: [PATCH 4/5] Rename OrcValReader to OrcValueReader --- .../iceberg/data/orc/GenericOrcReader.java | 6 +-- .../main/java/org/apache/iceberg/orc/ORC.java | 4 +- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 5 +-- .../org/apache/iceberg/orc/OrcIterable.java | 8 ++-- .../{OrcValReader.java => OrcRowReader.java} | 20 +++++----- .../apache/iceberg/orc/OrcValueReader.java | 20 +++++----- .../apache/iceberg/orc/OrcValueReaders.java | 38 +++++++++---------- .../iceberg/spark/data/SparkOrcReader.java | 25 ++++++------ .../spark/data/SparkOrcValueReaders.java | 38 +++++++++---------- 9 files changed, 81 insertions(+), 83 deletions(-) rename orc/src/main/java/org/apache/iceberg/orc/{OrcValReader.java => OrcRowReader.java} (69%) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java index 2db582295b8c..a2807c13bb20 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java @@ -39,7 +39,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.orc.ORCSchemaUtil; -import org.apache.iceberg.orc.OrcValueReader; +import org.apache.iceberg.orc.OrcRowReader; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -56,7 +56,7 @@ /** * ORC reader for Generic Record. */ -public class GenericOrcReader implements OrcValueReader { +public class GenericOrcReader implements OrcRowReader { private final Schema schema; private final List columns; @@ -82,7 +82,7 @@ private Converter[] buildConverters() { return newConverters; } - public static OrcValueReader buildReader(Schema expectedSchema, TypeDescription fileSchema) { + public static OrcRowReader buildReader(Schema expectedSchema, TypeDescription fileSchema) { return new GenericOrcReader(expectedSchema, fileSchema); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 7236a3c7a9cf..ce7dba6a4b83 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -123,7 +123,7 @@ public static class ReadBuilder { private Long start = null; private Long length = null; - private Function> readerFunc; + private Function> readerFunc; private ReadBuilder(InputFile file) { Preconditions.checkNotNull(file, "Input file cannot be null"); @@ -163,7 +163,7 @@ public ReadBuilder config(String property, String value) { return this; } - public ReadBuilder createReaderFunc(Function> readerFunction) { + public ReadBuilder createReaderFunc(Function> readerFunction) { this.readerFunc = readerFunction; return this; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 35e82c1b0ff9..5449f0b64d76 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -67,7 +67,7 @@ public TypeDescription type() { } } - static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; + private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required"; /** @@ -214,7 +214,6 @@ public static Schema convert(TypeDescription orcSchema) { "Error in ORC file, children fields and names do not match."); List icebergFields = Lists.newArrayListWithExpectedSize(children.size()); - // TODO how we get field ids from ORC schema AtomicInteger lastColumnId = new AtomicInteger(getMaxIcebergId(orcSchema)); for (int i = 0; i < children.size(); i++) { icebergFields.add(convertOrcToIceberg(children.get(i), childrenNames.get(i), @@ -378,7 +377,7 @@ private static boolean isSameType(TypeDescription orcType, Type icebergType) { } } - static Optional icebergID(TypeDescription orcType) { + private static Optional icebergID(TypeDescription orcType) { return Optional.ofNullable(orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE)) .map(Integer::parseInt); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java index 77eb24c5bead..ee0d79ac8228 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java @@ -42,11 +42,11 @@ class OrcIterable extends CloseableGroup implements CloseableIterable { private final InputFile file; private final Long start; private final Long length; - private final Function> readerFunction; + private final Function> readerFunction; OrcIterable(InputFile file, Configuration config, Schema schema, Long start, Long length, - Function> readerFunction) { + Function> readerFunction) { this.schema = schema; this.readerFunction = readerFunction; this.file = file; @@ -91,9 +91,9 @@ private static class OrcIterator implements Iterator { private VectorizedRowBatch current; private final VectorizedRowBatchIterator batchIter; - private final OrcValueReader reader; + private final OrcRowReader reader; - OrcIterator(VectorizedRowBatchIterator batchIter, OrcValueReader reader) { + OrcIterator(VectorizedRowBatchIterator batchIter, OrcRowReader reader) { this.batchIter = batchIter; this.reader = reader; current = null; diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValReader.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowReader.java similarity index 69% rename from orc/src/main/java/org/apache/iceberg/orc/OrcValReader.java rename to orc/src/main/java/org/apache/iceberg/orc/OrcRowReader.java index 565901852fe9..36304acda6b5 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValReader.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowReader.java @@ -19,18 +19,16 @@ package org.apache.iceberg.orc; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +/** + * Used for implementing ORC row readers. + */ +public interface OrcRowReader { -public interface OrcValReader { - default T read(ColumnVector vector, int row) { - int rowIndex = vector.isRepeating ? 0 : row; - if (!vector.noNulls && vector.isNull[rowIndex]) { - return null; - } else { - return nonNullRead(vector, rowIndex); - } - } + /** + * Reads a row. + */ + T read(VectorizedRowBatch batch, int row); - T nonNullRead(ColumnVector vector, int row); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java index cfc9ebb8afc3..90ec82d5c71d 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java @@ -19,16 +19,18 @@ package org.apache.iceberg.orc; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; -/** - * Used for implementing ORC value readers. - */ -public interface OrcValueReader { - /** - * Reads a value in row. - */ - T read(VectorizedRowBatch batch, int row); +public interface OrcValueReader { + default T read(ColumnVector vector, int row) { + int rowIndex = vector.isRepeating ? 0 : row; + if (!vector.noNulls && vector.isNull[rowIndex]) { + return null; + } else { + return nonNullRead(vector, rowIndex); + } + } + T nonNullRead(ColumnVector vector, int row); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java index 5b9c0c78137c..b84c3e3c3c93 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java @@ -35,31 +35,31 @@ public class OrcValueReaders { private OrcValueReaders() { } - public static OrcValReader booleans() { + public static OrcValueReader booleans() { return BooleanReader.INSTANCE; } - public static OrcValReader ints() { + public static OrcValueReader ints() { return IntegerReader.INSTANCE; } - public static OrcValReader longs() { + public static OrcValueReader longs() { return LongReader.INSTANCE; } - public static OrcValReader floats() { + public static OrcValueReader floats() { return FloatReader.INSTANCE; } - public static OrcValReader doubles() { + public static OrcValueReader doubles() { return DoubleReader.INSTANCE; } - public static OrcValReader bytes() { + public static OrcValueReader bytes() { return BytesReader.INSTANCE; } - private static class BooleanReader implements OrcValReader { + private static class BooleanReader implements OrcValueReader { static final BooleanReader INSTANCE = new BooleanReader(); private BooleanReader() { @@ -71,7 +71,7 @@ public Boolean nonNullRead(ColumnVector vector, int row) { } } - private static class IntegerReader implements OrcValReader { + private static class IntegerReader implements OrcValueReader { static final IntegerReader INSTANCE = new IntegerReader(); private IntegerReader() { @@ -83,7 +83,7 @@ public Integer nonNullRead(ColumnVector vector, int row) { } } - private static class LongReader implements OrcValReader { + private static class LongReader implements OrcValueReader { static final LongReader INSTANCE = new LongReader(); private LongReader() { @@ -95,7 +95,7 @@ public Long nonNullRead(ColumnVector vector, int row) { } } - private static class FloatReader implements OrcValReader { + private static class FloatReader implements OrcValueReader { private static final FloatReader INSTANCE = new FloatReader(); private FloatReader() { @@ -107,7 +107,7 @@ public Float nonNullRead(ColumnVector vector, int row) { } } - private static class DoubleReader implements OrcValReader { + private static class DoubleReader implements OrcValueReader { private static final DoubleReader INSTANCE = new DoubleReader(); private DoubleReader() { @@ -119,7 +119,7 @@ public Double nonNullRead(ColumnVector vector, int row) { } } - private static class BytesReader implements OrcValReader { + private static class BytesReader implements OrcValueReader { private static final BytesReader INSTANCE = new BytesReader(); private BytesReader() { @@ -134,19 +134,19 @@ public byte[] nonNullRead(ColumnVector vector, int row) { } } - public abstract static class StructReader implements OrcValReader { - private final OrcValReader[] readers; + public abstract static class StructReader implements OrcValueReader { + private final OrcValueReader[] readers; private final int[] positions; private final Object[] constants; - protected StructReader(List> readers) { - this.readers = readers.toArray(new OrcValReader[0]); + protected StructReader(List> readers) { + this.readers = readers.toArray(new OrcValueReader[0]); this.positions = new int[0]; this.constants = new Object[0]; } - protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { - this.readers = readers.toArray(new OrcValReader[0]); + protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { + this.readers = readers.toArray(new OrcValueReader[0]); List fields = struct.fields(); List positionList = Lists.newArrayListWithCapacity(fields.size()); List constantList = Lists.newArrayListWithCapacity(fields.size()); @@ -167,7 +167,7 @@ protected StructReader(List> readers, Types.StructType struct, M protected abstract void set(T struct, int pos, Object value); - public OrcValReader reader(int pos) { + public OrcValueReader reader(int pos) { return readers[pos]; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 8b6d1dcb8008..20f41cc26a33 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -22,8 +22,8 @@ import com.google.common.collect.ImmutableMap; import java.util.List; import java.util.Map; +import org.apache.iceberg.orc.OrcRowReader; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; -import org.apache.iceberg.orc.OrcValReader; import org.apache.iceberg.orc.OrcValueReader; import org.apache.iceberg.orc.OrcValueReaders; import org.apache.iceberg.types.Type; @@ -40,8 +40,8 @@ * * It minimizes allocations by reusing most of the objects in the implementation. */ -public class SparkOrcReader implements OrcValueReader { - private final SparkOrcValueReaders.StructReader reader; +public class SparkOrcReader implements OrcRowReader { + private final OrcValueReader reader; public SparkOrcReader(org.apache.iceberg.Schema expectedSchema, TypeDescription readSchema) { this(expectedSchema, readSchema, ImmutableMap.of()); @@ -50,16 +50,15 @@ public SparkOrcReader(org.apache.iceberg.Schema expectedSchema, TypeDescription @SuppressWarnings("unchecked") public SparkOrcReader( org.apache.iceberg.Schema expectedSchema, TypeDescription readOrcSchema, Map idToConstant) { - this.reader = (SparkOrcValueReaders.StructReader) OrcSchemaWithTypeVisitor.visit( - expectedSchema, readOrcSchema, new ReadBuilder(idToConstant)); + this.reader = OrcSchemaWithTypeVisitor.visit(expectedSchema, readOrcSchema, new ReadBuilder(idToConstant)); } @Override public InternalRow read(VectorizedRowBatch batch, int row) { - return reader.read(new StructColumnVector(batch.size, batch.cols), row); + return (InternalRow) reader.read(new StructColumnVector(batch.size, batch.cols), row); } - private static class ReadBuilder extends OrcSchemaWithTypeVisitor> { + private static class ReadBuilder extends OrcSchemaWithTypeVisitor> { private final Map idToConstant; private ReadBuilder(Map idToConstant) { @@ -67,24 +66,24 @@ private ReadBuilder(Map idToConstant) { } @Override - public OrcValReader record( - Types.StructType expected, TypeDescription record, List names, List> fields) { + public OrcValueReader record( + Types.StructType expected, TypeDescription record, List names, List> fields) { return SparkOrcValueReaders.struct(fields, expected, idToConstant); } @Override - public OrcValReader list(Types.ListType iList, TypeDescription array, OrcValReader elementReader) { + public OrcValueReader list(Types.ListType iList, TypeDescription array, OrcValueReader elementReader) { return SparkOrcValueReaders.array(elementReader); } @Override - public OrcValReader map( - Types.MapType iMap, TypeDescription map, OrcValReader keyReader, OrcValReader valueReader) { + public OrcValueReader map( + Types.MapType iMap, TypeDescription map, OrcValueReader keyReader, OrcValueReader valueReader) { return SparkOrcValueReaders.map(keyReader, valueReader); } @Override - public OrcValReader primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { switch (primitive.getCategory()) { case BOOLEAN: return OrcValueReaders.booleans(); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 3a25a2fb1af4..93c5d643cbc7 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -23,7 +23,7 @@ import java.math.BigDecimal; import java.util.List; import java.util.Map; -import org.apache.iceberg.orc.OrcValReader; +import org.apache.iceberg.orc.OrcValueReader; import org.apache.iceberg.orc.OrcValueReaders; import org.apache.iceberg.types.Types; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -47,31 +47,31 @@ class SparkOrcValueReaders { private SparkOrcValueReaders() { } - static OrcValReader utf8String() { + static OrcValueReader utf8String() { return StringReader.INSTANCE; } - static OrcValReader timestampTzs() { + static OrcValueReader timestampTzs() { return TimestampTzReader.INSTANCE; } - static OrcValReader struct( - List> readers, Types.StructType struct, Map idToConstant) { + static OrcValueReader struct( + List> readers, Types.StructType struct, Map idToConstant) { return new StructReader(readers, struct, idToConstant); } - static OrcValReader array(OrcValReader elementReader) { + static OrcValueReader array(OrcValueReader elementReader) { return new ArrayReader(elementReader); } - static OrcValReader map(OrcValReader keyReader, OrcValReader valueReader) { + static OrcValueReader map(OrcValueReader keyReader, OrcValueReader valueReader) { return new MapReader(keyReader, valueReader); } - private static class ArrayReader implements OrcValReader { - private final OrcValReader elementReader; + private static class ArrayReader implements OrcValueReader { + private final OrcValueReader elementReader; - private ArrayReader(OrcValReader elementReader) { + private ArrayReader(OrcValueReader elementReader) { this.elementReader = elementReader; } @@ -88,11 +88,11 @@ public ArrayData nonNullRead(ColumnVector vector, int row) { } } - private static class MapReader implements OrcValReader { - private final OrcValReader keyReader; - private final OrcValReader valueReader; + private static class MapReader implements OrcValueReader { + private final OrcValueReader keyReader; + private final OrcValueReader valueReader; - private MapReader(OrcValReader keyReader, OrcValReader valueReader) { + private MapReader(OrcValueReader keyReader, OrcValueReader valueReader) { this.keyReader = keyReader; this.valueReader = valueReader; } @@ -118,7 +118,7 @@ public MapData nonNullRead(ColumnVector vector, int row) { static class StructReader extends OrcValueReaders.StructReader { private final int numFields; - protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { + protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { super(readers, struct, idToConstant); this.numFields = readers.size(); } @@ -138,7 +138,7 @@ protected void set(InternalRow struct, int pos, Object value) { } } - private static class StringReader implements OrcValReader { + private static class StringReader implements OrcValueReader { private static final StringReader INSTANCE = new StringReader(); private StringReader() { @@ -151,7 +151,7 @@ public UTF8String nonNullRead(ColumnVector vector, int row) { } } - private static class TimestampTzReader implements OrcValReader { + private static class TimestampTzReader implements OrcValueReader { private static final TimestampTzReader INSTANCE = new TimestampTzReader(); private TimestampTzReader() { @@ -164,7 +164,7 @@ public Long nonNullRead(ColumnVector vector, int row) { } } - static class Decimal18Reader implements OrcValReader { + static class Decimal18Reader implements OrcValueReader { //TODO: these are being unused. check for bug private final int precision; private final int scale; @@ -181,7 +181,7 @@ public Decimal nonNullRead(ColumnVector vector, int row) { } } - static class Decimal38Reader implements OrcValReader { + static class Decimal38Reader implements OrcValueReader { private final int precision; private final int scale; From b973d4b5e1a08ca6a9feed36cc6402eee71a30f5 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Fri, 22 May 2020 07:52:49 -0700 Subject: [PATCH 5/5] Resolve conflicts --- .../iceberg/spark/source/RowDataReader.java | 27 ++----------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 05802ff2cb4d..a025e34adf3c 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.List; @@ -33,7 +32,6 @@ import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -59,7 +57,6 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.AttributeReference; -import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructType; @@ -67,8 +64,6 @@ import scala.collection.JavaConverters; class RowDataReader extends BaseDataReader { - private static final Set SUPPORTS_CONSTANTS = Sets.newHashSet( - FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC); // for some reason, the apply method can't be called from Java without reflection private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") .impl(UnsafeProjection.class, InternalRow.class) @@ -101,27 +96,9 @@ CloseableIterator open(FileScanTask task) { boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); if (projectsIdentityPartitionColumns) { - if (SUPPORTS_CONSTANTS.contains(file.format())) { - return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant)) - .iterator(); - } - - // schema used to read data files - Schema readSchema = TypeUtil.selectNot(expectedSchema, idColumns); - PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); - JoinedRow joined = new JoinedRow(); - - // create joined rows and project from the joined schema to the final schema - Schema joinedSchema = TypeUtil.join(readSchema, partitionSchema); - InternalRow partition = convertToRow.apply(file.partition()); - joined.withRight(partition); - - CloseableIterable transformedIterable = CloseableIterable.transform( - CloseableIterable.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft), - APPLY_PROJECTION.bind(projection(expectedSchema, joinedSchema))::invoke); - return transformedIterable.iterator(); + return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant)) + .iterator(); } - // return the base iterator return open(task, expectedSchema, ImmutableMap.of()).iterator(); }