diff --git a/data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java b/data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java new file mode 100644 index 000000000000..38b112683ce1 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/orc/BaseOrcReader.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.orc; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.orc.OrcRowReader; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.orc.OrcValueReader; +import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; + +public abstract class BaseOrcReader implements OrcRowReader { + private final OrcValueReader reader; + + protected BaseOrcReader(org.apache.iceberg.Schema expectedSchema, + TypeDescription readOrcSchema, + Map idToConstant) { + this.reader = OrcSchemaWithTypeVisitor.visit(expectedSchema, readOrcSchema, new ReadBuilder(idToConstant)); + } + + protected OrcValueReader getReader() { + return this.reader; + } + + protected abstract OrcValueReader createStructReader(List> fields, + Types.StructType expected, Map idToConstant); + + private class ReadBuilder extends OrcSchemaWithTypeVisitor> { + private final Map idToConstant; + + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public OrcValueReader record( + Types.StructType expected, TypeDescription record, List names, List> fields) { + return createStructReader(fields, expected, idToConstant); + } + + @Override + public OrcValueReader list(Types.ListType iList, TypeDescription array, OrcValueReader elementReader) { + return GenericOrcReaders.array(elementReader); + } + + @Override + public OrcValueReader map( + Types.MapType iMap, TypeDescription map, OrcValueReader keyReader, OrcValueReader valueReader) { + return GenericOrcReaders.map(keyReader, valueReader); + } + + @Override + public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + switch (primitive.getCategory()) { + case BOOLEAN: + return OrcValueReaders.booleans(); + case BYTE: + // Iceberg does not have a byte type. Use int + case SHORT: + // Iceberg does not have a short type. Use int + case INT: + return OrcValueReaders.ints(); + case LONG: + switch (iPrimitive.typeId()) { + case TIME: + return GenericOrcReaders.times(); + case LONG: + return OrcValueReaders.longs(); + default: + throw new IllegalStateException( + String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); + } + + case FLOAT: + return OrcValueReaders.floats(); + case DOUBLE: + return OrcValueReaders.doubles(); + case DATE: + return GenericOrcReaders.dates(); + case TIMESTAMP: + return GenericOrcReaders.timestamps(); + case TIMESTAMP_INSTANT: + return GenericOrcReaders.timestampTzs(); + case DECIMAL: + return GenericOrcReaders.decimals(); + case CHAR: + case VARCHAR: + case STRING: + return GenericOrcReaders.strings(); + case BINARY: + switch (iPrimitive.typeId()) { + case UUID: + return GenericOrcReaders.uuids(); + case FIXED: + return OrcValueReaders.bytes(); + case BINARY: + return GenericOrcReaders.bytes(); + default: + throw new IllegalStateException( + String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); + } + default: + throw new IllegalArgumentException("Unhandled type " + primitive); + } + } + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/orc/BaseOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/BaseOrcWriter.java new file mode 100644 index 000000000000..d6de7b8a8e0e --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/orc/BaseOrcWriter.java @@ -0,0 +1,567 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data.orc; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.orc.ORCSchemaUtil; +import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; + +public abstract class BaseOrcWriter implements OrcValueWriter { + private final Converter[] converters; + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + protected BaseOrcWriter(TypeDescription schema) { + this.converters = buildConverters(schema); + } + + protected Converter[] getConverters() { + return this.converters; + } + + /** + * The interface for the conversion from Spark's SpecializedGetters to + * ORC's ColumnVectors. + */ + protected interface Converter { + + Class getJavaClass(); + + /** + * Take a value from the Spark data value and add it to the ORC output. + * + * @param rowId the row in the ColumnVector + * @param data either an InternalRow or ArrayData + * @param output the ColumnVector to put the value into + */ + void addValue(int rowId, T data, ColumnVector output); + } + + static class BooleanConverter implements Converter { + @Override + public Class getJavaClass() { + return Boolean.class; + } + + @Override + public void addValue(int rowId, Boolean data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; + } + } + } + + static class ByteConverter implements Converter { + @Override + public Class getJavaClass() { + return Byte.class; + } + + @Override + public void addValue(int rowId, Byte data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + static class ShortConverter implements Converter { + @Override + public Class getJavaClass() { + return Short.class; + } + + @Override + public void addValue(int rowId, Short data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + static class IntConverter implements Converter { + @Override + public Class getJavaClass() { + return Integer.class; + } + + @Override + public void addValue(int rowId, Integer data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + static class TimeConverter implements Converter { + @Override + public Class getJavaClass() { + return LocalTime.class; + } + + @Override + public void addValue(int rowId, LocalTime data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; + } + } + } + + static class LongConverter implements Converter { + @Override + public Class getJavaClass() { + return Long.class; + } + + @Override + public void addValue(int rowId, Long data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = data; + } + } + } + + static class FloatConverter implements Converter { + @Override + public Class getJavaClass() { + return Float.class; + } + + @Override + public void addValue(int rowId, Float data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DoubleColumnVector) output).vector[rowId] = data; + } + } + } + + static class DoubleConverter implements Converter { + @Override + public Class getJavaClass() { + return Double.class; + } + + @Override + public void addValue(int rowId, Double data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DoubleColumnVector) output).vector[rowId] = data; + } + } + } + + static class StringConverter implements Converter { + @Override + public Class getJavaClass() { + return String.class; + } + + @Override + public void addValue(int rowId, String data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + byte[] value = data.getBytes(StandardCharsets.UTF_8); + ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); + } + } + } + + static class BytesConverter implements Converter { + @Override + public Class getJavaClass() { + return ByteBuffer.class; + } + + @Override + public void addValue(int rowId, ByteBuffer data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length); + } + } + } + + static class UUIDConverter implements Converter { + @Override + public Class getJavaClass() { + return UUID.class; + } + + @Override + public void addValue(int rowId, UUID data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(data.getMostSignificantBits()); + buffer.putLong(data.getLeastSignificantBits()); + ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); + } + } + } + + static class FixedConverter implements Converter { + @Override + public Class getJavaClass() { + return byte[].class; + } + + @Override + public void addValue(int rowId, byte[] data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); + } + } + } + + static class DateConverter implements Converter { + @Override + public Class getJavaClass() { + return LocalDate.class; + } + + @Override + public void addValue(int rowId, LocalDate data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); + } + } + } + + static class TimestampTzConverter implements Converter { + @Override + public Class getJavaClass() { + return OffsetDateTime.class; + } + + @Override + public void addValue(int rowId, OffsetDateTime data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.time[rowId] = data.toInstant().toEpochMilli(); // millis + cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision + } + } + } + + static class TimestampConverter implements Converter { + + @Override + public Class getJavaClass() { + return LocalDateTime.class; + } + + @Override + public void addValue(int rowId, LocalDateTime data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.setIsUTC(true); + cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis + cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision + } + } + } + + static class Decimal18Converter implements Converter { + private final int scale; + + Decimal18Converter(TypeDescription schema) { + this.scale = schema.getScale(); + } + + @Override + public Class getJavaClass() { + return BigDecimal.class; + } + + @Override + public void addValue(int rowId, BigDecimal data, ColumnVector output) { + // TODO: validate precision and scale from schema + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DecimalColumnVector) output).vector[rowId] + .setFromLongAndScale(data.unscaledValue().longValueExact(), scale); + } + } + } + + static class Decimal38Converter implements Converter { + Decimal38Converter(TypeDescription schema) { + } + + @Override + public Class getJavaClass() { + return BigDecimal.class; + } + + @Override + public void addValue(int rowId, BigDecimal data, ColumnVector output) { + // TODO: validate precision and scale from schema + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false)); + } + } + } + + protected abstract Converter createStructConverter(TypeDescription schema); + + class ListConverter implements Converter { + private final Converter children; + + ListConverter(TypeDescription schema) { + this.children = buildConverter(schema.getChildren().get(0)); + } + + @Override + public Class getJavaClass() { + return List.class; + } + + @Override + @SuppressWarnings("unchecked") + public void addValue(int rowId, List data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + List value = (List) data; + ListColumnVector cv = (ListColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = value.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.child.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + children.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); + } + } + } + } + + class MapConverter implements Converter { + private final Converter keyConverter; + private final Converter valueConverter; + + MapConverter(TypeDescription schema) { + this.keyConverter = buildConverter(schema.getChildren().get(0)); + this.valueConverter = buildConverter(schema.getChildren().get(1)); + } + + @Override + public Class getJavaClass() { + return Map.class; + } + + @Override + @SuppressWarnings("unchecked") + public void addValue(int rowId, Map data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + Map map = (Map) data; + List keys = Lists.newArrayListWithExpectedSize(map.size()); + List values = Lists.newArrayListWithExpectedSize(map.size()); + for (Map.Entry entry : map.entrySet()) { + keys.add(entry.getKey()); + values.add(entry.getValue()); + } + MapColumnVector cv = (MapColumnVector) output; + // record the length and start of the list elements + cv.lengths[rowId] = map.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.keys.ensureSize(cv.childCount, true); + cv.values.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + int pos = (int) (e + cv.offsets[rowId]); + keyConverter.addValue(pos, keys.get(e), cv.keys); + valueConverter.addValue(pos, values.get(e), cv.values); + } + } + } + } + + protected Converter buildConverter(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + return new ByteConverter(); + case SHORT: + return new ShortConverter(); + case DATE: + return new DateConverter(); + case INT: + return new IntConverter(); + case LONG: + String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE); + ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG : + ORCSchemaUtil.LongType.valueOf(longAttributeValue); + switch (longType) { + case TIME: + return new TimeConverter(); + case LONG: + return new LongConverter(); + default: + throw new IllegalStateException("Unhandled Long type found in ORC type attribute: " + longType); + } + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case BINARY: + String binaryAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE); + ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? ORCSchemaUtil.BinaryType.BINARY : + ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue); + switch (binaryType) { + case UUID: + return new UUIDConverter(); + case FIXED: + return new FixedConverter(); + case BINARY: + return new BytesConverter(); + default: + throw new IllegalStateException("Unhandled Binary type found in ORC type attribute: " + binaryType); + } + case STRING: + case CHAR: + case VARCHAR: + return new StringConverter(); + case DECIMAL: + return schema.getPrecision() <= 18 ? new Decimal18Converter(schema) : new Decimal38Converter(schema); + case TIMESTAMP: + return new TimestampConverter(); + case TIMESTAMP_INSTANT: + return new TimestampTzConverter(); + case STRUCT: + return createStructConverter(schema); + case LIST: + return new ListConverter(schema); + case MAP: + return new MapConverter(schema); + } + throw new IllegalArgumentException("Unhandled type " + schema); + } + + private Converter[] buildConverters(TypeDescription schema) { + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Top level must be a struct " + schema); + } + + List children = schema.getChildren(); + Converter[] result = new Converter[children.size()]; + for (int c = 0; c < children.size(); ++c) { + result[c] = buildConverter(children.get(c)); + } + return result; + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java index e3ca538495be..d52a5987a6c9 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java @@ -25,25 +25,22 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; import org.apache.iceberg.orc.OrcRowReader; -import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.orc.OrcValueReader; -import org.apache.iceberg.orc.OrcValueReaders; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -public class GenericOrcReader implements OrcRowReader { - private final OrcValueReader reader; +public class GenericOrcReader extends BaseOrcReader { - public GenericOrcReader( - org.apache.iceberg.Schema expectedSchema, TypeDescription readOrcSchema, Map idToConstant) { - this.reader = OrcSchemaWithTypeVisitor.visit(expectedSchema, readOrcSchema, new ReadBuilder(idToConstant)); + private GenericOrcReader(org.apache.iceberg.Schema expectedSchema, + TypeDescription readOrcSchema, + Map idToConstant) { + super(expectedSchema, readOrcSchema, idToConstant); } public static OrcRowReader buildReader(Schema expectedSchema, TypeDescription fileSchema) { - return new GenericOrcReader(expectedSchema, fileSchema, Collections.emptyMap()); + return buildReader(expectedSchema, fileSchema, Collections.emptyMap()); } public static OrcRowReader buildReader( @@ -53,86 +50,13 @@ public static OrcRowReader buildReader( @Override public Record read(VectorizedRowBatch batch, int row) { - return (Record) reader.read(new StructColumnVector(batch.size, batch.cols), row); + return (Record) getReader().read(new StructColumnVector(batch.size, batch.cols), row); } - private static class ReadBuilder extends OrcSchemaWithTypeVisitor> { - private final Map idToConstant; - - private ReadBuilder(Map idToConstant) { - this.idToConstant = idToConstant; - } - - @Override - public OrcValueReader record( - Types.StructType expected, TypeDescription record, List names, List> fields) { - return GenericOrcReaders.struct(fields, expected, idToConstant); - } - - @Override - public OrcValueReader list(Types.ListType iList, TypeDescription array, OrcValueReader elementReader) { - return GenericOrcReaders.array(elementReader); - } - - @Override - public OrcValueReader map( - Types.MapType iMap, TypeDescription map, OrcValueReader keyReader, OrcValueReader valueReader) { - return GenericOrcReaders.map(keyReader, valueReader); - } - - @Override - public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { - switch (primitive.getCategory()) { - case BOOLEAN: - return OrcValueReaders.booleans(); - case BYTE: - // Iceberg does not have a byte type. Use int - case SHORT: - // Iceberg does not have a short type. Use int - case INT: - return OrcValueReaders.ints(); - case LONG: - switch (iPrimitive.typeId()) { - case TIME: - return GenericOrcReaders.times(); - case LONG: - return OrcValueReaders.longs(); - default: - throw new IllegalStateException( - String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); - } - - case FLOAT: - return OrcValueReaders.floats(); - case DOUBLE: - return OrcValueReaders.doubles(); - case DATE: - return GenericOrcReaders.dates(); - case TIMESTAMP: - return GenericOrcReaders.timestamps(); - case TIMESTAMP_INSTANT: - return GenericOrcReaders.timestampTzs(); - case DECIMAL: - return GenericOrcReaders.decimals(); - case CHAR: - case VARCHAR: - case STRING: - return GenericOrcReaders.strings(); - case BINARY: - switch (iPrimitive.typeId()) { - case UUID: - return GenericOrcReaders.uuids(); - case FIXED: - return OrcValueReaders.bytes(); - case BINARY: - return GenericOrcReaders.bytes(); - default: - throw new IllegalStateException( - String.format("Invalid iceberg type %s corresponding to ORC type %s", iPrimitive, primitive)); - } - default: - throw new IllegalArgumentException("Unhandled type " + primitive); - } - } + @Override + protected OrcValueReader createStructReader(List> fields, + Types.StructType expected, + Map idToConstant) { + return GenericOrcReaders.struct(fields, expected, idToConstant); } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java index 1a48508a9b36..3593e2b2379d 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java @@ -49,8 +49,6 @@ public class GenericOrcReaders { - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); private GenericOrcReaders() { } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 1bf466df5cb3..593eff57e47d 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -19,44 +19,17 @@ package org.apache.iceberg.data.orc; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; -import java.util.List; -import java.util.Map; -import java.util.UUID; import org.apache.iceberg.data.Record; -import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcValueWriter; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.orc.TypeDescription; -import org.apache.orc.storage.common.type.HiveDecimal; -import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; -import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; -import org.apache.orc.storage.ql.exec.vector.ListColumnVector; -import org.apache.orc.storage.ql.exec.vector.LongColumnVector; -import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.StructColumnVector; -import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; -public class GenericOrcWriter implements OrcValueWriter { - private final Converter[] converters; - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); +public class GenericOrcWriter extends BaseOrcWriter { - private GenericOrcWriter(TypeDescription schema) { - this.converters = buildConverters(schema); + protected GenericOrcWriter(TypeDescription schema) { + super(schema); } public static OrcValueWriter buildWriter(TypeDescription fileSchema) { @@ -65,359 +38,20 @@ public static OrcValueWriter buildWriter(TypeDescription fileSchema) { @SuppressWarnings("unchecked") @Override - public void write(Record value, VectorizedRowBatch output) throws IOException { + public void write(Record value, VectorizedRowBatch output) { int row = output.size++; + Converter[] converters = getConverters(); for (int c = 0; c < converters.length; ++c) { converters[c].addValue(row, value.get(c, converters[c].getJavaClass()), output.cols[c]); } } - /** - * The interface for the conversion from Spark's SpecializedGetters to - * ORC's ColumnVectors. - */ - interface Converter { - - Class getJavaClass(); - - /** - * Take a value from the Spark data value and add it to the ORC output. - * @param rowId the row in the ColumnVector - * @param data either an InternalRow or ArrayData - * @param output the ColumnVector to put the value into - */ - void addValue(int rowId, T data, ColumnVector output); - } - - static class BooleanConverter implements Converter { - @Override - public Class getJavaClass() { - return Boolean.class; - } - - @Override - public void addValue(int rowId, Boolean data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data ? 1 : 0; - } - } - } - - static class ByteConverter implements Converter { - @Override - public Class getJavaClass() { - return Byte.class; - } - - @Override - public void addValue(int rowId, Byte data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class ShortConverter implements Converter { - @Override - public Class getJavaClass() { - return Short.class; - } - - @Override - public void addValue(int rowId, Short data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class IntConverter implements Converter { - @Override - public Class getJavaClass() { - return Integer.class; - } - - @Override - public void addValue(int rowId, Integer data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class TimeConverter implements Converter { - @Override - public Class getJavaClass() { - return LocalTime.class; - } - - @Override - public void addValue(int rowId, LocalTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000; - } - } - } - - static class LongConverter implements Converter { - @Override - public Class getJavaClass() { - return Long.class; - } - - @Override - public void addValue(int rowId, Long data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = data; - } - } - } - - static class FloatConverter implements Converter { - @Override - public Class getJavaClass() { - return Float.class; - } - - @Override - public void addValue(int rowId, Float data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DoubleColumnVector) output).vector[rowId] = data; - } - } - } - - static class DoubleConverter implements Converter { - @Override - public Class getJavaClass() { - return Double.class; - } - - @Override - public void addValue(int rowId, Double data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DoubleColumnVector) output).vector[rowId] = data; - } - } - } - - static class StringConverter implements Converter { - @Override - public Class getJavaClass() { - return String.class; - } - - @Override - public void addValue(int rowId, String data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - byte[] value = data.getBytes(StandardCharsets.UTF_8); - ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); - } - } - } - - static class BytesConverter implements Converter { - @Override - public Class getJavaClass() { - return ByteBuffer.class; - } - - @Override - public void addValue(int rowId, ByteBuffer data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length); - } - } - } - - static class UUIDConverter implements Converter { - @Override - public Class getJavaClass() { - return UUID.class; - } - - @Override - public void addValue(int rowId, UUID data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.putLong(data.getMostSignificantBits()); - buffer.putLong(data.getLeastSignificantBits()); - ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); - } - } - } - - static class FixedConverter implements Converter { - @Override - public Class getJavaClass() { - return byte[].class; - } - - @Override - public void addValue(int rowId, byte[] data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((BytesColumnVector) output).setRef(rowId, data, 0, data.length); - } - } - } - - static class DateConverter implements Converter { - @Override - public Class getJavaClass() { - return LocalDate.class; - } - - @Override - public void addValue(int rowId, LocalDate data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data); - } - } - } - - static class TimestampTzConverter implements Converter { - @Override - public Class getJavaClass() { - return OffsetDateTime.class; - } - - @Override - public void addValue(int rowId, OffsetDateTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - TimestampColumnVector cv = (TimestampColumnVector) output; - cv.time[rowId] = data.toInstant().toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision - } - } - } - - static class TimestampConverter implements Converter { - - @Override - public Class getJavaClass() { - return LocalDateTime.class; - } - - @Override - public void addValue(int rowId, LocalDateTime data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - TimestampColumnVector cv = (TimestampColumnVector) output; - cv.setIsUTC(true); - cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision - } - } - } - - static class Decimal18Converter implements Converter { - private final int scale; - - Decimal18Converter(TypeDescription schema) { - this.scale = schema.getScale(); - } - - @Override - public Class getJavaClass() { - return BigDecimal.class; - } - - @Override - public void addValue(int rowId, BigDecimal data, ColumnVector output) { - // TODO: validate precision and scale from schema - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DecimalColumnVector) output).vector[rowId] - .setFromLongAndScale(data.unscaledValue().longValueExact(), scale); - } - } - } - - static class Decimal38Converter implements Converter { - Decimal38Converter(TypeDescription schema) { - } - - @Override - public Class getJavaClass() { - return BigDecimal.class; - } - - @Override - public void addValue(int rowId, BigDecimal data, ColumnVector output) { - // TODO: validate precision and scale from schema - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false)); - } - } + @Override + protected Converter createStructConverter(TypeDescription schema) { + return new StructConverter(schema); } - static class StructConverter implements Converter { + private class StructConverter implements Converter { private final Converter[] children; StructConverter(TypeDescription schema) { @@ -447,162 +81,4 @@ public void addValue(int rowId, Record data, ColumnVector output) { } } } - - static class ListConverter implements Converter { - private final Converter children; - - ListConverter(TypeDescription schema) { - this.children = buildConverter(schema.getChildren().get(0)); - } - - @Override - public Class getJavaClass() { - return List.class; - } - - @Override - @SuppressWarnings("unchecked") - public void addValue(int rowId, List data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - List value = (List) data; - ListColumnVector cv = (ListColumnVector) output; - // record the length and start of the list elements - cv.lengths[rowId] = value.size(); - cv.offsets[rowId] = cv.childCount; - cv.childCount += cv.lengths[rowId]; - // make sure the child is big enough - cv.child.ensureSize(cv.childCount, true); - // Add each element - for (int e = 0; e < cv.lengths[rowId]; ++e) { - children.addValue((int) (e + cv.offsets[rowId]), value.get(e), cv.child); - } - } - } - } - - static class MapConverter implements Converter { - private final Converter keyConverter; - private final Converter valueConverter; - - MapConverter(TypeDescription schema) { - this.keyConverter = buildConverter(schema.getChildren().get(0)); - this.valueConverter = buildConverter(schema.getChildren().get(1)); - } - - @Override - public Class getJavaClass() { - return Map.class; - } - - @Override - @SuppressWarnings("unchecked") - public void addValue(int rowId, Map data, ColumnVector output) { - if (data == null) { - output.noNulls = false; - output.isNull[rowId] = true; - } else { - output.isNull[rowId] = false; - Map map = (Map) data; - List keys = Lists.newArrayListWithExpectedSize(map.size()); - List values = Lists.newArrayListWithExpectedSize(map.size()); - for (Map.Entry entry : map.entrySet()) { - keys.add(entry.getKey()); - values.add(entry.getValue()); - } - MapColumnVector cv = (MapColumnVector) output; - // record the length and start of the list elements - cv.lengths[rowId] = map.size(); - cv.offsets[rowId] = cv.childCount; - cv.childCount += cv.lengths[rowId]; - // make sure the child is big enough - cv.keys.ensureSize(cv.childCount, true); - cv.values.ensureSize(cv.childCount, true); - // Add each element - for (int e = 0; e < cv.lengths[rowId]; ++e) { - int pos = (int) (e + cv.offsets[rowId]); - keyConverter.addValue(pos, keys.get(e), cv.keys); - valueConverter.addValue(pos, values.get(e), cv.values); - } - } - } - } - - private static Converter buildConverter(TypeDescription schema) { - switch (schema.getCategory()) { - case BOOLEAN: - return new BooleanConverter(); - case BYTE: - return new ByteConverter(); - case SHORT: - return new ShortConverter(); - case DATE: - return new DateConverter(); - case INT: - return new IntConverter(); - case LONG: - String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE); - ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG : - ORCSchemaUtil.LongType.valueOf(longAttributeValue); - switch (longType) { - case TIME: - return new TimeConverter(); - case LONG: - return new LongConverter(); - default: - throw new IllegalStateException("Unhandled Long type found in ORC type attribute: " + longType); - } - case FLOAT: - return new FloatConverter(); - case DOUBLE: - return new DoubleConverter(); - case BINARY: - String binaryAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE); - ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? ORCSchemaUtil.BinaryType.BINARY : - ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue); - switch (binaryType) { - case UUID: - return new UUIDConverter(); - case FIXED: - return new FixedConverter(); - case BINARY: - return new BytesConverter(); - default: - throw new IllegalStateException("Unhandled Binary type found in ORC type attribute: " + binaryType); - } - case STRING: - case CHAR: - case VARCHAR: - return new StringConverter(); - case DECIMAL: - return schema.getPrecision() <= 18 ? new Decimal18Converter(schema) : new Decimal38Converter(schema); - case TIMESTAMP: - return new TimestampConverter(); - case TIMESTAMP_INSTANT: - return new TimestampTzConverter(); - case STRUCT: - return new StructConverter(schema); - case LIST: - return new ListConverter(schema); - case MAP: - return new MapConverter(schema); - } - throw new IllegalArgumentException("Unhandled type " + schema); - } - - private static Converter[] buildConverters(TypeDescription schema) { - if (schema.getCategory() != TypeDescription.Category.STRUCT) { - throw new IllegalArgumentException("Top level must be a struct " + schema); - } - - List children = schema.getChildren(); - Converter[] result = new Converter[children.size()]; - for (int c = 0; c < children.size(); ++c) { - result[c] = buildConverter(children.get(c)); - } - return result; - } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java new file mode 100644 index 000000000000..c7ab1866bf22 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.flink.types.Row; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.orc.BaseOrcReader; +import org.apache.iceberg.orc.OrcRowReader; +import org.apache.iceberg.orc.OrcValueReader; +import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class FlinkOrcReader extends BaseOrcReader { + + private FlinkOrcReader(org.apache.iceberg.Schema expectedSchema, + TypeDescription readOrcSchema, + Map idToConstant) { + super(expectedSchema, readOrcSchema, idToConstant); + } + + public static OrcRowReader buildReader(Schema expectedSchema, TypeDescription fileSchema) { + return buildReader(expectedSchema, fileSchema, Collections.emptyMap()); + } + + public static OrcRowReader buildReader(Schema expectedSchema, + TypeDescription fileSchema, + Map idToConstant) { + return new FlinkOrcReader(expectedSchema, fileSchema, idToConstant); + } + + @Override + protected OrcValueReader createStructReader(List> fields, + Types.StructType expected, + Map idToConstant) { + return new RowReader(fields, expected, idToConstant); + } + + @Override + public Row read(VectorizedRowBatch batch, int row) { + return (Row) getReader().read(new StructColumnVector(batch.size, batch.cols), row); + } + + private static class RowReader extends OrcValueReaders.StructReader { + private final Types.StructType structType; + + private RowReader(List> readers, Types.StructType structType, Map idToConstant) { + super(readers, structType, idToConstant); + this.structType = structType; + } + + @Override + protected Row create() { + return new Row(structType.fields().size()); + } + + @Override + protected void set(Row row, int pos, Object value) { + row.setField(pos, value); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java new file mode 100644 index 000000000000..68a9e610ef6d --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import org.apache.flink.types.Row; +import org.apache.iceberg.data.orc.BaseOrcWriter; +import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class FlinkOrcWriter extends BaseOrcWriter { + + private FlinkOrcWriter(TypeDescription schema) { + super(schema); + } + + public static OrcValueWriter buildWriter(TypeDescription fileSchema) { + return new FlinkOrcWriter(fileSchema); + } + + @Override + protected Converter createStructConverter(TypeDescription schema) { + return new RowConverter(schema); + } + + @Override + @SuppressWarnings("unchecked") + public void write(Row value, VectorizedRowBatch output) { + int row = output.size++; + Converter[] converters = getConverters(); + for (int c = 0; c < converters.length; ++c) { + Class clazz = converters[c].getJavaClass(); + converters[c].addValue(row, clazz.cast(value.getField(c)), output.cols[c]); + } + } + + private class RowConverter implements Converter { + private final Converter[] children; + + private RowConverter(TypeDescription schema) { + this.children = new Converter[schema.getChildren().size()]; + for (int c = 0; c < children.length; ++c) { + children[c] = buildConverter(schema.getChildren().get(c)); + } + } + + @Override + public Class getJavaClass() { + return Row.class; + } + + @Override + @SuppressWarnings("unchecked") + public void addValue(int rowId, Row data, ColumnVector output) { + if (data == null) { + output.noNulls = false; + output.isNull[rowId] = true; + } else { + output.isNull[rowId] = false; + StructColumnVector cv = (StructColumnVector) output; + for (int c = 0; c < children.length; ++c) { + Class clazz = children[c].getJavaClass(); + children[c].addValue(rowId, clazz.cast(data.getField(c)), cv.fields[c]); + } + } + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java index cdf667bdf61b..4cd1efca86ce 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java @@ -33,10 +33,40 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.RandomUtil; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + class RandomData { private RandomData() { } + static final Schema COMPLEX_SCHEMA = new Schema( + required(1, "roots", Types.LongType.get()), + optional(3, "lime", Types.ListType.ofRequired(4, Types.DoubleType.get())), + required(5, "strict", Types.StructType.of( + required(9, "tangerine", Types.StringType.get()), + optional(6, "hopeful", Types.StructType.of( + required(7, "steel", Types.FloatType.get()), + required(8, "lantern", Types.DateType.get()) + )), + optional(10, "vehement", Types.LongType.get()) + )), + optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13, + Types.StringType.get(), Types.TimestampType.withZone())), + required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of( + optional(16, "beet", Types.DoubleType.get()), + required(17, "stamp", Types.FloatType.get()), + optional(18, "wheeze", Types.StringType.get()) + ))), + optional(19, "renovate", Types.MapType.ofRequired(20, 21, + Types.StringType.get(), Types.StructType.of( + optional(22, "jumpy", Types.DoubleType.get()), + required(23, "koala", Types.IntegerType.get()), + required(24, "couch rope", Types.IntegerType.get()) + ))), + optional(2, "slide", Types.StringType.get()) + ); + private static Iterable generateData(Schema schema, int numRecords, Supplier supplier) { return () -> new Iterator() { private final RandomRowGenerator generator = supplier.get(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java new file mode 100644 index 000000000000..fe4f0766cd74 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import org.apache.flink.types.Row; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA; + +public class TestFlinkOrcReaderWriter { + private static final int NUM_RECORDS = 20_000; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private void testCorrectness(Schema schema, int numRecords, Iterable iterable) throws IOException { + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = ORC.write(Files.localOutput(testFile)) + .schema(schema) + .createWriterFunc(FlinkOrcWriter::buildWriter) + .build()) { + writer.addAll(iterable); + } + + try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) + .project(schema) + .createReaderFunc(type -> FlinkOrcReader.buildReader(schema, type)) + .build()) { + Iterator expected = iterable.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < numRecords; i += 1) { + Assert.assertTrue("Should have expected number of rows", rows.hasNext()); + Assert.assertEquals(expected.next(), rows.next()); + } + Assert.assertFalse("Should not have extra rows", rows.hasNext()); + } + } + + @Test + public void testNormalRowData() throws IOException { + testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, RandomData.generate(COMPLEX_SCHEMA, NUM_RECORDS, 19981)); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java index f8bf6a53e514..75220a4763f4 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java @@ -28,14 +28,12 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA; public class TestFlinkParquetReaderWriter { private static final int NUM_RECORDS = 20_000; @@ -43,33 +41,6 @@ public class TestFlinkParquetReaderWriter { @Rule public TemporaryFolder temp = new TemporaryFolder(); - private static final Schema COMPLEX_SCHEMA = new Schema( - required(1, "roots", Types.LongType.get()), - optional(3, "lime", Types.ListType.ofRequired(4, Types.DoubleType.get())), - required(5, "strict", Types.StructType.of( - required(9, "tangerine", Types.StringType.get()), - optional(6, "hopeful", Types.StructType.of( - required(7, "steel", Types.FloatType.get()), - required(8, "lantern", Types.DateType.get()) - )), - optional(10, "vehement", Types.LongType.get()) - )), - optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13, - Types.StringType.get(), Types.TimestampType.withZone())), - required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of( - optional(16, "beet", Types.DoubleType.get()), - required(17, "stamp", Types.FloatType.get()), - optional(18, "wheeze", Types.StringType.get()) - ))), - optional(19, "renovate", Types.MapType.ofRequired(20, 21, - Types.StringType.get(), Types.StructType.of( - optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.IntegerType.get()), - required(24, "couch rope", Types.IntegerType.get()) - ))), - optional(2, "slide", Types.StringType.get()) - ); - private void testCorrectness(Schema schema, int numRecords, Iterable iterable) throws IOException { File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete());