Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -506,18 +506,27 @@ public void write(ArrayData array, int ordinal) {
private void doWrite(ArrayData arrayData) {
recordConsumer.startGroup();
if (arrayData.size() > 0) {
final String repeatedGroup = "list";
final String elementField = "element";
final String repeatedGroup = "array";
recordConsumer.startField(repeatedGroup, 0);
for (int i = 0; i < arrayData.size(); i++) {
recordConsumer.startGroup();
if (!arrayData.isNullAt(i)) {
// Only creates the element field if the current array element is not null.
recordConsumer.startField(elementField, 0);
elementWriter.write(arrayData, i);
recordConsumer.endField(elementField, 0);
if (elementWriter instanceof RowWriter) {
for (int i = 0; i < arrayData.size(); i++) {
if (!arrayData.isNullAt(i)) {
// Only creates the element field if the current array element is not null.
elementWriter.write(arrayData, i);
}
}
} else {
final String elementField = "element";
for (int i = 0; i < arrayData.size(); i++) {
recordConsumer.startGroup();
if (!arrayData.isNullAt(i)) {
// Only creates the element field if the current array element is not null.
recordConsumer.startField(elementField, 0);
elementWriter.write(arrayData, i);
recordConsumer.endField(elementField, 0);
}
recordConsumer.endGroup();
}
recordConsumer.endGroup();
}
recordConsumer.endField(repeatedGroup, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
Expand Down Expand Up @@ -622,18 +623,25 @@ private static Type convertToParquetType(
}
case ARRAY:
// <list-repetition> group <name> (LIST) {
// repeated group list {
// repeated group array {
// <element-repetition> <element-type> element;
// }
// }
ArrayType arrayType = (ArrayType) type;
LogicalType elementType = arrayType.getElementType();

Types.GroupBuilder<GroupType> arrayGroupBuilder = Types.repeatedGroup();
if (elementType.getTypeRoot() == LogicalTypeRoot.ROW) {
RowType rowType = (RowType) elementType;
rowType.getFields().forEach(field ->
arrayGroupBuilder.addField(convertToParquetType(field.getName(), field.getType(), repetition)));
} else {
arrayGroupBuilder.addField(convertToParquetType("element", elementType, repetition));
}

return Types
.buildGroup(repetition).as(OriginalType.LIST)
.addField(
Types.repeatedGroup()
.addField(convertToParquetType("element", elementType, repetition))
.named("list"))
.addField(arrayGroupBuilder.named("array"))
.named(name);
case MAP:
// <map-repetition> group <name> (MAP) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void testConvertComplexTypes() {
assertThat(messageType.getColumns().size(), is(7));
final String expected = "message converted {\n"
+ " optional group f_array (LIST) {\n"
+ " repeated group list {\n"
+ " repeated group array {\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we supplement a test case for nested row in array type.

+ " optional binary element (STRING);\n"
+ " }\n"
+ " }\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,9 @@ void testAppendWriteReadSkippingClustering() throws Exception {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "insert")
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
.option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
.option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
.option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1)
.option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1)
.option(FlinkOptions.CLUSTERING_TASKS, 1)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
Expand All @@ -383,7 +383,7 @@ void testAppendWriteReadSkippingClustering() throws Exception {
String instant = TestUtils.getNthCompleteInstant(new StoragePath(tempFile.toURI()), 2, HoodieTimeline.COMMIT_ACTION);

streamTableEnv.getConfig().getConfiguration()
.setBoolean("table.dynamic-table-options.enabled", true);
.setBoolean("table.dynamic-table-options.enabled", true);
final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant);
List<Row> rows = execSelectSql(streamTableEnv, query, 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
Expand All @@ -398,9 +398,9 @@ void testAppendWriteWithClusteringBatchRead() throws Exception {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "insert")
.option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
.option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
.option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
.option(FlinkOptions.CLUSTERING_DELTA_COMMITS,2)
.option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2)
.option(FlinkOptions.CLUSTERING_TASKS, 1)
.option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
.end();
Expand All @@ -409,9 +409,9 @@ void testAppendWriteWithClusteringBatchRead() throws Exception {
execInsertSql(streamTableEnv, insertInto);

streamTableEnv.getConfig().getConfiguration()
.setBoolean("table.dynamic-table-options.enabled", true);
.setBoolean("table.dynamic-table-options.enabled", true);
final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/",
FlinkOptions.START_COMMIT_EARLIEST);
FlinkOptions.START_COMMIT_EARLIEST);

List<Row> rows = execSelectSql(streamTableEnv, query, 10);
// batch read will not lose data when cleaned clustered files.
Expand Down Expand Up @@ -450,12 +450,12 @@ void testStreamWriteWithCleaning() {
@Test
void testBatchWriteWithCleaning() {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
.end();
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1)
.end();
batchTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
execInsertSql(batchTableEnv, insertInto);
execInsertSql(batchTableEnv, insertInto);
execInsertSql(batchTableEnv, insertInto);
Expand All @@ -466,7 +466,7 @@ void testBatchWriteWithCleaning() {
HoodieTimeline timeline = StreamerUtil.createMetaClient(conf).getActiveTimeline();
assertTrue(timeline.filterCompletedInstants()
.getInstants().stream().anyMatch(instant -> instant.getAction().equals("clean")),
"some commits should be cleaned");
"some commits should be cleaned");
}

@Test
Expand Down Expand Up @@ -1621,12 +1621,44 @@ void testParquetComplexNestedRowTypes(String operation) {
List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
List<Row> expected = Arrays.asList(
row(1, array("abc1", "def1"), array(1, 1), map("abc1", 1, "def1", 3), row(array("abc1", "def1"), row(1, "abc1"))),
row(2, array("abc2", "def2"), array(2, 2), map("abc2", 1, "def2", 3), row(array("abc2", "def2"), row(2, "abc2"))),
row(3, array("abc3", "def3"), array(3, 3), map("abc3", 1, "def3", 3), row(array("abc3", "def3"), row(3, "abc3"))));
row(1, array("abc1", "def1"), array(1, 1), map("abc1", 1, "def1", 3), row(array("abc1", "def1"), row(1, "abc1"))),
row(2, array("abc2", "def2"), array(2, 2), map("abc2", 1, "def2", 3), row(array("abc2", "def2"), row(2, "abc2"))),
row(3, array("abc3", "def3"), array(3, 3), map("abc3", 1, "def3", 3), row(array("abc3", "def3"), row(3, "abc3"))));
assertRowsEqualsUnordered(result, expected);
}

@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testParquetArrayMapOfRowTypes(String operation) {
TableEnvironment tableEnv = batchTableEnv;

String hoodieTableDDL = sql("t1")
.field("f_int int")
.field("f_array array<row(f_array_row_f0 varchar(10), f_array_row_f1 int)>")
.field("f_map map<varchar(20), row(f_map_row_f0 int, f_map_row_f1 varchar(10))>")
.pkField("f_int")
.noPartition()
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, operation)
.end();
tableEnv.executeSql(hoodieTableDDL);

execInsertSql(tableEnv, TestSQL.ARRAY_MAP_OF_ROW_TYPE_INSERT_T1);

tableEnv.executeSql("ALTER TABLE t1 MODIFY (\n"
+ " f_array array<row(f_array_row_f0 varchar(10), f_array_row_f1 int, f_array_row_f2 double)>,\n"
+ " f_map map<varchar(20), row(f_map_row_f0 int, f_map_row_f1 varchar(10), f_map_row_f2 double)>\n"
+ ");");

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
List<Row> expected = Arrays.asList(
row(1, array(row("abc11", 11, null), row("abc12", 12, null), row("abc13", 13, null)), map("abc11", row(11, "def11", null), "abc12", row(12, "def12", null), "abc13", row(13, "def13", null))),
row(2, array(row("abc21", 21, null), row("abc22", 22, null), row("abc23", 23, null)), map("abc21", row(21, "def21", null), "abc22", row(22, "def22", null), "abc23", row(23, "def23", null))),
row(3, array(row("abc31", 31, null), row("abc32", 32, null), row("abc33", 33, null)), map("abc31", row(31, "def31", null), "abc32", row(32, "def32", null), "abc33", row(33, "def33", null))));
assertRowsEqualsUnordered(expected, result);
}

@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testParquetNullChildColumnsRowTypes(String operation) {
Expand Down Expand Up @@ -2026,18 +2058,18 @@ void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType, boolean read
void testReadMetaFields(HoodieTableType tableType, String queryType, int numInsertBatches, int compactionDeltaCommits) throws Exception {
String path = tempFile.getAbsolutePath();
String hoodieTableDDL = sql("t1")
.field("id int")
.field("name varchar(10)")
.field("ts timestamp(6)")
.field("`partition` varchar(10)")
.pkField("id")
.partitionField("partition")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.QUERY_TYPE, queryType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
.end();
.field("id int")
.field("name varchar(10)")
.field("ts timestamp(6)")
.field("`partition` varchar(10)")
.pkField("id")
.partitionField("partition")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.QUERY_TYPE, queryType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
.end();
batchTableEnv.executeSql(hoodieTableDDL);

final String[] insertInto = new String[] {
Expand Down Expand Up @@ -2076,7 +2108,7 @@ void testReadMetaFields(HoodieTableType tableType, String queryType, int numInse
for (int i = 0; i < numInsertBatches; i++) {
execInsertSql(batchTableEnv, insertInto[i]);
String commitTime = tableType.equals(HoodieTableType.MERGE_ON_READ)
? TestUtils.getLastDeltaCompleteInstant(path) : TestUtils.getLastCompleteInstant(path);
? TestUtils.getLastDeltaCompleteInstant(path) : TestUtils.getLastCompleteInstant(path);
expected1.append(template1[i]);
expected2.append(String.format(template2[i], commitTime));
expected3.append(String.format(template3[i], commitTime));
Expand All @@ -2087,62 +2119,62 @@ void testReadMetaFields(HoodieTableType tableType, String queryType, int numInse
String readHoodieTableDDL;
batchTableEnv.executeSql("drop table t1");
readHoodieTableDDL = sql("t1")
.field("id int")
.field("name varchar(10)")
.field("ts timestamp(6)")
.field("`partition` varchar(10)")
.pkField("id")
.partitionField("partition")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.QUERY_TYPE, queryType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
.end();
.field("id int")
.field("name varchar(10)")
.field("ts timestamp(6)")
.field("`partition` varchar(10)")
.pkField("id")
.partitionField("partition")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.QUERY_TYPE, queryType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
.end();
batchTableEnv.executeSql(readHoodieTableDDL);

List<Row> result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
assertRowsEquals(result, expected1.toString());

batchTableEnv.executeSql("drop table t1");
readHoodieTableDDL = sql("t1")
.field("_hoodie_commit_time string")
.field("_hoodie_record_key string")
.field("_hoodie_partition_path string")
.field("id int")
.field("name varchar(10)")
.field("ts timestamp(6)")
.field("`partition` varchar(10)")
.pkField("id")
.partitionField("partition")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.QUERY_TYPE, queryType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
.end();
.field("_hoodie_commit_time string")
.field("_hoodie_record_key string")
.field("_hoodie_partition_path string")
.field("id int")
.field("name varchar(10)")
.field("ts timestamp(6)")
.field("`partition` varchar(10)")
.pkField("id")
.partitionField("partition")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.QUERY_TYPE, queryType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
.end();
batchTableEnv.executeSql(readHoodieTableDDL);

result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
assertRowsEquals(result, expected2.toString());

batchTableEnv.executeSql("drop table t1");
readHoodieTableDDL = sql("t1")
.field("id int")
.field("_hoodie_commit_time string")
.field("name varchar(10)")
.field("_hoodie_record_key string")
.field("ts timestamp(6)")
.field("_hoodie_partition_path string")
.field("`partition` varchar(10)")
.pkField("id")
.partitionField("partition")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.QUERY_TYPE, queryType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
.end();
.field("id int")
.field("_hoodie_commit_time string")
.field("name varchar(10)")
.field("_hoodie_record_key string")
.field("ts timestamp(6)")
.field("_hoodie_partition_path string")
.field("`partition` varchar(10)")
.pkField("id")
.partitionField("partition")
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(FlinkOptions.QUERY_TYPE, queryType)
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, true)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits)
.option(FlinkOptions.PATH, path)
.end();
batchTableEnv.executeSql(readHoodieTableDDL);

result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
Expand Down Expand Up @@ -2300,11 +2332,11 @@ private static Stream<Arguments> tableTypeAndBooleanTrueFalseParams() {
*/
private static Stream<Arguments> tableTypeQueryTypeNumInsertAndCompactionDeltaCommitsParams() {
return Arrays.stream(new Object[][] {
{HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_INCREMENTAL, 1, 1},
{HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1},
{HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 1},
{HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 3},
{HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 3, 2}
{HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_INCREMENTAL, 1, 1},
{HoodieTableType.COPY_ON_WRITE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, 1, 1},
{HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 1},
{HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 1, 3},
{HoodieTableType.MERGE_ON_READ, FlinkOptions.QUERY_TYPE_SNAPSHOT, 3, 2}
}).map(Arguments::of);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ private TestSQL() {
+ "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
+ "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))";

public static final String ARRAY_MAP_OF_ROW_TYPE_INSERT_T1 = "insert into t1 values\n"
+ "(1, array[row('abc11', 11), row('abc12', 12), row('abc13', 13)], map['abc11', row(11, 'def11'), 'abc12', row(12, 'def12'), 'abc13', row(13, 'def13')]),\n"
+ "(2, array[row('abc21', 21), row('abc22', 22), row('abc23', 23)], map['abc21', row(21, 'def21'), 'abc22', row(22, 'def22'), 'abc23', row(23, 'def23')]),\n"
+ "(3, array[row('abc31', 31), row('abc32', 32), row('abc33', 33)], map['abc31', row(31, 'def31'), 'abc32', row(32, 'def32'), 'abc33', row(33, 'def33')])";

public static final String NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1 = "insert into t1 values\n"
+ "(1, row(cast(null as int), 'abc1')),\n"
+ "(2, row(2, cast(null as varchar))),\n"
Expand Down
Loading