Skip to content

Conversation

@David-N-Perkins
Copy link

@David-N-Perkins David-N-Perkins commented Aug 5, 2024

Change Logs

Added support for Array of Rows and Maps with Row values in Flink. Only supports nesting 1 level deep. The Row cannot contain another Array or Map.

I've only applied the changes to Flink 1.18.x, but will copy to other versions if no changes are requested.

Impact

No change to the public API. As far as I could tell, the limitation of only allowing non container types was not in the documentation.

Risk level (write none, low medium or high below)

Low if there are adequate unit tests. I added unit tests for my two use cases, but I did not see a lot of unit tests. I'm not sure if there are unit tests that cover the Arrays and Maps of the basic types, which my changes could affect.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

As far as I could tell, this was not in the documentation. But it would be nice to note somewhere that as of this change, Arrays and Map values support basic types and Rows, but does not support additional Arrays or Maps within the Row.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Aug 5, 2024
@danny0405 danny0405 changed the title HUDI-7930 Flink Support for Array of Row and Map of Row value [HUDI-7930] Flink Support for Array of Row and Map of Row value Aug 6, 2024
@yihua
Copy link
Contributor

yihua commented Sep 11, 2024

@David-N-Perkins any updates on addressing the comments on the PR?

@David-N-Perkins
Copy link
Author

I added unit tests to ITTestHoodieDataSource and uncovered an issue with inconsistent Parquet schemas when performing an insert vs upsert. I spoke with @danny0405 about this and they need to be fixed so they are both the same. I've investigated the issue but haven't fully implemented a fix. I've been pretty busy with work lately, and haven't had much time.

@danny0405
Copy link
Contributor

Hmm, there is a test failure:

TestParquetSchemaConverter.testConvertComplexTypes:72 
Expected: is "message converted {\n  optional group f_array (LIST) {\n    repeated group list {\n      optional binary element (STRING);\n    }\n  }\n  optional group f_map (MAP) {\n    repeated group key_value {\n      required int32 key;\n      optional binary value (STRING);\n    }\n  }\n  optional group f_row {\n    optional int32 f_row_f0;\n    optional binary f_row_f1 (STRING);\n    optional group f_row_f2 {\n      optional int32 f_row_f2_f0;\n      optional binary f_row_f2_f1 (STRING);\n    }\n  }\n}\n"
     but: was "message converted {\n  optional group f_array (LIST) {\n    repeated group array {\n 

@danny0405 danny0405 self-assigned this Sep 22, 2024
int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
if (fieldIndex < 0) {
columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
if (groupType.getRepetition().equals(Type.Repetition.REPEATED) && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will work in all cases. Since there isn't a descriptor, that can't be used to check the repetition level. I did add a unit test for adding a new field to an Array of rows and Map of rows.

@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:XL PR with lines of changes > 1000 labels Sep 22, 2024
@David-N-Perkins
Copy link
Author

Do I need to copy these changes to the other Flink versions?

@danny0405
Copy link
Contributor

Do I need to copy these changes to the other Flink versions?

yeah, we should but in separate PR, let's make this one solid first.

@danny0405
Copy link
Contributor

@David-N-Perkins I have no access to your focked repo, here is a patch to fix the checkstyle error:
Fix_checkstyle_errors.patch.zip

@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Sep 23, 2024
@danny0405
Copy link
Contributor

Still there is compile error:

Error:  src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java:[636,38] (whitespace) ParenPad: '(' is followed by whitespace.

final String expected = "message converted {\n"
+ " optional group f_array (LIST) {\n"
+ " repeated group list {\n"
+ " repeated group array {\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we supplement a test case for nested row in array type.

columnVectors[i] =
createWritableColumnVector(
batchSize,
new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we use array logical type here if the row field type is not array?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done here, at line 470, and in some other files in order to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields. In Parquet, an array of rows is stored as separate arrays for each field.
This approach does have some limitations. It won't work for multiple nested arrays and maps. The main problem is that the Flink classes and interface don't follow that pattern.

@hudi-bot
Copy link
Collaborator

hudi-bot commented Oct 5, 2024

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

@David-N-Perkins Can you also cherry-pick the changes to Flink 1.19.x, I think we can just drop this support for releases before Flink 1.18.

@danny0405 danny0405 merged commit 78dcff7 into apache:master Oct 7, 2024
@David-N-Perkins
Copy link
Author

Sure, I can do that.

I was also chasing down a potential issue with an instance of a null array of rows that was being read as an array with a single row full of nulls.

@danny0405
Copy link
Contributor

yeah, be careful with the null values handling.

@empcl
Copy link
Contributor

empcl commented Mar 18, 2025

@David-N-Perkins Hello, why do we need to change the written structure? Why do we change the list and element into array structures? Thank you for your answer.
image

@danny0405
Copy link
Contributor

@David-N-Perkins
Copy link
Author

@empcl If I remember correctly, it was needed to get consistent names and structure in the Parquet files. I was seeing differences depending on whether the operation was "insert", "upsert", or "bulk_insert".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

5 participants