Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.common;

/**
* Hoodie data processing engine. support only Apache Spark and Apache Flink for now.
*/
public enum EngineType {
SPARK, FLINK
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;

import javax.annotation.concurrent.Immutable;
Expand All @@ -36,7 +38,6 @@
public class HoodieIndexConfig extends DefaultHoodieConfig {

public static final String INDEX_TYPE_PROP = "hoodie.index.type";
public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name();

public static final String INDEX_CLASS_PROP = "hoodie.index.class";
public static final String DEFAULT_INDEX_CLASS = "";
Expand Down Expand Up @@ -103,8 +104,18 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String SIMPLE_INDEX_UPDATE_PARTITION_PATH = "hoodie.simple.index.update.partition.path";
public static final String DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH = "false";

private EngineType engineType;

/**
* Use Spark engine by default.
*/
private HoodieIndexConfig(Properties props) {
this(EngineType.SPARK, props);
}

private HoodieIndexConfig(EngineType engineType, Properties props) {
super(props);
this.engineType = engineType;
}

public static HoodieIndexConfig.Builder newBuilder() {
Expand All @@ -113,6 +124,7 @@ public static HoodieIndexConfig.Builder newBuilder() {

public static class Builder {

private EngineType engineType = EngineType.SPARK;
private final Properties props = new Properties();

public Builder fromFile(File propertiesFile) throws IOException {
Expand Down Expand Up @@ -237,9 +249,14 @@ public Builder withGlobalSimpleIndexUpdatePartitionPath(boolean updatePartitionP
return this;
}

public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
return this;
}

public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
HoodieIndexConfig config = new HoodieIndexConfig(engineType, props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, getDefaultIndexType(engineType));
setDefaultOnCondition(props, !props.containsKey(INDEX_CLASS_PROP), INDEX_CLASS_PROP, DEFAULT_INDEX_CLASS);
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_NUM_ENTRIES), BLOOM_FILTER_NUM_ENTRIES,
DEFAULT_BLOOM_FILTER_NUM_ENTRIES);
Expand Down Expand Up @@ -278,5 +295,20 @@ public HoodieIndexConfig build() {
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;
}

private String getDefaultIndexType(EngineType engineType) {
switch (engineType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other configurations that are different in different engines, and if there are many, are they easy to implement in this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other configurations that are different in different engines, and if there are many, are they easy to implement in this way?

Currently, I recognized this only as I am working on flink index.
I think it is simple, just add a check step before initialized it.
this way should be ok.

case SPARK:
return HoodieIndex.IndexType.BLOOM.name();
case FLINK:
return HoodieIndex.IndexType.INMEMORY.name();
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}

public EngineType getEngineType() {
return engineType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
Expand Down Expand Up @@ -124,10 +125,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
*
* <p>
* Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It
* only works with COW table. Hudi 0.5.x had stopped this behavior.
*
* <p>
* Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
* (disabled by default) which will allow this old behavior.
*/
Expand All @@ -145,10 +146,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
private FileSystemViewStorageConfig viewStorageConfig;

private EngineType engineType;

/**
* Use Spark engine by default.
*/
protected HoodieWriteConfig(Properties props) {
this(EngineType.SPARK, props);
}

protected HoodieWriteConfig(EngineType engineType, Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
this.engineType = engineType;
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
Expand Down Expand Up @@ -290,6 +301,10 @@ public boolean isMergeDataValidationCheckEnabled() {
return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
}

public EngineType getEngineType() {
return engineType;
}

/**
* compaction properties.
*/
Expand Down Expand Up @@ -779,6 +794,7 @@ public Long getMaxMemoryPerPartitionMerge() {
public static class Builder {

protected final Properties props = new Properties();
protected EngineType engineType = EngineType.SPARK;
private boolean isIndexConfigSet = false;
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
Expand All @@ -789,6 +805,11 @@ public static class Builder {
private boolean isConsistencyGuardSet = false;
private boolean isCallbackConfigSet = false;

public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
return this;
}

public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
Expand Down Expand Up @@ -1049,7 +1070,7 @@ protected void setDefaults() {
MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);

// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build());
setDefaultOnCondition(props, !isStorageConfigSet, HoodieStorageConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(props).build());
Expand Down Expand Up @@ -1081,7 +1102,7 @@ public HoodieWriteConfig build() {
setDefaults();
validate();
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
HoodieWriteConfig config = new HoodieWriteConfig(engineType, props);
return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.hudi.config;

import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.config.HoodieWriteConfig.Builder;

import org.apache.hudi.index.HoodieIndex;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -54,6 +56,21 @@ public void testPropertyLoading() throws IOException {
assertEquals(2, config.getMinCommitsToKeep());
}

@Test
public void testDefaultIndexAccordingToEngineType() {
// default bloom
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build();
assertEquals(HoodieIndex.IndexType.BLOOM, writeConfig.getIndexType());

// spark default bloom
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
assertEquals(HoodieIndex.IndexType.BLOOM, writeConfig.getIndexType());

// flink default in-memory
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath("/tmp").build();
assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType());
}

private ByteArrayOutputStream saveParamsIntoOutputStream(Map<String, String> params) throws IOException {
Properties properties = new Properties();
properties.putAll(params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
Expand Down Expand Up @@ -134,10 +133,9 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco
public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withAutoCommit(false)
.withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
.getConfig());
Expand Down