diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 723bd33b8c401..270027df18053 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1551,7 +1551,7 @@ private void setWriteTimer(HoodieTable table) { } } - private void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { + protected void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index ce75452d27ff4..2d23c3afb7f14 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -407,14 +407,20 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime) { + // do nothing. + // flink executes the upgrade/downgrade once when initializing the first instant on start up, + // no need to execute the upgrade/downgrade on each write in streaming. + } + /** * Upgrade downgrade the Hoodie table. * *

This action should only be executed once for each commit. * The modification of the table properties is not thread safe. */ - public void upgradeDowngrade(String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); + public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) { new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.current(), instantTime); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 023b1e696583a..39976e5ee2dc4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -394,7 +394,7 @@ private void initInstant(String instant) { // starts a new instant startInstant(); // upgrade downgrade - this.writeClient.upgradeDowngrade(this.instant); + this.writeClient.upgradeDowngrade(this.instant, this.metaClient); }, "initialize instant %s", instant); }