Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2012,7 +2012,9 @@ public WriteConcurrencyMode getWriteConcurrencyMode() {
* @return True if any table services are configured to run inline, false otherwise.
*/
public Boolean areAnyTableServicesExecutedInline() {
return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean() || isAutoArchive();
return areTableServicesEnabled()
&& (inlineClusteringEnabled() || inlineCompactionEnabled()
|| (isAutoClean() && !isAsyncClean()) || (isAutoArchive() && !isAsyncArchive()));
}

/**
Expand All @@ -2021,9 +2023,10 @@ public Boolean areAnyTableServicesExecutedInline() {
* @return True if any table services are configured to run async, false otherwise.
*/
public Boolean areAnyTableServicesAsync() {
return isAsyncClusteringEnabled()
return areTableServicesEnabled()
&& (isAsyncClusteringEnabled()
|| (getTableType() == HoodieTableType.MERGE_ON_READ && !inlineCompactionEnabled())
|| isAsyncClean() || isAsyncArchive();
|| (isAutoClean() && isAsyncClean()) || (isAutoArchive() && isAsyncArchive()));
}

public Boolean areAnyTableServicesScheduledInline() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig.Builder;
import org.apache.hudi.index.HoodieIndex;

Expand All @@ -48,6 +47,7 @@
import java.util.function.Function;

import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_ARCHIVE;
import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN;
import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN;
import static org.apache.hudi.config.HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY;
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType
put(ASYNC_CLEAN.key(), "false");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
}), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);

// 2. Async clean
Expand All @@ -151,7 +151,7 @@ public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType
put(ASYNC_CLEAN.key(), "true");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
}), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);

// 3. Async compaction configured
Expand All @@ -165,7 +165,7 @@ public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true,
tableType == HoodieTableType.MERGE_ON_READ,
tableType == HoodieTableType.MERGE_ON_READ, true,
tableType == HoodieTableType.MERGE_ON_READ
? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL
: WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
Expand All @@ -186,10 +186,25 @@ public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType
put(ASYNC_CLEAN.key(), "false");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), Option.of(true), Option.of(false), Option.of(true),
}), true, false, true,
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());

// 5. All async services
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
{
put(HoodieTableConfig.TYPE.key(), tableType.name());
put(ASYNC_CLUSTERING_ENABLE.key(), "true");
put(INLINE_COMPACT.key(), "false");
put(AUTO_CLEAN.key(), "true");
put(ASYNC_CLEAN.key(), "true");
put(ASYNC_ARCHIVE.key(), "true");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, false,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
}

@ParameterizedTest
Expand All @@ -205,7 +220,7 @@ public void testAutoAdjustLockConfigs(HoodieTableType tableType) {
.build();

verifyConcurrencyControlRelatedConfigs(writeConfig,
true, true,
true, true, true,
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
Expand All @@ -219,7 +234,7 @@ public void testAutoAdjustLockConfigs(HoodieTableType tableType) {
.build();

verifyConcurrencyControlRelatedConfigs(writeConfig,
true, true,
true, true, true,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY,
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
}
Expand All @@ -240,7 +255,7 @@ public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType t
.build();

verifyConcurrencyControlRelatedConfigs(writeConfig,
true, tableType == HoodieTableType.MERGE_ON_READ,
true, tableType == HoodieTableType.MERGE_ON_READ, true,
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
FileSystemBasedLockProviderTestClass.class.getName());
Expand All @@ -257,7 +272,7 @@ public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType t
ZookeeperBasedLockProvider.class.getName());
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true,
}), true, true, true,
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
ZookeeperBasedLockProvider.class.getName());
Expand All @@ -271,13 +286,13 @@ public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType t
});
if (writeConfig.areAnyTableServicesAsync()) {
verifyConcurrencyControlRelatedConfigs(writeConfig,
true, true,
true, true, true,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY,
InProcessLockProvider.class.getName());
} else {
verifyConcurrencyControlRelatedConfigs(writeConfig,
true, false,
true, false, true,
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
Expand All @@ -294,7 +309,7 @@ public void testAutoConcurrencyConfigAdjustmentWithNoTableService(HoodieTableTyp
put(TABLE_SERVICES_ENABLED.key(), "false");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), false, tableType == HoodieTableType.MERGE_ON_READ,
}), false, false, false,
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
Expand All @@ -311,7 +326,7 @@ public void testAutoConcurrencyConfigAdjustmentWithNoTableService(HoodieTableTyp
FileSystemBasedLockProviderTestClass.class.getName());
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), false, tableType == HoodieTableType.MERGE_ON_READ,
}), false, false, false,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY,
FileSystemBasedLockProviderTestClass.class.getName());
Expand All @@ -332,7 +347,7 @@ public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieT
put(ASYNC_CLEAN.key(), "false");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true,
}), true, true, true,
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
Expand All @@ -351,7 +366,8 @@ public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieT
FileSystemBasedLockProviderTestClass.class.getName());
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
}), true, true, true,
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName());
}

Expand Down Expand Up @@ -415,32 +431,14 @@ private Map<EngineType, Object> constructConfigMap(
private void verifyConcurrencyControlRelatedConfigs(
HoodieWriteConfig writeConfig, boolean expectedTableServicesEnabled,
boolean expectedAnyTableServicesAsync,
boolean expectedAnyTableServicesExecutedInline,
WriteConcurrencyMode expectedConcurrencyMode,
HoodieFailedWritesCleaningPolicy expectedCleanPolicy,
String expectedLockProviderName) {
verifyConcurrencyControlRelatedConfigs(writeConfig, Option.of(expectedTableServicesEnabled),
Option.of(expectedAnyTableServicesAsync), Option.empty(), expectedConcurrencyMode,
expectedCleanPolicy, expectedLockProviderName);
}

private void verifyConcurrencyControlRelatedConfigs(
HoodieWriteConfig writeConfig, Option<Boolean> expectedTableServicesEnabled,
Option<Boolean> expectedAnyTableServicesAsync,
Option<Boolean> expectedAnyTableServicesExecutedInline,
WriteConcurrencyMode expectedConcurrencyMode,
HoodieFailedWritesCleaningPolicy expectedCleanPolicy,
String expectedLockProviderName) {
if (expectedTableServicesEnabled.isPresent()) {
assertEquals(expectedTableServicesEnabled.get(), writeConfig.areTableServicesEnabled());
}
if (expectedAnyTableServicesAsync.isPresent()) {
assertEquals(expectedAnyTableServicesAsync.get(), writeConfig.areAnyTableServicesAsync());
}
if (expectedAnyTableServicesExecutedInline.isPresent()) {
assertEquals(expectedAnyTableServicesExecutedInline.get(),
writeConfig.areAnyTableServicesExecutedInline());
}

assertEquals(expectedTableServicesEnabled, writeConfig.areTableServicesEnabled());
assertEquals(expectedAnyTableServicesAsync, writeConfig.areAnyTableServicesAsync());
assertEquals(
expectedAnyTableServicesExecutedInline, writeConfig.areAnyTableServicesExecutedInline());
assertEquals(expectedConcurrencyMode, writeConfig.getWriteConcurrencyMode());
assertEquals(expectedCleanPolicy, writeConfig.getFailedWritesCleanPolicy());
assertEquals(expectedLockProviderName, writeConfig.getLockProviderClass());
Expand Down