diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 4189d0ae429b..ab7b1174c9f3 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -92,6 +92,7 @@ public ParquetValueReader struct( // match the expected struct's order Map> readersById = Maps.newHashMap(); Map typesById = Maps.newHashMap(); + Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { Type fieldType = fields.get(i); @@ -101,6 +102,9 @@ public ParquetValueReader struct( int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); typesById.put(id, fieldType); + if (idToConstant.containsKey(id)) { + maxDefinitionLevelsById.put(id, fieldD); + } } } } @@ -110,11 +114,16 @@ public ParquetValueReader struct( List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); + // Defaulting to parent max definition level + int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null - reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id))); + int fieldMaxDefinitionLevel = + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); + reorderedFields.add( + ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); types.add(null); } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { reorderedFields.add(ParquetValueReaders.position()); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index d21b8fd3846b..73d03710d32c 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -21,12 +21,15 @@ import static org.apache.iceberg.types.Types.NestedField.required; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -34,9 +37,11 @@ import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.junit.Assume; import org.junit.Test; /** Test {@link FlinkInputFormat}. */ @@ -135,6 +140,52 @@ public void testBasicProjection() throws IOException { TestHelpers.assertRows(result, expected); } + @Test + public void testReadPartitionColumn() throws Exception { + Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat); + + Schema nestedSchema = + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional( + 2, + "struct", + Types.StructType.of( + Types.NestedField.optional(3, "innerId", Types.LongType.get()), + Types.NestedField.optional(4, "innerName", Types.StringType.get())))); + PartitionSpec spec = + PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build(); + + Table table = + catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec); + List records = RandomGenericData.generate(nestedSchema, 10, 0L); + GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + for (Record record : records) { + org.apache.iceberg.TestHelpers.Row partition = + org.apache.iceberg.TestHelpers.Row.of(record.get(1, Record.class).get(1)); + appender.appendToTable(partition, Collections.singletonList(record)); + } + + TableSchema projectedSchema = + TableSchema.builder() + .field("struct", DataTypes.ROW(DataTypes.FIELD("innerName", DataTypes.STRING()))) + .build(); + List result = + runFormat( + FlinkSource.forRowData() + .tableLoader(tableLoader()) + .project(projectedSchema) + .buildFormat()); + + List expected = Lists.newArrayList(); + for (Record record : records) { + Row nested = Row.of(((Record) record.get(1)).get(1)); + expected.add(Row.of(nested)); + } + + TestHelpers.assertRows(result, expected); + } + private List runFormat(FlinkInputFormat inputFormat) throws IOException { RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema()); return TestHelpers.readRows(inputFormat, rowType); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 4189d0ae429b..ab7b1174c9f3 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -92,6 +92,7 @@ public ParquetValueReader struct( // match the expected struct's order Map> readersById = Maps.newHashMap(); Map typesById = Maps.newHashMap(); + Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { Type fieldType = fields.get(i); @@ -101,6 +102,9 @@ public ParquetValueReader struct( int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); typesById.put(id, fieldType); + if (idToConstant.containsKey(id)) { + maxDefinitionLevelsById.put(id, fieldD); + } } } } @@ -110,11 +114,16 @@ public ParquetValueReader struct( List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); + // Defaulting to parent max definition level + int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null - reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id))); + int fieldMaxDefinitionLevel = + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); + reorderedFields.add( + ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); types.add(null); } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { reorderedFields.add(ParquetValueReaders.position()); diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index d21b8fd3846b..73d03710d32c 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -21,12 +21,15 @@ import static org.apache.iceberg.types.Types.NestedField.required; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -34,9 +37,11 @@ import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.junit.Assume; import org.junit.Test; /** Test {@link FlinkInputFormat}. */ @@ -135,6 +140,52 @@ public void testBasicProjection() throws IOException { TestHelpers.assertRows(result, expected); } + @Test + public void testReadPartitionColumn() throws Exception { + Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat); + + Schema nestedSchema = + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional( + 2, + "struct", + Types.StructType.of( + Types.NestedField.optional(3, "innerId", Types.LongType.get()), + Types.NestedField.optional(4, "innerName", Types.StringType.get())))); + PartitionSpec spec = + PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build(); + + Table table = + catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, nestedSchema, spec); + List records = RandomGenericData.generate(nestedSchema, 10, 0L); + GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + for (Record record : records) { + org.apache.iceberg.TestHelpers.Row partition = + org.apache.iceberg.TestHelpers.Row.of(record.get(1, Record.class).get(1)); + appender.appendToTable(partition, Collections.singletonList(record)); + } + + TableSchema projectedSchema = + TableSchema.builder() + .field("struct", DataTypes.ROW(DataTypes.FIELD("innerName", DataTypes.STRING()))) + .build(); + List result = + runFormat( + FlinkSource.forRowData() + .tableLoader(tableLoader()) + .project(projectedSchema) + .buildFormat()); + + List expected = Lists.newArrayList(); + for (Record record : records) { + Row nested = Row.of(((Record) record.get(1)).get(1)); + expected.add(Row.of(nested)); + } + + TestHelpers.assertRows(result, expected); + } + private List runFormat(FlinkInputFormat inputFormat) throws IOException { RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema()); return TestHelpers.readRows(inputFormat, rowType);