Skip to content

Conversation

@stayrascal
Copy link
Contributor

@stayrascal stayrascal commented Feb 17, 2022

Tips

What is the purpose of the pull request

  1. Fix the class cast exception of ColumnarArrayData, ColumnarMapData, ColumnarRowData, MapColumnVector, RowColumnVector, VectorizedColumnBatch after upgrading flink version to 1.14 from 1.13. Because Flink 1.14 supports these class natively.

  2. Running below test or running Hudi HoodieDataSourceITCase.testParquetComplexNestedRowTypes() integration test can reproduce this issue:

  • Testing by bellow example and meet ClassCastException issue during search COW table which contains array filed.
CREATE TABLE cow_example_new_demo (
    id            BIGINT,
    name          STRING,
    `date`        DATE,
    int_arr       ARRAY<INT>,
    `timestamp`   TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
 ) PARTITIONED BY (name) WITH (
    'connector' = 'hudi',
    'path' = '/xxxxx/cow_example_new_demo/',
    'table.type' = 'COPY_ON_WRITE',
    'write.precombine.field' = 'timestamp',
    'write.precombine' = 'true',
    'index.bootstrap.enabled' = 'true'
);

insert into cow_example_new_demo 
select 
    111                                 as id,
    'name'                              as name,
    TO_DATE('2022-02-02')               as `date`,
    Array[1,2]                          as int_arr,
    TIMESTAMP '2022-02-16 13:43:32.593' as `timestamp`;

select * from cow_example_new_demo;

Exception trace as follow:

java.lang.ClassCastException: class org.apache.hudi.table.format.cow.data.ColumnarArrayData cannot be cast to class org.apache.flink.table.data.ColumnarArrayData (org.apache.hudi.table.format.cow.data.ColumnarArrayData is in unnamed module of loader org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader @4b489672; org.apache.flink.table.data.ColumnarArrayData is in unnamed module of loader 'app')
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

Brief change log

  • Delete these classes
    • ColumnarArrayData.java
    • ColumnarMapData.java
    • ColumnarRowData.java
    • MapColumnVector.java
    • RowColumnVector.java
  • Re-import the deleted classes from flink-table-runtime
    • replace the class import definition of ColumnarArrayData in HeapArrayVector.java
    • replace the class import definition of ColumnarMapData and import MapColumnVector from flink instead of using local in HeapMapColumnVector.java
    • replace the class import definition of ColumnarMapData and import RowColumnVector & VectorizedColumnBatch from flink instead of using local in HeapRowColumnVector.java
    • replace the class import definition of ColumnarMapData and import VectorizedColumnBatch from flink instead of using local in ParquetColumnarRowSplitReader.java
    • replace the class import definition of VectorizedColumnBatch in ParquetSplitReaderUtil.java
  • enhance the test case
    • add array<int> case in the testParquetComplexNestedRowTypes in HoodieDataSourceITCase.java
    • update the sql str COMPLEX_NESTED_ROW_TYPE_INSERT_T1 by add value of array<int> in TestSQL.java

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

  • integration test case verified.
  • Manually verified the change by running a job locally.

image

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@danny0405
Copy link
Contributor

Thanks for the contribution @stayrascal , the PR overall looks good, we need to fix the azure CI tests :)

@stayrascal
Copy link
Contributor Author

Thanks @danny0405 review this, it's seems the build issue, will try to rerun the CI.
[ERROR] Failed to execute goal com.dkanejs.maven.plugins:docker-compose-maven-plugin:2.0.1:up (up) on project hudi-integ-test: Execution up of goal com.dkanejs.maven.plugins:docker-compose-maven-plugin:2.0.1:up failed:

@stayrascal
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, thanks for the fix @stayrascal ~

@danny0405 danny0405 merged commit f15125c into apache:master Feb 19, 2022
vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
* [HUDI-3389] fix ColumnarArrayData ClassCastException issue

* [HUDI-3389] remove MapColumnVector.java, RowColumnVector.java, and add test case for array<int> field
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants