diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 1bf466df5cb3..1ed013abe894 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -19,412 +19,121 @@ package org.apache.iceberg.data.orc; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; -import java.util.Map; -import java.util.UUID; +import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; -import org.apache.iceberg.orc.ORCSchemaUtil; +import org.apache.iceberg.orc.OrcRowWriter; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.orc.OrcValueWriter; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; -import org.apache.orc.storage.common.type.HiveDecimal; -import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; -import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; -import org.apache.orc.storage.ql.exec.vector.ListColumnVector; -import org.apache.orc.storage.ql.exec.vector.LongColumnVector; -import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.StructColumnVector; -import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -public class GenericOrcWriter implements OrcValueWriter { - private final Converter[] converters; - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); +public class GenericOrcWriter implements OrcRowWriter { + private final OrcValueWriter writer; - private GenericOrcWriter(TypeDescription schema) { - this.converters = buildConverters(schema); - } - - public static OrcValueWriter buildWriter(TypeDescription fileSchema) { - return new GenericOrcWriter(fileSchema); - } - - @SuppressWarnings("unchecked") - @Override - public void write(Record value, VectorizedRowBatch output) throws IOException { - int row = output.size++; - for (int c = 0; c < converters.length; ++c) { - converters[c].addValue(row, value.get(c, converters[c].getJavaClass()), output.cols[c]); - } - } - - /** - * The interface for the conversion from Spark's SpecializedGetters to - * ORC's ColumnVectors. - */ - interface Converter { - - Class getJavaClass(); - - /** - * Take a value from the Spark data value and add it to the ORC output. - * @param rowId the row in the ColumnVector - * @param data either an InternalRow or ArrayData - * @param output the ColumnVector to put the value into - */ - void addValue(int rowId, T data, ColumnVector output); - } - - static class BooleanConverter implements Converter { - @Override - public Class getJavaClass() { - return Boolean.class; - } - - @Override - public void addValue(int rowId, Boolean data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; - } - } - } - - static class ByteConverter implements Converter { - @Override - public Class getJavaClass() { - return Byte.class; - } - - @Override - public void addValue(int rowId, Byte data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class ShortConverter implements Converter { - @Override - public Class getJavaClass() { - return Short.class; - } - - @Override - public void addValue(int rowId, Short data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class IntConverter implements Converter { - @Override - public Class getJavaClass() { - return Integer.class; - } - - @Override - public void addValue(int rowId, Integer data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class TimeConverter implements Converter { - @Override - public Class getJavaClass() { - return LocalTime.class; - } + private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) { + Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, + "Top level must be a struct " + orcSchema); - @Override - public void addValue(int rowId, LocalTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; - } - } + writer = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); } - static class LongConverter implements Converter { - @Override - public Class getJavaClass() { - return Long.class; - } - - @Override - public void addValue(int rowId, Long data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } + public static OrcRowWriter buildWriter(Schema expectedSchema, TypeDescription fileSchema) { + return new GenericOrcWriter(expectedSchema, fileSchema); } - static class FloatConverter implements Converter { - @Override - public Class getJavaClass() { - return Float.class; + private static class WriteBuilder extends OrcSchemaWithTypeVisitor> { + private WriteBuilder() { } @Override - public void addValue(int rowId, Float data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DoubleColumnVector) output).vector[rowId] = data; - } + public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, + List names, List> fields) { + return new RecordWriter(fields); } - } - static class DoubleConverter implements Converter { @Override - public Class getJavaClass() { - return Double.class; + public OrcValueWriter list(Types.ListType iList, TypeDescription array, + OrcValueWriter element) { + return GenericOrcWriters.list(element); } @Override - public void addValue(int rowId, Double data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DoubleColumnVector) output).vector[rowId] = data; - } + public OrcValueWriter map(Types.MapType iMap, TypeDescription map, + OrcValueWriter key, OrcValueWriter value) { + return GenericOrcWriters.map(key, value); } - } - static class StringConverter implements Converter { @Override - public Class getJavaClass() { - return String.class; - } - - @Override - public void addValue(int rowId, String data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - byte[] value = data.getBytes(StandardCharsets.UTF_8); - ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); - } - } - } - - static class BytesConverter implements Converter { - @Override - public Class getJavaClass() { - return ByteBuffer.class; - } - - @Override - public void addValue(int rowId, ByteBuffer data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length); + public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + switch (iPrimitive.typeId()) { + case BOOLEAN: + return GenericOrcWriters.booleans(); + case INTEGER: + return GenericOrcWriters.ints(); + case LONG: + return GenericOrcWriters.longs(); + case FLOAT: + return GenericOrcWriters.floats(); + case DOUBLE: + return GenericOrcWriters.doubles(); + case DATE: + return GenericOrcWriters.dates(); + case TIME: + return GenericOrcWriters.times(); + case TIMESTAMP: + Types.TimestampType timestampType = (Types.TimestampType) iPrimitive; + if (timestampType.shouldAdjustToUTC()) { + return GenericOrcWriters.timestampTz(); + } else { + return GenericOrcWriters.timestamp(); + } + case STRING: + return GenericOrcWriters.strings(); + case UUID: + return GenericOrcWriters.uuids(); + case FIXED: + return GenericOrcWriters.fixed(); + case BINARY: + return GenericOrcWriters.byteBuffers(); + case DECIMAL: + Types.DecimalType decimalType = (Types.DecimalType) iPrimitive; + return GenericOrcWriters.decimal(decimalType.scale(), decimalType.precision()); + default: + throw new IllegalArgumentException(String.format("Invalid iceberg type %s corresponding to ORC type %s", + iPrimitive, primitive)); } } } - static class UUIDConverter implements Converter { - @Override - public Class getJavaClass() { - return UUID.class; - } - - @Override - public void addValue(int rowId, UUID data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.putLong(data.getMostSignificantBits()); - buffer.putLong(data.getLeastSignificantBits()); - ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); - } - } - } - - static class FixedConverter implements Converter { - @Override - public Class getJavaClass() { - return byte[].class; - } - - @Override - public void addValue(int rowId, byte[] data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); - } - } - } - - static class DateConverter implements Converter { - @Override - public Class getJavaClass() { - return LocalDate.class; - } - - @Override - public void addValue(int rowId, LocalDate data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); - } - } - } - - static class TimestampTzConverter implements Converter { - @Override - public Class getJavaClass() { - return OffsetDateTime.class; - } - - @Override - public void addValue(int rowId, OffsetDateTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - TimestampColumnVector cv = (TimestampColumnVector) output; - cv.time[rowId] = data.toInstant().toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision - } - } - } - - static class TimestampConverter implements Converter { - - @Override - public Class getJavaClass() { - return LocalDateTime.class; - } + @Override + @SuppressWarnings("unchecked") + public void write(Record value, VectorizedRowBatch output) { + Preconditions.checkArgument(writer instanceof RecordWriter, "writer must be a RecordWriter."); - @Override - public void addValue(int rowId, LocalDateTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - TimestampColumnVector cv = (TimestampColumnVector) output; - cv.setIsUTC(true); - cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision - } + int row = output.size; + output.size += 1; + List> writers = ((RecordWriter) writer).writers(); + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter child = writers.get(c); + child.write(row, value.get(c, child.getJavaClass()), output.cols[c]); } } - static class Decimal18Converter implements Converter { - private final int scale; - - Decimal18Converter(TypeDescription schema) { - this.scale = schema.getScale(); - } - - @Override - public Class getJavaClass() { - return BigDecimal.class; - } - - @Override - public void addValue(int rowId, BigDecimal data, ColumnVector output) { - // TODO: validate precision and scale from schema - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DecimalColumnVector) output).vector[rowId] - .setFromLongAndScale(data.unscaledValue().longValueExact(), scale); - } - } - } + private static class RecordWriter implements OrcValueWriter { + private final List> writers; - static class Decimal38Converter implements Converter { - Decimal38Converter(TypeDescription schema) { + RecordWriter(List> writers) { + this.writers = writers; } - @Override - public Class getJavaClass() { - return BigDecimal.class; - } - - @Override - public void addValue(int rowId, BigDecimal data, ColumnVector output) { - // TODO: validate precision and scale from schema - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false)); - } - } - } - - static class StructConverter implements Converter { - private final Converter[] children; - - StructConverter(TypeDescription schema) { - this.children = new Converter[schema.getChildren().size()]; - for (int c = 0; c < children.length; ++c) { - children[c] = buildConverter(schema.getChildren().get(c)); - } + List> writers() { + return writers; } @Override @@ -434,175 +143,12 @@ public Class getJavaClass() { @Override @SuppressWarnings("unchecked") - public void addValue(int rowId, Record data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - StructColumnVector cv = (StructColumnVector) output; - for (int c = 0; c < children.length; ++c) { - children[c].addValue(rowId, data.get(c, children[c].getJavaClass()), cv.fields[c]); - } - } - } - } - - static class ListConverter implements Converter { - private final Converter children; - - ListConverter(TypeDescription schema) { - this.children = buildConverter(schema.getChildren().get(0)); - } - - @Override - public Class getJavaClass() { - return List.class; - } - - @Override - @SuppressWarnings("unchecked") - public void addValue(int rowId, List data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - List value = (List) data; - ListColumnVector cv = (ListColumnVector) output; - // record the length and start of the list elements - cv.lengths[rowId] = value.size(); - cv.offsets[rowId] = cv.childCount; - cv.childCount += cv.lengths[rowId]; - // make sure the child is big enough - cv.child.ensureSize(cv.childCount, true); - // Add each element - for (int e = 0; e < cv.lengths[rowId]; ++e) { - children.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); - } + public void nonNullWrite(int rowId, Record data, ColumnVector output) { + StructColumnVector cv = (StructColumnVector) output; + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter child = writers.get(c); + child.write(rowId, data.get(c, child.getJavaClass()), cv.fields[c]); } } } - - static class MapConverter implements Converter { - private final Converter keyConverter; - private final Converter valueConverter; - - MapConverter(TypeDescription schema) { - this.keyConverter = buildConverter(schema.getChildren().get(0)); - this.valueConverter = buildConverter(schema.getChildren().get(1)); - } - - @Override - public Class getJavaClass() { - return Map.class; - } - - @Override - @SuppressWarnings("unchecked") - public void addValue(int rowId, Map data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - Map map = (Map) data; - List keys = Lists.newArrayListWithExpectedSize(map.size()); - List values = Lists.newArrayListWithExpectedSize(map.size()); - for (Map.Entry entry : map.entrySet()) { - keys.add(entry.getKey()); - values.add(entry.getValue()); - } - MapColumnVector cv = (MapColumnVector) output; - // record the length and start of the list elements - cv.lengths[rowId] = map.size(); - cv.offsets[rowId] = cv.childCount; - cv.childCount += cv.lengths[rowId]; - // make sure the child is big enough - cv.keys.ensureSize(cv.childCount, true); - cv.values.ensureSize(cv.childCount, true); - // Add each element - for (int e = 0; e < cv.lengths[rowId]; ++e) { - int pos = (int) (e + cv.offsets[rowId]); - keyConverter.addValue(pos, keys.get(e), cv.keys); - valueConverter.addValue(pos, values.get(e), cv.values); - } - } - } - } - - private static Converter buildConverter(TypeDescription schema) { - switch (schema.getCategory()) { - case BOOLEAN: - return new BooleanConverter(); - case BYTE: - return new ByteConverter(); - case SHORT: - return new ShortConverter(); - case DATE: - return new DateConverter(); - case INT: - return new IntConverter(); - case LONG: - String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE); - ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG : - ORCSchemaUtil.LongType.valueOf(longAttributeValue); - switch (longType) { - case TIME: - return new TimeConverter(); - case LONG: - return new LongConverter(); - default: - throw new IllegalStateException("Unhandled Long type found in ORC type attribute: " + longType); - } - case FLOAT: - return new FloatConverter(); - case DOUBLE: - return new DoubleConverter(); - case BINARY: - String binaryAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE); - ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? ORCSchemaUtil.BinaryType.BINARY : - ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue); - switch (binaryType) { - case UUID: - return new UUIDConverter(); - case FIXED: - return new FixedConverter(); - case BINARY: - return new BytesConverter(); - default: - throw new IllegalStateException("Unhandled Binary type found in ORC type attribute: " + binaryType); - } - case STRING: - case CHAR: - case VARCHAR: - return new StringConverter(); - case DECIMAL: - return schema.getPrecision() <= 18 ? new Decimal18Converter(schema) : new Decimal38Converter(schema); - case TIMESTAMP: - return new TimestampConverter(); - case TIMESTAMP_INSTANT: - return new TimestampTzConverter(); - case STRUCT: - return new StructConverter(schema); - case LIST: - return new ListConverter(schema); - case MAP: - return new MapConverter(schema); - } - throw new IllegalArgumentException("Unhandled type " + schema); - } - - private static Converter[] buildConverters(TypeDescription schema) { - if (schema.getCategory() != TypeDescription.Category.STRUCT) { - throw new IllegalArgumentException("Top level must be a struct " + schema); - } - - List children = schema.getChildren(); - Converter[] result = new Converter[children.size()]; - for (int c = 0; c < children.size(); ++c) { - result[c] = buildConverter(children.get(c)); - } - return result; - } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java new file mode 100644 index 000000000000..6103c1e3e8b7 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.orc; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; + +public class GenericOrcWriters { + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + private GenericOrcWriters() { + } + + public static OrcValueWriter booleans() { + return BooleanWriter.INSTANCE; + } + + public static OrcValueWriter ints() { + return IntWriter.INSTANCE; + } + + public static OrcValueWriter times() { + return TimeWriter.INSTANCE; + } + + public static OrcValueWriter longs() { + return LongWriter.INSTANCE; + } + + public static OrcValueWriter floats() { + return FloatWriter.INSTANCE; + } + + public static OrcValueWriter doubles() { + return DoubleWriter.INSTANCE; + } + + public static OrcValueWriter strings() { + return StringWriter.INSTANCE; + } + + public static OrcValueWriter byteBuffers() { + return ByteBufferWriter.INSTANCE; + } + + public static OrcValueWriter uuids() { + return UUIDWriter.INSTANCE; + } + + public static OrcValueWriter fixed() { + return FixedWriter.INSTANCE; + } + + public static OrcValueWriter dates() { + return DateWriter.INSTANCE; + } + + public static OrcValueWriter timestampTz() { + return TimestampTzWriter.INSTANCE; + } + + public static OrcValueWriter timestamp() { + return TimestampWriter.INSTANCE; + } + + public static OrcValueWriter decimal(int scale, int precision) { + if (precision <= 18) { + return new Decimal18Writer(scale); + } else { + return Decimal38Writer.INSTANCE; + } + } + + public static OrcValueWriter> list(OrcValueWriter element) { + return new ListWriter<>(element); + } + + public static OrcValueWriter> map(OrcValueWriter key, OrcValueWriter value) { + return new MapWriter<>(key, value); + } + + private static class BooleanWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new BooleanWriter(); + + @Override + public Class getJavaClass() { + return Boolean.class; + } + + @Override + public void nonNullWrite(int rowId, Boolean data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; + } + } + + private static class IntWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new IntWriter(); + + @Override + public Class getJavaClass() { + return Integer.class; + } + + @Override + public void nonNullWrite(int rowId, Integer data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; + } + } + + private static class TimeWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimeWriter(); + + @Override + public Class getJavaClass() { + return LocalTime.class; + } + + @Override + public void nonNullWrite(int rowId, LocalTime data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; + } + } + + private static class LongWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new LongWriter(); + + @Override + public Class getJavaClass() { + return Long.class; + } + + @Override + public void nonNullWrite(int rowId, Long data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; + } + } + + private static class FloatWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new FloatWriter(); + + @Override + public Class getJavaClass() { + return Float.class; + } + + @Override + public void nonNullWrite(int rowId, Float data, ColumnVector output) { + ((DoubleColumnVector) output).vector[rowId] = data; + } + } + + private static class DoubleWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new DoubleWriter(); + + @Override + public Class getJavaClass() { + return Double.class; + } + + @Override + public void nonNullWrite(int rowId, Double data, ColumnVector output) { + ((DoubleColumnVector) output).vector[rowId] = data; + } + } + + private static class StringWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new StringWriter(); + + @Override + public Class getJavaClass() { + return String.class; + } + + @Override + public void nonNullWrite(int rowId, String data, ColumnVector output) { + byte[] value = data.getBytes(StandardCharsets.UTF_8); + ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); + } + } + + private static class ByteBufferWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ByteBufferWriter(); + + @Override + public Class getJavaClass() { + return ByteBuffer.class; + } + + @Override + public void nonNullWrite(int rowId, ByteBuffer data, ColumnVector output) { + ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length); + } + } + + private static class UUIDWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new UUIDWriter(); + + @Override + public Class getJavaClass() { + return UUID.class; + } + + @Override + public void nonNullWrite(int rowId, UUID data, ColumnVector output) { + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(data.getMostSignificantBits()); + buffer.putLong(data.getLeastSignificantBits()); + ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); + } + } + + private static class FixedWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new FixedWriter(); + + @Override + public Class getJavaClass() { + return byte[].class; + } + + @Override + public void nonNullWrite(int rowId, byte[] data, ColumnVector output) { + ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); + } + } + + private static class DateWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new DateWriter(); + + @Override + public Class getJavaClass() { + return LocalDate.class; + } + + @Override + public void nonNullWrite(int rowId, LocalDate data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); + } + } + + private static class TimestampTzWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimestampTzWriter(); + + @Override + public Class getJavaClass() { + return OffsetDateTime.class; + } + + @Override + public void nonNullWrite(int rowId, OffsetDateTime data, ColumnVector output) { + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.time[rowId] = data.toInstant().toEpochMilli(); // millis + cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision + } + } + + private static class TimestampWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new TimestampWriter(); + + @Override + public Class getJavaClass() { + return LocalDateTime.class; + } + + @Override + public void nonNullWrite(int rowId, LocalDateTime data, ColumnVector output) { + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.setIsUTC(true); + cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis + cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision + } + } + + private static class Decimal18Writer implements OrcValueWriter { + private final int scale; + + Decimal18Writer(int scale) { + this.scale = scale; + } + + @Override + public Class getJavaClass() { + return BigDecimal.class; + } + + @Override + public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) { + // TODO: validate precision and scale from schema + ((DecimalColumnVector) output).vector[rowId] + .setFromLongAndScale(data.unscaledValue().longValueExact(), scale); + } + } + + private static class Decimal38Writer implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new Decimal38Writer(); + + @Override + public Class getJavaClass() { + return BigDecimal.class; + } + + @Override + public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) { + // TODO: validate precision and scale from schema + ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false)); + } + } + + private static class ListWriter implements OrcValueWriter> { + private final OrcValueWriter element; + + ListWriter(OrcValueWriter element) { + this.element = element; + } + + @Override + public Class getJavaClass() { + return List.class; + } + + @Override + public void nonNullWrite(int rowId, List value, ColumnVector output) { + ListColumnVector cv = (ListColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = value.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.child.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + element.write((int) (e + cv.offsets[rowId]), value.get(e), cv.child); + } + } + } + + private static class MapWriter implements OrcValueWriter> { + private final OrcValueWriter keyWriter; + private final OrcValueWriter valueWriter; + + MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter) { + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + } + + @Override + public Class getJavaClass() { + return Map.class; + } + + @Override + public void nonNullWrite(int rowId, Map map, ColumnVector output) { + List keys = Lists.newArrayListWithExpectedSize(map.size()); + List values = Lists.newArrayListWithExpectedSize(map.size()); + for (Map.Entry entry : map.entrySet()) { + keys.add(entry.getKey()); + values.add(entry.getValue()); + } + MapColumnVector cv = (MapColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = map.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.keys.ensureSize(cv.childCount, true); + cv.values.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + int pos = (int) (e + cv.offsets[rowId]); + keyWriter.write(pos, keys.get(e), cv.keys); + valueWriter.write(pos, values.get(e), cv.values); + } + } + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 38dc522fa5cf..124cd5f1b61f 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -59,7 +60,7 @@ public static class WriteBuilder { private final OutputFile file; private final Configuration conf; private Schema schema = null; - private Function> createWriterFunc; + private BiFunction> createWriterFunc; private Map metadata = new HashMap<>(); private WriteBuilder(OutputFile file) { @@ -81,7 +82,7 @@ public WriteBuilder config(String property, String value) { return this; } - public WriteBuilder createWriterFunc(Function> writerFunction) { + public WriteBuilder createWriterFunc(BiFunction> writerFunction) { this.createWriterFunc = writerFunction; return this; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index acc48550688b..6b544ec6596f 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -24,7 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.function.Function; +import java.util.function.BiFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Metrics; @@ -47,24 +47,22 @@ */ class OrcFileAppender implements FileAppender { private final int batchSize; - private final Schema schema; private final OutputFile file; private final Writer writer; private final VectorizedRowBatch batch; - private final OrcValueWriter valueWriter; + private final OrcRowWriter valueWriter; private boolean isClosed = false; private final Configuration conf; OrcFileAppender(Schema schema, OutputFile file, - Function> createWriterFunc, + BiFunction> createWriterFunc, Configuration conf, Map metadata, int batchSize) { this.conf = conf; this.file = file; this.batchSize = batchSize; - this.schema = schema; - TypeDescription orcSchema = ORCSchemaUtil.convert(this.schema); + TypeDescription orcSchema = ORCSchemaUtil.convert(schema); this.batch = orcSchema.createRowBatch(this.batchSize); OrcFile.WriterOptions options = OrcFile.writerOptions(conf).useUTCTimestamp(true); @@ -73,7 +71,7 @@ class OrcFileAppender implements FileAppender { } options.setSchema(orcSchema); this.writer = newOrcWriter(file, options, metadata); - this.valueWriter = newOrcValueWriter(orcSchema, createWriterFunc); + this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc); } @Override @@ -146,8 +144,10 @@ private static Writer newOrcWriter(OutputFile file, } @SuppressWarnings("unchecked") - private static OrcValueWriter newOrcValueWriter( - TypeDescription schema, Function> createWriterFunc) { - return (OrcValueWriter) createWriterFunc.apply(schema); + private static OrcRowWriter newOrcRowWriter(Schema schema, + TypeDescription orcSchema, + BiFunction> + createWriterFunc) { + return (OrcRowWriter) createWriterFunc.apply(schema, orcSchema); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java new file mode 100644 index 000000000000..df494b9cc3e1 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.io.IOException; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +/** + * Write data value of a schema. + */ +public interface OrcRowWriter { + + /** + * Writes or appends a row to ORC's VectorizedRowBatch. + * + * @param row the row data value to write. + * @param output the VectorizedRowBatch to which the output will be written. + * @throws IOException if there's any IO error while writing the data value. + */ + void write(T row, VectorizedRowBatch output) throws IOException; +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index 74d2fddddcb0..9bbc1ddc6f0c 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -19,20 +19,28 @@ package org.apache.iceberg.orc; -import java.io.IOException; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; -/** - * Write data value of a schema. - */ public interface OrcValueWriter { + Class getJavaClass(); + /** - * Writes the data. + * Take a value from the data value and add it to the ORC output. * - * @param value the data value to write. - * @param output the VectorizedRowBatch to which the output will be written. - * @throws IOException if there's any IO error while writing the data value. + * @param rowId the row in the ColumnVector + * @param data the data value to write. + * @param output the ColumnVector to put the value into */ - void write(T value, VectorizedRowBatch output) throws IOException; + default void write(int rowId, T data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + nonNullWrite(rowId, data, output); + } + } + + void nonNullWrite(int rowId, T data, ColumnVector output); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index c27f95898812..0361fdc1c0c8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -20,7 +20,7 @@ package org.apache.iceberg.spark.data; import java.util.List; -import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.orc.OrcRowWriter; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -42,7 +42,7 @@ * This class acts as an adaptor from an OrcFileAppender to a * FileAppender<InternalRow>. */ -public class SparkOrcWriter implements OrcValueWriter { +public class SparkOrcWriter implements OrcRowWriter { private final Converter[] converters; diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index e29c6eb319dc..d7b271e82396 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -70,7 +70,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor case ORC: return ORC.write(file) - .createWriterFunc(SparkOrcWriter::new) + .createWriterFunc((schema, typeDesc) -> new SparkOrcWriter(typeDesc)) .setAll(properties) .schema(writeSchema) .overwrite() diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestOrcWrite.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestOrcWrite.java index 7cf9b9c736c6..2c514521da80 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestOrcWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestOrcWrite.java @@ -50,7 +50,7 @@ public void splitOffsets() throws IOException { Iterable rows = RandomData.generateSpark(SCHEMA, 1, 0L); FileAppender writer = ORC.write(Files.localOutput(testFile)) - .createWriterFunc(SparkOrcWriter::new) + .createWriterFunc((schema, typeDesc) -> new SparkOrcWriter(typeDesc)) .schema(SCHEMA) .build(); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java index 03ea3c443df9..fdb378335890 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java @@ -119,7 +119,7 @@ public void writeFile() throws IOException { Assert.assertTrue("Delete should succeed", testFile.delete()); try (FileAppender writer = ORC.write(Files.localOutput(testFile)) - .createWriterFunc(SparkOrcWriter::new) + .createWriterFunc((icebergSchema, typeDesc) -> new SparkOrcWriter(typeDesc)) .schema(DATA_SCHEMA) // write in such a way that the file contains 10 stripes each with 100 rows .config("iceberg.orc.vectorbatch.size", "100") diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java index 5042d1cc1338..5822c2ebe347 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java @@ -67,7 +67,7 @@ private void writeAndValidateRecords(Schema schema, Iterable expect Assert.assertTrue("Delete should succeed", testFile.delete()); try (FileAppender writer = ORC.write(Files.localOutput(testFile)) - .createWriterFunc(SparkOrcWriter::new) + .createWriterFunc((icebergSchema, typeDesc) -> new SparkOrcWriter(typeDesc)) .schema(schema) .build()) { writer.addAll(expected);