diff --git a/build.gradle b/build.gradle index 855a9f563304..290b101039b8 100644 --- a/build.gradle +++ b/build.gradle @@ -318,6 +318,7 @@ project(':iceberg-flink') { compileOnly "org.apache.flink:flink-table-api-java-bridge_2.12" compileOnly "org.apache.flink:flink-table-planner-blink_2.12" compileOnly "org.apache.flink:flink-table-planner_2.12" + compileOnly "org.apache.flink:flink-connector-hive_2.12" compileOnly "org.apache.hadoop:hadoop-hdfs" compileOnly "org.apache.hadoop:hadoop-common" compileOnly("org.apache.hadoop:hadoop-minicluster") { diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java index 145190c9dedb..e5865242cd94 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java @@ -40,4 +40,10 @@ private FlinkTableOptions() { .intType() .defaultValue(100) .withDescription("Sets max infer parallelism for source operator."); + + public static final ConfigOption ENABLE_VECTORIZED_READ = + ConfigOptions.key("enable-vectorized-read") + .booleanType() + .defaultValue(false) + .withDescription("Enable to vectorized read iceberg table."); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/vectorized/ConstantColumnVectors.java b/flink/src/main/java/org/apache/iceberg/flink/data/vectorized/ConstantColumnVectors.java new file mode 100644 index 000000000000..525498e550ce --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/vectorized/ConstantColumnVectors.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.vector.BooleanColumnVector; +import org.apache.flink.table.data.vector.BytesColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.DecimalColumnVector; +import org.apache.flink.table.data.vector.DoubleColumnVector; +import org.apache.flink.table.data.vector.FloatColumnVector; +import org.apache.flink.table.data.vector.IntColumnVector; +import org.apache.flink.table.data.vector.LongColumnVector; +import org.apache.flink.table.data.vector.TimestampColumnVector; + +class ConstantColumnVectors { + private ConstantColumnVectors() { + } + + static ColumnVector ints(Object constant) { + return new ConstantIntColumnVector(constant); + } + + static ColumnVector longs(Object constant) { + return new ConstantLongColumnVector(constant); + } + + static ColumnVector booleans(Object constant) { + return new ConstantBooleanColumnVector(constant); + } + + static ColumnVector doubles(Object constant) { + return new ConstantDoubleColumnVector(constant); + } + + static ColumnVector floats(Object constant) { + return new ConstantFloatColumnVector(constant); + } + + static ColumnVector decimals(Object constant) { + return new ConstantDecimalColumnVector(constant); + } + + static ColumnVector timestamps(Object constant) { + return new ConstantTimestampColumnVector(constant); + } + + static ColumnVector bytes(Object constant) { + return new ConstantBytesColumnVector(constant); + } + + private static class ConstantIntColumnVector implements IntColumnVector { + + private final Object constant; + + private ConstantIntColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public int getInt(int i) { + return (int) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantLongColumnVector implements LongColumnVector { + + private final Object constant; + + private ConstantLongColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public long getLong(int i) { + return (long) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantBooleanColumnVector implements BooleanColumnVector { + private final Object constant; + + private ConstantBooleanColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public boolean getBoolean(int i) { + return (boolean) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantDoubleColumnVector implements DoubleColumnVector { + private final Object constant; + + private ConstantDoubleColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public double getDouble(int i) { + return (double) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantFloatColumnVector implements FloatColumnVector { + private final Object constant; + + private ConstantFloatColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public float getFloat(int i) { + return (float) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantDecimalColumnVector implements DecimalColumnVector { + private final Object constant; + + private ConstantDecimalColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public DecimalData getDecimal(int i, int precision, int scale) { + return (DecimalData) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantTimestampColumnVector implements TimestampColumnVector { + private final Object constant; + + private ConstantTimestampColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public TimestampData getTimestamp(int i, int precision) { + return (TimestampData) constant; + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } + + private static class ConstantBytesColumnVector implements BytesColumnVector { + private final Object constant; + + private ConstantBytesColumnVector(Object constant) { + this.constant = constant; + } + + @Override + public Bytes getBytes(int i) { + byte[] bytes = null; + if (constant instanceof byte[]) { + bytes = (byte[]) constant; + } else { + BinaryStringData str = (BinaryStringData) constant; + bytes = str.toBytes(); + } + return new Bytes(bytes, 0, bytes.length); + } + + @Override + public boolean isNullAt(int i) { + return constant == null; + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/vectorized/VectorizedFlinkOrcReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/vectorized/VectorizedFlinkOrcReaders.java new file mode 100644 index 000000000000..83021a891eb2 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/vectorized/VectorizedFlinkOrcReaders.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import java.util.List; +import java.util.Map; +import org.apache.flink.orc.nohive.vector.OrcNoHiveBytesVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDecimalVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveDoubleVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveLongVector; +import org.apache.flink.orc.nohive.vector.OrcNoHiveTimestampVector; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.ColumnarArrayData; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ArrayColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.RowColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.OrcBatchReader; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class VectorizedFlinkOrcReaders { + private VectorizedFlinkOrcReaders() { + } + + public static OrcBatchReader buildReader(Schema expectedSchema, TypeDescription fileSchema, + Map idToConstant) { + Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant)); + + return new OrcBatchReader() { + // We use batchOffsetInFile to get the correct row offset for current row. + private long batchOffsetInFile; + + @Override + public VectorizedColumnBatch read(VectorizedRowBatch batch) { + FlinkRowColumnVector cv = (FlinkRowColumnVector) converter.convert( + new StructColumnVector(batch.size, batch.cols), batch.size, batchOffsetInFile); + + VectorizedColumnBatch columnarBatch = new VectorizedColumnBatch(cv.getFieldVectors()); + columnarBatch.setNumRows(batch.size); + return columnarBatch; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + }; + } + + private interface Converter { + ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize, + long batchOffsetInFile); + } + + private static class ReadBuilder extends OrcSchemaWithTypeVisitor { + private final Map idToConstant; + + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public Converter record(Types.StructType iStruct, TypeDescription record, List names, + List fields) { + return new StructConverter(iStruct, fields, idToConstant); + } + + @Override + public Converter list(Types.ListType iList, TypeDescription array, Converter element) { + return new StructConverter.ArrayConverter(element); + } + + @Override + public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) { + throw new UnsupportedOperationException("Unsupported vectorized read for map type."); + } + + @Override + public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + return (vector, batchSize, batchOffsetInFile) -> { + if (vector instanceof LongColumnVector) { + return new OrcNoHiveLongVector((LongColumnVector) vector); + } else if (vector instanceof DoubleColumnVector) { + return new OrcNoHiveDoubleVector((DoubleColumnVector) vector); + } else if (vector instanceof BytesColumnVector) { + return new OrcNoHiveBytesVector((BytesColumnVector) vector); + } else if (vector instanceof DecimalColumnVector) { + return new OrcNoHiveDecimalVector((DecimalColumnVector) vector); + } else if (vector instanceof TimestampColumnVector) { + return new OrcNoHiveTimestampVector((TimestampColumnVector) vector); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported vector: %s, the iceberg type is %s ,the orc type is %s ", + vector.getClass().getName(), iPrimitive, primitive)); + } + }; + } + } + + private static class RowPositionColumnVector implements org.apache.flink.table.data.vector.LongColumnVector { + private final long batchOffsetInFile; + + RowPositionColumnVector(long batchOffsetInFile) { + this.batchOffsetInFile = batchOffsetInFile; + } + + @Override + public boolean isNullAt(int i) { + return false; + } + + @Override + public long getLong(int i) { + return batchOffsetInFile + i; + } + } + + private static class StructConverter implements Converter { + private final Types.StructType structType; + private final List fieldConverters; + private final Map idToConstant; + + private StructConverter(Types.StructType structType, List fieldConverters, + Map idToConstant) { + this.structType = structType; + this.fieldConverters = fieldConverters; + this.idToConstant = idToConstant; + } + + @Override + public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize, + long batchOffsetInFile) { + StructColumnVector structVector = (StructColumnVector) vector; + List fields = structType.fields(); + ColumnVector[] fieldVectors = new ColumnVector[fields.size()]; + for (int pos = 0; pos < fields.size(); pos++) { + Types.NestedField field = fields.get(pos); + if (idToConstant.containsKey(field.fieldId())) { + fieldVectors[pos] = toConstantColumnVector(field.type(), idToConstant.get(field.fieldId())); + } else if (field.equals(MetadataColumns.ROW_POSITION)) { + fieldVectors[pos] = new RowPositionColumnVector(batchOffsetInFile); + } else { + fieldVectors[pos] = fieldConverters.get(pos) + .convert(structVector.fields[pos], batchSize, batchOffsetInFile); + } + } + + return new FlinkRowColumnVector(fieldVectors, structVector); + } + + private ColumnVector toConstantColumnVector(Type type, Object constant) { + Type.TypeID typeID = type.typeId(); + switch (typeID) { + case INTEGER: + case DATE: + case TIME: + return ConstantColumnVectors.ints(constant); + + case LONG: + return ConstantColumnVectors.longs(constant); + + case BOOLEAN: + return ConstantColumnVectors.booleans(constant); + + case DOUBLE: + return ConstantColumnVectors.doubles(constant); + + case FLOAT: + return ConstantColumnVectors.floats(constant); + + case DECIMAL: + return ConstantColumnVectors.decimals(constant); + + case TIMESTAMP: + return ConstantColumnVectors.timestamps(constant); + + case FIXED: + case UUID: + case BINARY: + case STRING: + return ConstantColumnVectors.bytes(constant); + + default: + throw new UnsupportedOperationException("Unsupported data type for constant."); + } + } + + private static class ArrayConverter implements Converter { + private final Converter elementConverter; + + private ArrayConverter(Converter elementConverter) { + this.elementConverter = elementConverter; + } + + @Override + public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize, + long batchOffsetInFile) { + ListColumnVector listVector = (ListColumnVector) vector; + ColumnVector elementVector = elementConverter.convert(listVector.child, batchSize, batchOffsetInFile); + + return new ArrayColumnVector() { + @Override + public ArrayData getArray(int rowId) { + int index = getRowIndex(rowId); + return new ColumnarArrayData(elementVector, (int) listVector.offsets[index], + (int) listVector.lengths[index]); + } + + @Override + public boolean isNullAt(int rowId) { + return vector.isNull[getRowIndex(rowId)]; + } + + private int getRowIndex(int rowId) { + return vector.isRepeating ? 0 : rowId; + } + }; + } + } + } + + private static class FlinkRowColumnVector implements RowColumnVector { + + private final ColumnVector[] fieldVectors; + private final StructColumnVector structVector; + private final VectorizedColumnBatch vectorizedColumnBatch; + + FlinkRowColumnVector(ColumnVector[] fieldVectors, + StructColumnVector structVector) { + this.fieldVectors = fieldVectors; + this.structVector = structVector; + this.vectorizedColumnBatch = new VectorizedColumnBatch(fieldVectors); + } + + @Override + public ColumnarRowData getRow(int i) { + return new ColumnarRowData(vectorizedColumnBatch, i); + } + + @Override + public boolean isNullAt(int i) { + return structVector.isNull[i]; + } + + ColumnVector[] getFieldVectors() { + return fieldVectors; + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/BatchRowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/BatchRowDataIterator.java new file mode 100644 index 000000000000..d1aa8be19438 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/BatchRowDataIterator.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.data.RowDataUtil; +import org.apache.iceberg.flink.data.vectorized.VectorizedFlinkOrcReaders; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PartitionUtil; +import org.jetbrains.annotations.NotNull; + +class BatchRowDataIterator extends DataIterator { + + private final Schema tableSchema; + private final Schema projectedSchema; + private final String nameMapping; + private final boolean caseSensitive; + private final DataType[] dataTypes; + + BatchRowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema, + Schema projectedSchema, String nameMapping, boolean caseSensitive, DataType[] dataTypes) { + super(task, io, encryption); + this.tableSchema = tableSchema; + this.projectedSchema = projectedSchema; + this.nameMapping = nameMapping; + this.caseSensitive = caseSensitive; + this.dataTypes = dataTypes; + if (!useOrcVectorizedRead(task)) { + throw new UnsupportedOperationException("Unsupport vectorized read"); + } + } + + @Override + CloseableIterator openTaskIterator(FileScanTask task) { + Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); + + Map idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : + PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); + + CloseableIterable iter; + switch (task.file().format()) { + case ORC: + iter = newOrcIterable(task, tableSchema, idToConstant); + break; + + case PARQUET: + default: + throw new UnsupportedOperationException( + "Cannot read unknown format: " + task.file().format()); + } + + return iter.iterator(); + } + + private CloseableIterable newOrcIterable(FileScanTask task, Schema schema, Map idToConstant) { + Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(schema, + Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + + ORC.ReadBuilder builder = ORC + .read(getInputFile(task)) + .split(task.start(), task.length()) + .project(readSchemaWithoutConstantAndMetadataFields) + .createBatchedReaderFunc(readOrcSchema -> + VectorizedFlinkOrcReaders.buildReader(schema, readOrcSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + CloseableIterable iterable = builder.build(); + + return new BatchRowIterator(iterable); + } + + + private static class BatchRowIterator implements CloseableIterable { + private final CloseableIterator iterator; + private int rowNums = 0; + private int rowId = 0; + private ColumnarRowData row; + + BatchRowIterator(CloseableIterable iterable) { + this.iterator = iterable.iterator(); + } + + @Override + public void close() throws IOException { + iterator.close(); + } + + @NotNull + @Override + public CloseableIterator iterator() { + return new CloseableIterator() { + + @Override + public boolean hasNext() { + if (iterator.hasNext() && rowId >= rowNums) { + if (row == null) { + VectorizedColumnBatch vectorizedColumnBatch = iterator.next(); + row = new ColumnarRowData(vectorizedColumnBatch); + rowNums = vectorizedColumnBatch.getNumRows(); + } else if (rowId > rowNums) { + row = null; + rowNums = 0; + rowId = 0; + } + } + + return iterator.hasNext() || rowId < rowNums; + } + + @Override + public RowData next() { + row.setRowId(rowId++); + return row; + } + + @Override + public void close() throws IOException { + iterator.close(); + } + }; + } + } + + private boolean useOrcVectorizedRead(CombinedScanTask task) { + Collection fileScanTasks = task.files(); + for (FileScanTask fileScanTask : fileScanTasks) { + DataFile dataFile = fileScanTask.file(); + if (!FileContent.DATA.equals(dataFile.content())) { + return false; + } + + if (!FileFormat.ORC.equals(dataFile.format())) { + return false; + } + } + + for (DataType dataType : dataTypes) { + if (!isVectorizationSupported(dataType.getLogicalType())) { + return false; + } + } + + return true; + } + + private static boolean isVectorizationSupported(LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BOOLEAN: + case BINARY: + case VARBINARY: + case DECIMAL: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case ROW: + case ARRAY: + return true; + case TIMESTAMP_WITH_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case MULTISET: + case MAP: + case DISTINCT_TYPE: + case STRUCTURED_TYPE: + case NULL: + case RAW: + case SYMBOL: + default: + return false; + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 1bad1c25952e..75990d1a13d9 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -25,11 +25,14 @@ import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.FlinkTableOptions; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -46,17 +49,21 @@ public class FlinkInputFormat extends RichInputFormat private final FileIO io; private final EncryptionManager encryption; private final ScanContext context; + private final DataType[] dataTypes; + private final ReadableConfig readableConfig; - private transient RowDataIterator iterator; + private transient DataIterator iterator; private transient long currentReadCount = 0L; FlinkInputFormat(TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption, - ScanContext context) { + ScanContext context, DataType[] dataTypes, ReadableConfig readableConfig) { this.tableLoader = tableLoader; this.tableSchema = tableSchema; this.io = io; this.encryption = encryption; this.context = context; + this.dataTypes = dataTypes; + this.readableConfig = readableConfig; } @VisibleForTesting @@ -91,9 +98,17 @@ public void configure(Configuration parameters) { @Override public void open(FlinkInputSplit split) { - this.iterator = new RowDataIterator( - split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(), - context.caseSensitive()); + boolean enableVectorizedRead = readableConfig.get(FlinkTableOptions.ENABLE_VECTORIZED_READ); + + if (enableVectorizedRead) { + this.iterator = new BatchRowDataIterator( + split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(), + context.caseSensitive(), dataTypes); + } else { + this.iterator = new RowDataIterator( + split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(), + context.caseSensitive()); + } } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index 84507c411bcc..a575eace13fc 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -31,6 +31,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -196,7 +197,10 @@ public FlinkInputFormat buildFormat() { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); } - return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build()); + DataType[] dataTypes = projectedSchema != null ? projectedSchema.getFieldDataTypes() : + FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema())).getFieldDataTypes(); + return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build(), dataTypes, + readableConfig); } public DataStream build() { diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/vectorized/TestVectorizedReads.java b/flink/src/test/java/org/apache/iceberg/flink/data/vectorized/TestVectorizedReads.java new file mode 100644 index 000000000000..4a01391f61cb --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/vectorized/TestVectorizedReads.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data.vectorized; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.List; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.FlinkTableOptions; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestVectorizedReads extends FlinkCatalogTestBase { + private static final String TABLE_NAME = "test_table"; + private final FileFormat format = FileFormat.ORC; + + public TestVectorizedReads(String catalogName, Namespace baseNamespace) { + super(catalogName, baseNamespace); + } + + @Override + protected TableEnvironment getTableEnv() { + super.getTableEnv() + .getConfig() + .getConfiguration() + .set(FlinkTableOptions.ENABLE_VECTORIZED_READ, true); + return super.getTableEnv(); + } + + @Before + public void before() { + super.before(); + sql("CREATE DATABASE %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + } + + @After + public void clean() { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + super.clean(); + } + + @Test + public void testVectorizedReadsIntType() { + sql("CREATE TABLE %s (id int) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(1),(2)", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(1)); + expected.add(Row.of(2)); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsLongType() { + sql("CREATE TABLE %s (id BIGINT) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(1),(2)", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(1L)); + expected.add(Row.of(2L)); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsFloatType() { + sql("CREATE TABLE %s (id FLOAT) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(1.1),(2.2)", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(1.1F)); + expected.add(Row.of(2.2F)); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsDoubleType() { + sql("CREATE TABLE %s (id DOUBLE) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(1.1),(2.2)", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(1.1)); + expected.add(Row.of(2.2)); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsBooleanType() { + sql("CREATE TABLE %s (id BOOLEAN) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(true),(false)", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(true)); + expected.add(Row.of(false)); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsDecimalType() { + sql("CREATE TABLE %s (id DECIMAL(10,2)) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(12.34),(34.56)", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(BigDecimal.valueOf(12.34))); + expected.add(Row.of(BigDecimal.valueOf(34.56))); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsDateType() { + sql("CREATE TABLE %s (data DATE) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(DATE '2021-01-01'),(DATE '2021-01-02')", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(LocalDate.of(2021, 1, 1))); + expected.add(Row.of(LocalDate.of(2021, 1, 2))); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsTimestampType() { + sql("CREATE TABLE %s (data TIMESTAMP) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(TO_TIMESTAMP('2021-01-01 12:13:14')),(TO_TIMESTAMP('2021-01-02 15:16:17'))", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(LocalDateTime.parse("2021-01-01T12:13:14"))); + expected.add(Row.of(LocalDateTime.parse("2021-01-02T15:16:17"))); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsStringType() { + sql("CREATE TABLE %s (id STRING) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES('a'),('b')", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of("a")); + expected.add(Row.of("b")); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsRowType() { + sql("CREATE TABLE %s (id INT, data ROW<`a` DOUBLE, `b` STRING>) WITH ('write.format.default'='%s')", TABLE_NAME, + format.name()); + sql("INSERT INTO %s VALUES(1,ROW(1.1,'aaa')),(2,ROW(2.2,'bbb'))", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(1, Row.of(1.1, "aaa"))); + expected.add(Row.of(2, Row.of(2.2, "bbb"))); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsArrayType() { + sql("CREATE TABLE %s (id ARRAY) WITH ('write.format.default'='%s')", TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(ARRAY[1,1]),(ARRAY[2,2])", TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + Object[] objects = new Object[1]; + objects[0] = new Integer[] {1, 1}; + Object[] objects1 = new Object[1]; + objects1[0] = new Integer[] {2, 2}; + + expected.add(Row.of(objects)); + expected.add(Row.of(objects1)); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsMapType() { + sql("CREATE TABLE %s (id INT, data MAP) WITH ('write.format.default'='%s')", TABLE_NAME, + format.name()); + sql("INSERT INTO %s VALUES(1, STR_TO_MAP('k1=v1,k2=v2'))", TABLE_NAME); + + AssertHelpers.assertThrows("Can read map type for vectorized read.", + RuntimeException.class, () -> sql("SELECT * FROM %s", TABLE_NAME)); + } + + @Test + public void testVectorizedReadsSupportedNestedType() { + sql("CREATE TABLE %s (id INT, data ROW<`a` INT, `b` ARRAY>) WITH ('write.format.default'='%s')", + TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(1, ROW(2,ARRAY[10,20]))", TABLE_NAME); + + List result = sql("SELECT * FROM %s", TABLE_NAME); + List expected = Lists.newArrayList(); + Integer[] ints = new Integer[] {10, 20}; + expected.add(Row.of(1, Row.of(2, ints))); + TestHelpers.assertRows(result, expected); + } + + @Test + public void testVectorizedReadsUnsupportedNestedType() { + sql("CREATE TABLE %s (id INT, data ARRAY>) WITH ('write.format.default'='%s')", + TABLE_NAME, format.name()); + sql("INSERT INTO %s VALUES(1, ARRAY[ROW(2),ROW(3)])", TABLE_NAME); + + AssertHelpers.assertThrows("Unsupported nested type for vectorized read.", + RuntimeException.class, () -> sql("SELECT * FROM %s", TABLE_NAME)); + } + + @Test + public void testVectorizedReadsConstant() { + sql("CREATE TABLE %s (id INT, spec1 VARCHAR,spec2 INT,spec3 DOUBLE,spec4 FLOAT," + + " spec5 BIGINT,spec6 BOOLEAN,spec7 DECIMAL(10,2),spec8 DATE,spec9 TIMESTAMP,spec10 TIME,spec11 BYTES) " + + " PARTITIONED BY (spec1,spec2,spec3,spec4,spec5,spec6,spec7,spec8,spec9,spec10,spec11)" + + " WITH ('write.format.default'='%s')", + TABLE_NAME, format.name()); + sql("INSERT INTO %s SELECT 1, 'hello' ,2 , 1.1 , 2.2 , 3 , true," + + " 12.34 ,DATE '2021-01-01',TO_TIMESTAMP('2021-01-01 12:13:14'),TIME '10:11:12',ENCODE('ab', 'UTF-8') ", + TABLE_NAME); + List result = sql("SELECT * FROM %s", TABLE_NAME); + + List expected = Lists.newArrayList(); + expected.add(Row.of(1, "hello", 2, 1.1D, 2.2F, 3L, true, BigDecimal.valueOf(12.34), LocalDate.of(2021, 1, 1), + LocalDateTime.parse("2021-01-01T12:13:14"), LocalTime.parse("10:11:12"), new byte[] {'a', 'b'})); + TestHelpers.assertRows(result, expected); + } +}