Skip to content

Conversation

@cuibo01
Copy link
Contributor

@cuibo01 cuibo01 commented Jun 30, 2022

Tips

What is the purpose of the pull request

Add hooide hive catalog for flink

  1. support create hoodie table in HMS with flink hiveHoodieCatalog.
  2. support read hoodie table which created by sparksql.
  3. sparksql can read hoodie table which create by flink.

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@cuibo01 cuibo01 force-pushed the hudiCatalog branch 3 times, most recently from 802cd0d to 967d463 Compare July 1, 2022 03:56
@xiarixiaoyao xiarixiaoyao changed the title [WIP]Support hoodie catalog [WIP][HUDI-4089] Support HMS for flink HoodieCatalog Jul 1, 2022
@cuibo01 cuibo01 force-pushed the hudiCatalog branch 4 times, most recently from f69db42 to 8453cb1 Compare July 1, 2022 11:18
@danny0405 danny0405 self-assigned this Jul 2, 2022
@danny0405 danny0405 added engine:flink Flink integration metadata labels Jul 2, 2022
@xiaozhch5
Copy link
Contributor

Hello, I test the PR, but the following problem occurs.

compile command:

mvn clean install -DskipTests -Dhive.version=3.1.2 -Dflink.version=1.13.6 -Pflink1.13 -Dhadoop.version=3.2.2 -Pspark3 -Pflink-bundle-shade-hive3 -Dscala.binary.version=2.12 -Dscala.version=2.12.12

image

The full stacks are list below,

2022-07-02 19:37:04,770 INFO  org.apache.hudi.table.catalog.HoodieCatalogUtil              [] - Setting hive conf dir as /data/hive/hive/conf
2022-07-02 19:37:04,776 INFO  org.apache.hudi.table.catalog.HoodieHiveCatalog              [] - Created HiveCatalog 'hudi'
2022-07-02 19:37:04,777 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient         [] - Trying to connect to metastore with URI thrift://bigdata:9083
2022-07-02 19:37:04,778 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient         [] - Opened a connection to metastore, current connections: 1
2022-07-02 19:37:04,802 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient         [] - Connected to metastore.
2022-07-02 19:37:04,802 INFO  org.apache.hadoop.hive.metastore.RetryingMetaStoreClient     [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ugi=root (auth:SIMPLE) retries=1 delay=1 lifetime=0
2022-07-02 19:37:04,933 INFO  org.apache.hudi.table.catalog.HoodieHiveCatalog              [] - Connected to Hive metastore
2022-07-02 19:37:19,532 INFO  org.apache.flink.table.catalog.CatalogManager                [] - Set the current default catalog as [hudi] and the current default database as [default].
2022-07-02 19:37:47,587 INFO  org.apache.flink.table.catalog.CatalogManager                [] - Set the current default database as [hudidb] in the current default catalog [hudi].
2022-07-02 19:38:29,533 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute SQL statement.
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.executeOperation(CliClient.java:564) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:424) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_241]
        at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
Caused by: org.apache.flink.table.api.TableException: Could not execute CreateTable in path `hudi`.`hudidb`.`orders_hudi`
        at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:847) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:659) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:865) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        ... 11 more
Caused by: org.apache.hudi.exception.HoodieCatalogException: Failed to create table hudidb.orders_hudi
        at org.apache.hudi.table.catalog.HoodieHiveCatalog.createTable(HoodieHiveCatalog.java:446) ~[hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar:0.12.0-SNAPSHOT]
        at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$10(CatalogManager.java:661) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:841) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:659) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:865) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        ... 11 more
Caused by: org.apache.hudi.exception.HoodieCatalogException: id and uuid are the different
        at org.apache.hudi.table.catalog.HoodieHiveCatalog.instantiateHiveTable(HoodieHiveCatalog.java:579) ~[hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar:0.12.0-SNAPSHOT]
        at org.apache.hudi.table.catalog.HoodieHiveCatalog.createTable(HoodieHiveCatalog.java:435) ~[hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar:0.12.0-SNAPSHOT]
        at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$10(CatalogManager.java:661) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:841) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:659) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:865) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        ... 11 more

But it's ok if I use hudi catalog.

image

@cuibo01
Copy link
Contributor Author

cuibo01 commented Jul 2, 2022

Hello, I test the PR, but the following problem occurs.

thx for your review. If the primary key exists, it must be the same as the RECORD_KEY_FIELD.
image

@xiaozhch5
Copy link
Contributor

xiaozhch5 commented Jul 3, 2022

Hello, I test the PR, but the following problem occurs.

thx for your review. If the primary key exists, it must be the same as the RECORD_KEY_FIELD. image

It's ok when I change the primary key to uuid or define the parameter of 'hoodie.datasource.write.recordkey.field'.

But another problem arouse when I query the table in spark sql catalog.

The table type I defined is MERGE_ON_READ, but when I use the command of "desc formatted xxx", It shows the table type is COPY_ON_WRITE.

image

And when I Insert into the data to the table, the table is loaded as COPY_ON_WRITE.

1656820730549

@cuibo01
Copy link
Contributor Author

cuibo01 commented Jul 3, 2022

It's ok when I change the primary key to uuid or define the parameter of 'hoodie.datasource.write.recordkey.field'.

But another problem arouse when I query the table in spark sql catalog.

👍 yes flink config was incorrectly used as a string, now u can retry it

@xiaozhch5
Copy link
Contributor

It's ok when I change the primary key to uuid or define the parameter of 'hoodie.datasource.write.recordkey.field'.
But another problem arouse when I query the table in spark sql catalog.

👍 yes flink config was incorrectly used as a string, now u can retry it

Yeah, the problem is resolved.

@xiaozhch5
Copy link
Contributor

xiaozhch5 commented Jul 3, 2022

When I create the table in flink sql, I can query the data in spark sql directly. But when I create the table in spark sql, I can't query the data in flink sql. Does it possible for flink to query the data after spark sql create the table?

spark sql

image

image

flink sql

image

hdfs dir

image

full stacks

org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute SQL statement.
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.executeOperation(CliClient.java:564) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:424) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_241]
        at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client_2.12-1.13.6.jar:1.13.6]
Caused by: org.apache.hudi.exception.HoodieCatalogException: the hudi_mor_tbl is not hoodie table
        at org.apache.hudi.table.catalog.HoodieHiveCatalog.checkHoodieTable(HoodieHiveCatalog.java:331) ~[hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar:0.12.0-SNAPSHOT]
        at org.apache.hudi.table.catalog.HoodieHiveCatalog.getHiveTable(HoodieHiveCatalog.java:358) ~[hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar:0.12.0-SNAPSHOT]
        at org.apache.hudi.table.catalog.HoodieHiveCatalog.getTable(HoodieHiveCatalog.java:392) ~[hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar:0.12.0-SNAPSHOT]
        at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:425) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:395) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1215) ~[flink-table_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213) ~[flink-sql-client_2.12-1.13.6.jar:1.13.6]
        ... 11 more

@xiarixiaoyao xiarixiaoyao changed the title [WIP][HUDI-4089] Support HMS for flink HoodieCatalog [HUDI-4089] Support HMS for flink HoodieCatalog Jul 7, 2022
@xiarixiaoyao
Copy link
Contributor

@danny0405 could you pls help review this pr. thanks
we will solve the comments as soon as possible.

@cuibo01
Copy link
Contributor Author

cuibo01 commented Jul 7, 2022

@hudi-bot run azure

@danny0405
Copy link
Contributor

@danny0405 could you pls help review this pr. thanks we will solve the comments as soon as possible.

You should not merge the branch, the code is a mess now, please rebase the patch with latest master and force push to the branch then.

@danny0405
Copy link
Contributor

danny0405 commented Jul 8, 2022

Instead of using command merge, you should use rebase, merge would generate a new commit on the commit history.

try {
Map<String, String> parameters = hiveTable.getParameters();
parameters.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hiveTable));
String path = hiveTable.getSd().getLocation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the table is not a hudi table at all, do we still need the translation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the table is a Spark table but not a Flink table, so we need to refresh the table to the hms


try {
boolean isMorTable = table.getOptions().getOrDefault(FlinkOptions.TABLE_TYPE.key(),
FlinkOptions.TABLE_TYPE.defaultValue()).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrDefault(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue()) -> use get directly is ok if the option already has a default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but table.getOptions() does not contain TABLE_TYPE

//create hive table
client.createTable(hiveTable);
//init hoodie metaClient
initTableIfNotExists(tablePath, (CatalogTable)table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this operation optional, some flink job on k8s may have no right to write dfs. @XuQianJin-Stars

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

}
try {
boolean isMorTable = newCatalogTable.getOptions().getOrDefault(FlinkOptions.TABLE_TYPE.key(),
FlinkOptions.TABLE_TYPE.defaultValue()).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let‘s forbid follow behavior

  1. forbidden change hoodie table type . eg: mor -> cow /cow -> mor
  2. if 'index.type' = 'BUCKET' is set already set for hoodie table. forbidden change this property, since bucket index is incompatible with other hoodie index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));

Map<String, String> properties = applyOptionsHook(table.getOptions());
properties.put("EXTERNAL", "TRUE");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's better for user to customize the table type of "EXTERNAL" or "MANAGED"?

Using flink engine and hudi catalog, if we drop the table, the hdfs dir will be deleted.

Using spark engine, if we set 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog', spark create the "MANAGED" table by default in HMS. And if we drop the table, the hdfs dir will be deleted.

I think if we can customize the table type of "EXTERNAL" or "MANAGED", we can control the hdfs dir will be deleted or not.
@danny0405

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OS Flink supports only EXTERNAL, does HUDI need to support MANAGED ?

@danny0405
Copy link
Contributor

image
Can we squash the commits locally and force push the branch, there are 10 commits on the branch and it's hard to review in total for the details.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

[HUDI-4098] Support HMS for flink HoodieCatalog

[HUDI-4098] Support HMS for flink HoodieCatalog

[minor] following 4152, refactor the clazz about plan selection strategy (apache#6060)

[HUDI-4367] Support copyToTable on call (apache#6054)

[HUDI-4098] Support HMS for flink HoodieCatalog

[HUDI-4098] Support HMS for flink HoodieCatalog

[HUDI-4098] Support HMS for flink HoodieCatalog

[HUDI-4335] Bug fixes in AWSGlueCatalogSyncClient post schema evolution. (apache#5995)

* fix for updateTableParameters which is not excluding partition columns and updateTableProperties boolean check

* Fix - serde parameters getting overrided on table property update

* removing stale syncConfig

[HUDI-4276] Reconcile schema-inject null values for missing fields and add new fields (apache#6017)

* [HUDI-4276] Reconcile schema-inject null values for missing fields and add new fields.

* fix comments

Co-authored-by: public (bdcee5037027) <[email protected]>

[HUDI-3500] Add call procedure for RepairsCommand (apache#6053)

[HUDI-2150] Rename/Restructure configs for better modularity (apache#6061)

- Move clean related configuration to HoodieCleanConfig
- Move Archival related configuration to HoodieArchivalConfig
- hoodie.compaction.payload.class move this to HoodiePayloadConfig

[MINOR] Bump xalan from 2.7.1 to 2.7.2 (apache#6062)

Bumps xalan from 2.7.1 to 2.7.2.

---
updated-dependencies:
- dependency-name: xalan:xalan
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <[email protected]>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

[HUDI-4324] Remove use_jdbc config from hudi sync (apache#6072)

* [HUDI-4324] Remove use_jdbc config from hudi sync
* Users should use HIVE_SYNC_MODE instead

[HUDI-3730][RFC-55] Improve hudi-sync classes design and simplify configs (apache#5695)

* [HUDI-4146] RFC for Improve Hive/Meta sync class design and hierarchies

Co-authored-by: jian.feng <[email protected]>
Co-authored-by: Raymond Xu <[email protected]>

[HUDI-4323] Make database table names optional in sync tool (apache#6073)

* [HUDI-4323] Make database table names optional in sync tool
* Infer from these properties from the table config

[MINOR] Update RFCs status (apache#6078)
@cuibo01
Copy link
Contributor Author

cuibo01 commented Jul 12, 2022

#6082

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:flink Flink integration

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants