diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d861ffe970c80..ca201816cb045 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2012,7 +2012,9 @@ public WriteConcurrencyMode getWriteConcurrencyMode() { * @return True if any table services are configured to run inline, false otherwise. */ public Boolean areAnyTableServicesExecutedInline() { - return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean() || isAutoArchive(); + return areTableServicesEnabled() + && (inlineClusteringEnabled() || inlineCompactionEnabled() + || (isAutoClean() && !isAsyncClean()) || (isAutoArchive() && !isAsyncArchive())); } /** @@ -2021,9 +2023,10 @@ public Boolean areAnyTableServicesExecutedInline() { * @return True if any table services are configured to run async, false otherwise. */ public Boolean areAnyTableServicesAsync() { - return isAsyncClusteringEnabled() + return areTableServicesEnabled() + && (isAsyncClusteringEnabled() || (getTableType() == HoodieTableType.MERGE_ON_READ && !inlineCompactionEnabled()) - || isAsyncClean() || isAsyncArchive(); + || (isAutoClean() && isAsyncClean()) || (isAutoArchive() && isAsyncArchive())); } public Boolean areAnyTableServicesScheduledInline() { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 85d40964b8fd2..3d0da3e49a7f8 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.marker.MarkerType; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig.Builder; import org.apache.hudi.index.HoodieIndex; @@ -48,6 +47,7 @@ import java.util.function.Function; import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; +import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_ARCHIVE; import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN; import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN; import static org.apache.hudi.config.HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY; @@ -138,7 +138,7 @@ public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType put(ASYNC_CLEAN.key(), "false"); put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } - }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + }), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName); // 2. Async clean @@ -151,7 +151,7 @@ public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType put(ASYNC_CLEAN.key(), "true"); put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } - }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + }), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName); // 3. Async compaction configured @@ -165,7 +165,7 @@ public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } }), true, - tableType == HoodieTableType.MERGE_ON_READ, + tableType == HoodieTableType.MERGE_ON_READ, true, tableType == HoodieTableType.MERGE_ON_READ ? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL : WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), @@ -186,10 +186,25 @@ public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType put(ASYNC_CLEAN.key(), "false"); put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } - }), Option.of(true), Option.of(false), Option.of(true), + }), true, false, true, WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); + + // 5. All async services + verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), tableType.name()); + put(ASYNC_CLUSTERING_ENABLE.key(), "true"); + put(INLINE_COMPACT.key(), "false"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "true"); + put(ASYNC_ARCHIVE.key(), "true"); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); + } + }), true, true, false, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName); } @ParameterizedTest @@ -205,7 +220,7 @@ public void testAutoAdjustLockConfigs(HoodieTableType tableType) { .build(); verifyConcurrencyControlRelatedConfigs(writeConfig, - true, true, + true, true, true, WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); @@ -219,7 +234,7 @@ public void testAutoAdjustLockConfigs(HoodieTableType tableType) { .build(); verifyConcurrencyControlRelatedConfigs(writeConfig, - true, true, + true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); } @@ -240,7 +255,7 @@ public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType t .build(); verifyConcurrencyControlRelatedConfigs(writeConfig, - true, tableType == HoodieTableType.MERGE_ON_READ, + true, tableType == HoodieTableType.MERGE_ON_READ, true, WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), FileSystemBasedLockProviderTestClass.class.getName()); @@ -257,7 +272,7 @@ public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType t ZookeeperBasedLockProvider.class.getName()); put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } - }), true, true, + }), true, true, true, WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), ZookeeperBasedLockProvider.class.getName()); @@ -271,13 +286,13 @@ public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType t }); if (writeConfig.areAnyTableServicesAsync()) { verifyConcurrencyControlRelatedConfigs(writeConfig, - true, true, + true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, InProcessLockProvider.class.getName()); } else { verifyConcurrencyControlRelatedConfigs(writeConfig, - true, false, + true, false, true, WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()), HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); @@ -294,7 +309,7 @@ public void testAutoConcurrencyConfigAdjustmentWithNoTableService(HoodieTableTyp put(TABLE_SERVICES_ENABLED.key(), "false"); put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } - }), false, tableType == HoodieTableType.MERGE_ON_READ, + }), false, false, false, WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()), HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); @@ -311,7 +326,7 @@ public void testAutoConcurrencyConfigAdjustmentWithNoTableService(HoodieTableTyp FileSystemBasedLockProviderTestClass.class.getName()); put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } - }), false, tableType == HoodieTableType.MERGE_ON_READ, + }), false, false, false, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName()); @@ -332,7 +347,7 @@ public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieT put(ASYNC_CLEAN.key(), "false"); put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } - }), true, true, + }), true, true, true, WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()), HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()), HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue()); @@ -351,7 +366,8 @@ public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieT FileSystemBasedLockProviderTestClass.class.getName()); put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); } - }), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + }), true, true, true, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName()); } @@ -415,32 +431,14 @@ private Map constructConfigMap( private void verifyConcurrencyControlRelatedConfigs( HoodieWriteConfig writeConfig, boolean expectedTableServicesEnabled, boolean expectedAnyTableServicesAsync, + boolean expectedAnyTableServicesExecutedInline, WriteConcurrencyMode expectedConcurrencyMode, HoodieFailedWritesCleaningPolicy expectedCleanPolicy, String expectedLockProviderName) { - verifyConcurrencyControlRelatedConfigs(writeConfig, Option.of(expectedTableServicesEnabled), - Option.of(expectedAnyTableServicesAsync), Option.empty(), expectedConcurrencyMode, - expectedCleanPolicy, expectedLockProviderName); - } - - private void verifyConcurrencyControlRelatedConfigs( - HoodieWriteConfig writeConfig, Option expectedTableServicesEnabled, - Option expectedAnyTableServicesAsync, - Option expectedAnyTableServicesExecutedInline, - WriteConcurrencyMode expectedConcurrencyMode, - HoodieFailedWritesCleaningPolicy expectedCleanPolicy, - String expectedLockProviderName) { - if (expectedTableServicesEnabled.isPresent()) { - assertEquals(expectedTableServicesEnabled.get(), writeConfig.areTableServicesEnabled()); - } - if (expectedAnyTableServicesAsync.isPresent()) { - assertEquals(expectedAnyTableServicesAsync.get(), writeConfig.areAnyTableServicesAsync()); - } - if (expectedAnyTableServicesExecutedInline.isPresent()) { - assertEquals(expectedAnyTableServicesExecutedInline.get(), - writeConfig.areAnyTableServicesExecutedInline()); - } - + assertEquals(expectedTableServicesEnabled, writeConfig.areTableServicesEnabled()); + assertEquals(expectedAnyTableServicesAsync, writeConfig.areAnyTableServicesAsync()); + assertEquals( + expectedAnyTableServicesExecutedInline, writeConfig.areAnyTableServicesExecutedInline()); assertEquals(expectedConcurrencyMode, writeConfig.getWriteConcurrencyMode()); assertEquals(expectedCleanPolicy, writeConfig.getFailedWritesCleanPolicy()); assertEquals(expectedLockProviderName, writeConfig.getLockProviderClass());