diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index d7088b4700e5..baccc84b3ee8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -24,16 +24,11 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.time.ZoneOffset; +import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.flink.table.data.ArrayData; -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.MapData; -import org.apache.flink.table.data.RawValueData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.data.TimestampData; + +import org.apache.flink.table.data.*; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetValueReader; @@ -483,7 +478,7 @@ protected void addElement(ReusableArrayData reused, E element) { @Override protected ArrayData buildList(ReusableArrayData list) { list.setNumElements(writePos); - return list; + return list.buildGenericArrayData(); } } @@ -541,7 +536,7 @@ protected void addPair(ReusableMapData map, K key, V value) { @Override protected MapData buildMap(ReusableMapData map) { map.setNumElements(writePos); - return map; + return map.buildGenericMapData(); } } @@ -634,6 +629,14 @@ public void setNumElements(int numElements) { values.setNumElements(numElements); } + public GenericMapData buildGenericMapData() { + Map map = Maps.newHashMapWithExpectedSize(numElements); + for (int i = 0; i < numElements; i++) { + map.put(keys.values[i], values.values[i]); + } + return new GenericMapData(map); + } + @Override public int size() { return numElements; @@ -675,6 +678,10 @@ public void setNumElements(int numElements) { this.numElements = numElements; } + public GenericArrayData buildGenericArrayData() { + return new GenericArrayData(Arrays.copyOf(values, numElements)); + } + @Override public int size() { return numElements; diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index eae3233a6546..6104ff9d796a 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -33,6 +33,7 @@ import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; @@ -61,6 +62,37 @@ protected List run( return runFormat(formatBuilder.tableLoader(tableLoader()).buildFormat()); } + @Test + public void testReadMap() throws Exception { + Schema MAP_SCHEMA = new Schema( + required(0, "id", Types.LongType.get()), + required(1, "data", Types.MapType.ofOptional(2, 3, + Types.StringType.get(), Types.StringType.get()))); + + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, MAP_SCHEMA); + + List expectedRecords = RandomGenericData.generate(MAP_SCHEMA, 1, 0L); + + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords); + + TestHelpers.assertRecords(run(), expectedRecords, MAP_SCHEMA); + } + + @Test + public void testReadArray() throws Exception { + Schema ARRAY_SCHEMA = new Schema( + required(0, "id", Types.LongType.get()), + required(1, "data", Types.ListType.ofRequired(2, Types.StringType.get()))); + + Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, ARRAY_SCHEMA); + + List expectedRecords = RandomGenericData.generate(ARRAY_SCHEMA, 1, 0L); + + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords); + + TestHelpers.assertRecords(run(), expectedRecords, ARRAY_SCHEMA); + } + @Test public void testNestedProjection() throws Exception { Schema schema = new Schema(