From 2daab0cfe1bda05fe3ab5854084145c68651c26d Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Wed, 27 Jul 2022 11:02:41 +0800 Subject: [PATCH] [HUDI-4484] Add default lock config options for flink metadata table --- .../transaction/TransactionManager.java | 4 ++++ .../apache/hudi/config/HoodieLockConfig.java | 10 ++++++++ .../hudi/client/HoodieFlinkWriteClient.java | 23 ++++++++++++------- .../sink/StreamWriteOperatorCoordinator.java | 6 ----- .../org/apache/hudi/util/StreamerUtil.java | 19 ++++++++++++++- .../hudi/sink/ITTestDataStreamWrite.java | 6 +---- .../compact/ITTestHoodieFlinkCompactor.java | 17 ++++++-------- .../hudi/table/ITTestHoodieDataSource.java | 7 +++++- 8 files changed, 61 insertions(+), 31 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index aef1fee5e0794..e9329fece69f3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -85,6 +85,10 @@ public void close() { } } + public LockManager getLockManager() { + return lockManager; + } + public Option getLastCompletedTransactionOwner() { return lastCompletedTxnOwnerInstant; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index 7fcc96810be2c..3623a04232be2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -311,6 +311,16 @@ public HoodieLockConfig.Builder withConflictResolutionStrategy(ConflictResolutio return this; } + public HoodieLockConfig.Builder withFileSystemLockPath(String path) { + lockConfig.setValue(FILESYSTEM_LOCK_PATH, path); + return this; + } + + public HoodieLockConfig.Builder withFileSystemLockExpire(Integer expireTime) { + lockConfig.setValue(FILESYSTEM_LOCK_EXPIRE, String.valueOf(expireTime)); + return this; + } + public HoodieLockConfig build() { lockConfig.setDefaults(HoodieLockConfig.class.getName()); return lockConfig; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 0276b389e9152..0cbb192ab9072 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -267,14 +267,21 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String if (this.metadataWriter == null) { initMetadataWriter(); } - // refresh the timeline - - // Note: the data meta client is not refreshed currently, some code path - // relies on the meta client for resolving the latest data schema, - // the schema expects to be immutable for SQL jobs but may be not for non-SQL - // jobs. - this.metadataWriter.initTableMetadata(); - this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); + try { + // guard the metadata writer with concurrent lock + this.txnManager.getLockManager().lock(); + + // refresh the timeline + + // Note: the data meta client is not refreshed currently, some code path + // relies on the meta client for resolving the latest data schema, + // the schema expects to be immutable for SQL jobs but may be not for non-SQL + // jobs. + this.metadataWriter.initTableMetadata(); + this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); + } finally { + this.txnManager.getLockManager().unlock(); + } } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index d636bcde3cf8f..7d20789bff981 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -152,11 +152,6 @@ public class StreamWriteOperatorCoordinator */ private CkpMetadata ckpMetadata; - /** - * Current checkpoint. - */ - private long checkpointId = -1; - /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -219,7 +214,6 @@ public void close() throws Exception { @Override public void checkpointCoordinator(long checkpointId, CompletableFuture result) { - this.checkpointId = checkpointId; executor.execute( () -> { try { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 5a34f2a178b11..b3c054b32d3f3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; @@ -43,6 +44,7 @@ import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieStorageConfig; @@ -88,6 +90,7 @@ import static org.apache.hudi.common.model.HoodieFileFormat.ORC; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; +import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME; /** * Utilities for Flink stream read and write. @@ -170,7 +173,7 @@ public static HoodieWriteConfig getHoodieClientConfig( .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) .withClusteringConfig( HoodieClusteringConfig.newBuilder() - .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) + .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED)) .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS)) .withClusteringPlanPartitionFilterMode( ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME))) @@ -218,6 +221,12 @@ public static HoodieWriteConfig getHoodieClientConfig( .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) .build()) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withLockWaitTimeInMillis(2000L) // 2s + .withFileSystemLockExpire(1) // 1 minute + .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf)) + .build()) .withPayloadConfig(HoodiePayloadConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) @@ -231,6 +240,7 @@ public static HoodieWriteConfig getHoodieClientConfig( .withProps(flinkConf2TypedProperties(conf)) .withSchema(getSourceSchema(conf).toString()); + // do not configure cleaning strategy as LAZY until multi-writers is supported. HoodieWriteConfig writeConfig = builder.build(); if (loadFsViewStorageConfig) { // do not use the builder to give a change for recovering the original fs view storage config @@ -548,4 +558,11 @@ public static boolean fileExists(FileSystem fs, Path path) { throw new HoodieException("Exception while checking file " + path + " existence", e); } } + + /** + * Returns the auxiliary path. + */ + public static String getAuxiliaryPath(Configuration conf) { + return conf.getString(FlinkOptions.PATH) + Path.SEPARATOR + AUXILIARYFOLDER_NAME; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 1589cf31e7405..680c4d02e238b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -382,13 +382,9 @@ public void testHoodiePipelineBuilderSink() throws Exception { execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE"); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4"); - options.put("table.type", HoodieTableType.MERGE_ON_READ.name()); - options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id"); - options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); + options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString()); Configuration conf = Configuration.fromMap(options); // Read from file source diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 814763e1f6f38..d8ffd6b1b111d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -42,7 +42,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -166,7 +165,6 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { TestData.checkWrittenDataCOW(tempFile, EXPECTED1); } - @Disabled @ParameterizedTest @ValueSource(booleans = {true, false}) public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception { @@ -201,14 +199,13 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce asyncCompactionService.start(null); // wait for the asynchronous commit to finish - TimeUnit.SECONDS.sleep(5); + TimeUnit.SECONDS.sleep(10); asyncCompactionService.shutDown(); TestData.checkWrittenDataCOW(tempFile, EXPECTED2); } - @Disabled @ParameterizedTest @ValueSource(booleans = {true, false}) public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception { @@ -218,7 +215,6 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel tableEnv.getConfig().getConfiguration() .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); Map options = new HashMap<>(); - options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false"); options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); @@ -227,9 +223,6 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel tableEnv.executeSql(hoodieTableDDL); tableEnv.executeSql(TestSQL.INSERT_T1).await(); - // wait for the asynchronous commit to finish - TimeUnit.SECONDS.sleep(3); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkCompactionConfig cfg = new FlinkCompactionConfig(); cfg.path = tempFile.getAbsolutePath(); @@ -253,9 +246,13 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')"; tableEnv.executeSql(insertT1ForNewPartition).await(); - // wait for the asynchronous commit to finish - TimeUnit.SECONDS.sleep(3); + writeClient.close(); + // re-create the write client/fs view server + // or there is low probability that connection refused occurs then + // the reader metadata view is not complete + writeClient = StreamerUtil.createWriteClient(conf); + metaClient.reloadActiveTimeline(); compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); HoodieFlinkTable table = writeClient.getHoodieTable(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index fb9f55986d150..c40831639b67f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -222,7 +222,7 @@ void testStreamWriteBatchRead() { } @Test - void testStreamWriteBatchReadOptimized() { + void testStreamWriteBatchReadOptimized() throws Exception { // create filesystem table named source String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); @@ -236,11 +236,16 @@ void testStreamWriteBatchReadOptimized() { .option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED) .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) .option(FlinkOptions.COMPACTION_TASKS, 1) + // disable the metadata table because + // the lock conflicts resolution takes time + .option(FlinkOptions.METADATA_ENABLED, false) .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); + // give some buffer time for finishing the async compaction tasks + TimeUnit.SECONDS.sleep(5); List rows = CollectionUtil.iterableToList( () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());