diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index c18f69f729f7..c8be176060ab 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -155,8 +155,11 @@ public int size() { @Override public T get(int pos, Class javaClass) { - int structPos = positionMap[pos]; + if (struct == null) { + return null; + } + int structPos = positionMap[pos]; if (nestedProjections[pos] != null) { return javaClass.cast(nestedProjections[pos].wrap(struct.get(structPos, StructLike.class))); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java new file mode 100644 index 000000000000..2546027c9f35 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; + +public class RowDataProjection { + private final RowData.FieldGetter[] getters; + + public static RowDataProjection create(Schema schema, Schema projectSchema) { + return new RowDataProjection(FlinkSchemaUtil.convert(schema), schema.asStruct(), projectSchema.asStruct()); + } + + private RowDataProjection(RowType rowType, Types.StructType rowStruct, Types.StructType projectType) { + this.getters = new RowData.FieldGetter[projectType.fields().size()]; + for (int i = 0; i < getters.length; i++) { + getters[i] = createFieldGetter(rowType, rowStruct, projectType.fields().get(i)); + } + } + + private static RowData.FieldGetter createFieldGetter(RowType rowType, + Types.StructType rowStruct, + Types.NestedField projectField) { + for (int i = 0; i < rowStruct.fields().size(); i++) { + Types.NestedField rowField = rowStruct.fields().get(i); + if (rowField.fieldId() == projectField.fieldId()) { + Preconditions.checkArgument(rowField.type().typeId() == projectField.type().typeId(), + String.format("Different iceberg type between row field <%s> and project field <%s>", + rowField, projectField)); + + switch (projectField.type().typeId()) { + case STRUCT: + RowType nestedRowType = (RowType) rowType.getTypeAt(i); + int rowPos = i; + return row -> { + RowData nestedRow = row.isNullAt(rowPos) ? null : row.getRow(rowPos, nestedRowType.getFieldCount()); + return new RowDataProjection(nestedRowType, rowField.type().asStructType(), + projectField.type().asStructType()).project(nestedRow); + }; + + case MAP: + case LIST: + throw new IllegalArgumentException(String.format("Cannot project list or map field: %s", projectField)); + default: + return RowData.createFieldGetter(rowType.getTypeAt(i), i); + } + } + } + throw new IllegalArgumentException(String.format("Cannot find field %s in %s", projectField, rowStruct)); + } + + public RowData project(RowData row) { + GenericRowData projectedRow = new GenericRowData(getters.length); + if (row != null) { + projectedRow.setRowKind(row.getRowKind()); + for (int i = 0; i < getters.length; i++) { + projectedRow.setField(i, getters[i].getFieldOrNull(row)); + } + } + return projectedRow; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index fbdb7bf3cc02..f1ac905215be 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -34,6 +34,7 @@ import org.apache.iceberg.flink.data.FlinkAvroReader; import org.apache.iceberg.flink.data.FlinkOrcReader; import org.apache.iceberg.flink.data.FlinkParquetReaders; +import org.apache.iceberg.flink.data.RowDataProjection; import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -70,9 +71,16 @@ public CloseableIterator open(FileScanTask task, InputFilesDecryptor in PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); - return deletes - .filter(newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor)) - .iterator(); + CloseableIterable iterable = + deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor)); + + // Project the RowData to remove the extra meta columns. + if (!projectedSchema.sameSchema(deletes.requiredSchema())) { + RowDataProjection rowDataProjection = RowDataProjection.create(deletes.requiredSchema(), projectedSchema); + iterable = CloseableIterable.transform(iterable, rowDataProjection::project); + } + + return iterable.iterator(); } private CloseableIterable newIterable( diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java index d44f45ab52fd..fca4e03ed863 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java @@ -38,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.StructLikeSet; import org.junit.After; import org.junit.Assert; @@ -125,10 +126,10 @@ public void testSqlChangeLogOnIdKey() throws Exception { ) ); - List> expectedRecordsPerCheckpoint = ImmutableList.of( - ImmutableList.of(record(1, "bbb"), record(2, "bbb")), - ImmutableList.of(record(1, "bbb"), record(2, "ddd")), - ImmutableList.of(record(1, "ddd"), record(2, "ddd")) + List> expectedRecordsPerCheckpoint = ImmutableList.of( + ImmutableList.of(insertRow(1, "bbb"), insertRow(2, "bbb")), + ImmutableList.of(insertRow(1, "bbb"), insertRow(2, "ddd")), + ImmutableList.of(insertRow(1, "ddd"), insertRow(2, "ddd")) ); testSqlChangeLog(TABLE_NAME, ImmutableList.of("id"), inputRowsPerCheckpoint, @@ -157,10 +158,10 @@ public void testChangeLogOnDataKey() throws Exception { ) ); - List> expectedRecords = ImmutableList.of( - ImmutableList.of(record(1, "bbb"), record(2, "aaa")), - ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")), - ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")) + List> expectedRecords = ImmutableList.of( + ImmutableList.of(insertRow(1, "bbb"), insertRow(2, "aaa")), + ImmutableList.of(insertRow(1, "aaa"), insertRow(1, "bbb"), insertRow(1, "ccc")), + ImmutableList.of(insertRow(1, "aaa"), insertRow(1, "ccc"), insertRow(2, "aaa"), insertRow(2, "ccc")) ); testSqlChangeLog(TABLE_NAME, ImmutableList.of("data"), elementsPerCheckpoint, expectedRecords); @@ -187,10 +188,10 @@ public void testChangeLogOnIdDataKey() throws Exception { ) ); - List> expectedRecords = ImmutableList.of( - ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")), - ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")), - ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")) + List> expectedRecords = ImmutableList.of( + ImmutableList.of(insertRow(1, "bbb"), insertRow(2, "aaa"), insertRow(2, "bbb")), + ImmutableList.of(insertRow(1, "aaa"), insertRow(1, "bbb"), insertRow(1, "ccc"), insertRow(2, "bbb")), + ImmutableList.of(insertRow(1, "aaa"), insertRow(1, "ccc"), insertRow(2, "aaa"), insertRow(2, "bbb")) ); testSqlChangeLog(TABLE_NAME, ImmutableList.of("data", "id"), elementsPerCheckpoint, expectedRecords); @@ -213,31 +214,31 @@ public void testPureInsertOnIdKey() throws Exception { ) ); - List> expectedRecords = ImmutableList.of( + List> expectedRecords = ImmutableList.of( ImmutableList.of( - record(1, "aaa"), - record(2, "bbb") + insertRow(1, "aaa"), + insertRow(2, "bbb") ), ImmutableList.of( - record(1, "aaa"), - record(2, "bbb"), - record(3, "ccc"), - record(4, "ddd") + insertRow(1, "aaa"), + insertRow(2, "bbb"), + insertRow(3, "ccc"), + insertRow(4, "ddd") ), ImmutableList.of( - record(1, "aaa"), - record(2, "bbb"), - record(3, "ccc"), - record(4, "ddd"), - record(5, "eee"), - record(6, "fff") + insertRow(1, "aaa"), + insertRow(2, "bbb"), + insertRow(3, "ccc"), + insertRow(4, "ddd"), + insertRow(5, "eee"), + insertRow(6, "fff") ) ); testSqlChangeLog(TABLE_NAME, ImmutableList.of("data"), elementsPerCheckpoint, expectedRecords); } - private Record record(int id, String data) { + private static Record record(int id, String data) { return SimpleDataUtil.createRecord(id, data); } @@ -261,7 +262,7 @@ private Table createTable(String tableName, List key, boolean isPartitio private void testSqlChangeLog(String tableName, List key, List> inputRowsPerCheckpoint, - List> expectedRecordsPerCheckpoint) throws Exception { + List> expectedRecordsPerCheckpoint) throws Exception { String dataId = BoundedTableFactory.registerDataSet(inputRowsPerCheckpoint); sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + " WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, dataId); @@ -273,6 +274,8 @@ private void testSqlChangeLog(String tableName, Table table = createTable(tableName, key, partitioned); sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE); + sql("SELECT * FROM %s", tableName); + table.refresh(); List snapshots = findValidSnapshots(table); int expectedSnapshotNum = expectedRecordsPerCheckpoint.size(); @@ -280,9 +283,15 @@ private void testSqlChangeLog(String tableName, for (int i = 0; i < expectedSnapshotNum; i++) { long snapshotId = snapshots.get(i).snapshotId(); - List expectedRecords = expectedRecordsPerCheckpoint.get(i); + List expectedRowss = expectedRecordsPerCheckpoint.get(i); Assert.assertEquals("Should have the expected records for the checkpoint#" + i, - expectedRowSet(table, expectedRecords), actualRowSet(table, snapshotId)); + expectedRowSet(table, expectedRowss), actualRowSet(table, snapshotId)); + } + + if (expectedSnapshotNum > 0) { + Assert.assertEquals("Should have the expected rows in the final table", + Sets.newHashSet(expectedRecordsPerCheckpoint.get(expectedSnapshotNum - 1)), + Sets.newHashSet(sql("SELECT * FROM %s", tableName))); } } @@ -296,8 +305,12 @@ private List findValidSnapshots(Table table) { return validSnapshots; } - private static StructLikeSet expectedRowSet(Table table, List records) { - return SimpleDataUtil.expectedRowSet(table, records.toArray(new Record[0])); + private static StructLikeSet expectedRowSet(Table table, List rows) { + Record[] records = new Record[rows.size()]; + for (int i = 0; i < records.length; i++) { + records[i] = record((int) rows.get(i).getField(0), (String) rows.get(i).getField(1)); + } + return SimpleDataUtil.expectedRowSet(table, records); } private static StructLikeSet actualRowSet(Table table, long snapshotId) throws IOException { diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 7099c864cb34..bc5d225d5567 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -50,6 +50,7 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.flink.source.FlinkInputFormat; @@ -116,7 +117,11 @@ public static void assertRows(List results, List expected) { Assert.assertEquals(expected, results); } - public static void assertRowData(Types.StructType structType, LogicalType rowType, Record expectedRecord, + public static void assertRowData(Schema schema, StructLike expected, RowData actual) { + assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual); + } + + public static void assertRowData(Types.StructType structType, LogicalType rowType, StructLike expectedRecord, RowData actualRowData) { if (expectedRecord == null && actualRowData == null) { return; @@ -131,10 +136,15 @@ public static void assertRowData(Types.StructType structType, LogicalType rowTyp } for (int i = 0; i < types.size(); i += 1) { - Object expected = expectedRecord.get(i); LogicalType logicalType = ((RowType) rowType).getTypeAt(i); - assertEquals(types.get(i), logicalType, expected, - RowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData)); + Object expected = expectedRecord.get(i, Object.class); + // The RowData.createFieldGetter won't return null for the required field. But in the projection case, if we are + // projecting a nested required field from a optional struct, then we should give a null for the projected field + // if the outer struct value is null. So we need to check the nullable for actualRowData here. For more details + // please see issue #2738. + Object actual = actualRowData.isNullAt(i) ? null : + RowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); + assertEquals(types.get(i), logicalType, expected, actual); } } @@ -213,8 +223,9 @@ private static void assertEquals(Type type, LogicalType logicalType, Object expe assertMapValues(type.asMapType(), logicalType, (Map) expected, (MapData) actual); break; case STRUCT: - Assertions.assertThat(expected).as("Should expect a Record").isInstanceOf(Record.class); - assertRowData(type.asStructType(), logicalType, (Record) expected, (RowData) actual); + Assertions.assertThat(expected).as("Should expect a StructLike").isInstanceOf(StructLike.class); + Assert.assertTrue("Should expect a Record", expected instanceof StructLike); + assertRowData(type.asStructType(), logicalType, (StructLike) expected, (RowData) actual); break; case UUID: Assertions.assertThat(expected).as("Should expect a UUID").isInstanceOf(UUID.class); diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java new file mode 100644 index 000000000000..e9a21e9d8f4b --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.Iterator; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.junit.Assert; +import org.junit.Test; + +public class TestRowDataProjection { + + @Test + public void testFullProjection() { + Schema schema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()) + ); + + generateAndValidate(schema, schema); + } + + @Test + public void testReorderedFullProjection() { + Schema schema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()) + ); + + Schema reordered = new Schema( + Types.NestedField.optional(1, "data", Types.StringType.get()), + Types.NestedField.required(0, "id", Types.LongType.get()) + ); + + generateAndValidate(schema, reordered); + } + + @Test + public void testBasicProjection() { + Schema schema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()) + ); + Schema id = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()) + ); + Schema data = new Schema( + Types.NestedField.optional(1, "data", Types.StringType.get()) + ); + generateAndValidate(schema, id); + generateAndValidate(schema, data); + } + + @Test + public void testEmptyProjection() { + Schema schema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()) + ); + generateAndValidate(schema, schema.select()); + } + + @Test + public void testRename() { + Schema schema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()) + ); + + Schema renamed = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "renamed", Types.StringType.get()) + ); + generateAndValidate(schema, renamed); + } + + @Test + public void testNestedProjection() { + Schema schema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(3, "location", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()), + Types.NestedField.required(2, "long", Types.FloatType.get()) + )) + ); + + // Project id only. + Schema idOnly = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()) + ); + generateAndValidate(schema, idOnly); + + // Project lat only. + Schema latOnly = new Schema( + Types.NestedField.optional(3, "location", Types.StructType.of( + Types.NestedField.required(1, "lat", Types.FloatType.get()) + )) + ); + generateAndValidate(schema, latOnly); + + // Project long only. + Schema longOnly = new Schema( + Types.NestedField.optional(3, "location", Types.StructType.of( + Types.NestedField.required(2, "long", Types.FloatType.get()) + )) + ); + generateAndValidate(schema, longOnly); + + // Project location. + Schema locationOnly = schema.select("location"); + generateAndValidate(schema, locationOnly); + } + + private void generateAndValidate(Schema schema, Schema projectSchema) { + int numRecords = 100; + Iterable recordList = RandomGenericData.generate(schema, numRecords, 102L); + Iterable rowDataList = RandomRowData.generate(schema, numRecords, 102L); + + StructProjection structProjection = StructProjection.create(schema, projectSchema); + RowDataProjection rowDataProjection = RowDataProjection.create(schema, projectSchema); + + Iterator recordIter = recordList.iterator(); + Iterator rowDataIter = rowDataList.iterator(); + + for (int i = 0; i < numRecords; i++) { + Assert.assertTrue("Should have more records", recordIter.hasNext()); + Assert.assertTrue("Should have more RowData", rowDataIter.hasNext()); + + StructLike expected = structProjection.wrap(recordIter.next()); + RowData actual = rowDataProjection.project(rowDataIter.next()); + + TestHelpers.assertRowData(projectSchema, expected, actual); + } + + Assert.assertFalse("Shouldn't have more record", recordIter.hasNext()); + Assert.assertFalse("Shouldn't have more RowData", rowDataIter.hasNext()); + } +}