diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/EngineType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/EngineType.java new file mode 100644 index 0000000000000..1ecb0e9557b82 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/EngineType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.common; + +/** + * Hoodie data processing engine. support only Apache Spark and Apache Flink for now. + */ +public enum EngineType { + SPARK, FLINK +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 83e9f678bc7ba..48b652aa58845 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -20,6 +20,8 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.config.DefaultHoodieConfig; +import org.apache.hudi.client.common.EngineType; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.HoodieIndex; import javax.annotation.concurrent.Immutable; @@ -36,7 +38,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String INDEX_TYPE_PROP = "hoodie.index.type"; - public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name(); public static final String INDEX_CLASS_PROP = "hoodie.index.class"; public static final String DEFAULT_INDEX_CLASS = ""; @@ -103,8 +104,18 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String SIMPLE_INDEX_UPDATE_PARTITION_PATH = "hoodie.simple.index.update.partition.path"; public static final String DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH = "false"; + private EngineType engineType; + + /** + * Use Spark engine by default. + */ private HoodieIndexConfig(Properties props) { + this(EngineType.SPARK, props); + } + + private HoodieIndexConfig(EngineType engineType, Properties props) { super(props); + this.engineType = engineType; } public static HoodieIndexConfig.Builder newBuilder() { @@ -113,6 +124,7 @@ public static HoodieIndexConfig.Builder newBuilder() { public static class Builder { + private EngineType engineType = EngineType.SPARK; private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { @@ -237,9 +249,14 @@ public Builder withGlobalSimpleIndexUpdatePartitionPath(boolean updatePartitionP return this; } + public Builder withEngineType(EngineType engineType) { + this.engineType = engineType; + return this; + } + public HoodieIndexConfig build() { - HoodieIndexConfig config = new HoodieIndexConfig(props); - setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE); + HoodieIndexConfig config = new HoodieIndexConfig(engineType, props); + setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, getDefaultIndexType(engineType)); setDefaultOnCondition(props, !props.containsKey(INDEX_CLASS_PROP), INDEX_CLASS_PROP, DEFAULT_INDEX_CLASS); setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_NUM_ENTRIES), BLOOM_FILTER_NUM_ENTRIES, DEFAULT_BLOOM_FILTER_NUM_ENTRIES); @@ -278,5 +295,20 @@ public HoodieIndexConfig build() { HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); return config; } + + private String getDefaultIndexType(EngineType engineType) { + switch (engineType) { + case SPARK: + return HoodieIndex.IndexType.BLOOM.name(); + case FLINK: + return HoodieIndex.IndexType.INMEMORY.name(); + default: + throw new HoodieNotSupportedException("Unsupported engine " + engineType); + } + } + + public EngineType getEngineType() { + return engineType; + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b06f994b08326..8c22cab3067e0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.common.config.DefaultHoodieConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.client.common.EngineType; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; @@ -124,10 +125,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. - * + *

* Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It * only works with COW table. Hudi 0.5.x had stopped this behavior. - * + *

* Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag * (disabled by default) which will allow this old behavior. */ @@ -145,10 +146,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig; private FileSystemViewStorageConfig viewStorageConfig; + private EngineType engineType; + + /** + * Use Spark engine by default. + */ protected HoodieWriteConfig(Properties props) { + this(EngineType.SPARK, props); + } + + protected HoodieWriteConfig(EngineType engineType, Properties props) { super(props); Properties newProps = new Properties(); newProps.putAll(props); + this.engineType = engineType; this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build(); this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build(); this.viewStorageConfig = clientSpecifiedViewStorageConfig; @@ -290,6 +301,10 @@ public boolean isMergeDataValidationCheckEnabled() { return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED)); } + public EngineType getEngineType() { + return engineType; + } + /** * compaction properties. */ @@ -779,6 +794,7 @@ public Long getMaxMemoryPerPartitionMerge() { public static class Builder { protected final Properties props = new Properties(); + protected EngineType engineType = EngineType.SPARK; private boolean isIndexConfigSet = false; private boolean isStorageConfigSet = false; private boolean isCompactionConfigSet = false; @@ -789,6 +805,11 @@ public static class Builder { private boolean isConsistencyGuardSet = false; private boolean isCallbackConfigSet = false; + public Builder withEngineType(EngineType engineType) { + this.engineType = engineType; + return this; + } + public Builder fromFile(File propertiesFile) throws IOException { try (FileReader reader = new FileReader(propertiesFile)) { this.props.load(reader); @@ -1049,7 +1070,7 @@ protected void setDefaults() { MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED); // Make sure the props is propagated - setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build()); + setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build()); setDefaultOnCondition(props, !isStorageConfigSet, HoodieStorageConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isCompactionConfigSet, HoodieCompactionConfig.newBuilder().fromProperties(props).build()); @@ -1081,7 +1102,7 @@ public HoodieWriteConfig build() { setDefaults(); validate(); // Build WriteConfig at the end - HoodieWriteConfig config = new HoodieWriteConfig(props); + HoodieWriteConfig config = new HoodieWriteConfig(engineType, props); return config; } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 360fa054b8def..cba240959274c 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -18,8 +18,10 @@ package org.apache.hudi.config; +import org.apache.hudi.client.common.EngineType; import org.apache.hudi.config.HoodieWriteConfig.Builder; +import org.apache.hudi.index.HoodieIndex; import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; @@ -54,6 +56,21 @@ public void testPropertyLoading() throws IOException { assertEquals(2, config.getMinCommitsToKeep()); } + @Test + public void testDefaultIndexAccordingToEngineType() { + // default bloom + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build(); + assertEquals(HoodieIndex.IndexType.BLOOM, writeConfig.getIndexType()); + + // spark default bloom + writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build(); + assertEquals(HoodieIndex.IndexType.BLOOM, writeConfig.getIndexType()); + + // flink default in-memory + writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath("/tmp").build(); + assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType()); + } + private ByteArrayOutputStream saveParamsIntoOutputStream(Map params) throws IOException { Properties properties = new Properties(); properties.putAll(params); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index db7ad253aeadf..f9dacae1e9449 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -22,14 +22,13 @@ import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.client.common.EngineType; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; -import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.schema.FilebasedSchemaProvider; @@ -134,10 +133,9 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) { FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf()); HoodieWriteConfig.Builder builder = - HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true) + HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build()) .forTable(cfg.targetTableName) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) .withAutoCommit(false) .withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs) .getConfig());