diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index b55cb8c14f92..cf7082cd035f 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -98,7 +98,7 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio case UUID: return GenericOrcWriters.uuids(); case FIXED: - return GenericOrcWriters.fixed(); + return GenericOrcWriters.byteArrays(); case BINARY: return GenericOrcWriters.byteBuffers(); case DECIMAL: diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 12d70f5225ad..2d4a6daa0c05 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -56,6 +56,14 @@ public static OrcValueWriter booleans() { return BooleanWriter.INSTANCE; } + public static OrcValueWriter bytes() { + return ByteWriter.INSTANCE; + } + + public static OrcValueWriter shorts() { + return ShortWriter.INSTANCE; + } + public static OrcValueWriter ints() { return IntWriter.INSTANCE; } @@ -88,8 +96,8 @@ public static OrcValueWriter uuids() { return UUIDWriter.INSTANCE; } - public static OrcValueWriter fixed() { - return FixedWriter.INSTANCE; + public static OrcValueWriter byteArrays() { + return ByteArrayWriter.INSTANCE; } public static OrcValueWriter dates() { @@ -136,6 +144,34 @@ public void nonNullWrite(int rowId, Boolean data, ColumnVector output) { } } + private static class ByteWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ByteWriter(); + + @Override + public Class getJavaClass() { + return Byte.class; + } + + @Override + public void nonNullWrite(int rowId, Byte data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; + } + } + + private static class ShortWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ShortWriter(); + + @Override + public Class getJavaClass() { + return Short.class; + } + + @Override + public void nonNullWrite(int rowId, Short data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; + } + } + private static class IntWriter implements OrcValueWriter { private static final OrcValueWriter INSTANCE = new IntWriter(); @@ -252,8 +288,8 @@ public void nonNullWrite(int rowId, UUID data, ColumnVector output) { } } - private static class FixedWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new FixedWriter(); + private static class ByteArrayWriter implements OrcValueWriter { + private static final OrcValueWriter INSTANCE = new ByteArrayWriter(); @Override public Class getJavaClass() { diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java index f14aed220a52..e8c5301824ce 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java @@ -32,6 +32,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.data.FlinkAvroWriter; +import org.apache.iceberg.flink.data.FlinkOrcWriter; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; @@ -40,6 +41,7 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.orc.ORC; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; class RowDataTaskWriterFactory implements TaskWriterFactory { @@ -137,8 +139,16 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma .schema(schema) .overwrite() .build(); - case PARQUET: + case ORC: + return ORC.write(outputFile) + .createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) + .setAll(props) + .schema(schema) + .overwrite() + .build(); + + case PARQUET: default: throw new UnsupportedOperationException("Cannot write unknown file format: " + format); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java new file mode 100644 index 000000000000..2f5db1967ef2 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.List; +import java.util.Map; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.OrcRowReader; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.orc.OrcValueReader; +import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class FlinkOrcReader implements OrcRowReader { + private final OrcValueReader reader; + + private FlinkOrcReader(Schema iSchema, TypeDescription readSchema) { + this(iSchema, readSchema, ImmutableMap.of()); + } + + private FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map idToConstant) { + this.reader = OrcSchemaWithTypeVisitor.visit(iSchema, readSchema, new ReadBuilder(idToConstant)); + } + + public static OrcRowReader buildReader(Schema schema, TypeDescription readSchema) { + return new FlinkOrcReader(schema, readSchema); + } + + @Override + public RowData read(VectorizedRowBatch batch, int row) { + return (RowData) reader.read(new StructColumnVector(batch.size, batch.cols), row); + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + reader.setBatchContext(batchOffsetInFile); + } + + private static class ReadBuilder extends OrcSchemaWithTypeVisitor> { + private final Map idToConstant; + + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public OrcValueReader record(Types.StructType iStruct, TypeDescription record, List names, + List> fields) { + return FlinkOrcReaders.struct(fields, iStruct, idToConstant); + } + + @Override + public OrcValueReader list(Types.ListType iList, TypeDescription array, + OrcValueReader elementReader) { + return FlinkOrcReaders.array(elementReader); + } + + @Override + public OrcValueReader map(Types.MapType iMap, TypeDescription map, + OrcValueReader keyReader, + OrcValueReader valueReader) { + return FlinkOrcReaders.map(keyReader, valueReader); + } + + @Override + public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + switch (iPrimitive.typeId()) { + case BOOLEAN: + return OrcValueReaders.booleans(); + case INTEGER: + return OrcValueReaders.ints(); + case LONG: + return OrcValueReaders.longs(); + case FLOAT: + return OrcValueReaders.floats(); + case DOUBLE: + return OrcValueReaders.doubles(); + case DATE: + return FlinkOrcReaders.dates(); + case TIME: + return FlinkOrcReaders.times(); + case TIMESTAMP: + Types.TimestampType timestampType = (Types.TimestampType) iPrimitive; + if (timestampType.shouldAdjustToUTC()) { + return FlinkOrcReaders.timestampTzs(); + } else { + return FlinkOrcReaders.timestamps(); + } + case STRING: + return FlinkOrcReaders.strings(); + case UUID: + case FIXED: + case BINARY: + return OrcValueReaders.bytes(); + case DECIMAL: + Types.DecimalType decimalType = (Types.DecimalType) iPrimitive; + return FlinkOrcReaders.decimals(decimalType.precision(), decimalType.scale()); + default: + throw new IllegalArgumentException(String.format("Invalid iceberg type %s corresponding to ORC type %s", + iPrimitive, primitive)); + } + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java new file mode 100644 index 000000000000..a434bddfe265 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Map; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.iceberg.orc.OrcValueReader; +import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +class FlinkOrcReaders { + private FlinkOrcReaders() { + } + + static OrcValueReader strings() { + return StringReader.INSTANCE; + } + + static OrcValueReader dates() { + return DateReader.INSTANCE; + } + + static OrcValueReader decimals(int precision, int scale) { + if (precision <= 18) { + return new Decimal18Reader(precision, scale); + } else if (precision <= 38) { + return new Decimal38Reader(precision, scale); + } else { + throw new IllegalArgumentException("Invalid precision: " + precision); + } + } + + static OrcValueReader times() { + return TimeReader.INSTANCE; + } + + static OrcValueReader timestamps() { + return TimestampReader.INSTANCE; + } + + static OrcValueReader timestampTzs() { + return TimestampTzReader.INSTANCE; + } + + static OrcValueReader array(OrcValueReader elementReader) { + return new ArrayReader<>(elementReader); + } + + public static OrcValueReader map(OrcValueReader keyReader, OrcValueReader valueReader) { + return new MapReader<>(keyReader, valueReader); + } + + public static OrcValueReader struct(List> readers, + Types.StructType struct, + Map idToConstant) { + return new StructReader(readers, struct, idToConstant); + } + + private static class StringReader implements OrcValueReader { + private static final StringReader INSTANCE = new StringReader(); + + @Override + public StringData nonNullRead(ColumnVector vector, int row) { + BytesColumnVector bytesVector = (BytesColumnVector) vector; + return StringData.fromBytes(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); + } + } + + private static class DateReader implements OrcValueReader { + private static final DateReader INSTANCE = new DateReader(); + + @Override + public Integer nonNullRead(ColumnVector vector, int row) { + return (int) ((LongColumnVector) vector).vector[row]; + } + } + + private static class Decimal18Reader implements OrcValueReader { + private final int precision; + private final int scale; + + Decimal18Reader(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public DecimalData nonNullRead(ColumnVector vector, int row) { + HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row]; + return DecimalData.fromUnscaledLong(value.serialize64(scale), precision, scale); + } + } + + private static class Decimal38Reader implements OrcValueReader { + private final int precision; + private final int scale; + + Decimal38Reader(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public DecimalData nonNullRead(ColumnVector vector, int row) { + BigDecimal value = ((DecimalColumnVector) vector).vector[row].getHiveDecimal().bigDecimalValue(); + return DecimalData.fromBigDecimal(value, precision, scale); + } + } + + private static class TimeReader implements OrcValueReader { + private static final TimeReader INSTANCE = new TimeReader(); + + @Override + public Integer nonNullRead(ColumnVector vector, int row) { + long micros = ((LongColumnVector) vector).vector[row]; + // Flink only support time mills, just erase micros. + return (int) (micros / 1000); + } + } + + private static class TimestampReader implements OrcValueReader { + private static final TimestampReader INSTANCE = new TimestampReader(); + + @Override + public TimestampData nonNullRead(ColumnVector vector, int row) { + TimestampColumnVector tcv = (TimestampColumnVector) vector; + LocalDateTime localDate = Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), tcv.nanos[row]) + .atOffset(ZoneOffset.UTC) + .toLocalDateTime(); + return TimestampData.fromLocalDateTime(localDate); + } + } + + private static class TimestampTzReader implements OrcValueReader { + private static final TimestampTzReader INSTANCE = new TimestampTzReader(); + + @Override + public TimestampData nonNullRead(ColumnVector vector, int row) { + TimestampColumnVector tcv = (TimestampColumnVector) vector; + Instant instant = Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), tcv.nanos[row]) + .atOffset(ZoneOffset.UTC) + .toInstant(); + return TimestampData.fromInstant(instant); + } + } + + private static class ArrayReader implements OrcValueReader { + private final OrcValueReader elementReader; + + private ArrayReader(OrcValueReader elementReader) { + this.elementReader = elementReader; + } + + @Override + public ArrayData nonNullRead(ColumnVector vector, int row) { + ListColumnVector listVector = (ListColumnVector) vector; + int offset = (int) listVector.offsets[row]; + int length = (int) listVector.lengths[row]; + List elements = Lists.newArrayListWithExpectedSize(length); + for (int c = 0; c < length; ++c) { + elements.add(elementReader.read(listVector.child, offset + c)); + } + return new GenericArrayData(elements.toArray()); + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + elementReader.setBatchContext(batchOffsetInFile); + } + } + + private static class MapReader implements OrcValueReader { + private final OrcValueReader keyReader; + private final OrcValueReader valueReader; + + private MapReader(OrcValueReader keyReader, OrcValueReader valueReader) { + this.keyReader = keyReader; + this.valueReader = valueReader; + } + + @Override + public MapData nonNullRead(ColumnVector vector, int row) { + MapColumnVector mapVector = (MapColumnVector) vector; + int offset = (int) mapVector.offsets[row]; + long length = mapVector.lengths[row]; + + Map map = Maps.newHashMap(); + for (int c = 0; c < length; c++) { + K key = keyReader.read(mapVector.keys, offset + c); + V value = valueReader.read(mapVector.values, offset + c); + map.put(key, value); + } + + return new GenericMapData(map); + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + keyReader.setBatchContext(batchOffsetInFile); + valueReader.setBatchContext(batchOffsetInFile); + } + } + + private static class StructReader extends OrcValueReaders.StructReader { + private final int numFields; + + StructReader(List> readers, Types.StructType struct, Map idToConstant) { + super(readers, struct, idToConstant); + this.numFields = readers.size(); + } + + @Override + protected RowData create() { + return new GenericRowData(numFields); + } + + @Override + protected void set(RowData struct, int pos, Object value) { + ((GenericRowData) struct).setField(pos, value); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java new file mode 100644 index 000000000000..592307ded257 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.orc.GenericOrcWriters; +import org.apache.iceberg.orc.OrcRowWriter; +import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class FlinkOrcWriter implements OrcRowWriter { + private final FlinkOrcWriters.StructWriter writer; + private final List fieldGetters; + + private FlinkOrcWriter(RowType rowType, Schema iSchema) { + this.writer = (FlinkOrcWriters.StructWriter) FlinkSchemaVisitor.visit(rowType, iSchema, new WriteBuilder()); + + List fieldTypes = rowType.getChildren(); + this.fieldGetters = Lists.newArrayListWithExpectedSize(fieldTypes.size()); + for (int i = 0; i < fieldTypes.size(); i++) { + fieldGetters.add(RowData.createFieldGetter(fieldTypes.get(i), i)); + } + } + + public static OrcRowWriter buildWriter(RowType rowType, Schema iSchema) { + return new FlinkOrcWriter(rowType, iSchema); + } + + @Override + @SuppressWarnings("unchecked") + public void write(RowData row, VectorizedRowBatch output) { + int rowId = output.size; + output.size += 1; + + List> writers = writer.writers(); + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter child = writers.get(c); + child.write(rowId, fieldGetters.get(c).getFieldOrNull(row), output.cols[c]); + } + } + + private static class WriteBuilder extends FlinkSchemaVisitor> { + private WriteBuilder() { + } + + @Override + public OrcValueWriter record(Types.StructType iStruct, + List> results, + List fieldType) { + return FlinkOrcWriters.struct(results, fieldType); + } + + @Override + public OrcValueWriter map(Types.MapType iMap, OrcValueWriter key, OrcValueWriter value, + LogicalType keyType, LogicalType valueType) { + return FlinkOrcWriters.map(key, value, keyType, valueType); + } + + @Override + public OrcValueWriter list(Types.ListType iList, OrcValueWriter element, LogicalType elementType) { + return FlinkOrcWriters.list(element, elementType); + } + + @Override + public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, LogicalType flinkPrimitive) { + switch (iPrimitive.typeId()) { + case BOOLEAN: + return GenericOrcWriters.booleans(); + case INTEGER: + switch (flinkPrimitive.getTypeRoot()) { + case TINYINT: + return GenericOrcWriters.bytes(); + case SMALLINT: + return GenericOrcWriters.shorts(); + } + return GenericOrcWriters.ints(); + case LONG: + return GenericOrcWriters.longs(); + case FLOAT: + return GenericOrcWriters.floats(); + case DOUBLE: + return GenericOrcWriters.doubles(); + case DATE: + return FlinkOrcWriters.dates(); + case TIME: + return FlinkOrcWriters.times(); + case TIMESTAMP: + Types.TimestampType timestampType = (Types.TimestampType) iPrimitive; + if (timestampType.shouldAdjustToUTC()) { + return FlinkOrcWriters.timestampTzs(); + } else { + return FlinkOrcWriters.timestamps(); + } + case STRING: + return FlinkOrcWriters.strings(); + case UUID: + case FIXED: + case BINARY: + return GenericOrcWriters.byteArrays(); + case DECIMAL: + Types.DecimalType decimalType = (Types.DecimalType) iPrimitive; + return FlinkOrcWriters.decimals(decimalType.precision(), decimalType.scale()); + default: + throw new IllegalArgumentException(String.format( + "Invalid iceberg type %s corresponding to Flink logical type %s", iPrimitive, flinkPrimitive)); + } + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java new file mode 100644 index 000000000000..a3919c988c21 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.List; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; + +class FlinkOrcWriters { + + private FlinkOrcWriters() { + } + + static OrcValueWriter strings() { + return StringWriter.INSTANCE; + } + + static OrcValueWriter dates() { + return DateWriter.INSTANCE; + } + + static OrcValueWriter times() { + return TimeWriter.INSTANCE; + } + + static OrcValueWriter timestamps() { + return TimestampWriter.INSTANCE; + } + + static OrcValueWriter timestampTzs() { + return TimestampTzWriter.INSTANCE; + } + + static OrcValueWriter decimals(int precision, int scale) { + if (precision <= 18) { + return new Decimal18Writer(precision, scale); + } else if (precision <= 38) { + return new Decimal38Writer(precision, scale); + } else { + throw new IllegalArgumentException("Invalid precision: " + precision); + } + } + + static OrcValueWriter list(OrcValueWriter elementWriter, LogicalType elementType) { + return new ListWriter<>(elementWriter, elementType); + } + + static OrcValueWriter map(OrcValueWriter keyWriter, OrcValueWriter valueWriter, + LogicalType keyType, LogicalType valueType) { + return new MapWriter<>(keyWriter, valueWriter, keyType, valueType); + } + + static OrcValueWriter struct(List> writers, List types) { + return new StructWriter(writers, types); + } + + private static class StringWriter implements OrcValueWriter { + private static final StringWriter INSTANCE = new StringWriter(); + + @Override + public Class getJavaClass() { + return StringData.class; + } + + @Override + public void nonNullWrite(int rowId, StringData data, ColumnVector output) { + byte[] value = data.toBytes(); + ((BytesColumnVector) output).setRef(rowId, value, 0, value.length); + } + } + + private static class DateWriter implements OrcValueWriter { + private static final DateWriter INSTANCE = new DateWriter(); + + @Override + public Class getJavaClass() { + return Integer.class; + } + + @Override + public void nonNullWrite(int rowId, Integer data, ColumnVector output) { + ((LongColumnVector) output).vector[rowId] = data; + } + } + + private static class TimeWriter implements OrcValueWriter { + private static final TimeWriter INSTANCE = new TimeWriter(); + + @Override + public Class getJavaClass() { + return Integer.class; + } + + @Override + public void nonNullWrite(int rowId, Integer millis, ColumnVector output) { + // The time in flink is in millisecond, while the standard time in iceberg is microsecond. + // So we need to transform it to microsecond. + ((LongColumnVector) output).vector[rowId] = millis * 1000; + } + } + + private static class TimestampWriter implements OrcValueWriter { + private static final TimestampWriter INSTANCE = new TimestampWriter(); + + @Override + public Class getJavaClass() { + return TimestampData.class; + } + + @Override + public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) { + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.setIsUTC(true); + // millis + OffsetDateTime offsetDateTime = data.toInstant().atOffset(ZoneOffset.UTC); + cv.time[rowId] = offsetDateTime.toEpochSecond() * 1_000 + offsetDateTime.getNano() / 1_000_000; + // truncate nanos to only keep microsecond precision. + cv.nanos[rowId] = (offsetDateTime.getNano() / 1_000) * 1_000; + } + } + + private static class TimestampTzWriter implements OrcValueWriter { + private static final TimestampTzWriter INSTANCE = new TimestampTzWriter(); + + @Override + public Class getJavaClass() { + return TimestampData.class; + } + + @Override + public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) { + TimestampColumnVector cv = (TimestampColumnVector) output; + // millis + Instant instant = data.toInstant(); + cv.time[rowId] = instant.toEpochMilli(); + // truncate nanos to only keep microsecond precision. + cv.nanos[rowId] = (instant.getNano() / 1_000) * 1_000; + } + } + + private static class Decimal18Writer implements OrcValueWriter { + private final int precision; + private final int scale; + + Decimal18Writer(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public Class getJavaClass() { + return DecimalData.class; + } + + @Override + public void nonNullWrite(int rowId, DecimalData data, ColumnVector output) { + Preconditions.checkArgument(scale == data.scale(), + "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, data); + Preconditions.checkArgument(data.precision() <= precision, + "Cannot write value as decimal(%s,%s), too large: %s", precision, scale, data); + + ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(data.toUnscaledLong(), data.scale()); + } + } + + private static class Decimal38Writer implements OrcValueWriter { + private final int precision; + private final int scale; + + Decimal38Writer(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public Class getJavaClass() { + return DecimalData.class; + } + + @Override + public void nonNullWrite(int rowId, DecimalData data, ColumnVector output) { + Preconditions.checkArgument(scale == data.scale(), + "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, data); + Preconditions.checkArgument(data.precision() <= precision, + "Cannot write value as decimal(%s,%s), too large: %s", precision, scale, data); + + ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data.toBigDecimal(), false)); + } + } + + static class ListWriter implements OrcValueWriter { + private final OrcValueWriter elementWriter; + private final ArrayData.ElementGetter elementGetter; + + ListWriter(OrcValueWriter elementWriter, LogicalType elementType) { + this.elementWriter = elementWriter; + this.elementGetter = ArrayData.createElementGetter(elementType); + } + + @Override + public Class getJavaClass() { + return ArrayData.class; + } + + @Override + @SuppressWarnings("unchecked") + public void nonNullWrite(int rowId, ArrayData data, ColumnVector output) { + ListColumnVector cv = (ListColumnVector) output; + cv.lengths[rowId] = data.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough. + cv.child.ensureSize(cv.childCount, true); + + for (int e = 0; e < cv.lengths[rowId]; ++e) { + Object value = elementGetter.getElementOrNull(data, e); + elementWriter.write((int) (e + cv.offsets[rowId]), (T) value, cv.child); + } + } + } + + static class MapWriter implements OrcValueWriter { + private final OrcValueWriter keyWriter; + private final OrcValueWriter valueWriter; + private final ArrayData.ElementGetter keyGetter; + private final ArrayData.ElementGetter valueGetter; + + MapWriter(OrcValueWriter keyWriter, OrcValueWriter valueWriter, + LogicalType keyType, LogicalType valueType) { + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + this.keyGetter = ArrayData.createElementGetter(keyType); + this.valueGetter = ArrayData.createElementGetter(valueType); + } + + @Override + public Class getJavaClass() { + return MapData.class; + } + + @Override + @SuppressWarnings("unchecked") + public void nonNullWrite(int rowId, MapData data, ColumnVector output) { + MapColumnVector cv = (MapColumnVector) output; + ArrayData keyArray = data.keyArray(); + ArrayData valArray = data.valueArray(); + + // record the length and start of the list elements + cv.lengths[rowId] = data.size(); + cv.offsets[rowId] = cv.childCount; + cv.childCount += cv.lengths[rowId]; + // make sure the child is big enough + cv.keys.ensureSize(cv.childCount, true); + cv.values.ensureSize(cv.childCount, true); + // Add each element + for (int e = 0; e < cv.lengths[rowId]; ++e) { + int pos = (int) (e + cv.offsets[rowId]); + keyWriter.write(pos, (K) keyGetter.getElementOrNull(keyArray, e), cv.keys); + valueWriter.write(pos, (V) valueGetter.getElementOrNull(valArray, e), cv.values); + } + } + } + + static class StructWriter implements OrcValueWriter { + private final List> writers; + private final List fieldGetters; + + StructWriter(List> writers, List types) { + this.writers = writers; + + this.fieldGetters = Lists.newArrayListWithExpectedSize(types.size()); + for (int i = 0; i < types.size(); i++) { + fieldGetters.add(RowData.createFieldGetter(types.get(i), i)); + } + } + + List> writers() { + return writers; + } + + @Override + public Class getJavaClass() { + return RowData.class; + } + + @Override + @SuppressWarnings("unchecked") + public void nonNullWrite(int rowId, RowData data, ColumnVector output) { + StructColumnVector cv = (StructColumnVector) output; + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter writer = writers.get(c); + writer.write(rowId, fieldGetters.get(c).getFieldOrNull(data), cv.fields[c]); + } + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java new file mode 100644 index 000000000000..363d2bde4918 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.List; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +abstract class FlinkSchemaVisitor { + + static T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor visitor) { + return visit(flinkType, schema.asStruct(), visitor); + } + + private static T visit(LogicalType flinkType, Type iType, FlinkSchemaVisitor visitor) { + switch (iType.typeId()) { + case STRUCT: + return visitRecord(flinkType, iType.asStructType(), visitor); + + case MAP: + MapType mapType = (MapType) flinkType; + Types.MapType iMapType = iType.asMapType(); + + T key = visit(mapType.getKeyType(), iMapType.keyType(), visitor); + T value = visit(mapType.getValueType(), iMapType.valueType(), visitor); + + return visitor.map(iMapType, key, value, mapType.getKeyType(), mapType.getValueType()); + + case LIST: + ArrayType listType = (ArrayType) flinkType; + Types.ListType iListType = iType.asListType(); + + T element = visit(listType.getElementType(), iListType.elementType(), visitor); + + return visitor.list(iListType, element, listType.getElementType()); + + default: + return visitor.primitive(iType.asPrimitiveType(), flinkType); + } + } + + private static T visitRecord(LogicalType flinkType, Types.StructType struct, + FlinkSchemaVisitor visitor) { + Preconditions.checkArgument(flinkType instanceof RowType, "%s is not a RowType.", flinkType); + RowType rowType = (RowType) flinkType; + + int fieldSize = struct.fields().size(); + List results = Lists.newArrayListWithExpectedSize(fieldSize); + List fieldTypes = Lists.newArrayListWithExpectedSize(fieldSize); + List nestedFields = struct.fields(); + + for (int i = 0; i < fieldSize; i++) { + Types.NestedField iField = nestedFields.get(i); + int fieldIndex = rowType.getFieldIndex(iField.name()); + Preconditions.checkArgument(fieldIndex >= 0, + "NestedField: %s is not found in flink RowType: %s", iField, rowType); + + LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex); + + fieldTypes.add(fieldFlinkType); + results.add(visit(fieldFlinkType, iField.type(), visitor)); + } + + return visitor.record(struct, results, fieldTypes); + } + + public T record(Types.StructType iStruct, List results, List fieldTypes) { + return null; + } + + public T list(Types.ListType iList, T element, LogicalType elementType) { + return null; + } + + public T map(Types.MapType iMap, T key, T value, LogicalType keyType, LogicalType valueType) { + return null; + } + + public T primitive(Type.PrimitiveType iPrimitive, LogicalType flinkPrimitive) { + return null; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java b/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java index 3dd7e75c2e63..59306d638ee2 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java @@ -34,13 +34,14 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -124,12 +125,12 @@ private static Object convert(Type type, Object object) { case STRUCT: return convert(type.asStructType(), (Record) object); case LIST: - List convertedList = Lists.newArrayList(); List list = (List) object; - for (Object element : list) { - convertedList.add(convert(type.asListType().elementType(), element)); + Object[] convertedArray = new Object[list.size()]; + for (int i = 0; i < convertedArray.length; i++) { + convertedArray[i] = convert(type.asListType().elementType(), list.get(i)); } - return convertedList; + return new GenericArrayData(convertedArray); case MAP: Map convertedMap = Maps.newLinkedHashMap(); Map map = (Map) object; @@ -139,7 +140,7 @@ private static Object convert(Type type, Object object) { convert(type.asMapType().valueType(), entry.getValue()) ); } - return convertedMap; + return new GenericMapData(convertedMap); default: throw new UnsupportedOperationException("Not a supported type: " + type); } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java index bb07586d613e..c6eee4635615 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java @@ -70,12 +70,14 @@ public class TestIcebergStreamWriter { private final FileFormat format; private final boolean partitioned; - // TODO add ORC/Parquet unit test once the readers and writers are ready. + // TODO add Parquet unit test once the readers and writers are ready. @Parameterized.Parameters(name = "format = {0}, partitioned = {1}") public static Object[][] parameters() { return new Object[][] { new Object[] {"avro", true}, - new Object[] {"avro", false} + new Object[] {"avro", false}, + new Object[] {"orc", true}, + new Object[] {"orc", false} }; } @@ -211,6 +213,10 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { @Test public void testTableWithTargetFileSize() throws Exception { + // TODO: ORC file does not support target file size before closed. + if (format == FileFormat.ORC) { + return; + } // Adjust the target-file-size in table properties. table.updateProperties() .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java index dc39fd5baf27..bb3841efcb8f 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java @@ -55,12 +55,14 @@ public class TestTaskWriters { @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - // TODO add ORC/Parquet unit test once the readers and writers are ready. + // TODO add Parquet unit test once the readers and writers are ready. @Parameterized.Parameters(name = "format = {0}, partitioned = {1}") public static Object[][] parameters() { return new Object[][] { new Object[] {"avro", true}, - new Object[] {"avro", false} + new Object[] {"avro", false}, + new Object[] {"orc", true}, + new Object[] {"orc", false} }; } @@ -178,6 +180,10 @@ public void testCompleteFiles() throws IOException { @Test public void testRollingWithTargetFileSize() throws IOException { + // TODO ORC don't support target file size before closed. + if (format == FileFormat.ORC) { + return; + } try (TaskWriter taskWriter = createTaskWriter(4)) { List rows = Lists.newArrayListWithCapacity(8000); List records = Lists.newArrayListWithCapacity(8000); diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java new file mode 100644 index 000000000000..79f1c61f905a --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DataTest; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +public class TestFlinkOrcReaderWriter extends DataTest { + private static final int NUM_RECORDS = 100; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Override + protected void writeAndValidate(Schema schema) throws IOException { + RowType flinkSchema = FlinkSchemaUtil.convert(schema); + List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L); + List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); + + File recordsFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", recordsFile.delete()); + + // Write the expected records into ORC file, then read them into RowData and assert with the expected Record list. + try (FileAppender writer = ORC.write(Files.localOutput(recordsFile)) + .schema(schema) + .createWriterFunc(GenericOrcWriter::buildWriter) + .build()) { + writer.addAll(expectedRecords); + } + + try (CloseableIterable reader = ORC.read(Files.localInput(recordsFile)) + .project(schema) + .createReaderFunc(type -> FlinkOrcReader.buildReader(schema, type)) + .build()) { + Iterator expected = expectedRecords.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < NUM_RECORDS; i++) { + Assert.assertTrue("Should have expected number of records", rows.hasNext()); + TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); + } + Assert.assertFalse("Should not have extra records", rows.hasNext()); + } + + File rowDataFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", rowDataFile.delete()); + + // Write the expected RowData into ORC file, then read them into Record and assert with the expected RowData list. + RowType rowType = FlinkSchemaUtil.convert(schema); + try (FileAppender writer = ORC.write(Files.localOutput(rowDataFile)) + .schema(schema) + .createWriterFunc((iSchema, typeDesc) -> FlinkOrcWriter.buildWriter(rowType, iSchema)) + .build()) { + writer.addAll(expectedRows); + } + + try (CloseableIterable reader = ORC.read(Files.localInput(rowDataFile)) + .project(schema) + .createReaderFunc(type -> GenericOrcReader.buildReader(schema, type)) + .build()) { + Iterator expected = expectedRows.iterator(); + Iterator records = reader.iterator(); + for (int i = 0; i < NUM_RECORDS; i += 1) { + Assert.assertTrue("Should have expected number of records", records.hasNext()); + TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); + } + Assert.assertFalse("Should not have extra records", records.hasNext()); + } + } +}