Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -66,6 +67,8 @@
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

Expand Down Expand Up @@ -93,6 +96,7 @@
+ "higher level frameworks (e.g Spark datasources, Flink sink) and utilities (e.g DeltaStreamer).")
public class HoodieWriteConfig extends HoodieConfig {

private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class);
private static final long serialVersionUID = 0L;

// This is a constant as is should never be changed via config (will invalidate previous commits)
Expand Down Expand Up @@ -903,6 +907,11 @@ public String getTableName() {
return getString(TBL_NAME);
}

public HoodieTableType getTableType() {
return HoodieTableType.valueOf(getStringOrDefault(
HoodieTableConfig.TYPE, HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase());
}

public String getPreCombineField() {
return getString(PRECOMBINE_FIELD_NAME);
}
Expand Down Expand Up @@ -1930,7 +1939,9 @@ public Boolean areAnyTableServicesExecutedInline() {
* @return True if any table services are configured to run async, false otherwise.
*/
public Boolean areAnyTableServicesAsync() {
return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean() || isAsyncArchive();
return isAsyncClusteringEnabled()
|| (getTableType() == HoodieTableType.MERGE_ON_READ && !inlineCompactionEnabled())
|| isAsyncClean() || isAsyncArchive();
}

public Boolean areAnyTableServicesScheduledInline() {
Expand Down Expand Up @@ -2390,19 +2401,56 @@ protected void setDefaults() {
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));

// Async table services can update the metadata table and a lock provider is
// needed to guard against any concurrent table write operations. If user has
// not configured any lock provider, let's use the InProcess lock provider.
autoAdjustConfigsForConcurrencyMode();
}

private void autoAdjustConfigsForConcurrencyMode() {
boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
final TypedProperties writeConfigProperties = writeConfig.getProps();
final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
|| writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);

if (!isLockConfigSet) {
HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps());
if (!isLockProviderPropertySet && writeConfig.areAnyTableServicesAsync()) {
lockConfigBuilder.withLockProvider(InProcessLockProvider.class);
}
writeConfig.setDefault(lockConfigBuilder.build());
}

if (isMetadataTableEnabled) {
// When metadata table is enabled, optimistic concurrency control must be used for
// single writer with async table services.
// Async table services can update the metadata table and a lock provider is
// needed to guard against any concurrent table write operations. If user has
// not configured any lock provider, let's use the InProcess lock provider.
boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled();
boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync();

if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) {
// This is targeted at Single writer with async table services
// If user does not set the lock provider, likely that the concurrency mode is not set either
// Override the configs for metadata table
writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName());
LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the "
+ "lock provider for single writer with async table services",
WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
}
}

// We check if "hoodie.cleaner.policy.failed.writes"
// is properly set to LAZY for optimistic concurrency control
String writeConcurrencyMode = writeConfig.getString(WRITE_CONCURRENCY_MODE);
if (WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()
.equalsIgnoreCase(writeConcurrencyMode)) {
// In this case, we assume that the user takes care of setting the lock provider used
writeConfig.setValue(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
LOG.info(String.format("Automatically set %s=%s since optimistic concurrency control is used",
HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name()));
}
}

private void validate() {
Expand All @@ -2411,9 +2459,9 @@ private void validate() {
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
Objects.requireNonNull(writeConfig.getString(BASE_PATH));
if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) {
ValidationUtils.checkArgument(writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
!= HoodieFailedWritesCleaningPolicy.EAGER.name(), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value())) {
ValidationUtils.checkArgument(!writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
.equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
}
}

Expand Down
Loading