diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java index e5b9509d8798a..62ea526c275d8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java @@ -506,18 +506,27 @@ public void write(ArrayData array, int ordinal) { private void doWrite(ArrayData arrayData) { recordConsumer.startGroup(); if (arrayData.size() > 0) { - final String repeatedGroup = "list"; - final String elementField = "element"; + final String repeatedGroup = "array"; recordConsumer.startField(repeatedGroup, 0); - for (int i = 0; i < arrayData.size(); i++) { - recordConsumer.startGroup(); - if (!arrayData.isNullAt(i)) { - // Only creates the element field if the current array element is not null. - recordConsumer.startField(elementField, 0); - elementWriter.write(arrayData, i); - recordConsumer.endField(elementField, 0); + if (elementWriter instanceof RowWriter) { + for (int i = 0; i < arrayData.size(); i++) { + if (!arrayData.isNullAt(i)) { + // Only creates the element field if the current array element is not null. + elementWriter.write(arrayData, i); + } + } + } else { + final String elementField = "element"; + for (int i = 0; i < arrayData.size(); i++) { + recordConsumer.startGroup(); + if (!arrayData.isNullAt(i)) { + // Only creates the element field if the current array element is not null. + recordConsumer.startField(elementField, 0); + elementWriter.write(arrayData, i); + recordConsumer.endField(elementField, 0); + } + recordConsumer.endGroup(); } - recordConsumer.endGroup(); } recordConsumer.endField(repeatedGroup, 0); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java index 8bd1a8488553d..b4b425f383ccc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java @@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; @@ -622,18 +623,25 @@ private static Type convertToParquetType( } case ARRAY: // group (LIST) { - // repeated group list { + // repeated group array { // element; // } // } ArrayType arrayType = (ArrayType) type; LogicalType elementType = arrayType.getElementType(); + + Types.GroupBuilder arrayGroupBuilder = Types.repeatedGroup(); + if (elementType.getTypeRoot() == LogicalTypeRoot.ROW) { + RowType rowType = (RowType) elementType; + rowType.getFields().forEach(field -> + arrayGroupBuilder.addField(convertToParquetType(field.getName(), field.getType(), repetition))); + } else { + arrayGroupBuilder.addField(convertToParquetType("element", elementType, repetition)); + } + return Types .buildGroup(repetition).as(OriginalType.LIST) - .addField( - Types.repeatedGroup() - .addField(convertToParquetType("element", elementType, repetition)) - .named("list")) + .addField(arrayGroupBuilder.named("array")) .named(name); case MAP: // group (MAP) { diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java index 9e07edbd4ca05..864abeed86ab4 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java @@ -50,7 +50,7 @@ void testConvertComplexTypes() { assertThat(messageType.getColumns().size(), is(7)); final String expected = "message converted {\n" + " optional group f_array (LIST) {\n" - + " repeated group list {\n" + + " repeated group array {\n" + " optional binary element (STRING);\n" + " }\n" + " }\n" diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 7cd1f4b3e7b14..52f4fd440699c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -371,9 +371,9 @@ void testAppendWriteReadSkippingClustering() throws Exception { .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.OPERATION, "insert") .option(FlinkOptions.READ_AS_STREAMING, true) - .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true) + .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true) .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true) - .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1) + .option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1) .option(FlinkOptions.CLUSTERING_TASKS, 1) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -383,7 +383,7 @@ void testAppendWriteReadSkippingClustering() throws Exception { String instant = TestUtils.getNthCompleteInstant(new StoragePath(tempFile.toURI()), 2, HoodieTimeline.COMMIT_ACTION); streamTableEnv.getConfig().getConfiguration() - .setBoolean("table.dynamic-table-options.enabled", true); + .setBoolean("table.dynamic-table-options.enabled", true); final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant); List rows = execSelectSql(streamTableEnv, query, 10); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); @@ -398,9 +398,9 @@ void testAppendWriteWithClusteringBatchRead() throws Exception { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.OPERATION, "insert") - .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true) + .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true) .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true) - .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,2) + .option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2) .option(FlinkOptions.CLUSTERING_TASKS, 1) .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1) .end(); @@ -409,9 +409,9 @@ void testAppendWriteWithClusteringBatchRead() throws Exception { execInsertSql(streamTableEnv, insertInto); streamTableEnv.getConfig().getConfiguration() - .setBoolean("table.dynamic-table-options.enabled", true); + .setBoolean("table.dynamic-table-options.enabled", true); final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", - FlinkOptions.START_COMMIT_EARLIEST); + FlinkOptions.START_COMMIT_EARLIEST); List rows = execSelectSql(streamTableEnv, query, 10); // batch read will not lose data when cleaned clustered files. @@ -450,12 +450,12 @@ void testStreamWriteWithCleaning() { @Test void testBatchWriteWithCleaning() { String hoodieTableDDL = sql("t1") - .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1) - .end(); + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1) + .end(); batchTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 values\n" - + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')"; + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')"; execInsertSql(batchTableEnv, insertInto); execInsertSql(batchTableEnv, insertInto); execInsertSql(batchTableEnv, insertInto); @@ -466,7 +466,7 @@ void testBatchWriteWithCleaning() { HoodieTimeline timeline = StreamerUtil.createMetaClient(conf).getActiveTimeline(); assertTrue(timeline.filterCompletedInstants() .getInstants().stream().anyMatch(instant -> instant.getAction().equals("clean")), - "some commits should be cleaned"); + "some commits should be cleaned"); } @Test @@ -1621,12 +1621,44 @@ void testParquetComplexNestedRowTypes(String operation) { List result = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); List expected = Arrays.asList( - row(1, array("abc1", "def1"), array(1, 1), map("abc1", 1, "def1", 3), row(array("abc1", "def1"), row(1, "abc1"))), - row(2, array("abc2", "def2"), array(2, 2), map("abc2", 1, "def2", 3), row(array("abc2", "def2"), row(2, "abc2"))), - row(3, array("abc3", "def3"), array(3, 3), map("abc3", 1, "def3", 3), row(array("abc3", "def3"), row(3, "abc3")))); + row(1, array("abc1", "def1"), array(1, 1), map("abc1", 1, "def1", 3), row(array("abc1", "def1"), row(1, "abc1"))), + row(2, array("abc2", "def2"), array(2, 2), map("abc2", 1, "def2", 3), row(array("abc2", "def2"), row(2, "abc2"))), + row(3, array("abc3", "def3"), array(3, 3), map("abc3", 1, "def3", 3), row(array("abc3", "def3"), row(3, "abc3")))); assertRowsEqualsUnordered(result, expected); } + @ParameterizedTest + @ValueSource(strings = {"insert", "upsert", "bulk_insert"}) + void testParquetArrayMapOfRowTypes(String operation) { + TableEnvironment tableEnv = batchTableEnv; + + String hoodieTableDDL = sql("t1") + .field("f_int int") + .field("f_array array") + .field("f_map map") + .pkField("f_int") + .noPartition() + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.OPERATION, operation) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.ARRAY_MAP_OF_ROW_TYPE_INSERT_T1); + + tableEnv.executeSql("ALTER TABLE t1 MODIFY (\n" + + " f_array array,\n" + + " f_map map\n" + + ");"); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + List expected = Arrays.asList( + row(1, array(row("abc11", 11, null), row("abc12", 12, null), row("abc13", 13, null)), map("abc11", row(11, "def11", null), "abc12", row(12, "def12", null), "abc13", row(13, "def13", null))), + row(2, array(row("abc21", 21, null), row("abc22", 22, null), row("abc23", 23, null)), map("abc21", row(21, "def21", null), "abc22", row(22, "def22", null), "abc23", row(23, "def23", null))), + row(3, array(row("abc31", 31, null), row("abc32", 32, null), row("abc33", 33, null)), map("abc31", row(31, "def31", null), "abc32", row(32, "def32", null), "abc33", row(33, "def33", null)))); + assertRowsEqualsUnordered(expected, result); + } + @ParameterizedTest @ValueSource(strings = {"insert", "upsert", "bulk_insert"}) void testParquetNullChildColumnsRowTypes(String operation) { @@ -2026,18 +2058,18 @@ void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType, boolean read void testReadMetaFields(HoodieTableType tableType, String queryType, int numInsertBatches, int compactionDeltaCommits) throws Exception { String path = tempFile.getAbsolutePath(); String hoodieTableDDL = sql("t1") - .field("id int") - .field("name varchar(10)") - .field("ts timestamp(6)") - .field("`partition` varchar(10)") - .pkField("id") - .partitionField("partition") - .option(FlinkOptions.TABLE_TYPE, tableType) - .option(FlinkOptions.QUERY_TYPE, queryType) - .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) - .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits) - .option(FlinkOptions.PATH, path) - .end(); + .field("id int") + .field("name varchar(10)") + .field("ts timestamp(6)") + .field("`partition` varchar(10)") + .pkField("id") + .partitionField("partition") + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.QUERY_TYPE, queryType) + .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits) + .option(FlinkOptions.PATH, path) + .end(); batchTableEnv.executeSql(hoodieTableDDL); final String[] insertInto = new String[] { @@ -2076,7 +2108,7 @@ void testReadMetaFields(HoodieTableType tableType, String queryType, int numInse for (int i = 0; i < numInsertBatches; i++) { execInsertSql(batchTableEnv, insertInto[i]); String commitTime = tableType.equals(HoodieTableType.MERGE_ON_READ) - ? TestUtils.getLastDeltaCompleteInstant(path) : TestUtils.getLastCompleteInstant(path); + ? TestUtils.getLastDeltaCompleteInstant(path) : TestUtils.getLastCompleteInstant(path); expected1.append(template1[i]); expected2.append(String.format(template2[i], commitTime)); expected3.append(String.format(template3[i], commitTime)); @@ -2087,18 +2119,18 @@ void testReadMetaFields(HoodieTableType tableType, String queryType, int numInse String readHoodieTableDDL; batchTableEnv.executeSql("drop table t1"); readHoodieTableDDL = sql("t1") - .field("id int") - .field("name varchar(10)") - .field("ts timestamp(6)") - .field("`partition` varchar(10)") - .pkField("id") - .partitionField("partition") - .option(FlinkOptions.TABLE_TYPE, tableType) - .option(FlinkOptions.QUERY_TYPE, queryType) - .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) - .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits) - .option(FlinkOptions.PATH, path) - .end(); + .field("id int") + .field("name varchar(10)") + .field("ts timestamp(6)") + .field("`partition` varchar(10)") + .pkField("id") + .partitionField("partition") + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.QUERY_TYPE, queryType) + .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits) + .option(FlinkOptions.PATH, path) + .end(); batchTableEnv.executeSql(readHoodieTableDDL); List result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH); @@ -2106,21 +2138,21 @@ void testReadMetaFields(HoodieTableType tableType, String queryType, int numInse batchTableEnv.executeSql("drop table t1"); readHoodieTableDDL = sql("t1") - .field("_hoodie_commit_time string") - .field("_hoodie_record_key string") - .field("_hoodie_partition_path string") - .field("id int") - .field("name varchar(10)") - .field("ts timestamp(6)") - .field("`partition` varchar(10)") - .pkField("id") - .partitionField("partition") - .option(FlinkOptions.TABLE_TYPE, tableType) - .option(FlinkOptions.QUERY_TYPE, queryType) - .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) - .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits) - .option(FlinkOptions.PATH, path) - .end(); + .field("_hoodie_commit_time string") + .field("_hoodie_record_key string") + .field("_hoodie_partition_path string") + .field("id int") + .field("name varchar(10)") + .field("ts timestamp(6)") + .field("`partition` varchar(10)") + .pkField("id") + .partitionField("partition") + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.QUERY_TYPE, queryType) + .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits) + .option(FlinkOptions.PATH, path) + .end(); batchTableEnv.executeSql(readHoodieTableDDL); result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH); @@ -2128,21 +2160,21 @@ void testReadMetaFields(HoodieTableType tableType, String queryType, int numInse batchTableEnv.executeSql("drop table t1"); readHoodieTableDDL = sql("t1") - .field("id int") - .field("_hoodie_commit_time string") - .field("name varchar(10)") - .field("_hoodie_record_key string") - .field("ts timestamp(6)") - .field("_hoodie_partition_path string") - .field("`partition` varchar(10)") - .pkField("id") - .partitionField("partition") - .option(FlinkOptions.TABLE_TYPE, tableType) - .option(FlinkOptions.QUERY_TYPE, queryType) - .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) - .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits) - .option(FlinkOptions.PATH, path) - .end(); + .field("id int") + .field("_hoodie_commit_time string") + .field("name varchar(10)") + .field("_hoodie_record_key string") + .field("ts timestamp(6)") + .field("_hoodie_partition_path string") + .field("`partition` varchar(10)") + .pkField("id") + .partitionField("partition") + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.QUERY_TYPE, queryType) + .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits) + .option(FlinkOptions.PATH, path) + .end(); batchTableEnv.executeSql(readHoodieTableDDL); result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH); @@ -2300,11 +2332,11 @@ private static Stream tableTypeAndBooleanTrueFalseParams() { */ private static Stream tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams() { return Arrays.stream(new Object[][] { - {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_INCREMENTAL, 1, 1}, - {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1}, - {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 1}, - {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 3}, - {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 3, 2} + {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_INCREMENTAL, 1, 1}, + {HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1}, + {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 1}, + {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 3}, + {HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 3, 2} }).map(Arguments::of); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java index 46f51df741f12..ddb29cdbea728 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java @@ -180,6 +180,8 @@ private void writeTableWithSchema1(TableOptions tableOptions) throws ExecutionEx + " f_struct row," + " f_map map," + " f_array array," + + " f_row_map map>," + + " f_row_array array>," + " `partition` string" + ") partitioned by (`partition`) with (" + tableOptions + ")" ); @@ -195,18 +197,29 @@ private void writeTableWithSchema1(TableOptions tableOptions) throws ExecutionEx + " cast(f_struct as row)," + " cast(f_map as map)," + " cast(f_array as array)," + + " cast(f_row_map as map>)," + + " cast(f_row_array as array>)," + " cast(`partition` as string) " + "from (values " - + " ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as row), map['Indica', 1212], array[12], 'par0')," - + " ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1', '', 1), cast(map['Danny', 2323] as map), array[23, 23], 'par1')," - + " ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2', '', 2), cast(map['Stephen', 3333] as map), array[33], 'par1')," - + " ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3', '', 3), cast(map['Julian', 5353] as map), array[53, 53], 'par2')," - + " ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4', '', 4), cast(map['Fabian', 3131] as map), array[31], 'par2')," - + " ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5', '', 5), cast(map['Sophia', 1818] as map), array[18, 18], 'par3')," - + " ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6', '', 6), cast(map['Emma', 2020] as map), array[20], 'par3')," - + " ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7', '', 7), cast(map['Bob', 4444] as map), array[44, 44], 'par4')," - + " ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8', '', 8), cast(map['Han', 5656] as map), array[56, 56, 56], 'par4')" - + ") as A(uuid, name, gender, age, ts, f_struct, f_map, f_array, `partition`)" + + " ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as row), map['Indica', 1212], array[12], " + + " cast(null as map>), array[row(0, 's0', '', 0)], 'par0')," + + " ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1', '', 1), cast(map['Danny', 2323] as map), array[23, 23], " + + " cast(map['Danny', row(1, 's1', '', 1)] as map>), array[row(1, 's1', '', 1)], 'par1')," + + " ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2', '', 2), cast(map['Stephen', 3333] as map), array[33], " + + " cast(map['Stephen', row(2, 's2', '', 2)] as map>), array[row(2, 's2', '', 2)], 'par1')," + + " ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3', '', 3), cast(map['Julian', 5353] as map), array[53, 53], " + + " cast(map['Julian', row(3, 's3', '', 3)] as map>), array[row(3, 's3', '', 3)], 'par2')," + + " ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4', '', 4), cast(map['Fabian', 3131] as map), array[31], " + + " cast(map['Fabian', row(4, 's4', '', 4)] as map>), array[row(4, 's4', '', 4)], 'par2')," + + " ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5', '', 5), cast(map['Sophia', 1818] as map), array[18, 18], " + + " cast(map['Sophia', row(5, 's5', '', 5)] as map>), array[row(5, 's5', '', 5)], 'par3')," + + " ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6', '', 6), cast(map['Emma', 2020] as map), array[20], " + + " cast(map['Emma', row(6, 's6', '', 6)] as map>), array[row(6, 's6', '', 6)], 'par3')," + + " ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7', '', 7), cast(map['Bob', 4444] as map), array[44, 44], " + + " cast(map['Bob', row(7, 's7', '', 7)] as map>), array[row(7, 's7', '', 7)], 'par4')," + + " ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8', '', 8), cast(map['Han', 5656] as map), array[56, 56, 56], " + + " cast(map['Han', row(8, 's8', '', 8)] as map>), array[row(8, 's8', '', 8)], 'par4')" + + ") as A(uuid, name, gender, age, ts, f_struct, f_map, f_array, f_row_map, f_row_array, `partition`)" ).await(); } @@ -232,19 +245,25 @@ private void changeTableSchema(TableOptions tableOptions, boolean shouldCompactB writeClient.updateColumnType("age", Types.StringType.get()); writeClient.addColumn("last_name", stringType, "empty allowed", "salary", BEFORE); writeClient.reOrderColPosition("age", "first_name", BEFORE); - // add a field in the middle of the `f_struct` column + // add a field in the middle of the `f_struct` and `f_row_map` columns writeClient.addColumn("f_struct.f2", intType, "add field in middle of struct", "f_struct.f0", AFTER); - // add a field at the end of `f_struct` column + writeClient.addColumn("f_row_map.value.f2", intType, "add field in middle of struct", "f_row_map.value.f0", AFTER); + // add a field at the end of `f_struct` and `f_row_map` column writeClient.addColumn("f_struct.f3", stringType); + writeClient.addColumn("f_row_map.value.f3", stringType); // delete and add a field with the same name // reads should not return previously inserted datum of dropped field of the same name writeClient.deleteColumns("f_struct.drop_add"); writeClient.addColumn("f_struct.drop_add", doubleType); + writeClient.deleteColumns("f_row_map.value.drop_add"); + writeClient.addColumn("f_row_map.value.drop_add", doubleType); // perform comprehensive evolution on complex types (struct, array, map) by promoting its primitive types writeClient.updateColumnType("f_struct.change_type", Types.LongType.get()); writeClient.renameColumn("f_struct.change_type", "renamed_change_type"); + writeClient.updateColumnType("f_row_map.value.change_type", Types.LongType.get()); + writeClient.renameColumn("f_row_map.value.change_type", "renamed_change_type"); writeClient.updateColumnType("f_array.element", Types.DoubleType.get()); writeClient.updateColumnType("f_map.value", Types.DoubleType.get()); @@ -256,6 +275,8 @@ private void changeTableSchema(TableOptions tableOptions, boolean shouldCompactB // perform comprehensive evolution on a struct column by reordering field positions writeClient.updateColumnType("f_struct.f0", Types.DecimalType.get(20, 0)); writeClient.reOrderColPosition("f_struct.f0", "f_struct.drop_add", AFTER); + writeClient.updateColumnType("f_row_map.value.f0", Types.DecimalType.get(20, 0)); + writeClient.reOrderColPosition("f_row_map.value.f0", "f_row_map.value.drop_add", AFTER); } } @@ -278,6 +299,8 @@ private void writeTableWithSchema2(TableOptions tableOptions) throws ExecutionEx + " f_struct row," + " f_map map," + " f_array array," + + " f_row_map map>," + + " f_row_array array>," + " new_row_col row," + " new_array_col array," + " new_map_col map," @@ -296,18 +319,26 @@ private void writeTableWithSchema2(TableOptions tableOptions) throws ExecutionEx + " cast(f_struct as row)," + " cast(f_map as map)," + " cast(f_array as array)," + + " cast(f_row_map as map>)," + + " cast(f_row_array as array>)," + " cast(new_row_col as row)," + " cast(new_array_col as array)," + " cast(new_map_col as map)," + " cast(`partition` as string) " + "from (values " + " ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1, 's1', 11, 't1', 'drop_add1', 1), cast(map['Danny', 2323.23] as map), array[23, 23, 23], " + + " cast(map['Danny', row(1, 's1', 11, 't1', 'drop_add1', 1)] as map>), " + + " array[row(1, 's1', '', 1)], " + " row(1, '1'), array['1'], Map['k1','v1'], 'par1')," + " ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', row(9, 's9', 99, 't9', 'drop_add9', 9), cast(map['Alice', 9999.99] as map), array[9999, 9999], " + + " cast(map['Alice', row(9, 's9', 99, 't9', 'drop_add9', 9)] as map>), " + + " array[row(9, 's9', '', 9)], " + " row(9, '9'), array['9'], Map['k9','v9'], 'par1')," + " ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', row(3, 's3', 33, 't3', 'drop_add3', 3), cast(map['Julian', 5353.53] as map), array[53], " + + " cast(map['Julian', row(3, 's3', 33, 't3', 'drop_add3', 3)] as map>), " + + " array[row(3, 's3', '', 3)], " + " row(3, '3'), array['3'], Map['k3','v3'], 'par2')" - + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct, f_map, f_array, new_row_col, new_array_col, new_map_col, `partition`)" + + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct, f_map, f_array, f_row_map, f_row_array, new_row_col, new_array_col, new_map_col, `partition`)" ).await(); } @@ -345,6 +376,8 @@ private void checkAnswerEvolved(String... expectedResult) throws Exception { + " f_struct, " + " f_map, " + " f_array, " + + " f_row_map, " + + " f_row_array, " + " new_row_col, " + " new_array_col, " + " new_map_col " @@ -376,6 +409,8 @@ private void checkAnswerWithMeta(TableOptions tableOptions, String... expectedRe + " f_struct row," + " f_map map," + " f_array array," + + " f_row_map map>," + + " f_row_array array>," + " new_row_col row," + " new_array_col array," + " new_map_col map," @@ -392,6 +427,8 @@ private void checkAnswerWithMeta(TableOptions tableOptions, String... expectedRe + " f_struct, " + " f_map, " + " f_array, " + + " f_row_map, " + + " f_row_array, " + " new_row_col, " + " new_array_col, " + " new_map_col " @@ -427,6 +464,16 @@ private void checkAnswer(String query, String... expectedResult) { executor.shutdownNow(); } + for (String expectedItem : expected) { + if (!actual.contains(expectedItem)) { + System.out.println("Not in actual: " + expectedItem); + } + } + for (String actualItem : actual) { + if (!expected.contains(actualItem)) { + System.out.println("Not in expected: " + actualItem); + } + } assertEquals(expected, actual); } @@ -472,30 +519,31 @@ private ExpectedResult(String[] evolvedRows, String[] rowsWithMeta, String[] row } } + //TODO: null arrays have a single null row; array with null vs array with row will all null values private static final ExpectedResult EXPECTED_MERGED_RESULT = new ExpectedResult( new String[] { - "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]", - "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]", - "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], {Stephen=3333.0}, [33.0], null, null, null]", - "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]", - "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], {Fabian=3131.0}, [31.0], null, null, null]", - "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], {Sophia=1818.0}, [18.0, 18.0], null, null, null]", - "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, [20.0], null, null, null]", - "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, [44.0, 44.0], null, null, null]", - "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]", - "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]", + "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, [+I[0, s0, , 0]], null, null, null]", + "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], {Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]}, [+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]", + "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], {Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2, , 2]], null, null, null]", + "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], {Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3, , 3]], +I[3, 3], [3], {k3=v3}]", + "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], {Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, , 4]], null, null, null]", + "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], {Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5, s5, , 5]], null, null, null]", + "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, [20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], null, null, null]", + "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, [44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 7]], null, null, null]", + "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, [56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, s8, , 8]], null, null, null]", + "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], {Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]}, [+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]", }, new String[] { - "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]", - "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]", - "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], {Stephen=3333.0}, [33.0], null, null, null]", - "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]", - "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], {Fabian=3131.0}, [31.0], null, null, null]", - "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], {Sophia=1818.0}, [18.0, 18.0], null, null, null]", - "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, [20.0], null, null, null]", - "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, [44.0, 44.0], null, null, null]", - "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]", - "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]", + "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, [+I[0, s0, , 0]], null, null, null]", + "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], {Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]}, [+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]", + "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], {Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2, , 2]], null, null, null]", + "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], {Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3, , 3]], +I[3, 3], [3], {k3=v3}]", + "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], {Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, , 4]], null, null, null]", + "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], {Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5, s5, , 5]], null, null, null]", + "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, [20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], null, null, null]", + "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, [44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 7]], null, null, null]", + "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, [56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, s8, , 8]], null, null, null]", + "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], {Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]}, [+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]", }, new String[] { "+I[1]", @@ -522,32 +570,32 @@ private ExpectedResult(String[] evolvedRows, String[] rowsWithMeta, String[] row private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new ExpectedResult( new String[] { - "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]", - "+I[Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0}, [23.0, 23.0], null, null, null]", - "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], {Stephen=3333.0}, [33.0], null, null, null]", - "+I[Julian, null, 53, +I[null, s3, 3, null, null, 3], {Julian=5353.0}, [53.0, 53.0], null, null, null]", - "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], {Fabian=3131.0}, [31.0], null, null, null]", - "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], {Sophia=1818.0}, [18.0, 18.0], null, null, null]", - "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, [20.0], null, null, null]", - "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, [44.0, 44.0], null, null, null]", - "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]", - "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]", - "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]", - "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]", + "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, [+I[0, s0, , 0]], null, null, null]", + "+I[Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0}, [23.0, 23.0], {Danny=+I[null, s1, 1, null, null, 1]}, [+I[1, s1, , 1]], null, null, null]", + "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], {Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2, , 2]], null, null, null]", + "+I[Julian, null, 53, +I[null, s3, 3, null, null, 3], {Julian=5353.0}, [53.0, 53.0], {Julian=+I[null, s3, 3, null, null, 3]}, [+I[3, s3, , 3]], null, null, null]", + "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], {Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, , 4]], null, null, null]", + "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], {Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5, s5, , 5]], null, null, null]", + "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, [20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], null, null, null]", + "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, [44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 7]], null, null, null]", + "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, [56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, s8, , 8]], null, null, null]", + "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], {Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]}, [+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]", + "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], {Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]}, [+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]", + "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], {Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3, , 3]], +I[3, 3], [3], {k3=v3}]", }, new String[] { - "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]", - "+I[id1, Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0}, [23.0, 23.0], null, null, null]", - "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], {Stephen=3333.0}, [33.0], null, null, null]", - "+I[id3, Julian, null, 53, +I[null, s3, 3, null, null, 3], {Julian=5353.0}, [53.0, 53.0], null, null, null]", - "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], {Fabian=3131.0}, [31.0], null, null, null]", - "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], {Sophia=1818.0}, [18.0, 18.0], null, null, null]", - "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, [20.0], null, null, null]", - "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, [44.0, 44.0], null, null, null]", - "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]", - "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]", - "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]", - "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]", + "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, [+I[0, s0, , 0]], null, null, null]", + "+I[id1, Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0}, [23.0, 23.0], {Danny=+I[null, s1, 1, null, null, 1]}, [+I[1, s1, , 1]], null, null, null]", + "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], {Stephen=3333.0}, [33.0], {Stephen=+I[null, s2, 2, null, null, 2]}, [+I[2, s2, , 2]], null, null, null]", + "+I[id3, Julian, null, 53, +I[null, s3, 3, null, null, 3], {Julian=5353.0}, [53.0, 53.0], {Julian=+I[null, s3, 3, null, null, 3]}, [+I[3, s3, , 3]], null, null, null]", + "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], {Fabian=3131.0}, [31.0], {Fabian=+I[null, s4, 4, null, null, 4]}, [+I[4, s4, , 4]], null, null, null]", + "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], {Sophia=1818.0}, [18.0, 18.0], {Sophia=+I[null, s5, 5, null, null, 5]}, [+I[5, s5, , 5]], null, null, null]", + "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, [20.0], {Emma=+I[null, s6, 6, null, null, 6]}, [+I[6, s6, , 6]], null, null, null]", + "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, [44.0, 44.0], {Bob=+I[null, s7, 7, null, null, 7]}, [+I[7, s7, , 7]], null, null, null]", + "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, [56.0, 56.0, 56.0], {Han=+I[null, s8, 8, null, null, 8]}, [+I[8, s8, , 8]], null, null, null]", + "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], {Alice=9999.99}, [9999.0, 9999.0], {Alice=+I[9, s9, 99, t9, drop_add9, 9]}, [+I[9, s9, , 9]], +I[9, 9], [9], {k9=v9}]", + "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], {Danny=2323.23}, [23.0, 23.0, 23.0], {Danny=+I[1, s1, 11, t1, drop_add1, 1]}, [+I[1, s1, , 1]], +I[1, 1], [1], {k1=v1}]", + "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], {Julian=5353.53}, [53.0], {Julian=+I[3, s3, 33, t3, drop_add3, 3]}, [+I[3, s3, , 3]], +I[3, 3], [3], {k3=v3}]", }, new String[] { "+I[1]", diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 71295d93b1099..92d44a8e7ef10 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -97,6 +97,16 @@ private TestConfigurations() { DataTypes.FIELD("change_type", DataTypes.INT()))), DataTypes.FIELD("f_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())), DataTypes.FIELD("f_array", DataTypes.ARRAY(DataTypes.INT())), + DataTypes.FIELD("f_row_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.STRING()), + DataTypes.FIELD("drop_add", DataTypes.STRING()), + DataTypes.FIELD("change_type", DataTypes.INT())))), + DataTypes.FIELD("f_row_array", DataTypes.ARRAY(DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.STRING()), + DataTypes.FIELD("drop_add", DataTypes.STRING()), + DataTypes.FIELD("change_type", DataTypes.INT())))), DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) .notNull(); @@ -118,6 +128,18 @@ private TestConfigurations() { DataTypes.FIELD("f0", DataTypes.DECIMAL(20, 0)))), DataTypes.FIELD("f_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE())), DataTypes.FIELD("f_array", DataTypes.ARRAY(DataTypes.DOUBLE())), + DataTypes.FIELD("f_row_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.ROW( + DataTypes.FIELD("f2", DataTypes.INT()), // new field added in the middle of struct + DataTypes.FIELD("f1", DataTypes.STRING()), + DataTypes.FIELD("renamed_change_type", DataTypes.BIGINT()), + DataTypes.FIELD("f3", DataTypes.STRING()), + DataTypes.FIELD("drop_add", DataTypes.STRING()), + DataTypes.FIELD("f0", DataTypes.DECIMAL(20, 0))))), + DataTypes.FIELD("f_row_array", DataTypes.ARRAY(DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.STRING()), + DataTypes.FIELD("drop_add", DataTypes.STRING()), + DataTypes.FIELD("change_type", DataTypes.INT())))), DataTypes.FIELD("new_row_col", DataTypes.ROW( DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.STRING()))), diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java index 70455d9446617..683051a492632 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java @@ -62,6 +62,11 @@ private TestSQL() { + "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n" + "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))"; + public static final String ARRAY_MAP_OF_ROW_TYPE_INSERT_T1 = "insert into t1 values\n" + + "(1, array[row('abc11', 11), row('abc12', 12), row('abc13', 13)], map['abc11', row(11, 'def11'), 'abc12', row(12, 'def12'), 'abc13', row(13, 'def13')]),\n" + + "(2, array[row('abc21', 21), row('abc22', 22), row('abc23', 23)], map['abc21', row(21, 'def21'), 'abc22', row(22, 'def22'), 'abc23', row(23, 'def23')]),\n" + + "(3, array[row('abc31', 31), row('abc32', 32), row('abc33', 33)], map['abc31', row(31, 'def31'), 'abc32', row(32, 'def32'), 'abc33', row(33, 'def33')])"; + public static final String NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1 = "insert into t1 values\n" + "(1, row(cast(null as int), 'abc1')),\n" + "(2, row(2, cast(null as varchar))),\n" diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index 8bbbb1288e53a..4532fd0672ecc 100644 --- a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -19,11 +19,13 @@ package org.apache.hudi.table.format.cow; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector; import org.apache.hudi.table.format.cow.vector.HeapArrayVector; import org.apache.hudi.table.format.cow.vector.HeapDecimalVector; import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector; import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector; import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader; +import org.apache.hudi.table.format.cow.vector.reader.ArrayGroupReader; import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader; import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader; import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader; @@ -62,6 +64,8 @@ import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; @@ -121,7 +125,7 @@ public static ParquetColumnarRowSplitReader genPartColumnarRowReader( UnboundRecordFilter recordFilter) throws IOException { ValidationUtils.checkState(Arrays.stream(selectedFields).noneMatch(x -> x == -1), - "One or more specified columns does not exist in the hudi table."); + "One or more specified columns does not exist in the hudi table."); List selNonPartNames = Arrays.stream(selectedFields) .mapToObj(i -> fullFieldNames[i]) @@ -282,12 +286,23 @@ private static ColumnVector createVectorFromConstant( } return tv; case ARRAY: - HeapArrayVector arrayVector = new HeapArrayVector(batchSize); - if (value == null) { - arrayVector.fillWithNulls(); - return arrayVector; + ArrayType arrayType = (ArrayType) type; + if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) { + HeapArrayGroupColumnVector arrayGroup = new HeapArrayGroupColumnVector(batchSize); + if (value == null) { + arrayGroup.fillWithNulls(); + return arrayGroup; + } else { + throw new UnsupportedOperationException("Unsupported create array with default value."); + } } else { - throw new UnsupportedOperationException("Unsupported create array with default value."); + HeapArrayVector arrayVector = new HeapArrayVector(batchSize); + if (value == null) { + arrayVector.fillWithNulls(); + return arrayVector; + } else { + throw new UnsupportedOperationException("Unsupported create array with default value."); + } } case MAP: HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize, null, null); @@ -394,12 +409,23 @@ private static ColumnReader createColumnReader( throw new AssertionError(); } case ARRAY: - return new ArrayColumnReader( - descriptor, - pageReader, - utcTimestamp, - descriptor.getPrimitiveType(), - fieldType); + ArrayType arrayType = (ArrayType) fieldType; + if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) { + return new ArrayGroupReader(createColumnReader( + utcTimestamp, + arrayType.getElementType(), + physicalType.asGroupType().getType(0), + descriptors, + pages, + depth + 1)); + } else { + return new ArrayColumnReader( + descriptor, + pageReader, + utcTimestamp, + descriptor.getPrimitiveType(), + fieldType); + } case MAP: MapType mapType = (MapType) fieldType; ArrayColumnReader keyReader = @@ -409,14 +435,24 @@ private static ColumnReader createColumnReader( utcTimestamp, descriptor.getPrimitiveType(), new ArrayType(mapType.getKeyType())); - ArrayColumnReader valueReader = - new ArrayColumnReader( - descriptors.get(1), - pages.getPageReader(descriptors.get(1)), - utcTimestamp, - descriptors.get(1).getPrimitiveType(), - new ArrayType(mapType.getValueType())); - return new MapColumnReader(keyReader, valueReader, fieldType); + ColumnReader valueReader; + if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) { + valueReader = new ArrayGroupReader(createColumnReader( + utcTimestamp, + mapType.getValueType(), + physicalType.asGroupType().getType(0).asGroupType().getType(1), // Get the value physical type + descriptors.subList(1, descriptors.size()), // remove the key descriptor + pages, + depth + 2)); // increase the depth by 2, because there's a key_value entry in the path + } else { + valueReader = new ArrayColumnReader( + descriptors.get(1), + pages.getPageReader(descriptors.get(1)), + utcTimestamp, + descriptors.get(1).getPrimitiveType(), + new ArrayType(mapType.getValueType())); + } + return new MapColumnReader(keyReader, valueReader); case ROW: RowType rowType = (RowType) fieldType; GroupType groupType = physicalType.asGroupType(); @@ -427,14 +463,32 @@ private static ColumnReader createColumnReader( if (fieldIndex < 0) { fieldReaders.add(new EmptyColumnReader()); } else { - fieldReaders.add( - createColumnReader( - utcTimestamp, - rowType.getTypeAt(i), - groupType.getType(fieldIndex), - descriptors, - pages, - depth + 1)); + // Check for nested row in array with atomic field type. + + // This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields. + // In Parquet, an array of rows is stored as separate arrays for each field. + + // Limitations: It won't work for multiple nested arrays and maps. + // The main problem is that the Flink classes and interface don't follow that pattern. + if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) { + fieldReaders.add( + createColumnReader( + utcTimestamp, + new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)), + groupType.getType(fieldIndex), + descriptors, + pages, + depth + 1)); + } else { + fieldReaders.add( + createColumnReader( + utcTimestamp, + rowType.getTypeAt(i), + groupType.getType(fieldIndex), + descriptors, + pages, + depth + 1)); + } } } return new RowColumnReader(fieldReaders); @@ -501,43 +555,65 @@ private static WritableColumnVector createWritableColumnVector( case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS, - getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType)); + getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType)); return new HeapTimestampVector(batchSize); case DECIMAL: checkArgument( (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY || typeName == PrimitiveType.PrimitiveTypeName.BINARY) && primitiveType.getOriginalType() == OriginalType.DECIMAL, - getPrimitiveTypeCheckFailureMessage(typeName, fieldType)); + getPrimitiveTypeCheckFailureMessage(typeName, fieldType)); return new HeapDecimalVector(batchSize); case ARRAY: ArrayType arrayType = (ArrayType) fieldType; - return new HeapArrayVector( - batchSize, - createWritableColumnVector( - batchSize, - arrayType.getElementType(), - physicalType, - descriptors, - depth)); + if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) { + return new HeapArrayGroupColumnVector( + batchSize, + createWritableColumnVector( + batchSize, + arrayType.getElementType(), + physicalType.asGroupType().getType(0), + descriptors, + depth + 1)); + } else { + return new HeapArrayVector( + batchSize, + createWritableColumnVector( + batchSize, + arrayType.getElementType(), + physicalType, + descriptors, + depth)); + } case MAP: MapType mapType = (MapType) fieldType; GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType(); // the map column has three level paths. - return new HeapMapColumnVector( + WritableColumnVector keyColumnVector = createWritableColumnVector( batchSize, - createWritableColumnVector( - batchSize, - mapType.getKeyType(), - repeatedType.getType(0), - descriptors, - depth + 2), - createWritableColumnVector( - batchSize, - mapType.getValueType(), - repeatedType.getType(1), - descriptors, - depth + 2)); + new ArrayType(mapType.getKeyType().isNullable(), mapType.getKeyType()), + repeatedType.getType(0), + descriptors, + depth + 2); + WritableColumnVector valueColumnVector; + if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) { + valueColumnVector = new HeapArrayGroupColumnVector( + batchSize, + createWritableColumnVector( + batchSize, + mapType.getValueType(), + repeatedType.getType(1).asGroupType(), + descriptors, + depth + 2)); + } else { + valueColumnVector = createWritableColumnVector( + batchSize, + new ArrayType(mapType.getValueType().isNullable(), mapType.getValueType()), + repeatedType.getType(1), + descriptors, + depth + 2); + } + return new HeapMapColumnVector(batchSize, keyColumnVector, valueColumnVector); case ROW: RowType rowType = (RowType) fieldType; GroupType groupType = physicalType.asGroupType(); @@ -546,15 +622,44 @@ private static WritableColumnVector createWritableColumnVector( // schema evolution: read the file with a new extended field name. int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType); if (fieldIndex < 0) { - columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize); + // Check for nested row in array with atomic field type. + + // This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields. + // In Parquet, an array of rows is stored as separate arrays for each field. + + // Limitations: It won't work for multiple nested arrays and maps. + // The main problem is that the Flink classes and interface don't follow that pattern. + if (groupType.getRepetition().equals(Type.Repetition.REPEATED) && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) { + columnVectors[i] = (WritableColumnVector) createVectorFromConstant( + new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)), null, batchSize); + } else { + columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize); + } } else { - columnVectors[i] = - createWritableColumnVector( - batchSize, - rowType.getTypeAt(i), - groupType.getType(fieldIndex), - descriptors, - depth + 1); + // Check for nested row in array with atomic field type. + + // This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields. + // In Parquet, an array of rows is stored as separate arrays for each field. + + // Limitations: It won't work for multiple nested arrays and maps. + // The main problem is that the Flink classes and interface don't follow that pattern. + if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) { + columnVectors[i] = + createWritableColumnVector( + batchSize, + new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)), + groupType.getType(fieldIndex), + descriptors, + depth + 1); + } else { + columnVectors[i] = + createWritableColumnVector( + batchSize, + rowType.getTypeAt(i), + groupType.getType(fieldIndex), + descriptors, + depth + 1); + } } } return new HeapRowColumnVector(batchSize, columnVectors); @@ -575,8 +680,9 @@ private static int getFieldIndexInPhysicalType(String fieldName, GroupType group /** * Construct the error message when primitive type mismatches. + * * @param primitiveType Primitive type - * @param fieldType Logical field type + * @param fieldType Logical field type * @return The error message */ private static String getPrimitiveTypeCheckFailureMessage(PrimitiveType.PrimitiveTypeName primitiveType, LogicalType fieldType) { @@ -585,8 +691,9 @@ private static String getPrimitiveTypeCheckFailureMessage(PrimitiveType.Primitiv /** * Construct the error message when original type mismatches. + * * @param originalType Original type - * @param fieldType Logical field type + * @param fieldType Logical field type * @return The error message */ private static String getOriginalTypeCheckFailureMessage(OriginalType originalType, LogicalType fieldType) { diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java new file mode 100644 index 0000000000000..4c9275f3b0932 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format.cow.vector; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector; + +public class ColumnarGroupArrayData implements ArrayData { + + WritableColumnVector vector; + int rowId; + + public ColumnarGroupArrayData(WritableColumnVector vector, int rowId) { + this.vector = vector; + this.rowId = rowId; + } + + @Override + public int size() { + if (vector == null) { + return 0; + } + + if (vector instanceof HeapRowColumnVector) { + // assume all fields have the same size + if (((HeapRowColumnVector) vector).vectors == null || ((HeapRowColumnVector) vector).vectors.length == 0) { + return 0; + } + return ((HeapArrayVector) ((HeapRowColumnVector) vector).vectors[0]).getArray(rowId).size(); + } + throw new UnsupportedOperationException(vector.getClass().getName() + " is not supported. Supported vector types: HeapRowColumnVector"); + } + + @Override + public boolean isNullAt(int index) { + if (vector == null) { + return true; + } + + if (vector instanceof HeapRowColumnVector) { + return ((HeapRowColumnVector) vector).vectors == null; + } + + throw new UnsupportedOperationException(vector.getClass().getName() + " is not supported. Supported vector types: HeapRowColumnVector"); + } + + @Override + public boolean getBoolean(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public byte getByte(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public short getShort(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public int getInt(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public long getLong(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public float getFloat(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public double getDouble(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public StringData getString(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public DecimalData getDecimal(int index, int precision, int scale) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public TimestampData getTimestamp(int index, int precision) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public RawValueData getRawValue(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public byte[] getBinary(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public ArrayData getArray(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public MapData getMap(int index) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public RowData getRow(int index, int numFields) { + return new ColumnarGroupRowData((HeapRowColumnVector) vector, rowId, index); + } + + @Override + public boolean[] toBooleanArray() { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public byte[] toByteArray() { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public short[] toShortArray() { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public int[] toIntArray() { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public long[] toLongArray() { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public float[] toFloatArray() { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public double[] toDoubleArray() { + throw new UnsupportedOperationException("Not support the operation!"); + } + +} diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java new file mode 100644 index 0000000000000..69cb6feca13e4 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format.cow.vector; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector; + +public class ColumnarGroupMapData implements MapData { + + WritableColumnVector keyVector; + WritableColumnVector valueVector; + int rowId; + + public ColumnarGroupMapData(WritableColumnVector keyVector, WritableColumnVector valueVector, int rowId) { + this.keyVector = keyVector; + this.valueVector = valueVector; + this.rowId = rowId; + } + + @Override + public int size() { + if (keyVector == null) { + return 0; + } + + if (keyVector instanceof HeapArrayVector) { + return ((HeapArrayVector) keyVector).getArray(rowId).size(); + } + throw new UnsupportedOperationException(keyVector.getClass().getName() + " is not supported. Supported vector types: HeapArrayVector"); + } + + @Override + public ArrayData keyArray() { + return ((HeapArrayVector) keyVector).getArray(rowId); + } + + @Override + public ArrayData valueArray() { + if (valueVector instanceof HeapArrayVector) { + return ((HeapArrayVector) valueVector).getArray(rowId); + } else if (valueVector instanceof HeapArrayGroupColumnVector) { + return ((HeapArrayGroupColumnVector) valueVector).getArray(rowId); + } + throw new UnsupportedOperationException(valueVector.getClass().getName() + " is not supported. Supported vector types: HeapArrayVector, HeapArrayGroupColumnVector"); + } +} diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java new file mode 100644 index 0000000000000..439c1880823f1 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format.cow.vector; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; + +public class ColumnarGroupRowData implements RowData { + + HeapRowColumnVector vector; + int rowId; + int index; + + public ColumnarGroupRowData(HeapRowColumnVector vector, int rowId, int index) { + this.vector = vector; + this.rowId = rowId; + this.index = index; + } + + @Override + public int getArity() { + return vector.vectors.length; + } + + @Override + public RowKind getRowKind() { + return RowKind.INSERT; + } + + @Override + public void setRowKind(RowKind rowKind) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public boolean isNullAt(int pos) { + return + vector.vectors[pos].isNullAt(rowId) + || ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).isNullAt(index); + } + + @Override + public boolean getBoolean(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getBoolean(index); + } + + @Override + public byte getByte(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getByte(index); + } + + @Override + public short getShort(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getShort(index); + } + + @Override + public int getInt(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getInt(index); + } + + @Override + public long getLong(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getLong(index); + } + + @Override + public float getFloat(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getFloat(index); + } + + @Override + public double getDouble(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getDouble(index); + } + + @Override + public StringData getString(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getString(index); + } + + @Override + public DecimalData getDecimal(int pos, int i1, int i2) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getDecimal(index, i1, i2); + } + + @Override + public TimestampData getTimestamp(int pos, int i1) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getTimestamp(index, i1); + } + + @Override + public RawValueData getRawValue(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getRawValue(index); + } + + @Override + public byte[] getBinary(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getBinary(index); + } + + @Override + public ArrayData getArray(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getArray(index); + } + + @Override + public MapData getMap(int pos) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getMap(index); + } + + @Override + public RowData getRow(int pos, int numFields) { + return ((HeapArrayVector) (vector.vectors[pos])).getArray(rowId).getRow(index, numFields); + } +} diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java new file mode 100644 index 0000000000000..3d7d8b1f0de0f --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format.cow.vector; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.columnar.vector.ArrayColumnVector; +import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector; +import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector; + +/** + * This class represents a nullable heap row column vector. + */ +public class HeapArrayGroupColumnVector extends AbstractHeapVector + implements WritableColumnVector, ArrayColumnVector { + + public WritableColumnVector vector; + + public HeapArrayGroupColumnVector(int len) { + super(len); + } + + public HeapArrayGroupColumnVector(int len, WritableColumnVector vector) { + super(len); + this.vector = vector; + } + + @Override + public ArrayData getArray(int rowId) { + return new ColumnarGroupArrayData(vector, rowId); + } + + @Override + public void reset() { + super.reset(); + vector.reset(); + } +} diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java index a379737169502..95d8fd720d300 100644 --- a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java @@ -19,8 +19,6 @@ package org.apache.hudi.table.format.cow.vector; import org.apache.flink.table.data.MapData; -import org.apache.flink.table.data.columnar.ColumnarMapData; -import org.apache.flink.table.data.columnar.vector.ColumnVector; import org.apache.flink.table.data.columnar.vector.MapColumnVector; import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector; import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector; @@ -31,49 +29,25 @@ public class HeapMapColumnVector extends AbstractHeapVector implements WritableColumnVector, MapColumnVector { - private long[] offsets; - private long[] lengths; - private int size; - private ColumnVector keys; - private ColumnVector values; + private WritableColumnVector keys; + private WritableColumnVector values; - public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) { + public HeapMapColumnVector(int len, WritableColumnVector keys, WritableColumnVector values) { super(len); - size = 0; - offsets = new long[len]; - lengths = new long[len]; this.keys = keys; this.values = values; } - public void setOffsets(long[] offsets) { - this.offsets = offsets; + public WritableColumnVector getKeys() { + return keys; } - public void setLengths(long[] lengths) { - this.lengths = lengths; - } - - public void setKeys(ColumnVector keys) { - this.keys = keys; - } - - public void setValues(ColumnVector values) { - this.values = values; - } - - public int getSize() { - return size; - } - - public void setSize(int size) { - this.size = size; + public WritableColumnVector getValues() { + return values; } @Override - public MapData getMap(int i) { - long offset = offsets[i]; - long length = lengths[i]; - return new ColumnarMapData(keys, values, (int) offset, (int) length); + public MapData getMap(int rowId) { + return new ColumnarGroupMapData(keys, values, rowId); } } diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java new file mode 100644 index 0000000000000..437c186a93661 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format.cow.vector.reader; + +import org.apache.flink.formats.parquet.vector.reader.ColumnReader; +import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector; +import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector; + +import java.io.IOException; + +/** + * Array of a Group type (Array, Map, Row, etc.) {@link ColumnReader}. + */ +public class ArrayGroupReader implements ColumnReader { + + private final ColumnReader fieldReader; + + public ArrayGroupReader(ColumnReader fieldReader) { + this.fieldReader = fieldReader; + } + + @Override + public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { + HeapArrayGroupColumnVector rowColumnVector = (HeapArrayGroupColumnVector) vector; + + fieldReader.readToVector(readNumber, rowColumnVector.vector); + } +} diff --git a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java index a6762d2e175c1..ee65dd22c4369 100644 --- a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java +++ b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java @@ -18,14 +18,11 @@ package org.apache.hudi.table.format.cow.vector.reader; -import org.apache.hudi.table.format.cow.vector.HeapArrayVector; import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector; import org.apache.flink.formats.parquet.vector.reader.ColumnReader; -import org.apache.flink.table.data.columnar.vector.ColumnVector; +import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector; import org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.MapType; import java.io.IOException; @@ -34,43 +31,26 @@ */ public class MapColumnReader implements ColumnReader { - private final LogicalType logicalType; private final ArrayColumnReader keyReader; - private final ArrayColumnReader valueReader; + private final ColumnReader valueReader; public MapColumnReader( - ArrayColumnReader keyReader, ArrayColumnReader valueReader, LogicalType logicalType) { + ArrayColumnReader keyReader, ColumnReader valueReader) { this.keyReader = keyReader; this.valueReader = valueReader; - this.logicalType = logicalType; } - public void readBatch(int total, ColumnVector column) throws IOException { - HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) column; - MapType mapType = (MapType) logicalType; - // initialize 2 ListColumnVector for keys and values - HeapArrayVector keyArrayColumnVector = new HeapArrayVector(total); - HeapArrayVector valueArrayColumnVector = new HeapArrayVector(total); - // read the keys and values - keyReader.readToVector(total, keyArrayColumnVector); - valueReader.readToVector(total, valueArrayColumnVector); - - // set the related attributes according to the keys and values - mapColumnVector.setKeys(keyArrayColumnVector.child); - mapColumnVector.setValues(valueArrayColumnVector.child); - mapColumnVector.setOffsets(keyArrayColumnVector.offsets); - mapColumnVector.setLengths(keyArrayColumnVector.lengths); - mapColumnVector.setSize(keyArrayColumnVector.getSize()); + @Override + public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { + HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) vector; + AbstractHeapVector keyArrayColumnVector = (AbstractHeapVector) (mapColumnVector.getKeys()); + keyReader.readToVector(readNumber, mapColumnVector.getKeys()); + valueReader.readToVector(readNumber, mapColumnVector.getValues()); for (int i = 0; i < keyArrayColumnVector.getLen(); i++) { if (keyArrayColumnVector.isNullAt(i)) { mapColumnVector.setNullAt(i); } } } - - @Override - public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { - readBatch(readNumber, vector); - } }