Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@
import static org.apache.hudi.config.HoodieHBaseIndexConfig.TABLENAME;
import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKPORT;
import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKQUORUM;
import static org.apache.hudi.index.HoodieIndex.IndexType.BLOOM;
import static org.apache.hudi.index.HoodieIndex.IndexType.BUCKET;
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM;
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
import static org.apache.hudi.index.HoodieIndex.IndexType.HBASE;
import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
import static org.apache.hudi.index.HoodieIndex.IndexType.SIMPLE;

/**
* Indexing related config.
Expand All @@ -57,7 +64,10 @@ public class HoodieIndexConfig extends HoodieConfig {

public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
.key("hoodie.index.type")
// Builder#getDefaultIndexType has already set it according to engine type
.noDefaultValue()
.withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(),
SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name())
.withDocumentation("Type of index to use. Default is Bloom filter. "
+ "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
+ "Bloom filters removes the dependency on a external system "
Expand Down Expand Up @@ -141,6 +151,7 @@ public class HoodieIndexConfig extends HoodieConfig {
public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty
.key("hoodie.bloom.index.filter.type")
.defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name())
.withValidValues(BloomFilterTypeCode.SIMPLE.name(), BloomFilterTypeCode.DYNAMIC_V0.name())
.withDocumentation("Filter type used. Default is BloomFilterTypeCode.DYNAMIC_V0. "
+ "Available values are [BloomFilterTypeCode.SIMPLE , BloomFilterTypeCode.DYNAMIC_V0]. "
+ "Dynamic bloom filters auto size themselves based on number of keys.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.Objects;

Expand All @@ -48,19 +51,22 @@ public class ConfigProperty<T> implements Serializable {

private final Option<String> deprecatedVersion;

private final Set<String> validValues;

private final String[] alternatives;

// provide the ability to infer config value based on other configs
private final Option<Function<HoodieConfig, Option<T>>> inferFunction;

ConfigProperty(String key, T defaultValue, String doc, Option<String> sinceVersion,
Option<String> deprecatedVersion, Option<Function<HoodieConfig, Option<T>>> inferFunc, String... alternatives) {
Option<String> deprecatedVersion, Option<Function<HoodieConfig, Option<T>>> inferFunc, Set<String> validValues, String... alternatives) {
this.key = Objects.requireNonNull(key);
this.defaultValue = defaultValue;
this.doc = doc;
this.sinceVersion = sinceVersion;
this.deprecatedVersion = deprecatedVersion;
this.inferFunction = inferFunc;
this.validValues = validValues;
this.alternatives = alternatives;
}

Expand Down Expand Up @@ -95,33 +101,46 @@ Option<Function<HoodieConfig, Option<T>>> getInferFunc() {
return inferFunction;
}

public void checkValues(String value) {
if (validValues != null && !validValues.isEmpty() && !validValues.contains(value)) {
throw new IllegalArgumentException(
"The value of " + key + " should be one of "
+ String.join(",", validValues) + ", but was " + value);
}
}

public List<String> getAlternatives() {
return Arrays.asList(alternatives);
}

public ConfigProperty<T> withDocumentation(String doc) {
Objects.requireNonNull(doc);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, alternatives);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, validValues, alternatives);
}

public ConfigProperty<T> withValidValues(String... validValues) {
Objects.requireNonNull(validValues);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, new HashSet<>(Arrays.asList(validValues)), alternatives);
}

public ConfigProperty<T> withAlternatives(String... alternatives) {
Objects.requireNonNull(alternatives);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, alternatives);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, validValues, alternatives);
}

public ConfigProperty<T> sinceVersion(String sinceVersion) {
Objects.requireNonNull(sinceVersion);
return new ConfigProperty<>(key, defaultValue, doc, Option.of(sinceVersion), deprecatedVersion, inferFunction, alternatives);
return new ConfigProperty<>(key, defaultValue, doc, Option.of(sinceVersion), deprecatedVersion, inferFunction, validValues, alternatives);
}

public ConfigProperty<T> deprecatedAfter(String deprecatedVersion) {
Objects.requireNonNull(deprecatedVersion);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, Option.of(deprecatedVersion), inferFunction, alternatives);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, Option.of(deprecatedVersion), inferFunction, validValues, alternatives);
}

public ConfigProperty<T> withInferFunction(Function<HoodieConfig, Option<T>> inferFunction) {
Objects.requireNonNull(inferFunction);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, Option.of(inferFunction), alternatives);
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, Option.of(inferFunction), validValues, alternatives);
}

/**
Expand Down Expand Up @@ -156,13 +175,13 @@ public static final class PropertyBuilder {

public <T> ConfigProperty<T> defaultValue(T value) {
Objects.requireNonNull(value);
ConfigProperty<T> configProperty = new ConfigProperty<>(key, value, "", Option.empty(), Option.empty(), Option.empty());
ConfigProperty<T> configProperty = new ConfigProperty<>(key, value, "", Option.empty(), Option.empty(), Option.empty(), Collections.emptySet());
return configProperty;
}

public ConfigProperty<String> noDefaultValue() {
ConfigProperty<String> configProperty = new ConfigProperty<>(key, null, "", Option.empty(),
Option.empty(), Option.empty());
Option.empty(), Option.empty(), Collections.emptySet());
return configProperty;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public HoodieConfig(Properties props) {
}

public <T> void setValue(ConfigProperty<T> cfg, String val) {
cfg.checkValues(val);
props.setProperty(cfg.key(), val);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ object DataSourceReadOptions {
.key("hoodie.datasource.query.type")
.defaultValue(QUERY_TYPE_SNAPSHOT_OPT_VAL)
.withAlternatives("hoodie.datasource.view.type")
.withValidValues(QUERY_TYPE_SNAPSHOT_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_INCREMENTAL_OPT_VAL)
.withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " +
"(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " +
"(obtain latest view, by merging base and (if any) log files)")
Expand All @@ -65,6 +66,7 @@ object DataSourceReadOptions {
val REALTIME_MERGE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.merge.type")
.defaultValue(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
.withValidValues(REALTIME_SKIP_MERGE_OPT_VAL, REALTIME_PAYLOAD_COMBINE_OPT_VAL)
.withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " +
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
Expand Down Expand Up @@ -210,6 +212,23 @@ object DataSourceWriteOptions {
val OPERATION: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.operation")
.defaultValue(UPSERT_OPERATION_OPT_VAL)
.withValidValues(
WriteOperationType.INSERT.value,
WriteOperationType.INSERT_PREPPED.value,
WriteOperationType.UPSERT.value,
WriteOperationType.UPSERT_PREPPED.value,
WriteOperationType.BULK_INSERT.value,
WriteOperationType.BULK_INSERT_PREPPED.value,
WriteOperationType.DELETE.value,
WriteOperationType.BOOTSTRAP.value,
WriteOperationType.INSERT_OVERWRITE.value,
WriteOperationType.CLUSTER.value,
WriteOperationType.DELETE_PARTITION.value,
WriteOperationType.INSERT_OVERWRITE_TABLE.value,
WriteOperationType.COMPACT.value,
WriteOperationType.INSERT.value,
WriteOperationType.ALTER_SCHEMA.value
)
.withDocumentation("Whether to do upsert, insert or bulkinsert for the write operation. " +
"Use bulkinsert to load new data into a table, and there on use upsert/insert. " +
"bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")
Expand All @@ -220,6 +239,7 @@ object DataSourceWriteOptions {
val TABLE_TYPE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.table.type")
.defaultValue(COW_TABLE_TYPE_OPT_VAL)
.withValidValues(COW_TABLE_TYPE_OPT_VAL, MOR_TABLE_TYPE_OPT_VAL)
.withAlternatives("hoodie.datasource.write.storage.type")
.withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")

Expand Down Expand Up @@ -308,7 +328,8 @@ object DataSourceWriteOptions {
Option.of(classOf[NonpartitionedKeyGenerator].getName)
} else {
val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
if (numOfPartFields == 1) {
val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length
if (numOfPartFields == 1 && numOfRecordKeyFields == 1) {
Option.of(classOf[SimpleKeyGenerator].getName)
} else {
Option.of(classOf[ComplexKeyGenerator].getName)
Expand Down