Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ public int size() {

@Override
public <T> T get(int pos, Class<T> javaClass) {
if (struct == null) {
// Return a null struct when projecting a nested required field from an optional struct.
// See more details in issue #2738.
return null;
}

int structPos = positionMap[pos];

if (nestedProjections[pos] != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.data;

import java.util.Map;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;

public class RowDataProjection implements RowData {
/**
* Creates a projecting wrapper for {@link RowData} rows.
* <p>
* This projection will not project the nested children types of repeated types like lists and maps.
*
* @param schema schema of rows wrapped by this projection
* @param projectedSchema result schema of the projected rows
* @return a wrapper to project rows
*/
public static RowDataProjection create(Schema schema, Schema projectedSchema) {
return RowDataProjection.create(FlinkSchemaUtil.convert(schema), schema.asStruct(), projectedSchema.asStruct());
}

/**
* Creates a projecting wrapper for {@link RowData} rows.
* <p>
* This projection will not project the nested children types of repeated types like lists and maps.
*
* @param rowType flink row type of rows wrapped by this projection
* @param schema schema of rows wrapped by this projection
* @param projectedSchema result schema of the projected rows
* @return a wrapper to project rows
*/
public static RowDataProjection create(RowType rowType, Types.StructType schema, Types.StructType projectedSchema) {
return new RowDataProjection(rowType, schema, projectedSchema);
}

private final RowData.FieldGetter[] getters;
private RowData rowData;

private RowDataProjection(RowType rowType, Types.StructType rowStruct, Types.StructType projectType) {
Map<Integer, Integer> fieldIdToPosition = Maps.newHashMap();
for (int i = 0; i < rowStruct.fields().size(); i++) {
fieldIdToPosition.put(rowStruct.fields().get(i).fieldId(), i);
}

this.getters = new RowData.FieldGetter[projectType.fields().size()];
for (int i = 0; i < getters.length; i++) {
Types.NestedField projectField = projectType.fields().get(i);
Types.NestedField rowField = rowStruct.field(projectField.fieldId());

Preconditions.checkNotNull(rowField,
"Cannot locate the project field <%s> in the iceberg struct <%s>", projectField, rowStruct);

getters[i] = createFieldGetter(rowType, fieldIdToPosition.get(projectField.fieldId()), rowField, projectField);
}
}

private static RowData.FieldGetter createFieldGetter(RowType rowType,
int position,
Types.NestedField rowField,
Types.NestedField projectField) {
Preconditions.checkArgument(rowField.type().typeId() == projectField.type().typeId(),
"Different iceberg type between row field <%s> and project field <%s>", rowField, projectField);

switch (projectField.type().typeId()) {
case STRUCT:
RowType nestedRowType = (RowType) rowType.getTypeAt(position);
return row -> {
RowData nestedRow = row.isNullAt(position) ? null : row.getRow(position, nestedRowType.getFieldCount());
return RowDataProjection
.create(nestedRowType, rowField.type().asStructType(), projectField.type().asStructType())
.wrap(nestedRow);
};

case MAP:
Types.MapType projectedMap = projectField.type().asMapType();
Types.MapType originalMap = rowField.type().asMapType();

boolean keyProjectable = !projectedMap.keyType().isNestedType() ||
projectedMap.keyType().equals(originalMap.keyType());
boolean valueProjectable = !projectedMap.valueType().isNestedType() ||
projectedMap.valueType().equals(originalMap.valueType());
Preconditions.checkArgument(keyProjectable && valueProjectable,
"Cannot project a partial map key or value with non-primitive type. Trying to project <%s> out of <%s>",
projectField, rowField);

return RowData.createFieldGetter(rowType.getTypeAt(position), position);

case LIST:
Types.ListType projectedList = projectField.type().asListType();
Types.ListType originalList = rowField.type().asListType();

boolean elementProjectable = !projectedList.elementType().isNestedType() ||
projectedList.elementType().equals(originalList.elementType());
Preconditions.checkArgument(elementProjectable,
"Cannot project a partial list element with non-primitive type. Trying to project <%s> out of <%s>",
projectField, rowField);

return RowData.createFieldGetter(rowType.getTypeAt(position), position);

default:
return RowData.createFieldGetter(rowType.getTypeAt(position), position);
}
}

public RowData wrap(RowData row) {
this.rowData = row;
return this;
}

private Object getValue(int pos) {
return getters[pos].getFieldOrNull(rowData);
}

@Override
public int getArity() {
return getters.length;
}

@Override
public RowKind getRowKind() {
return rowData.getRowKind();
}

@Override
public void setRowKind(RowKind kind) {
throw new UnsupportedOperationException("Cannot set row kind in the RowDataProjection");
}

@Override
public boolean isNullAt(int pos) {
return rowData == null || getValue(pos) == null;
}

@Override
public boolean getBoolean(int pos) {
return (boolean) getValue(pos);
}

@Override
public byte getByte(int pos) {
return (byte) getValue(pos);
}

@Override
public short getShort(int pos) {
return (short) getValue(pos);
}

@Override
public int getInt(int pos) {
return (int) getValue(pos);
}

@Override
public long getLong(int pos) {
return (long) getValue(pos);
}

@Override
public float getFloat(int pos) {
return (float) getValue(pos);
}

@Override
public double getDouble(int pos) {
return (double) getValue(pos);
}

@Override
public StringData getString(int pos) {
return (StringData) getValue(pos);
}

@Override
public DecimalData getDecimal(int pos, int precision, int scale) {
return (DecimalData) getValue(pos);
}

@Override
public TimestampData getTimestamp(int pos, int precision) {
return (TimestampData) getValue(pos);
}

@Override
@SuppressWarnings("unchecked")
public <T> RawValueData<T> getRawValue(int pos) {
return (RawValueData<T>) getValue(pos);
}

@Override
public byte[] getBinary(int pos) {
return (byte[]) getValue(pos);
}

@Override
public ArrayData getArray(int pos) {
return (ArrayData) getValue(pos);
}

@Override
public MapData getMap(int pos) {
return (MapData) getValue(pos);
}

@Override
public RowData getRow(int pos, int numFields) {
return (RowData) getValue(pos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
Expand All @@ -34,6 +35,7 @@
import org.apache.iceberg.flink.data.FlinkAvroReader;
import org.apache.iceberg.flink.data.FlinkOrcReader;
import org.apache.iceberg.flink.data.FlinkParquetReaders;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.flink.data.RowDataUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
Expand Down Expand Up @@ -70,9 +72,18 @@ public CloseableIterator<RowData> open(FileScanTask task, InputFilesDecryptor in
PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);

FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor);
return deletes
.filter(newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor))
.iterator();
CloseableIterable<RowData> iterable = deletes.filter(
newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor)
);

// Project the RowData to remove the extra meta columns.
if (!projectedSchema.sameSchema(deletes.requiredSchema())) {
RowDataProjection rowDataProjection = RowDataProjection.create(
deletes.requiredRowType(), deletes.requiredSchema().asStruct(), projectedSchema.asStruct());
iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
}

return iterable.iterator();
}

private CloseableIterable<RowData> newIterable(
Expand Down Expand Up @@ -156,16 +167,22 @@ private CloseableIterable<RowData> newOrcIterable(
}

private static class FlinkDeleteFilter extends DeleteFilter<RowData> {
private final RowType requiredRowType;
private final RowDataWrapper asStructLike;
private final InputFilesDecryptor inputFilesDecryptor;

FlinkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema,
InputFilesDecryptor inputFilesDecryptor) {
super(task, tableSchema, requestedSchema);
this.asStructLike = new RowDataWrapper(FlinkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct());
this.requiredRowType = FlinkSchemaUtil.convert(requiredSchema());
this.asStructLike = new RowDataWrapper(requiredRowType, requiredSchema().asStruct());
this.inputFilesDecryptor = inputFilesDecryptor;
}

public RowType requiredRowType() {
return requiredRowType;
}

@Override
protected StructLike asStructLike(RowData row) {
return asStructLike.wrap(row);
Expand Down
Loading