From b4164d9c3c0f765e6beaaf1c4a5f68e0ac81fcd9 Mon Sep 17 00:00:00 2001 From: openinx Date: Mon, 13 Jul 2020 20:14:34 +0800 Subject: [PATCH 01/15] Refactor the GenericOrcWriter by using OrcSchemaWithTypeVisitor#visit --- .../iceberg/data/orc/GenericOrcWriter.java | 671 +++--------------- .../iceberg/data/orc/GenericOrcWriters.java | 612 ++++++++++++++++ .../apache/iceberg/data/TestLocalScan.java | 2 +- .../data/TestMetricsRowGroupFilter.java | 2 +- .../data/TestMetricsRowGroupFilterTypes.java | 2 +- .../iceberg/data/orc/TestGenericData.java | 4 +- .../data/orc/TestGenericReadProjection.java | 2 +- .../apache/iceberg/orc/TestOrcMetrics.java | 2 +- .../apache/iceberg/mr/mapred/TestHelpers.java | 2 +- .../mr/mapreduce/TestIcebergInputFormat.java | 2 +- .../spark/source/TestSparkReadProjection.java | 2 +- .../spark/source/TestFilteredScan.java | 2 +- 12 files changed, 723 insertions(+), 582 deletions(-) create mode 100644 data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 1bf466df5cb3..bc9f0198992e 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -19,590 +19,119 @@ package org.apache.iceberg.data.orc; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; -import java.util.Map; -import java.util.UUID; +import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; import org.apache.iceberg.orc.ORCSchemaUtil; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.orc.OrcValueWriter; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; -import org.apache.orc.storage.common.type.HiveDecimal; -import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; -import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; -import org.apache.orc.storage.ql.exec.vector.ListColumnVector; -import org.apache.orc.storage.ql.exec.vector.LongColumnVector; -import org.apache.orc.storage.ql.exec.vector.MapColumnVector; -import org.apache.orc.storage.ql.exec.vector.StructColumnVector; -import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; public class GenericOrcWriter implements OrcValueWriter { - private final Converter[] converters; - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - - private GenericOrcWriter(TypeDescription schema) { - this.converters = buildConverters(schema); - } - - public static OrcValueWriter buildWriter(TypeDescription fileSchema) { - return new GenericOrcWriter(fileSchema); + private final GenericOrcWriters.Converter converter; + + private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) { + Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, + "Top level must be a struct " + orcSchema); + + converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); + } + + public static OrcValueWriter buildWriter(Schema expectedSchema, TypeDescription fileSchema) { + return new GenericOrcWriter(expectedSchema, fileSchema); + } + + private static class WriteBuilder extends OrcSchemaWithTypeVisitor { + private WriteBuilder() { + } + + public GenericOrcWriters.Converter record(Types.StructType iStruct, TypeDescription record, + List names, List fields) { + return new GenericOrcWriters.RecordConverter(fields); + } + + public GenericOrcWriters.Converter list(Types.ListType iList, TypeDescription array, + GenericOrcWriters.Converter element) { + return new GenericOrcWriters.ListConverter(element); + } + + public GenericOrcWriters.Converter map(Types.MapType iMap, TypeDescription map, + GenericOrcWriters.Converter key, GenericOrcWriters.Converter value) { + return new GenericOrcWriters.MapConverter(key, value); + } + + public GenericOrcWriters.Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return GenericOrcWriters.booleans(); + case BYTE: + return GenericOrcWriters.bytes(); + case SHORT: + return GenericOrcWriters.shorts(); + case DATE: + return GenericOrcWriters.dates(); + case INT: + return GenericOrcWriters.ints(); + case LONG: + String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE); + ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG : + ORCSchemaUtil.LongType.valueOf(longAttributeValue); + switch (longType) { + case TIME: + return GenericOrcWriters.times(); + case LONG: + return GenericOrcWriters.longs(); + default: + throw new IllegalStateException("Unhandled Long type found in ORC type attribute: " + longType); + } + case FLOAT: + return GenericOrcWriters.floats(); + case DOUBLE: + return GenericOrcWriters.doubles(); + case BINARY: + String binaryAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE); + ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? ORCSchemaUtil.BinaryType.BINARY : + ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue); + switch (binaryType) { + case UUID: + return GenericOrcWriters.uuids(); + case FIXED: + return GenericOrcWriters.fixed(); + case BINARY: + return GenericOrcWriters.binary(); + default: + throw new IllegalStateException("Unhandled Binary type found in ORC type attribute: " + binaryType); + } + case STRING: + case CHAR: + case VARCHAR: + return GenericOrcWriters.strings(); + case DECIMAL: + return schema.getPrecision() <= 18 ? GenericOrcWriters.decimal18(schema) : + GenericOrcWriters.decimal38(schema); + case TIMESTAMP: + return GenericOrcWriters.timestamp(); + case TIMESTAMP_INSTANT: + return GenericOrcWriters.timestampTz(); + } + throw new IllegalArgumentException("Unhandled type " + schema); + } } @SuppressWarnings("unchecked") @Override - public void write(Record value, VectorizedRowBatch output) throws IOException { - int row = output.size++; - for (int c = 0; c < converters.length; ++c) { - converters[c].addValue(row, value.get(c, converters[c].getJavaClass()), output.cols[c]); - } - } - - /** - * The interface for the conversion from Spark's SpecializedGetters to - * ORC's ColumnVectors. - */ - interface Converter { - - Class getJavaClass(); - - /** - * Take a value from the Spark data value and add it to the ORC output. - * @param rowId the row in the ColumnVector - * @param data either an InternalRow or ArrayData - * @param output the ColumnVector to put the value into - */ - void addValue(int rowId, T data, ColumnVector output); - } - - static class BooleanConverter implements Converter { - @Override - public Class getJavaClass() { - return Boolean.class; - } - - @Override - public void addValue(int rowId, Boolean data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; - } - } - } - - static class ByteConverter implements Converter { - @Override - public Class getJavaClass() { - return Byte.class; - } - - @Override - public void addValue(int rowId, Byte data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class ShortConverter implements Converter { - @Override - public Class getJavaClass() { - return Short.class; - } - - @Override - public void addValue(int rowId, Short data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class IntConverter implements Converter { - @Override - public Class getJavaClass() { - return Integer.class; - } - - @Override - public void addValue(int rowId, Integer data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class TimeConverter implements Converter { - @Override - public Class getJavaClass() { - return LocalTime.class; - } - - @Override - public void addValue(int rowId, LocalTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; - } - } - } - - static class LongConverter implements Converter { - @Override - public Class getJavaClass() { - return Long.class; - } - - @Override - public void addValue(int rowId, Long data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class FloatConverter implements Converter { - @Override - public Class getJavaClass() { - return Float.class; - } - - @Override - public void addValue(int rowId, Float data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DoubleColumnVector) output).vector[rowId] = data; - } - } - } - - static class DoubleConverter implements Converter { - @Override - public Class getJavaClass() { - return Double.class; - } - - @Override - public void addValue(int rowId, Double data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DoubleColumnVector) output).vector[rowId] = data; - } - } - } - - static class StringConverter implements Converter { - @Override - public Class getJavaClass() { - return String.class; - } - - @Override - public void addValue(int rowId, String data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - byte[] value = data.getBytes(StandardCharsets.UTF_8); - ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); - } - } - } - - static class BytesConverter implements Converter { - @Override - public Class getJavaClass() { - return ByteBuffer.class; - } - - @Override - public void addValue(int rowId, ByteBuffer data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length); - } - } - } - - static class UUIDConverter implements Converter { - @Override - public Class getJavaClass() { - return UUID.class; - } - - @Override - public void addValue(int rowId, UUID data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.putLong(data.getMostSignificantBits()); - buffer.putLong(data.getLeastSignificantBits()); - ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); - } - } - } + public void write(Record value, VectorizedRowBatch output) { + Preconditions.checkArgument(converter instanceof GenericOrcWriters.RecordConverter, + "Converter must be a RecordConverter."); - static class FixedConverter implements Converter { - @Override - public Class getJavaClass() { - return byte[].class; - } - - @Override - public void addValue(int rowId, byte[] data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); - } - } - } - - static class DateConverter implements Converter { - @Override - public Class getJavaClass() { - return LocalDate.class; - } - - @Override - public void addValue(int rowId, LocalDate data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); - } - } - } - - static class TimestampTzConverter implements Converter { - @Override - public Class getJavaClass() { - return OffsetDateTime.class; - } - - @Override - public void addValue(int rowId, OffsetDateTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - TimestampColumnVector cv = (TimestampColumnVector) output; - cv.time[rowId] = data.toInstant().toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision - } - } - } - - static class TimestampConverter implements Converter { - - @Override - public Class getJavaClass() { - return LocalDateTime.class; - } - - @Override - public void addValue(int rowId, LocalDateTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - TimestampColumnVector cv = (TimestampColumnVector) output; - cv.setIsUTC(true); - cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision - } - } - } - - static class Decimal18Converter implements Converter { - private final int scale; - - Decimal18Converter(TypeDescription schema) { - this.scale = schema.getScale(); - } - - @Override - public Class getJavaClass() { - return BigDecimal.class; - } - - @Override - public void addValue(int rowId, BigDecimal data, ColumnVector output) { - // TODO: validate precision and scale from schema - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DecimalColumnVector) output).vector[rowId] - .setFromLongAndScale(data.unscaledValue().longValueExact(), scale); - } - } - } - - static class Decimal38Converter implements Converter { - Decimal38Converter(TypeDescription schema) { - } - - @Override - public Class getJavaClass() { - return BigDecimal.class; - } - - @Override - public void addValue(int rowId, BigDecimal data, ColumnVector output) { - // TODO: validate precision and scale from schema - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false)); - } - } - } - - static class StructConverter implements Converter { - private final Converter[] children; - - StructConverter(TypeDescription schema) { - this.children = new Converter[schema.getChildren().size()]; - for (int c = 0; c < children.length; ++c) { - children[c] = buildConverter(schema.getChildren().get(c)); - } - } - - @Override - public Class getJavaClass() { - return Record.class; - } - - @Override - @SuppressWarnings("unchecked") - public void addValue(int rowId, Record data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - StructColumnVector cv = (StructColumnVector) output; - for (int c = 0; c < children.length; ++c) { - children[c].addValue(rowId, data.get(c, children[c].getJavaClass()), cv.fields[c]); - } - } - } - } - - static class ListConverter implements Converter { - private final Converter children; - - ListConverter(TypeDescription schema) { - this.children = buildConverter(schema.getChildren().get(0)); - } - - @Override - public Class getJavaClass() { - return List.class; - } - - @Override - @SuppressWarnings("unchecked") - public void addValue(int rowId, List data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - List value = (List) data; - ListColumnVector cv = (ListColumnVector) output; - // record the length and start of the list elements - cv.lengths[rowId] = value.size(); - cv.offsets[rowId] = cv.childCount; - cv.childCount += cv.lengths[rowId]; - // make sure the child is big enough - cv.child.ensureSize(cv.childCount, true); - // Add each element - for (int e = 0; e < cv.lengths[rowId]; ++e) { - children.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); - } - } - } - } - - static class MapConverter implements Converter { - private final Converter keyConverter; - private final Converter valueConverter; - - MapConverter(TypeDescription schema) { - this.keyConverter = buildConverter(schema.getChildren().get(0)); - this.valueConverter = buildConverter(schema.getChildren().get(1)); - } - - @Override - public Class getJavaClass() { - return Map.class; - } - - @Override - @SuppressWarnings("unchecked") - public void addValue(int rowId, Map data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - Map map = (Map) data; - List keys = Lists.newArrayListWithExpectedSize(map.size()); - List values = Lists.newArrayListWithExpectedSize(map.size()); - for (Map.Entry entry : map.entrySet()) { - keys.add(entry.getKey()); - values.add(entry.getValue()); - } - MapColumnVector cv = (MapColumnVector) output; - // record the length and start of the list elements - cv.lengths[rowId] = map.size(); - cv.offsets[rowId] = cv.childCount; - cv.childCount += cv.lengths[rowId]; - // make sure the child is big enough - cv.keys.ensureSize(cv.childCount, true); - cv.values.ensureSize(cv.childCount, true); - // Add each element - for (int e = 0; e < cv.lengths[rowId]; ++e) { - int pos = (int) (e + cv.offsets[rowId]); - keyConverter.addValue(pos, keys.get(e), cv.keys); - valueConverter.addValue(pos, values.get(e), cv.values); - } - } - } - } - - private static Converter buildConverter(TypeDescription schema) { - switch (schema.getCategory()) { - case BOOLEAN: - return new BooleanConverter(); - case BYTE: - return new ByteConverter(); - case SHORT: - return new ShortConverter(); - case DATE: - return new DateConverter(); - case INT: - return new IntConverter(); - case LONG: - String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE); - ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG : - ORCSchemaUtil.LongType.valueOf(longAttributeValue); - switch (longType) { - case TIME: - return new TimeConverter(); - case LONG: - return new LongConverter(); - default: - throw new IllegalStateException("Unhandled Long type found in ORC type attribute: " + longType); - } - case FLOAT: - return new FloatConverter(); - case DOUBLE: - return new DoubleConverter(); - case BINARY: - String binaryAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE); - ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? ORCSchemaUtil.BinaryType.BINARY : - ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue); - switch (binaryType) { - case UUID: - return new UUIDConverter(); - case FIXED: - return new FixedConverter(); - case BINARY: - return new BytesConverter(); - default: - throw new IllegalStateException("Unhandled Binary type found in ORC type attribute: " + binaryType); - } - case STRING: - case CHAR: - case VARCHAR: - return new StringConverter(); - case DECIMAL: - return schema.getPrecision() <= 18 ? new Decimal18Converter(schema) : new Decimal38Converter(schema); - case TIMESTAMP: - return new TimestampConverter(); - case TIMESTAMP_INSTANT: - return new TimestampTzConverter(); - case STRUCT: - return new StructConverter(schema); - case LIST: - return new ListConverter(schema); - case MAP: - return new MapConverter(schema); - } - throw new IllegalArgumentException("Unhandled type " + schema); - } - - private static Converter[] buildConverters(TypeDescription schema) { - if (schema.getCategory() != TypeDescription.Category.STRUCT) { - throw new IllegalArgumentException("Top level must be a struct " + schema); - } - - List children = schema.getChildren(); - Converter[] result = new Converter[children.size()]; - for (int c = 0; c < children.size(); ++c) { - result[c] = buildConverter(children.get(c)); + int row = output.size++; + List converters = ((GenericOrcWriters.RecordConverter) converter).converters(); + for (int c = 0; c < converters.size(); ++c) { + converters.get(c).addValue(row, value.get(c, converters.get(c).getJavaClass()), output.cols[c]); } - return result; } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java new file mode 100644 index 000000000000..bf847aabc546 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.orc; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; + +public class GenericOrcWriters { + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + private GenericOrcWriters() { + } + + /** + * The interface for the conversion from Spark's SpecializedGetters to + * ORC's ColumnVectors. + */ + interface Converter { + + Class getJavaClass(); + + /** + * Take a value from the Spark data value and add it to the ORC output. + * + * @param rowId the row in the ColumnVector + * @param data either an InternalRow or ArrayData + * @param output the ColumnVector to put the value into + */ + void addValue(int rowId, T data, ColumnVector output); + } + + public static Converter booleans() { + return BooleanConverter.INSTANCE; + } + + public static Converter bytes() { + return ByteConverter.INSTANCE; + } + + public static Converter shorts() { + return ShortConverter.INSTANCE; + } + + public static Converter ints() { + return IntConverter.INSTANCE; + } + + public static Converter times() { + return TimeConverter.INSTANCE; + } + + public static Converter longs() { + return LongConverter.INSTANCE; + } + + public static Converter floats() { + return FloatConverter.INSTANCE; + } + + public static Converter doubles() { + return DoubleConverter.INSTANCE; + } + + public static Converter strings() { + return StringConverter.INSTANCE; + } + + public static Converter binary() { + return BytesConverter.INSTANCE; + } + + public static Converter uuids() { + return UUIDConverter.INSTANCE; + } + + public static Converter fixed() { + return FixedConverter.INSTANCE; + } + + public static Converter dates() { + return DateConverter.INSTANCE; + } + + public static Converter timestampTz() { + return TimestampTzConverter.INSTANCE; + } + + public static Converter timestamp() { + return TimestampConverter.INSTANCE; + } + + public static Converter decimal18(TypeDescription schema) { + return new Decimal18Converter(schema); + } + + public static Converter decimal38(TypeDescription schema) { + return Decimal38Converter.INSTANCE; + } + + private static class BooleanConverter implements Converter { + private static final Converter INSTANCE = new BooleanConverter(); + + @Override + public Class getJavaClass() { + return Boolean.class; + } + + @Override + public void addValue(int rowId, Boolean data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; + } + } + } + + private static class ByteConverter implements Converter { + private static final Converter INSTANCE = new ByteConverter(); + + @Override + public Class getJavaClass() { + return Byte.class; + } + + @Override + public void addValue(int rowId, Byte data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + private static class ShortConverter implements Converter { + private static final Converter INSTANCE = new ShortConverter(); + + @Override + public Class getJavaClass() { + return Short.class; + } + + @Override + public void addValue(int rowId, Short data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + private static class IntConverter implements Converter { + private static final Converter INSTANCE = new IntConverter(); + + @Override + public Class getJavaClass() { + return Integer.class; + } + + @Override + public void addValue(int rowId, Integer data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + private static class TimeConverter implements Converter { + private static final Converter INSTANCE = new TimeConverter(); + + @Override + public Class getJavaClass() { + return LocalTime.class; + } + + @Override + public void addValue(int rowId, LocalTime data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; + } + } + } + + private static class LongConverter implements Converter { + private static final Converter INSTANCE = new LongConverter(); + + @Override + public Class getJavaClass() { + return Long.class; + } + + @Override + public void addValue(int rowId, Long data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + private static class FloatConverter implements Converter { + private static final Converter INSTANCE = new FloatConverter(); + + @Override + public Class getJavaClass() { + return Float.class; + } + + @Override + public void addValue(int rowId, Float data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DoubleColumnVector) output).vector[rowId] = data; + } + } + } + + private static class DoubleConverter implements Converter { + private static final Converter INSTANCE = new DoubleConverter(); + + @Override + public Class getJavaClass() { + return Double.class; + } + + @Override + public void addValue(int rowId, Double data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DoubleColumnVector) output).vector[rowId] = data; + } + } + } + + private static class StringConverter implements Converter { + private static final Converter INSTANCE = new StringConverter(); + + @Override + public Class getJavaClass() { + return String.class; + } + + @Override + public void addValue(int rowId, String data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + byte[] value = data.getBytes(StandardCharsets.UTF_8); + ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); + } + } + } + + private static class BytesConverter implements Converter { + private static final Converter INSTANCE = new BytesConverter(); + + @Override + public Class getJavaClass() { + return ByteBuffer.class; + } + + @Override + public void addValue(int rowId, ByteBuffer data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length); + } + } + } + + private static class UUIDConverter implements Converter { + private static final Converter INSTANCE = new UUIDConverter(); + + @Override + public Class getJavaClass() { + return UUID.class; + } + + @Override + public void addValue(int rowId, UUID data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(data.getMostSignificantBits()); + buffer.putLong(data.getLeastSignificantBits()); + ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); + } + } + } + + private static class FixedConverter implements Converter { + private static final Converter INSTANCE = new FixedConverter(); + + @Override + public Class getJavaClass() { + return byte[].class; + } + + @Override + public void addValue(int rowId, byte[] data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); + } + } + } + + private static class DateConverter implements Converter { + private static final Converter INSTANCE = new DateConverter(); + + @Override + public Class getJavaClass() { + return LocalDate.class; + } + + @Override + public void addValue(int rowId, LocalDate data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); + } + } + } + + private static class TimestampTzConverter implements Converter { + private static final Converter INSTANCE = new TimestampTzConverter(); + + @Override + public Class getJavaClass() { + return OffsetDateTime.class; + } + + @Override + public void addValue(int rowId, OffsetDateTime data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.time[rowId] = data.toInstant().toEpochMilli(); // millis + cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision + } + } + } + + private static class TimestampConverter implements Converter { + private static final Converter INSTANCE = new TimestampConverter(); + + @Override + public Class getJavaClass() { + return LocalDateTime.class; + } + + @Override + public void addValue(int rowId, LocalDateTime data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.setIsUTC(true); + cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis + cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision + } + } + } + + private static class Decimal18Converter implements Converter { + private final int scale; + + Decimal18Converter(TypeDescription schema) { + this.scale = schema.getScale(); + } + + @Override + public Class getJavaClass() { + return BigDecimal.class; + } + + @Override + public void addValue(int rowId, BigDecimal data, ColumnVector output) { + // TODO: validate precision and scale from schema + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DecimalColumnVector) output).vector[rowId] + .setFromLongAndScale(data.unscaledValue().longValueExact(), scale); + } + } + } + + private static class Decimal38Converter implements Converter { + private static final Converter INSTANCE = new Decimal38Converter(); + + @Override + public Class getJavaClass() { + return BigDecimal.class; + } + + @Override + public void addValue(int rowId, BigDecimal data, ColumnVector output) { + // TODO: validate precision and scale from schema + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false)); + } + } + } + + public static class RecordConverter implements Converter { + private final List converters; + + RecordConverter(List converters) { + this.converters = converters; + } + + public List converters() { + return converters; + } + + @Override + public Class getJavaClass() { + return Record.class; + } + + @Override + @SuppressWarnings("unchecked") + public void addValue(int rowId, Record data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + StructColumnVector cv = (StructColumnVector) output; + for (int c = 0; c < converters.size(); ++c) { + converters.get(c).addValue(rowId, data.get(c, converters.get(c).getJavaClass()), cv.fields[c]); + } + } + } + } + + public static class ListConverter implements Converter { + private final Converter children; + + ListConverter(Converter children) { + this.children = children; + } + + @Override + public Class getJavaClass() { + return List.class; + } + + @Override + @SuppressWarnings("unchecked") + public void addValue(int rowId, List data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + List value = (List) data; + ListColumnVector cv = (ListColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = value.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.child.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + children.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); + } + } + } + } + + public static class MapConverter implements Converter { + private final Converter keyConverter; + private final Converter valueConverter; + + MapConverter(Converter keyConverter, Converter valueConverter) { + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + } + + @Override + public Class getJavaClass() { + return Map.class; + } + + @Override + @SuppressWarnings("unchecked") + public void addValue(int rowId, Map data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + Map map = (Map) data; + List keys = Lists.newArrayListWithExpectedSize(map.size()); + List values = Lists.newArrayListWithExpectedSize(map.size()); + for (Map.Entry entry : map.entrySet()) { + keys.add(entry.getKey()); + values.add(entry.getValue()); + } + MapColumnVector cv = (MapColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = map.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.keys.ensureSize(cv.childCount, true); + cv.values.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + int pos = (int) (e + cv.offsets[rowId]); + keyConverter.addValue(pos, keys.get(e), cv.keys); + valueConverter.addValue(pos, values.get(e), cv.values); + } + } + } + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index 84a58c505ecc..1d6516a501bf 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -436,7 +436,7 @@ private DataFile writeFile(String location, String filename, Schema schema, List case ORC: FileAppender orcAppender = ORC.write(fromPath(path, CONF)) .schema(schema) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(schema, typeDesc)) .build(); try { orcAppender.addAll(records); diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java index a0e37e7f6637..1bf6ebf045c5 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java +++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java @@ -172,7 +172,7 @@ public void createOrcInputFile() throws IOException { OutputFile outFile = Files.localOutput(orcFile); try (FileAppender appender = ORC.write(outFile) .schema(FILE_SCHEMA) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(FILE_SCHEMA, typeDesc)) .build()) { GenericRecord record = GenericRecord.create(FILE_SCHEMA); // create 50 records diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java index 3884a411f85e..12a0695d895c 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java +++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java @@ -180,7 +180,7 @@ public void createOrcInputFile(List records) throws IOException { OutputFile outFile = Files.localOutput(ORC_FILE); try (FileAppender appender = ORC.write(outFile) .schema(FILE_SCHEMA) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(FILE_SCHEMA, typeDesc)) .build()) { appender.addAll(records); } diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java index a18ef5f9d427..7cfd6379c710 100644 --- a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java @@ -92,7 +92,7 @@ public void writeAndValidateTimestamps() throws IOException { try (FileAppender writer = ORC.write(Files.localOutput(testFile)) .schema(timestampSchema) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(timestampSchema, typeDesc)) .build()) { writer.add(record1); writer.add(record2); @@ -129,7 +129,7 @@ private void writeAndValidateRecords(Schema schema, List expected) throw try (FileAppender writer = ORC.write(Files.localOutput(testFile)) .schema(schema) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(schema, typeDesc)) .build()) { for (Record rec : expected) { writer.add(rec); diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java index 1aab27dbedb4..8809b1ca5bfa 100644 --- a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java @@ -40,7 +40,7 @@ protected Record writeAndRead(String desc, try (FileAppender appender = ORC.write(Files.localOutput(file)) .schema(writeSchema) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(writeSchema, typeDesc)) .build()) { appender.add(record); } diff --git a/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java b/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java index 82ef54b58be1..6e352ec4297b 100644 --- a/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java +++ b/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java @@ -79,7 +79,7 @@ private InputFile writeRecords(Schema schema, Map properties, Re try (FileAppender writer = ORC.write(file) .schema(schema) .setAll(properties) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(schema, typeDesc)) .build()) { writer.addAll(Lists.newArrayList(records)); } diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java index 0d8cdc036e7f..c563ab02d5ef 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java @@ -74,7 +74,7 @@ public static DataFile writeFile(File targetFile, Table table, StructLike partit case ORC: appender = ORC.write(Files.localOutput(targetFile)) .schema(table.schema()) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(table.schema(), typeDesc)) .build(); break; default: diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java index 05842f80e6bd..fdf4be3e08e5 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java +++ b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java @@ -531,7 +531,7 @@ private DataFile writeFile( case ORC: appender = ORC.write(Files.localOutput(file)) .schema(table.schema()) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(table.schema(), typeDesc)) .build(); break; default: diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java index ac64fa952c71..8e74e56a45b1 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -132,7 +132,7 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema case ORC: try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(tableSchema, typeDesc)) .schema(tableSchema) .build()) { writer.add(record); diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 0d45179b315c..72cb7d30de17 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -212,7 +212,7 @@ public void writeUnpartitionedTable() throws IOException { case ORC: try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(tableSchema, typeDesc)) .schema(tableSchema) .build()) { writer.addAll(records); From e21eadeacebeaa0a9744bda186f411b650c31e43 Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 14 Jul 2020 10:44:25 +0800 Subject: [PATCH 02/15] Refactor the OrcValueWriter to OrcRowWriter and refactor converter to OrcValueWriter. --- .../iceberg/data/orc/GenericOrcWriter.java | 132 +++++++---- .../iceberg/data/orc/GenericOrcWriters.java | 221 +++++++----------- .../main/java/org/apache/iceberg/orc/ORC.java | 4 +- .../apache/iceberg/orc/OrcFileAppender.java | 10 +- .../org/apache/iceberg/orc/OrcRowWriter.java | 38 +++ .../apache/iceberg/orc/OrcValueWriter.java | 18 +- .../iceberg/spark/data/SparkOrcWriter.java | 9 +- 7 files changed, 231 insertions(+), 201 deletions(-) create mode 100644 orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index bc9f0198992e..c178bf2bb679 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -20,83 +20,95 @@ package org.apache.iceberg.data.orc; import java.util.List; +import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; -import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -public class GenericOrcWriter implements OrcValueWriter { - private final GenericOrcWriters.Converter converter; +public class GenericOrcWriter implements OrcRowWriter { + private final OrcValueWriter orcValueWriter; private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) { Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, "Top level must be a struct " + orcSchema); - converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); + orcValueWriter = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); } - public static OrcValueWriter buildWriter(Schema expectedSchema, TypeDescription fileSchema) { + public static OrcRowWriter buildWriter(Schema expectedSchema, TypeDescription fileSchema) { return new GenericOrcWriter(expectedSchema, fileSchema); } - private static class WriteBuilder extends OrcSchemaWithTypeVisitor { + private static class WriteBuilder extends OrcSchemaWithTypeVisitor { private WriteBuilder() { } - public GenericOrcWriters.Converter record(Types.StructType iStruct, TypeDescription record, - List names, List fields) { - return new GenericOrcWriters.RecordConverter(fields); + @Override + public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, + List names, List fields) { + return new RecordOrcValueWriter(fields); } - public GenericOrcWriters.Converter list(Types.ListType iList, TypeDescription array, - GenericOrcWriters.Converter element) { - return new GenericOrcWriters.ListConverter(element); + @Override + public OrcValueWriter list(Types.ListType iList, TypeDescription array, + OrcValueWriter element) { + return GenericOrcWriters.list(element); } - public GenericOrcWriters.Converter map(Types.MapType iMap, TypeDescription map, - GenericOrcWriters.Converter key, GenericOrcWriters.Converter value) { - return new GenericOrcWriters.MapConverter(key, value); + @Override + public OrcValueWriter map(Types.MapType iMap, TypeDescription map, + OrcValueWriter key, OrcValueWriter value) { + return GenericOrcWriters.map(key, value); } - public GenericOrcWriters.Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription schema) { - switch (schema.getCategory()) { + @Override + public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + switch (primitive.getCategory()) { case BOOLEAN: return GenericOrcWriters.booleans(); case BYTE: - return GenericOrcWriters.bytes(); + throw new IllegalArgumentException("Iceberg does not have a byte type"); case SHORT: - return GenericOrcWriters.shorts(); - case DATE: - return GenericOrcWriters.dates(); + throw new IllegalArgumentException("Iceberg does not have a short type."); case INT: return GenericOrcWriters.ints(); case LONG: - String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE); - ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG : - ORCSchemaUtil.LongType.valueOf(longAttributeValue); - switch (longType) { + switch (iPrimitive.typeId()) { case TIME: return GenericOrcWriters.times(); case LONG: return GenericOrcWriters.longs(); default: - throw new IllegalStateException("Unhandled Long type found in ORC type attribute: " + longType); + throw new IllegalStateException( + String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); } case FLOAT: return GenericOrcWriters.floats(); case DOUBLE: return GenericOrcWriters.doubles(); + case DATE: + return GenericOrcWriters.dates(); + case TIMESTAMP: + return GenericOrcWriters.timestamp(); + case TIMESTAMP_INSTANT: + return GenericOrcWriters.timestampTz(); + case DECIMAL: + return GenericOrcWriters.decimal(primitive.getScale(), primitive.getPrecision()); + case CHAR: + case VARCHAR: + case STRING: + return GenericOrcWriters.strings(); case BINARY: - String binaryAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE); - ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? ORCSchemaUtil.BinaryType.BINARY : - ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue); - switch (binaryType) { + switch (iPrimitive.typeId()) { case UUID: return GenericOrcWriters.uuids(); case FIXED: @@ -104,34 +116,58 @@ public GenericOrcWriters.Converter primitive(Type.PrimitiveType iPrimitive, Type case BINARY: return GenericOrcWriters.binary(); default: - throw new IllegalStateException("Unhandled Binary type found in ORC type attribute: " + binaryType); + throw new IllegalStateException( + String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); } - case STRING: - case CHAR: - case VARCHAR: - return GenericOrcWriters.strings(); - case DECIMAL: - return schema.getPrecision() <= 18 ? GenericOrcWriters.decimal18(schema) : - GenericOrcWriters.decimal38(schema); - case TIMESTAMP: - return GenericOrcWriters.timestamp(); - case TIMESTAMP_INSTANT: - return GenericOrcWriters.timestampTz(); + default: + throw new IllegalArgumentException("Unhandled type " + primitive); } - throw new IllegalArgumentException("Unhandled type " + schema); } } @SuppressWarnings("unchecked") @Override public void write(Record value, VectorizedRowBatch output) { - Preconditions.checkArgument(converter instanceof GenericOrcWriters.RecordConverter, + Preconditions.checkArgument(orcValueWriter instanceof RecordOrcValueWriter, "Converter must be a RecordConverter."); - int row = output.size++; - List converters = ((GenericOrcWriters.RecordConverter) converter).converters(); - for (int c = 0; c < converters.size(); ++c) { - converters.get(c).addValue(row, value.get(c, converters.get(c).getJavaClass()), output.cols[c]); + int row = output.size; + output.size += 1; + List orcValueWriters = ((RecordOrcValueWriter) orcValueWriter).converters(); + for (int c = 0; c < orcValueWriters.size(); ++c) { + orcValueWriters.get(c).addValue(row, value.get(c, orcValueWriters.get(c).getJavaClass()), output.cols[c]); + } + } + + private static class RecordOrcValueWriter implements OrcValueWriter { + private final List orcValueWriters; + + RecordOrcValueWriter(List orcValueWriters) { + this.orcValueWriters = orcValueWriters; + } + + List converters() { + return orcValueWriters; + } + + @Override + public Class getJavaClass() { + return Record.class; + } + + @Override + @SuppressWarnings("unchecked") + public void addValue(int rowId, Record data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + StructColumnVector cv = (StructColumnVector) output; + for (int c = 0; c < orcValueWriters.size(); ++c) { + orcValueWriters.get(c).addValue(rowId, data.get(c, orcValueWriters.get(c).getJavaClass()), cv.fields[c]); + } + } } } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index bf847aabc546..a8f35121fd05 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.iceberg.data.Record; +import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; @@ -43,7 +43,6 @@ import org.apache.orc.storage.ql.exec.vector.ListColumnVector; import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; -import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; public class GenericOrcWriters { @@ -53,94 +52,84 @@ public class GenericOrcWriters { private GenericOrcWriters() { } - /** - * The interface for the conversion from Spark's SpecializedGetters to - * ORC's ColumnVectors. - */ - interface Converter { - - Class getJavaClass(); - - /** - * Take a value from the Spark data value and add it to the ORC output. - * - * @param rowId the row in the ColumnVector - * @param data either an InternalRow or ArrayData - * @param output the ColumnVector to put the value into - */ - void addValue(int rowId, T data, ColumnVector output); + public static OrcValueWriter booleans() { + return BooleanOrcValueWriter.INSTANCE; } - public static Converter booleans() { - return BooleanConverter.INSTANCE; + public static OrcValueWriter bytes() { + return ByteOrcValueWriter.INSTANCE; } - public static Converter bytes() { - return ByteConverter.INSTANCE; + public static OrcValueWriter shorts() { + return ShortOrcValueWriter.INSTANCE; } - public static Converter shorts() { - return ShortConverter.INSTANCE; + public static OrcValueWriter ints() { + return IntOrcValueWriter.INSTANCE; } - public static Converter ints() { - return IntConverter.INSTANCE; + public static OrcValueWriter times() { + return TimeOrcValueWriter.INSTANCE; } - public static Converter times() { - return TimeConverter.INSTANCE; + public static OrcValueWriter longs() { + return LongOrcValueWriter.INSTANCE; } - public static Converter longs() { - return LongConverter.INSTANCE; + public static OrcValueWriter floats() { + return FloatOrcValueWriter.INSTANCE; } - public static Converter floats() { - return FloatConverter.INSTANCE; + public static OrcValueWriter doubles() { + return DoubleOrcValueWriter.INSTANCE; } - public static Converter doubles() { - return DoubleConverter.INSTANCE; + public static OrcValueWriter strings() { + return StringOrcValueWriter.INSTANCE; } - public static Converter strings() { - return StringConverter.INSTANCE; + public static OrcValueWriter binary() { + return BytesOrcValueWriter.INSTANCE; } - public static Converter binary() { - return BytesConverter.INSTANCE; + public static OrcValueWriter uuids() { + return UUIDOrcValueWriter.INSTANCE; } - public static Converter uuids() { - return UUIDConverter.INSTANCE; + public static OrcValueWriter fixed() { + return FixedOrcValueWriter.INSTANCE; } - public static Converter fixed() { - return FixedConverter.INSTANCE; + public static OrcValueWriter dates() { + return DateOrcValueWriter.INSTANCE; } - public static Converter dates() { - return DateConverter.INSTANCE; + public static OrcValueWriter timestampTz() { + return TimestampTzOrcValueWriter.INSTANCE; } - public static Converter timestampTz() { - return TimestampTzConverter.INSTANCE; + public static OrcValueWriter timestamp() { + return TimestampOrcValueWriter.INSTANCE; } - public static Converter timestamp() { - return TimestampConverter.INSTANCE; + public static OrcValueWriter decimal(int scala, int precision) { + if (precision <= 18) { + return new Decimal18OrcValueWriter(scala); + } else { + return Decimal38OrcValueWriter.INSTANCE; + } } - public static Converter decimal18(TypeDescription schema) { - return new Decimal18Converter(schema); + public static OrcValueWriter list(OrcValueWriter element) { + return new ListOrcValueWriter(element); } - public static Converter decimal38(TypeDescription schema) { - return Decimal38Converter.INSTANCE; + public static OrcValueWriter map(OrcValueWriter key, OrcValueWriter value) { + return new MapOrcValueWriter(key, value); } - private static class BooleanConverter implements Converter { - private static final Converter INSTANCE = new BooleanConverter(); + private static class BooleanOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new BooleanOrcValueWriter(); @Override public Class getJavaClass() { @@ -159,8 +148,8 @@ public void addValue(int rowId, Boolean data, ColumnVector output) { } } - private static class ByteConverter implements Converter { - private static final Converter INSTANCE = new ByteConverter(); + private static class ByteOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ByteOrcValueWriter(); @Override public Class getJavaClass() { @@ -179,8 +168,8 @@ public void addValue(int rowId, Byte data, ColumnVector output) { } } - private static class ShortConverter implements Converter { - private static final Converter INSTANCE = new ShortConverter(); + private static class ShortOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ShortOrcValueWriter(); @Override public Class getJavaClass() { @@ -199,8 +188,8 @@ public void addValue(int rowId, Short data, ColumnVector output) { } } - private static class IntConverter implements Converter { - private static final Converter INSTANCE = new IntConverter(); + private static class IntOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new IntOrcValueWriter(); @Override public Class getJavaClass() { @@ -219,8 +208,8 @@ public void addValue(int rowId, Integer data, ColumnVector output) { } } - private static class TimeConverter implements Converter { - private static final Converter INSTANCE = new TimeConverter(); + private static class TimeOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimeOrcValueWriter(); @Override public Class getJavaClass() { @@ -239,8 +228,8 @@ public void addValue(int rowId, LocalTime data, ColumnVector output) { } } - private static class LongConverter implements Converter { - private static final Converter INSTANCE = new LongConverter(); + private static class LongOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new LongOrcValueWriter(); @Override public Class getJavaClass() { @@ -259,8 +248,8 @@ public void addValue(int rowId, Long data, ColumnVector output) { } } - private static class FloatConverter implements Converter { - private static final Converter INSTANCE = new FloatConverter(); + private static class FloatOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new FloatOrcValueWriter(); @Override public Class getJavaClass() { @@ -279,8 +268,8 @@ public void addValue(int rowId, Float data, ColumnVector output) { } } - private static class DoubleConverter implements Converter { - private static final Converter INSTANCE = new DoubleConverter(); + private static class DoubleOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new DoubleOrcValueWriter(); @Override public Class getJavaClass() { @@ -299,8 +288,8 @@ public void addValue(int rowId, Double data, ColumnVector output) { } } - private static class StringConverter implements Converter { - private static final Converter INSTANCE = new StringConverter(); + private static class StringOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new StringOrcValueWriter(); @Override public Class getJavaClass() { @@ -320,8 +309,8 @@ public void addValue(int rowId, String data, ColumnVector output) { } } - private static class BytesConverter implements Converter { - private static final Converter INSTANCE = new BytesConverter(); + private static class BytesOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new BytesOrcValueWriter(); @Override public Class getJavaClass() { @@ -340,8 +329,8 @@ public void addValue(int rowId, ByteBuffer data, ColumnVector output) { } } - private static class UUIDConverter implements Converter { - private static final Converter INSTANCE = new UUIDConverter(); + private static class UUIDOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new UUIDOrcValueWriter(); @Override public Class getJavaClass() { @@ -363,8 +352,8 @@ public void addValue(int rowId, UUID data, ColumnVector output) { } } - private static class FixedConverter implements Converter { - private static final Converter INSTANCE = new FixedConverter(); + private static class FixedOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new FixedOrcValueWriter(); @Override public Class getJavaClass() { @@ -383,8 +372,8 @@ public void addValue(int rowId, byte[] data, ColumnVector output) { } } - private static class DateConverter implements Converter { - private static final Converter INSTANCE = new DateConverter(); + private static class DateOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new DateOrcValueWriter(); @Override public Class getJavaClass() { @@ -403,8 +392,8 @@ public void addValue(int rowId, LocalDate data, ColumnVector output) { } } - private static class TimestampTzConverter implements Converter { - private static final Converter INSTANCE = new TimestampTzConverter(); + private static class TimestampTzOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimestampTzOrcValueWriter(); @Override public Class getJavaClass() { @@ -425,8 +414,8 @@ public void addValue(int rowId, OffsetDateTime data, ColumnVector output) { } } - private static class TimestampConverter implements Converter { - private static final Converter INSTANCE = new TimestampConverter(); + private static class TimestampOrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimestampOrcValueWriter(); @Override public Class getJavaClass() { @@ -448,11 +437,11 @@ public void addValue(int rowId, LocalDateTime data, ColumnVector output) { } } - private static class Decimal18Converter implements Converter { + private static class Decimal18OrcValueWriter implements OrcValueWriter { private final int scale; - Decimal18Converter(TypeDescription schema) { - this.scale = schema.getScale(); + Decimal18OrcValueWriter(int scale) { + this.scale = scale; } @Override @@ -474,8 +463,8 @@ public void addValue(int rowId, BigDecimal data, ColumnVector output) { } } - private static class Decimal38Converter implements Converter { - private static final Converter INSTANCE = new Decimal38Converter(); + private static class Decimal38OrcValueWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new Decimal38OrcValueWriter(); @Override public Class getJavaClass() { @@ -495,43 +484,11 @@ public void addValue(int rowId, BigDecimal data, ColumnVector output) { } } - public static class RecordConverter implements Converter { - private final List converters; - - RecordConverter(List converters) { - this.converters = converters; - } - - public List converters() { - return converters; - } - - @Override - public Class getJavaClass() { - return Record.class; - } - - @Override - @SuppressWarnings("unchecked") - public void addValue(int rowId, Record data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - StructColumnVector cv = (StructColumnVector) output; - for (int c = 0; c < converters.size(); ++c) { - converters.get(c).addValue(rowId, data.get(c, converters.get(c).getJavaClass()), cv.fields[c]); - } - } - } - } - - public static class ListConverter implements Converter { - private final Converter children; + private static class ListOrcValueWriter implements OrcValueWriter { + private final OrcValueWriter element; - ListConverter(Converter children) { - this.children = children; + ListOrcValueWriter(OrcValueWriter element) { + this.element = element; } @Override @@ -557,19 +514,19 @@ public void addValue(int rowId, List data, ColumnVector output) { cv.child.ensureSize(cv.childCount, true); // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { - children.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); + element.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); } } } } - public static class MapConverter implements Converter { - private final Converter keyConverter; - private final Converter valueConverter; + private static class MapOrcValueWriter implements OrcValueWriter { + private final OrcValueWriter keyWriter; + private final OrcValueWriter valueWriter; - MapConverter(Converter keyConverter, Converter valueConverter) { - this.keyConverter = keyConverter; - this.valueConverter = valueConverter; + MapOrcValueWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter) { + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; } @Override @@ -603,8 +560,8 @@ public void addValue(int rowId, Map data, ColumnVector output) { // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { int pos = (int) (e + cv.offsets[rowId]); - keyConverter.addValue(pos, keys.get(e), cv.keys); - valueConverter.addValue(pos, values.get(e), cv.values); + keyWriter.addValue(pos, keys.get(e), cv.keys); + valueWriter.addValue(pos, values.get(e), cv.values); } } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 38dc522fa5cf..8e67aca15c4d 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -59,7 +59,7 @@ public static class WriteBuilder { private final OutputFile file; private final Configuration conf; private Schema schema = null; - private Function> createWriterFunc; + private Function> createWriterFunc; private Map metadata = new HashMap<>(); private WriteBuilder(OutputFile file) { @@ -81,7 +81,7 @@ public WriteBuilder config(String property, String value) { return this; } - public WriteBuilder createWriterFunc(Function> writerFunction) { + public WriteBuilder createWriterFunc(Function> writerFunction) { this.createWriterFunc = writerFunction; return this; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index acc48550688b..6c457ec77b3c 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -51,12 +51,12 @@ class OrcFileAppender implements FileAppender { private final OutputFile file; private final Writer writer; private final VectorizedRowBatch batch; - private final OrcValueWriter valueWriter; + private final OrcRowWriter valueWriter; private boolean isClosed = false; private final Configuration conf; OrcFileAppender(Schema schema, OutputFile file, - Function> createWriterFunc, + Function> createWriterFunc, Configuration conf, Map metadata, int batchSize) { this.conf = conf; @@ -146,8 +146,8 @@ private static Writer newOrcWriter(OutputFile file, } @SuppressWarnings("unchecked") - private static OrcValueWriter newOrcValueWriter( - TypeDescription schema, Function> createWriterFunc) { - return (OrcValueWriter) createWriterFunc.apply(schema); + private static OrcRowWriter newOrcValueWriter( + TypeDescription schema, Function> createWriterFunc) { + return (OrcRowWriter) createWriterFunc.apply(schema); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java new file mode 100644 index 000000000000..ddcd4280bf0d --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.io.IOException; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +/** + * Write data value of a schema. + */ +public interface OrcRowWriter { + + /** + * Writes the row data. + * + * @param row the row data value to write. + * @param output the VectorizedRowBatch to which the output will be written. + * @throws IOException if there's any IO error while writing the data value. + */ + void write(T row, VectorizedRowBatch output) throws IOException; +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index 74d2fddddcb0..3a6cac644302 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -19,20 +19,18 @@ package org.apache.iceberg.orc; -import java.io.IOException; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; -/** - * Write data value of a schema. - */ public interface OrcValueWriter { + Class getJavaClass(); + /** - * Writes the data. + * Take a value from the data value and add it to the ORC output. * - * @param value the data value to write. - * @param output the VectorizedRowBatch to which the output will be written. - * @throws IOException if there's any IO error while writing the data value. + * @param rowId the row in the ColumnVector + * @param data the data value to write. + * @param output the ColumnVector to put the value into */ - void write(T value, VectorizedRowBatch output) throws IOException; + void addValue(int rowId, T data, ColumnVector output); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index c27f95898812..e797bdddccdb 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -20,7 +20,7 @@ package org.apache.iceberg.spark.data; import java.util.List; -import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.orc.OrcRowWriter; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -42,7 +42,7 @@ * This class acts as an adaptor from an OrcFileAppender to a * FileAppender<InternalRow>. */ -public class SparkOrcWriter implements OrcValueWriter { +public class SparkOrcWriter implements OrcRowWriter { private final Converter[] converters; @@ -65,9 +65,10 @@ public void write(InternalRow value, VectorizedRowBatch output) { interface Converter { /** * Take a value from the Spark data value and add it to the ORC output. - * @param rowId the row in the ColumnVector + * + * @param rowId the row in the ColumnVector * @param column either the column number or element number - * @param data either an InternalRow or ArrayData + * @param data either an InternalRow or ArrayData * @param output the ColumnVector to put the value into */ void addValue(int rowId, int column, SpecializedGetters data, From e88f6786fafd6993d057292b047354e37daf4c78 Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 14 Jul 2020 11:22:43 +0800 Subject: [PATCH 03/15] Remove the `OrcValue` from all the writer names. --- .../iceberg/data/orc/GenericOrcWriter.java | 2 +- .../iceberg/data/orc/GenericOrcWriters.java | 109 +++++++++--------- 2 files changed, 55 insertions(+), 56 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index c178bf2bb679..2aa972f82251 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -23,8 +23,8 @@ import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; -import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.orc.OrcRowWriter; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index a8f35121fd05..51717f62fb13 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -34,7 +34,6 @@ import java.util.UUID; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; @@ -53,59 +52,59 @@ private GenericOrcWriters() { } public static OrcValueWriter booleans() { - return BooleanOrcValueWriter.INSTANCE; + return BooleanWriter.INSTANCE; } public static OrcValueWriter bytes() { - return ByteOrcValueWriter.INSTANCE; + return ByteWriter.INSTANCE; } public static OrcValueWriter shorts() { - return ShortOrcValueWriter.INSTANCE; + return ShortWriter.INSTANCE; } public static OrcValueWriter ints() { - return IntOrcValueWriter.INSTANCE; + return IntWriter.INSTANCE; } public static OrcValueWriter times() { - return TimeOrcValueWriter.INSTANCE; + return TimeWriter.INSTANCE; } public static OrcValueWriter longs() { - return LongOrcValueWriter.INSTANCE; + return LongWriter.INSTANCE; } public static OrcValueWriter floats() { - return FloatOrcValueWriter.INSTANCE; + return FloatWriter.INSTANCE; } public static OrcValueWriter doubles() { - return DoubleOrcValueWriter.INSTANCE; + return DoubleWriter.INSTANCE; } public static OrcValueWriter strings() { - return StringOrcValueWriter.INSTANCE; + return StringWriter.INSTANCE; } public static OrcValueWriter binary() { - return BytesOrcValueWriter.INSTANCE; + return BytesWriter.INSTANCE; } public static OrcValueWriter uuids() { - return UUIDOrcValueWriter.INSTANCE; + return UUIDWriter.INSTANCE; } public static OrcValueWriter fixed() { - return FixedOrcValueWriter.INSTANCE; + return FixedWriter.INSTANCE; } public static OrcValueWriter dates() { - return DateOrcValueWriter.INSTANCE; + return DateWriter.INSTANCE; } public static OrcValueWriter timestampTz() { - return TimestampTzOrcValueWriter.INSTANCE; + return TimestampTzWriter.INSTANCE; } public static OrcValueWriter timestamp() { @@ -114,22 +113,22 @@ public static OrcValueWriter timestamp() { public static OrcValueWriter decimal(int scala, int precision) { if (precision <= 18) { - return new Decimal18OrcValueWriter(scala); + return new DecimalWriter(scala); } else { - return Decimal38OrcValueWriter.INSTANCE; + return Decimal38Writer.INSTANCE; } } public static OrcValueWriter list(OrcValueWriter element) { - return new ListOrcValueWriter(element); + return new ListWriter(element); } public static OrcValueWriter map(OrcValueWriter key, OrcValueWriter value) { - return new MapOrcValueWriter(key, value); + return new MapWriter(key, value); } - private static class BooleanOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new BooleanOrcValueWriter(); + private static class BooleanWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new BooleanWriter(); @Override public Class getJavaClass() { @@ -148,8 +147,8 @@ public void addValue(int rowId, Boolean data, ColumnVector output) { } } - private static class ByteOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new ByteOrcValueWriter(); + private static class ByteWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ByteWriter(); @Override public Class getJavaClass() { @@ -168,8 +167,8 @@ public void addValue(int rowId, Byte data, ColumnVector output) { } } - private static class ShortOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new ShortOrcValueWriter(); + private static class ShortWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ShortWriter(); @Override public Class getJavaClass() { @@ -188,8 +187,8 @@ public void addValue(int rowId, Short data, ColumnVector output) { } } - private static class IntOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new IntOrcValueWriter(); + private static class IntWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new IntWriter(); @Override public Class getJavaClass() { @@ -208,8 +207,8 @@ public void addValue(int rowId, Integer data, ColumnVector output) { } } - private static class TimeOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new TimeOrcValueWriter(); + private static class TimeWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimeWriter(); @Override public Class getJavaClass() { @@ -228,8 +227,8 @@ public void addValue(int rowId, LocalTime data, ColumnVector output) { } } - private static class LongOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new LongOrcValueWriter(); + private static class LongWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new LongWriter(); @Override public Class getJavaClass() { @@ -248,8 +247,8 @@ public void addValue(int rowId, Long data, ColumnVector output) { } } - private static class FloatOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new FloatOrcValueWriter(); + private static class FloatWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new FloatWriter(); @Override public Class getJavaClass() { @@ -268,8 +267,8 @@ public void addValue(int rowId, Float data, ColumnVector output) { } } - private static class DoubleOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new DoubleOrcValueWriter(); + private static class DoubleWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new DoubleWriter(); @Override public Class getJavaClass() { @@ -288,8 +287,8 @@ public void addValue(int rowId, Double data, ColumnVector output) { } } - private static class StringOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new StringOrcValueWriter(); + private static class StringWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new StringWriter(); @Override public Class getJavaClass() { @@ -309,8 +308,8 @@ public void addValue(int rowId, String data, ColumnVector output) { } } - private static class BytesOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new BytesOrcValueWriter(); + private static class BytesWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new BytesWriter(); @Override public Class getJavaClass() { @@ -329,8 +328,8 @@ public void addValue(int rowId, ByteBuffer data, ColumnVector output) { } } - private static class UUIDOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new UUIDOrcValueWriter(); + private static class UUIDWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new UUIDWriter(); @Override public Class getJavaClass() { @@ -352,8 +351,8 @@ public void addValue(int rowId, UUID data, ColumnVector output) { } } - private static class FixedOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new FixedOrcValueWriter(); + private static class FixedWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new FixedWriter(); @Override public Class getJavaClass() { @@ -372,8 +371,8 @@ public void addValue(int rowId, byte[] data, ColumnVector output) { } } - private static class DateOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new DateOrcValueWriter(); + private static class DateWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new DateWriter(); @Override public Class getJavaClass() { @@ -392,8 +391,8 @@ public void addValue(int rowId, LocalDate data, ColumnVector output) { } } - private static class TimestampTzOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new TimestampTzOrcValueWriter(); + private static class TimestampTzWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimestampTzWriter(); @Override public Class getJavaClass() { @@ -437,10 +436,10 @@ public void addValue(int rowId, LocalDateTime data, ColumnVector output) { } } - private static class Decimal18OrcValueWriter implements OrcValueWriter { + private static class DecimalWriter implements OrcValueWriter { private final int scale; - Decimal18OrcValueWriter(int scale) { + DecimalWriter(int scale) { this.scale = scale; } @@ -463,8 +462,8 @@ public void addValue(int rowId, BigDecimal data, ColumnVector output) { } } - private static class Decimal38OrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new Decimal38OrcValueWriter(); + private static class Decimal38Writer implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new Decimal38Writer(); @Override public Class getJavaClass() { @@ -484,10 +483,10 @@ public void addValue(int rowId, BigDecimal data, ColumnVector output) { } } - private static class ListOrcValueWriter implements OrcValueWriter { + private static class ListWriter implements OrcValueWriter { private final OrcValueWriter element; - ListOrcValueWriter(OrcValueWriter element) { + ListWriter(OrcValueWriter element) { this.element = element; } @@ -520,11 +519,11 @@ public void addValue(int rowId, List data, ColumnVector output) { } } - private static class MapOrcValueWriter implements OrcValueWriter { + private static class MapWriter implements OrcValueWriter { private final OrcValueWriter keyWriter; private final OrcValueWriter valueWriter; - MapOrcValueWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter) { + MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter) { this.keyWriter = keyWriter; this.valueWriter = valueWriter; } From afe95c7b73d8e5dfc9f82bf59e712d4fc2dc7a36 Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 14 Jul 2020 11:34:15 +0800 Subject: [PATCH 04/15] Addressing serveral minor issues --- .../iceberg/data/orc/GenericOrcWriter.java | 42 ++- .../iceberg/data/orc/GenericOrcWriters.java | 296 +++++------------- .../apache/iceberg/orc/OrcValueWriter.java | 12 +- 3 files changed, 103 insertions(+), 247 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 2aa972f82251..6ed77936c9a9 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -35,13 +35,13 @@ import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; public class GenericOrcWriter implements OrcRowWriter { - private final OrcValueWriter orcValueWriter; + private final OrcValueWriter writer; private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) { Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, "Top level must be a struct " + orcSchema); - orcValueWriter = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); + writer = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); } public static OrcRowWriter buildWriter(Schema expectedSchema, TypeDescription fileSchema) { @@ -88,7 +88,7 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription p case LONG: return GenericOrcWriters.longs(); default: - throw new IllegalStateException( + throw new IllegalArgumentException( String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); } case FLOAT: @@ -116,7 +116,7 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription p case BINARY: return GenericOrcWriters.binary(); default: - throw new IllegalStateException( + throw new IllegalArgumentException( String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); } default: @@ -128,26 +128,27 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription p @SuppressWarnings("unchecked") @Override public void write(Record value, VectorizedRowBatch output) { - Preconditions.checkArgument(orcValueWriter instanceof RecordOrcValueWriter, + Preconditions.checkArgument(writer instanceof RecordOrcValueWriter, "Converter must be a RecordConverter."); int row = output.size; output.size += 1; - List orcValueWriters = ((RecordOrcValueWriter) orcValueWriter).converters(); - for (int c = 0; c < orcValueWriters.size(); ++c) { - orcValueWriters.get(c).addValue(row, value.get(c, orcValueWriters.get(c).getJavaClass()), output.cols[c]); + List writers = ((RecordOrcValueWriter) writer).writers(); + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter child = writers.get(c); + child.write(row, value.get(c, child.getJavaClass()), output.cols[c]); } } private static class RecordOrcValueWriter implements OrcValueWriter { - private final List orcValueWriters; + private final List writers; - RecordOrcValueWriter(List orcValueWriters) { - this.orcValueWriters = orcValueWriters; + RecordOrcValueWriter(List writers) { + this.writers = writers; } - List converters() { - return orcValueWriters; + List writers() { + return writers; } @Override @@ -157,16 +158,11 @@ public Class getJavaClass() { @Override @SuppressWarnings("unchecked") - public void addValue(int rowId, Record data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - StructColumnVector cv = (StructColumnVector) output; - for (int c = 0; c < orcValueWriters.size(); ++c) { - orcValueWriters.get(c).addValue(rowId, data.get(c, orcValueWriters.get(c).getJavaClass()), cv.fields[c]); - } + public void nonNullWrite(int rowId, Record data, ColumnVector output) { + StructColumnVector cv = (StructColumnVector) output; + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter child = writers.get(c); + child.write(rowId, data.get(c, child.getJavaClass()), cv.fields[c]); } } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 51717f62fb13..62185b0e4f15 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -55,14 +55,6 @@ public static OrcValueWriter booleans() { return BooleanWriter.INSTANCE; } - public static OrcValueWriter bytes() { - return ByteWriter.INSTANCE; - } - - public static OrcValueWriter shorts() { - return ShortWriter.INSTANCE; - } - public static OrcValueWriter ints() { return IntWriter.INSTANCE; } @@ -136,54 +128,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, Boolean data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; - } - } - } - - private static class ByteWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new ByteWriter(); - - @Override - public Class getJavaClass() { - return Byte.class; - } - - @Override - public void addValue(int rowId, Byte data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - private static class ShortWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new ShortWriter(); - - @Override - public Class getJavaClass() { - return Short.class; - } - - @Override - public void addValue(int rowId, Short data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } + public void nonNullWrite(int rowId, Boolean data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; } } @@ -196,14 +142,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, Integer data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } + public void nonNullWrite(int rowId, Integer data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; } } @@ -216,14 +156,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, LocalTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; - } + public void nonNullWrite(int rowId, LocalTime data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; } } @@ -236,14 +170,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, Long data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } + public void nonNullWrite(int rowId, Long data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; } } @@ -256,14 +184,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, Float data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DoubleColumnVector) output).vector[rowId] = data; - } + public void nonNullWrite(int rowId, Float data, ColumnVector output) { + ((DoubleColumnVector) output).vector[rowId] = data; } } @@ -276,14 +198,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, Double data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DoubleColumnVector) output).vector[rowId] = data; - } + public void nonNullWrite(int rowId, Double data, ColumnVector output) { + ((DoubleColumnVector) output).vector[rowId] = data; } } @@ -296,15 +212,9 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, String data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - byte[] value = data.getBytes(StandardCharsets.UTF_8); - ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); - } + public void nonNullWrite(int rowId, String data, ColumnVector output) { + byte[] value = data.getBytes(StandardCharsets.UTF_8); + ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); } } @@ -317,14 +227,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, ByteBuffer data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length); - } + public void nonNullWrite(int rowId, ByteBuffer data, ColumnVector output) { + ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length); } } @@ -337,17 +241,11 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, UUID data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.putLong(data.getMostSignificantBits()); - buffer.putLong(data.getLeastSignificantBits()); - ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); - } + public void nonNullWrite(int rowId, UUID data, ColumnVector output) { + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(data.getMostSignificantBits()); + buffer.putLong(data.getLeastSignificantBits()); + ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); } } @@ -360,14 +258,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, byte[] data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); - } + public void nonNullWrite(int rowId, byte[] data, ColumnVector output) { + ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); } } @@ -380,14 +272,8 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, LocalDate data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); - } + public void nonNullWrite(int rowId, LocalDate data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); } } @@ -400,16 +286,10 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, OffsetDateTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - TimestampColumnVector cv = (TimestampColumnVector) output; - cv.time[rowId] = data.toInstant().toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision - } + public void nonNullWrite(int rowId, OffsetDateTime data, ColumnVector output) { + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.time[rowId] = data.toInstant().toEpochMilli(); // millis + cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision } } @@ -422,17 +302,11 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, LocalDateTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - TimestampColumnVector cv = (TimestampColumnVector) output; - cv.setIsUTC(true); - cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision - } + public void nonNullWrite(int rowId, LocalDateTime data, ColumnVector output) { + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.setIsUTC(true); + cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis + cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision } } @@ -449,16 +323,10 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, BigDecimal data, ColumnVector output) { + public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) { // TODO: validate precision and scale from schema - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DecimalColumnVector) output).vector[rowId] - .setFromLongAndScale(data.unscaledValue().longValueExact(), scale); - } + ((DecimalColumnVector) output).vector[rowId] + .setFromLongAndScale(data.unscaledValue().longValueExact(), scale); } } @@ -471,15 +339,9 @@ public Class getJavaClass() { } @Override - public void addValue(int rowId, BigDecimal data, ColumnVector output) { + public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) { // TODO: validate precision and scale from schema - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false)); - } + ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false)); } } @@ -497,24 +359,18 @@ public Class getJavaClass() { @Override @SuppressWarnings("unchecked") - public void addValue(int rowId, List data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - List value = (List) data; - ListColumnVector cv = (ListColumnVector) output; - // record the length and start of the list elements - cv.lengths[rowId] = value.size(); - cv.offsets[rowId] = cv.childCount; - cv.childCount += cv.lengths[rowId]; - // make sure the child is big enough - cv.child.ensureSize(cv.childCount, true); - // Add each element - for (int e = 0; e < cv.lengths[rowId]; ++e) { - element.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); - } + public void nonNullWrite(int rowId, List data, ColumnVector output) { + List value = (List) data; + ListColumnVector cv = (ListColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = value.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.child.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + element.write((int) (e + cv.offsets[rowId]), value.get(e), cv.child); } } } @@ -535,33 +391,27 @@ public Class getJavaClass() { @Override @SuppressWarnings("unchecked") - public void addValue(int rowId, Map data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - Map map = (Map) data; - List keys = Lists.newArrayListWithExpectedSize(map.size()); - List values = Lists.newArrayListWithExpectedSize(map.size()); - for (Map.Entry entry : map.entrySet()) { - keys.add(entry.getKey()); - values.add(entry.getValue()); - } - MapColumnVector cv = (MapColumnVector) output; - // record the length and start of the list elements - cv.lengths[rowId] = map.size(); - cv.offsets[rowId] = cv.childCount; - cv.childCount += cv.lengths[rowId]; - // make sure the child is big enough - cv.keys.ensureSize(cv.childCount, true); - cv.values.ensureSize(cv.childCount, true); - // Add each element - for (int e = 0; e < cv.lengths[rowId]; ++e) { - int pos = (int) (e + cv.offsets[rowId]); - keyWriter.addValue(pos, keys.get(e), cv.keys); - valueWriter.addValue(pos, values.get(e), cv.values); - } + public void nonNullWrite(int rowId, Map data, ColumnVector output) { + Map map = (Map) data; + List keys = Lists.newArrayListWithExpectedSize(map.size()); + List values = Lists.newArrayListWithExpectedSize(map.size()); + for (Map.Entry entry : map.entrySet()) { + keys.add(entry.getKey()); + values.add(entry.getValue()); + } + MapColumnVector cv = (MapColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = map.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.keys.ensureSize(cv.childCount, true); + cv.values.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + int pos = (int) (e + cv.offsets[rowId]); + keyWriter.write(pos, keys.get(e), cv.keys); + valueWriter.write(pos, values.get(e), cv.values); } } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index 3a6cac644302..4151e12f302e 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -32,5 +32,15 @@ public interface OrcValueWriter { * @param data the data value to write. * @param output the ColumnVector to put the value into */ - void addValue(int rowId, T data, ColumnVector output); + default void write(int rowId, T data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + nonNullWrite(rowId, data, output); + } + } + + void nonNullWrite(int rowId, T data, ColumnVector output); } From cd5580b2b62c1f70eef34749a3a6235f1d4bc11f Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 14 Jul 2020 15:00:19 +0800 Subject: [PATCH 05/15] Fixed broken unit tests. --- .../java/org/apache/iceberg/spark/source/TestFilteredScan.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 9be99383873f..7ec49890576d 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -209,7 +209,7 @@ public void writeUnpartitionedTable() throws IOException { case ORC: try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) + .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(tableSchema, typeDesc)) .schema(tableSchema) .build()) { writer.addAll(records); From e29467aaa1644ea3ec33bc0fe4455e9a48bffda0 Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 14 Jul 2020 16:12:21 +0800 Subject: [PATCH 06/15] Minor fix: rename the OrcFileAppender#newOrcValueWriter to OrcFileAppender#newOrcRowWriter. --- orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index 6c457ec77b3c..35cde45b87f7 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -73,7 +73,7 @@ class OrcFileAppender implements FileAppender { } options.setSchema(orcSchema); this.writer = newOrcWriter(file, options, metadata); - this.valueWriter = newOrcValueWriter(orcSchema, createWriterFunc); + this.valueWriter = newOrcRowWriter(orcSchema, createWriterFunc); } @Override @@ -146,7 +146,7 @@ private static Writer newOrcWriter(OutputFile file, } @SuppressWarnings("unchecked") - private static OrcRowWriter newOrcValueWriter( + private static OrcRowWriter newOrcRowWriter( TypeDescription schema, Function> createWriterFunc) { return (OrcRowWriter) createWriterFunc.apply(schema); } From 4fce2d06b9dce13b5759d0686c9cec5b0ab2c852 Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 14 Jul 2020 19:14:19 +0800 Subject: [PATCH 07/15] align the class name of writers and use the iceberg types in switch-cases. --- .../iceberg/data/orc/GenericOrcWriter.java | 64 ++++++++----------- .../iceberg/data/orc/GenericOrcWriters.java | 20 +++--- 2 files changed, 35 insertions(+), 49 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 6ed77936c9a9..4f1f7bec11f5 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -55,7 +55,7 @@ private WriteBuilder() { @Override public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, List names, List fields) { - return new RecordOrcValueWriter(fields); + return new RecordWriter(fields); } @Override @@ -72,55 +72,41 @@ public OrcValueWriter map(Types.MapType iMap, TypeDescription map, @Override public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { - switch (primitive.getCategory()) { + switch (iPrimitive.typeId()) { case BOOLEAN: return GenericOrcWriters.booleans(); - case BYTE: - throw new IllegalArgumentException("Iceberg does not have a byte type"); - case SHORT: - throw new IllegalArgumentException("Iceberg does not have a short type."); - case INT: + case INTEGER: return GenericOrcWriters.ints(); case LONG: - switch (iPrimitive.typeId()) { - case TIME: - return GenericOrcWriters.times(); - case LONG: - return GenericOrcWriters.longs(); - default: - throw new IllegalArgumentException( - String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); - } + return GenericOrcWriters.longs(); case FLOAT: return GenericOrcWriters.floats(); case DOUBLE: return GenericOrcWriters.doubles(); case DATE: return GenericOrcWriters.dates(); + case TIME: + return GenericOrcWriters.times(); case TIMESTAMP: - return GenericOrcWriters.timestamp(); - case TIMESTAMP_INSTANT: - return GenericOrcWriters.timestampTz(); - case DECIMAL: - return GenericOrcWriters.decimal(primitive.getScale(), primitive.getPrecision()); - case CHAR: - case VARCHAR: + Types.TimestampType timestampType = (Types.TimestampType) iPrimitive; + if (timestampType.shouldAdjustToUTC()) { + return GenericOrcWriters.timestampTz(); + } else { + return GenericOrcWriters.timestamp(); + } case STRING: return GenericOrcWriters.strings(); + case UUID: + return GenericOrcWriters.uuids(); + case FIXED: + return GenericOrcWriters.fixed(); case BINARY: - switch (iPrimitive.typeId()) { - case UUID: - return GenericOrcWriters.uuids(); - case FIXED: - return GenericOrcWriters.fixed(); - case BINARY: - return GenericOrcWriters.binary(); - default: - throw new IllegalArgumentException( - String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); - } + return GenericOrcWriters.byteBuffers(); + case DECIMAL: + Types.DecimalType decimalType = (Types.DecimalType) iPrimitive; + return GenericOrcWriters.decimal(decimalType.scale(), decimalType.precision()); default: - throw new IllegalArgumentException("Unhandled type " + primitive); + throw new IllegalArgumentException(String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); } } } @@ -128,22 +114,22 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription p @SuppressWarnings("unchecked") @Override public void write(Record value, VectorizedRowBatch output) { - Preconditions.checkArgument(writer instanceof RecordOrcValueWriter, + Preconditions.checkArgument(writer instanceof RecordWriter, "Converter must be a RecordConverter."); int row = output.size; output.size += 1; - List writers = ((RecordOrcValueWriter) writer).writers(); + List writers = ((RecordWriter) writer).writers(); for (int c = 0; c < writers.size(); ++c) { OrcValueWriter child = writers.get(c); child.write(row, value.get(c, child.getJavaClass()), output.cols[c]); } } - private static class RecordOrcValueWriter implements OrcValueWriter { + private static class RecordWriter implements OrcValueWriter { private final List writers; - RecordOrcValueWriter(List writers) { + RecordWriter(List writers) { this.writers = writers; } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 62185b0e4f15..5a7524cbb176 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -79,8 +79,8 @@ public static OrcValueWriter strings() { return StringWriter.INSTANCE; } - public static OrcValueWriter binary() { - return BytesWriter.INSTANCE; + public static OrcValueWriter byteBuffers() { + return ByteBufferWriter.INSTANCE; } public static OrcValueWriter uuids() { @@ -100,12 +100,12 @@ public static OrcValueWriter timestampTz() { } public static OrcValueWriter timestamp() { - return TimestampOrcValueWriter.INSTANCE; + return TimestampWriter.INSTANCE; } public static OrcValueWriter decimal(int scala, int precision) { if (precision <= 18) { - return new DecimalWriter(scala); + return new Decimal18Writer(scala); } else { return Decimal38Writer.INSTANCE; } @@ -218,8 +218,8 @@ public void nonNullWrite(int rowId, String data, ColumnVector output) { } } - private static class BytesWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new BytesWriter(); + private static class ByteBufferWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ByteBufferWriter(); @Override public Class getJavaClass() { @@ -293,8 +293,8 @@ public void nonNullWrite(int rowId, OffsetDateTime data, ColumnVector output) { } } - private static class TimestampOrcValueWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new TimestampOrcValueWriter(); + private static class TimestampWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimestampWriter(); @Override public Class getJavaClass() { @@ -310,10 +310,10 @@ public void nonNullWrite(int rowId, LocalDateTime data, ColumnVector output) { } } - private static class DecimalWriter implements OrcValueWriter { + private static class Decimal18Writer implements OrcValueWriter { private final int scale; - DecimalWriter(int scale) { + Decimal18Writer(int scale) { this.scale = scale; } From c6db2270aee00e77226c210aa8b45ff97035d153 Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 14 Jul 2020 20:11:45 +0800 Subject: [PATCH 08/15] Fix checkstyle --- .../java/org/apache/iceberg/data/orc/GenericOrcWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 4f1f7bec11f5..7c83c02eb83d 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -106,7 +106,8 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription p Types.DecimalType decimalType = (Types.DecimalType) iPrimitive; return GenericOrcWriters.decimal(decimalType.scale(), decimalType.precision()); default: - throw new IllegalArgumentException(String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); + throw new IllegalArgumentException(String.format("Invalid iceberg type %s corresponding to ORC type %s", + iPrimitive, primitive)); } } } From a19855fc3f34944eabe5d79741297885573d0383 Mon Sep 17 00:00:00 2001 From: openinx Date: Thu, 16 Jul 2020 10:49:32 +0800 Subject: [PATCH 09/15] Addressing the comment. --- .../java/org/apache/iceberg/spark/data/SparkOrcWriter.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index e797bdddccdb..0361fdc1c0c8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -65,10 +65,9 @@ public void write(InternalRow value, VectorizedRowBatch output) { interface Converter { /** * Take a value from the Spark data value and add it to the ORC output. - * - * @param rowId the row in the ColumnVector + * @param rowId the row in the ColumnVector * @param column either the column number or element number - * @param data either an InternalRow or ArrayData + * @param data either an InternalRow or ArrayData * @param output the ColumnVector to put the value into */ void addValue(int rowId, int column, SpecializedGetters data, From c147747e3a8e3193841c7a2a7bdafaf682083824 Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 21 Jul 2020 11:05:38 +0800 Subject: [PATCH 10/15] return the OrcValueWriter with a type. --- .../iceberg/data/orc/GenericOrcWriter.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 7c83c02eb83d..d345563392b8 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -48,13 +48,13 @@ public static OrcRowWriter buildWriter(Schema expectedSchema, TypeDescri return new GenericOrcWriter(expectedSchema, fileSchema); } - private static class WriteBuilder extends OrcSchemaWithTypeVisitor { + private static class WriteBuilder extends OrcSchemaWithTypeVisitor> { private WriteBuilder() { } @Override public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, - List names, List fields) { + List names, List> fields) { return new RecordWriter(fields); } @@ -71,7 +71,7 @@ public OrcValueWriter map(Types.MapType iMap, TypeDescription map, } @Override - public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { switch (iPrimitive.typeId()) { case BOOLEAN: return GenericOrcWriters.booleans(); @@ -112,15 +112,15 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription p } } - @SuppressWarnings("unchecked") @Override + @SuppressWarnings("unchecked") public void write(Record value, VectorizedRowBatch output) { Preconditions.checkArgument(writer instanceof RecordWriter, "Converter must be a RecordConverter."); int row = output.size; output.size += 1; - List writers = ((RecordWriter) writer).writers(); + List> writers = ((RecordWriter) writer).writers(); for (int c = 0; c < writers.size(); ++c) { OrcValueWriter child = writers.get(c); child.write(row, value.get(c, child.getJavaClass()), output.cols[c]); @@ -128,13 +128,13 @@ public void write(Record value, VectorizedRowBatch output) { } private static class RecordWriter implements OrcValueWriter { - private final List writers; + private final List> writers; - RecordWriter(List writers) { + RecordWriter(List> writers) { this.writers = writers; } - List writers() { + List> writers() { return writers; } From 6b6bb4ecac273bfbadf8d3eebdb1fb7759149c30 Mon Sep 17 00:00:00 2001 From: openinx Date: Tue, 21 Jul 2020 11:22:23 +0800 Subject: [PATCH 11/15] typo --- .../java/org/apache/iceberg/data/orc/GenericOrcWriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index d345563392b8..1bf0614eeb41 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -115,8 +115,7 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio @Override @SuppressWarnings("unchecked") public void write(Record value, VectorizedRowBatch output) { - Preconditions.checkArgument(writer instanceof RecordWriter, - "Converter must be a RecordConverter."); + Preconditions.checkArgument(writer instanceof RecordWriter, "writer must be a RecordWriter."); int row = output.size; output.size += 1; From 7a021e318ed089f5dac9143e97e48ab444e0af02 Mon Sep 17 00:00:00 2001 From: openinx Date: Wed, 22 Jul 2020 11:25:24 +0800 Subject: [PATCH 12/15] Provide the parameter type for ListWriter and MapWriter --- .../iceberg/data/orc/GenericOrcWriter.java | 9 ++--- .../iceberg/data/orc/GenericOrcWriters.java | 40 +++++++++---------- .../apache/iceberg/orc/OrcValueWriter.java | 2 +- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 1bf0614eeb41..1ed013abe894 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -20,7 +20,6 @@ package org.apache.iceberg.data.orc; import java.util.List; -import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; import org.apache.iceberg.orc.OrcRowWriter; @@ -59,14 +58,14 @@ public OrcValueWriter record(Types.StructType iStruct, TypeDescription r } @Override - public OrcValueWriter list(Types.ListType iList, TypeDescription array, - OrcValueWriter element) { + public OrcValueWriter list(Types.ListType iList, TypeDescription array, + OrcValueWriter element) { return GenericOrcWriters.list(element); } @Override - public OrcValueWriter map(Types.MapType iMap, TypeDescription map, - OrcValueWriter key, OrcValueWriter value) { + public OrcValueWriter map(Types.MapType iMap, TypeDescription map, + OrcValueWriter key, OrcValueWriter value) { return GenericOrcWriters.map(key, value); } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 5a7524cbb176..5a1f6914ee04 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -111,12 +111,12 @@ public static OrcValueWriter decimal(int scala, int precision) { } } - public static OrcValueWriter list(OrcValueWriter element) { - return new ListWriter(element); + public static OrcValueWriter> list(OrcValueWriter element) { + return new ListWriter<>(element); } - public static OrcValueWriter map(OrcValueWriter key, OrcValueWriter value) { - return new MapWriter(key, value); + public static OrcValueWriter> map(OrcValueWriter key, OrcValueWriter value) { + return new MapWriter<>(key, value); } private static class BooleanWriter implements OrcValueWriter { @@ -345,22 +345,20 @@ public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) { } } - private static class ListWriter implements OrcValueWriter { - private final OrcValueWriter element; + private static class ListWriter implements OrcValueWriter> { + private final OrcValueWriter element; - ListWriter(OrcValueWriter element) { + ListWriter(OrcValueWriter element) { this.element = element; } @Override - public Class getJavaClass() { + public Class getJavaClass() { return List.class; } @Override - @SuppressWarnings("unchecked") - public void nonNullWrite(int rowId, List data, ColumnVector output) { - List value = (List) data; + public void nonNullWrite(int rowId, List value, ColumnVector output) { ListColumnVector cv = (ListColumnVector) output; // record the length and start of the list elements cv.lengths[rowId] = value.size(); @@ -375,27 +373,25 @@ public void nonNullWrite(int rowId, List data, ColumnVector output) { } } - private static class MapWriter implements OrcValueWriter { - private final OrcValueWriter keyWriter; - private final OrcValueWriter valueWriter; + private static class MapWriter implements OrcValueWriter> { + private final OrcValueWriter keyWriter; + private final OrcValueWriter valueWriter; - MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter) { + MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter) { this.keyWriter = keyWriter; this.valueWriter = valueWriter; } @Override - public Class getJavaClass() { + public Class getJavaClass() { return Map.class; } @Override - @SuppressWarnings("unchecked") - public void nonNullWrite(int rowId, Map data, ColumnVector output) { - Map map = (Map) data; - List keys = Lists.newArrayListWithExpectedSize(map.size()); - List values = Lists.newArrayListWithExpectedSize(map.size()); - for (Map.Entry entry : map.entrySet()) { + public void nonNullWrite(int rowId, Map map, ColumnVector output) { + List keys = Lists.newArrayListWithExpectedSize(map.size()); + List values = Lists.newArrayListWithExpectedSize(map.size()); + for (Map.Entry entry : map.entrySet()) { keys.add(entry.getKey()); values.add(entry.getValue()); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index 4151e12f302e..9bbc1ddc6f0c 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -23,7 +23,7 @@ public interface OrcValueWriter { - Class getJavaClass(); + Class getJavaClass(); /** * Take a value from the data value and add it to the ORC output. From c37a207308993219960a7867d26428a42a38f585 Mon Sep 17 00:00:00 2001 From: openinx Date: Wed, 22 Jul 2020 11:46:22 +0800 Subject: [PATCH 13/15] User the BiFunction instead of Function to reduce code changes. --- .../org/apache/iceberg/data/TestLocalScan.java | 2 +- .../data/TestMetricsRowGroupFilter.java | 2 +- .../data/TestMetricsRowGroupFilterTypes.java | 2 +- .../iceberg/data/orc/TestGenericData.java | 4 ++-- .../data/orc/TestGenericReadProjection.java | 2 +- .../org/apache/iceberg/orc/TestOrcMetrics.java | 2 +- .../apache/iceberg/mr/mapred/TestHelpers.java | 2 +- .../mr/mapreduce/TestIcebergInputFormat.java | 2 +- .../main/java/org/apache/iceberg/orc/ORC.java | 7 ++++--- .../apache/iceberg/orc/OrcFileAppender.java | 18 +++++++++--------- .../spark/source/SparkAppenderFactory.java | 2 +- .../iceberg/spark/data/TestOrcWrite.java | 2 +- .../iceberg/spark/data/TestSparkOrcReader.java | 2 +- .../spark/source/TestSparkReadProjection.java | 2 +- .../iceberg/spark/source/TestFilteredScan.java | 2 +- .../iceberg/spark/source/TestFilteredScan.java | 2 +- 16 files changed, 28 insertions(+), 27 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index 1d6516a501bf..84a58c505ecc 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -436,7 +436,7 @@ private DataFile writeFile(String location, String filename, Schema schema, List case ORC: FileAppender orcAppender = ORC.write(fromPath(path, CONF)) .schema(schema) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(schema, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build(); try { orcAppender.addAll(records); diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java index 1bf6ebf045c5..a0e37e7f6637 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java +++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java @@ -172,7 +172,7 @@ public void createOrcInputFile() throws IOException { OutputFile outFile = Files.localOutput(orcFile); try (FileAppender appender = ORC.write(outFile) .schema(FILE_SCHEMA) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(FILE_SCHEMA, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build()) { GenericRecord record = GenericRecord.create(FILE_SCHEMA); // create 50 records diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java index 12a0695d895c..3884a411f85e 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java +++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java @@ -180,7 +180,7 @@ public void createOrcInputFile(List records) throws IOException { OutputFile outFile = Files.localOutput(ORC_FILE); try (FileAppender appender = ORC.write(outFile) .schema(FILE_SCHEMA) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(FILE_SCHEMA, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build()) { appender.addAll(records); } diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java index 7cfd6379c710..a18ef5f9d427 100644 --- a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java @@ -92,7 +92,7 @@ public void writeAndValidateTimestamps() throws IOException { try (FileAppender writer = ORC.write(Files.localOutput(testFile)) .schema(timestampSchema) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(timestampSchema, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build()) { writer.add(record1); writer.add(record2); @@ -129,7 +129,7 @@ private void writeAndValidateRecords(Schema schema, List expected) throw try (FileAppender writer = ORC.write(Files.localOutput(testFile)) .schema(schema) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(schema, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build()) { for (Record rec : expected) { writer.add(rec); diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java index 8809b1ca5bfa..1aab27dbedb4 100644 --- a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericReadProjection.java @@ -40,7 +40,7 @@ protected Record writeAndRead(String desc, try (FileAppender appender = ORC.write(Files.localOutput(file)) .schema(writeSchema) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(writeSchema, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build()) { appender.add(record); } diff --git a/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java b/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java index 6e352ec4297b..82ef54b58be1 100644 --- a/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java +++ b/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java @@ -79,7 +79,7 @@ private InputFile writeRecords(Schema schema, Map properties, Re try (FileAppender writer = ORC.write(file) .schema(schema) .setAll(properties) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(schema, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build()) { writer.addAll(Lists.newArrayList(records)); } diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java index c563ab02d5ef..0d8cdc036e7f 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java +++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java @@ -74,7 +74,7 @@ public static DataFile writeFile(File targetFile, Table table, StructLike partit case ORC: appender = ORC.write(Files.localOutput(targetFile)) .schema(table.schema()) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(table.schema(), typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build(); break; default: diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java index fdf4be3e08e5..05842f80e6bd 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java +++ b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java @@ -531,7 +531,7 @@ private DataFile writeFile( case ORC: appender = ORC.write(Files.localOutput(file)) .schema(table.schema()) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(table.schema(), typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .build(); break; default: diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 8e67aca15c4d..5b6f6ea6a375 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -59,7 +60,7 @@ public static class WriteBuilder { private final OutputFile file; private final Configuration conf; private Schema schema = null; - private Function> createWriterFunc; + private BiFunction> createWriterFunc; private Map metadata = new HashMap<>(); private WriteBuilder(OutputFile file) { @@ -81,7 +82,7 @@ public WriteBuilder config(String property, String value) { return this; } - public WriteBuilder createWriterFunc(Function> writerFunction) { + public WriteBuilder createWriterFunc(BiFunction> writerFunction) { this.createWriterFunc = writerFunction; return this; } @@ -143,7 +144,7 @@ private ReadBuilder(InputFile file) { /** * Restricts the read to the given range: [start, start + length). * - * @param newStart the start position for this read + * @param newStart the start position for this read * @param newLength the length of the range this read should scan * @return this builder for method chaining */ diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index 35cde45b87f7..6b544ec6596f 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -24,7 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.function.Function; +import java.util.function.BiFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Metrics; @@ -47,7 +47,6 @@ */ class OrcFileAppender implements FileAppender { private final int batchSize; - private final Schema schema; private final OutputFile file; private final Writer writer; private final VectorizedRowBatch batch; @@ -56,15 +55,14 @@ class OrcFileAppender implements FileAppender { private final Configuration conf; OrcFileAppender(Schema schema, OutputFile file, - Function> createWriterFunc, + BiFunction> createWriterFunc, Configuration conf, Map metadata, int batchSize) { this.conf = conf; this.file = file; this.batchSize = batchSize; - this.schema = schema; - TypeDescription orcSchema = ORCSchemaUtil.convert(this.schema); + TypeDescription orcSchema = ORCSchemaUtil.convert(schema); this.batch = orcSchema.createRowBatch(this.batchSize); OrcFile.WriterOptions options = OrcFile.writerOptions(conf).useUTCTimestamp(true); @@ -73,7 +71,7 @@ class OrcFileAppender implements FileAppender { } options.setSchema(orcSchema); this.writer = newOrcWriter(file, options, metadata); - this.valueWriter = newOrcRowWriter(orcSchema, createWriterFunc); + this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc); } @Override @@ -146,8 +144,10 @@ private static Writer newOrcWriter(OutputFile file, } @SuppressWarnings("unchecked") - private static OrcRowWriter newOrcRowWriter( - TypeDescription schema, Function> createWriterFunc) { - return (OrcRowWriter) createWriterFunc.apply(schema); + private static OrcRowWriter newOrcRowWriter(Schema schema, + TypeDescription orcSchema, + BiFunction> + createWriterFunc) { + return (OrcRowWriter) createWriterFunc.apply(schema, orcSchema); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index e29c6eb319dc..d7b271e82396 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -70,7 +70,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor case ORC: return ORC.write(file) - .createWriterFunc(SparkOrcWriter::new) + .createWriterFunc((schema, typeDesc) -> new SparkOrcWriter(typeDesc)) .setAll(properties) .schema(writeSchema) .overwrite() diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestOrcWrite.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestOrcWrite.java index 7cf9b9c736c6..2c514521da80 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestOrcWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestOrcWrite.java @@ -50,7 +50,7 @@ public void splitOffsets() throws IOException { Iterable rows = RandomData.generateSpark(SCHEMA, 1, 0L); FileAppender writer = ORC.write(Files.localOutput(testFile)) - .createWriterFunc(SparkOrcWriter::new) + .createWriterFunc((schema, typeDesc) -> new SparkOrcWriter(typeDesc)) .schema(SCHEMA) .build(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java index 5042d1cc1338..5822c2ebe347 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java @@ -67,7 +67,7 @@ private void writeAndValidateRecords(Schema schema, Iterable expect Assert.assertTrue("Delete should succeed", testFile.delete()); try (FileAppender writer = ORC.write(Files.localOutput(testFile)) - .createWriterFunc(SparkOrcWriter::new) + .createWriterFunc((icebergSchema, typeDesc) -> new SparkOrcWriter(typeDesc)) .schema(schema) .build()) { writer.addAll(expected); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java index 8e74e56a45b1..ac64fa952c71 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -132,7 +132,7 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema case ORC: try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(tableSchema, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .schema(tableSchema) .build()) { writer.add(record); diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 72cb7d30de17..0d45179b315c 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -212,7 +212,7 @@ public void writeUnpartitionedTable() throws IOException { case ORC: try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(tableSchema, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .schema(tableSchema) .build()) { writer.addAll(records); diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 7ec49890576d..9be99383873f 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -209,7 +209,7 @@ public void writeUnpartitionedTable() throws IOException { case ORC: try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(typeDesc -> GenericOrcWriter.buildWriter(tableSchema, typeDesc)) + .createWriterFunc(GenericOrcWriter::buildWriter) .schema(tableSchema) .build()) { writer.addAll(records); From d5fcd984ba84ae678e3e12060da092b6c6aaf05b Mon Sep 17 00:00:00 2001 From: openinx Date: Wed, 22 Jul 2020 12:01:51 +0800 Subject: [PATCH 14/15] Rebase and fix the broken unit tests. --- .../iceberg/spark/data/TestSparkOrcReadMetadataColumns.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java index 03ea3c443df9..fdb378335890 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java @@ -119,7 +119,7 @@ public void writeFile() throws IOException { Assert.assertTrue("Delete should succeed", testFile.delete()); try (FileAppender writer = ORC.write(Files.localOutput(testFile)) - .createWriterFunc(SparkOrcWriter::new) + .createWriterFunc((icebergSchema, typeDesc) -> new SparkOrcWriter(typeDesc)) .schema(DATA_SCHEMA) // write in such a way that the file contains 10 stripes each with 100 rows .config("iceberg.orc.vectorbatch.size", "100") From e820e89183a7b812b3f1899e7cf3128d7228c59f Mon Sep 17 00:00:00 2001 From: openinx Date: Thu, 23 Jul 2020 15:58:36 +0800 Subject: [PATCH 15/15] Addressing the comments from rdsr --- .../java/org/apache/iceberg/data/orc/GenericOrcWriters.java | 4 ++-- orc/src/main/java/org/apache/iceberg/orc/ORC.java | 2 +- orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 5a1f6914ee04..6103c1e3e8b7 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -103,9 +103,9 @@ public static OrcValueWriter timestamp() { return TimestampWriter.INSTANCE; } - public static OrcValueWriter decimal(int scala, int precision) { + public static OrcValueWriter decimal(int scale, int precision) { if (precision <= 18) { - return new Decimal18Writer(scala); + return new Decimal18Writer(scale); } else { return Decimal38Writer.INSTANCE; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 5b6f6ea6a375..124cd5f1b61f 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -144,7 +144,7 @@ private ReadBuilder(InputFile file) { /** * Restricts the read to the given range: [start, start + length). * - * @param newStart the start position for this read + * @param newStart the start position for this read * @param newLength the length of the range this read should scan * @return this builder for method chaining */ diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java index ddcd4280bf0d..df494b9cc3e1 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java @@ -28,7 +28,7 @@ public interface OrcRowWriter { /** - * Writes the row data. + * Writes or appends a row to ORC's VectorizedRowBatch. * * @param row the row data value to write. * @param output the VectorizedRowBatch to which the output will be written.