diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index dbd45b9738285..0cef5550af8b7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -44,6 +44,13 @@ import static org.apache.hudi.config.HoodieHBaseIndexConfig.TABLENAME; import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKPORT; import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKQUORUM; +import static org.apache.hudi.index.HoodieIndex.IndexType.BLOOM; +import static org.apache.hudi.index.HoodieIndex.IndexType.BUCKET; +import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM; +import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE; +import static org.apache.hudi.index.HoodieIndex.IndexType.HBASE; +import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY; +import static org.apache.hudi.index.HoodieIndex.IndexType.SIMPLE; /** * Indexing related config. @@ -57,7 +64,10 @@ public class HoodieIndexConfig extends HoodieConfig { public static final ConfigProperty INDEX_TYPE = ConfigProperty .key("hoodie.index.type") + // Builder#getDefaultIndexType has already set it according to engine type .noDefaultValue() + .withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(), + SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name()) .withDocumentation("Type of index to use. Default is Bloom filter. " + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. " + "Bloom filters removes the dependency on a external system " @@ -141,6 +151,7 @@ public class HoodieIndexConfig extends HoodieConfig { public static final ConfigProperty BLOOM_FILTER_TYPE = ConfigProperty .key("hoodie.bloom.index.filter.type") .defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name()) + .withValidValues(BloomFilterTypeCode.SIMPLE.name(), BloomFilterTypeCode.DYNAMIC_V0.name()) .withDocumentation("Filter type used. Default is BloomFilterTypeCode.DYNAMIC_V0. " + "Available values are [BloomFilterTypeCode.SIMPLE , BloomFilterTypeCode.DYNAMIC_V0]. " + "Dynamic bloom filters auto size themselves based on number of keys."); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java index 9612914303588..934803d8d315e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java @@ -24,7 +24,10 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Function; import java.util.Objects; @@ -48,19 +51,22 @@ public class ConfigProperty implements Serializable { private final Option deprecatedVersion; + private final Set validValues; + private final String[] alternatives; // provide the ability to infer config value based on other configs private final Option>> inferFunction; ConfigProperty(String key, T defaultValue, String doc, Option sinceVersion, - Option deprecatedVersion, Option>> inferFunc, String... alternatives) { + Option deprecatedVersion, Option>> inferFunc, Set validValues, String... alternatives) { this.key = Objects.requireNonNull(key); this.defaultValue = defaultValue; this.doc = doc; this.sinceVersion = sinceVersion; this.deprecatedVersion = deprecatedVersion; this.inferFunction = inferFunc; + this.validValues = validValues; this.alternatives = alternatives; } @@ -95,33 +101,46 @@ Option>> getInferFunc() { return inferFunction; } + public void checkValues(String value) { + if (validValues != null && !validValues.isEmpty() && !validValues.contains(value)) { + throw new IllegalArgumentException( + "The value of " + key + " should be one of " + + String.join(",", validValues) + ", but was " + value); + } + } + public List getAlternatives() { return Arrays.asList(alternatives); } public ConfigProperty withDocumentation(String doc) { Objects.requireNonNull(doc); - return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, alternatives); + return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, validValues, alternatives); + } + + public ConfigProperty withValidValues(String... validValues) { + Objects.requireNonNull(validValues); + return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, new HashSet<>(Arrays.asList(validValues)), alternatives); } public ConfigProperty withAlternatives(String... alternatives) { Objects.requireNonNull(alternatives); - return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, alternatives); + return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, validValues, alternatives); } public ConfigProperty sinceVersion(String sinceVersion) { Objects.requireNonNull(sinceVersion); - return new ConfigProperty<>(key, defaultValue, doc, Option.of(sinceVersion), deprecatedVersion, inferFunction, alternatives); + return new ConfigProperty<>(key, defaultValue, doc, Option.of(sinceVersion), deprecatedVersion, inferFunction, validValues, alternatives); } public ConfigProperty deprecatedAfter(String deprecatedVersion) { Objects.requireNonNull(deprecatedVersion); - return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, Option.of(deprecatedVersion), inferFunction, alternatives); + return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, Option.of(deprecatedVersion), inferFunction, validValues, alternatives); } public ConfigProperty withInferFunction(Function> inferFunction) { Objects.requireNonNull(inferFunction); - return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, Option.of(inferFunction), alternatives); + return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, Option.of(inferFunction), validValues, alternatives); } /** @@ -156,13 +175,13 @@ public static final class PropertyBuilder { public ConfigProperty defaultValue(T value) { Objects.requireNonNull(value); - ConfigProperty configProperty = new ConfigProperty<>(key, value, "", Option.empty(), Option.empty(), Option.empty()); + ConfigProperty configProperty = new ConfigProperty<>(key, value, "", Option.empty(), Option.empty(), Option.empty(), Collections.emptySet()); return configProperty; } public ConfigProperty noDefaultValue() { ConfigProperty configProperty = new ConfigProperty<>(key, null, "", Option.empty(), - Option.empty(), Option.empty()); + Option.empty(), Option.empty(), Collections.emptySet()); return configProperty; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index c77e292b4775f..1aa0cfba5bc13 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -57,6 +57,7 @@ public HoodieConfig(Properties props) { } public void setValue(ConfigProperty cfg, String val) { + cfg.checkValues(val); props.setProperty(cfg.key(), val); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 0d4c7cf184ddc..c467e13ce237e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -56,6 +56,7 @@ object DataSourceReadOptions { .key("hoodie.datasource.query.type") .defaultValue(QUERY_TYPE_SNAPSHOT_OPT_VAL) .withAlternatives("hoodie.datasource.view.type") + .withValidValues(QUERY_TYPE_SNAPSHOT_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_INCREMENTAL_OPT_VAL) .withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " + "(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " + "(obtain latest view, by merging base and (if any) log files)") @@ -65,6 +66,7 @@ object DataSourceReadOptions { val REALTIME_MERGE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.merge.type") .defaultValue(REALTIME_PAYLOAD_COMBINE_OPT_VAL) + .withValidValues(REALTIME_SKIP_MERGE_OPT_VAL, REALTIME_PAYLOAD_COMBINE_OPT_VAL) .withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " + s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" + s"${REALTIME_SKIP_MERGE_OPT_VAL}") @@ -210,6 +212,23 @@ object DataSourceWriteOptions { val OPERATION: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.operation") .defaultValue(UPSERT_OPERATION_OPT_VAL) + .withValidValues( + WriteOperationType.INSERT.value, + WriteOperationType.INSERT_PREPPED.value, + WriteOperationType.UPSERT.value, + WriteOperationType.UPSERT_PREPPED.value, + WriteOperationType.BULK_INSERT.value, + WriteOperationType.BULK_INSERT_PREPPED.value, + WriteOperationType.DELETE.value, + WriteOperationType.BOOTSTRAP.value, + WriteOperationType.INSERT_OVERWRITE.value, + WriteOperationType.CLUSTER.value, + WriteOperationType.DELETE_PARTITION.value, + WriteOperationType.INSERT_OVERWRITE_TABLE.value, + WriteOperationType.COMPACT.value, + WriteOperationType.INSERT.value, + WriteOperationType.ALTER_SCHEMA.value + ) .withDocumentation("Whether to do upsert, insert or bulkinsert for the write operation. " + "Use bulkinsert to load new data into a table, and there on use upsert/insert. " + "bulk insert uses a disk based write path to scale to load large inputs without need to cache it.") @@ -220,6 +239,7 @@ object DataSourceWriteOptions { val TABLE_TYPE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.table.type") .defaultValue(COW_TABLE_TYPE_OPT_VAL) + .withValidValues(COW_TABLE_TYPE_OPT_VAL, MOR_TABLE_TYPE_OPT_VAL) .withAlternatives("hoodie.datasource.write.storage.type") .withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.") @@ -308,7 +328,8 @@ object DataSourceWriteOptions { Option.of(classOf[NonpartitionedKeyGenerator].getName) } else { val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length - if (numOfPartFields == 1) { + val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length + if (numOfPartFields == 1 && numOfRecordKeyFields == 1) { Option.of(classOf[SimpleKeyGenerator].getName) } else { Option.of(classOf[ComplexKeyGenerator].getName)