diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index b4958f5692db4..258894fd79afe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -317,6 +317,8 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf); + protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient); + void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) { if (writeTimer != null) { long durationInMs = metrics.getDurationInMs(writeTimer.stop()); @@ -1425,17 +1427,38 @@ public HoodieMetrics getMetrics() { } /** - * Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary - * bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped) + * Performs necessary bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped). * - * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS - * NOT REQUIRING EXTERNAL SYNCHRONIZATION + *

NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS + * NOT REQUIRING EXTERNAL SYNCHRONIZATION * * @param metaClient instance of {@link HoodieTableMetaClient} * @param instantTime current inflight instant time - * @return instantiated {@link HoodieTable} */ - protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary); + protected void doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { + Option ownerInstant = Option.empty(); + if (instantTime.isPresent()) { + ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get())); + } + this.txnManager.beginTransaction(ownerInstant, Option.empty()); + try { + tryUpgrade(metaClient, instantTime); + if (initialMetadataTableIfNecessary) { + initMetadataTable(instantTime); + } + } finally { + this.txnManager.endTransaction(ownerInstant); + } + } + + /** + * Bootstrap the metadata table. + * + * @param instantTime current inflight instant time + */ + protected void initMetadataTable(Option instantTime) { + // by default do nothing. + } /** * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping @@ -1457,18 +1480,8 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option ownerInstant = Option.empty(); - if (instantTime.isPresent()) { - ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get())); - } - this.txnManager.beginTransaction(ownerInstant, Option.empty()); - try { - tryUpgrade(metaClient, instantTime); - table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary); - } finally { - this.txnManager.endTransaction(ownerInstant); - } + doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary); + HoodieTable table = createTable(config, hadoopConf, metaClient); // Validate table properties metaClient.validateTableProperties(config.getProps()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 551b412ccbc6c..3fbc3abbb86f2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -126,6 +126,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } + @Override + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { + return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient); + } + @Override public List> filterExists(List> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible @@ -291,10 +296,14 @@ public void initMetadataTable() { HoodieFlinkTable table = getHoodieTable(); if (config.isMetadataTableEnabled()) { // initialize the metadata table path - try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) { - // do nothing + // guard the metadata writer with concurrent lock + try { + this.txnManager.getLockManager().lock(); + initMetadataWriter().close(); } catch (Exception e) { throw new HoodieException("Failed to initialize metadata table", e); + } finally { + this.txnManager.getLockManager().unlock(); } // clean the obsolete index stats table.deleteMetadataIndexIfNecessary(); @@ -478,16 +487,13 @@ private void completeClustering( } @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { - // Create a Hoodie table which encapsulated the commits and files visible - return getHoodieTable(); - } - - @Override - protected void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { + protected void doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { // do nothing. + // flink executes the upgrade/downgrade once when initializing the first instant on start up, // no need to execute the upgrade/downgrade on each write in streaming. + + // flink performs metadata table bootstrap on the coordinator when it starts up. } public void completeTableService( diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index b6951bc6b7874..4bb631e643f5e 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -92,6 +92,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop return HoodieJavaTable.create(config, context); } + @Override + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { + return HoodieJavaTable.create(config, context, metaClient); + } + @Override public List upsert(List> records, String instantTime) { @@ -228,13 +233,4 @@ protected HoodieWriteMetadata> compact(String compactionInstan public HoodieWriteMetadata> cluster(final String clusteringInstant, final boolean shouldComplete) { throw new HoodieNotSupportedException("Cluster is not supported in HoodieJavaClient"); } - - @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { - // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); - - // Create a Hoodie table which encapsulated the commits and files visible - return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); - } - } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index 3c878cbc14cf8..c33bf88e7ee29 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -51,7 +51,7 @@ public static HoodieJavaTable create(HoodieWr } public static HoodieJavaTable create(HoodieWriteConfig config, - HoodieJavaEngineContext context, + HoodieEngineContext context, HoodieTableMetaClient metaClient) { switch (metaClient.getTableType()) { case COPY_ON_WRITE: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 1f9fcf3ef9c29..9d70d43b6222f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -131,6 +131,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop return HoodieSparkTable.create(config, context); } + @Override + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { + return HoodieSparkTable.create(config, context, metaClient); + } + @Override public JavaRDD> filterExists(JavaRDD> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible @@ -434,16 +439,11 @@ private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitM } @Override - protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) { - if (initialMetadataTableIfNecessary) { - // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, - // if it didn't exist before - // See https://issues.apache.org/jira/browse/HUDI-3343 for more details - initializeMetadataTable(instantTime); - } - - // Create a Hoodie table which encapsulated the commits and files visible - return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); + protected void initMetadataTable(Option instantTime) { + // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, + // if it didn't exist before + // See https://issues.apache.org/jira/browse/HUDI-3343 for more details + initializeMetadataTable(instantTime); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 66d51c91283f3..3719ed742bcb3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -19,7 +19,6 @@ package org.apache.hudi.table; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; @@ -60,11 +59,11 @@ public static HoodieSparkTable create(HoodieW .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) .setFileSystemRetryConfig(config.getFileSystemRetryConfig()) .setProperties(config.getProps()).build(); - return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); + return HoodieSparkTable.create(config, context, metaClient); } public static HoodieSparkTable create(HoodieWriteConfig config, - HoodieSparkEngineContext context, + HoodieEngineContext context, HoodieTableMetaClient metaClient) { HoodieSparkTable hoodieSparkTable; switch (metaClient.getTableType()) {