metrics, long defaultBaseFileSize) {
+ int numLogFiles = 0;
+ long totalLogFileSize = 0;
+ long totalIORead = 0;
+ long totalIOWrite = 0;
+ long totalIO = 0;
+
+ for (FileSlice slice : fileSlices) {
+ numLogFiles += slice.getLogFiles().count();
+ // Total size of all the log files
+ totalLogFileSize += slice.getLogFiles().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
+ .reduce(Long::sum).orElse(0L);
+
+ long baseFileSize = slice.getBaseFile().isPresent() ? slice.getBaseFile().get().getFileSize() : 0L;
+ // Total read will be the base file + all the log files
+ totalIORead = FSUtils.getSizeInMB(baseFileSize + totalLogFileSize);
+ // Total write will be similar to the size of the base file
+ totalIOWrite = FSUtils.getSizeInMB(baseFileSize > 0 ? baseFileSize : defaultBaseFileSize);
+ // Total IO will the the IO for read + write
+ totalIO = totalIORead + totalIOWrite;
+ }
+
+ metrics.put(TOTAL_IO_READ_MB, (double) totalIORead);
+ metrics.put(TOTAL_IO_WRITE_MB, (double) totalIOWrite);
+ metrics.put(TOTAL_IO_MB, (double) totalIO);
+ metrics.put(TOTAL_LOG_FILE_SIZE, (double) totalLogFileSize);
+ metrics.put(TOTAL_LOG_FILES, (double) numLogFiles);
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
new file mode 100644
index 0000000000000..91acd3075377a
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Clustering specific configs.
+ */
+public class HoodieClusteringConfig extends DefaultHoodieConfig {
+
+ // Config to provide a strategy class to create ClusteringPlan. Class has to be subclass of ClusteringPlanStrategy
+ public static final String CLUSTERING_PLAN_STRATEGY_CLASS = "hoodie.clustering.plan.strategy.class";
+ public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS =
+ "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
+
+ // Config to provide a strategy class to execute a ClusteringPlan. Class has to be subclass of RunClusteringStrategy
+ public static final String CLUSTERING_EXECUTION_STRATEGY_CLASS = "hoodie.clustering.execution.strategy.class";
+ public static final String DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS =
+ "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
+
+ // Turn on inline clustering - clustering will be run after write operation is complete.
+ public static final String INLINE_CLUSTERING_PROP = "hoodie.clustering.inline";
+ private static final String DEFAULT_INLINE_CLUSTERING = "false";
+
+ // Config to control frequency of clustering
+ public static final String INLINE_CLUSTERING_MAX_COMMIT_PROP = "hoodie.clustering.inline.max.commits";
+ private static final String DEFAULT_INLINE_CLUSTERING_NUM_COMMITS = "4";
+
+ // Any strategy specific params can be saved with this prefix
+ public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
+
+ // Number of partitions to list to create ClusteringPlan.
+ public static final String CLUSTERING_TARGET_PARTITIONS = CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.lookback.partitions";
+ public static final String DEFAULT_CLUSTERING_TARGET_PARTITIONS = String.valueOf(2);
+
+ // Files smaller than the size specified here are candidates for clustering.
+ public static final String CLUSTERING_PLAN_SMALL_FILE_LIMIT = CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit";
+ public static final String DEFAULT_CLUSTERING_PLAN_SMALL_FILE_LIMIT = String.valueOf(600 * 1024 * 1024L); // 600MB
+
+ // Each clustering operation can create multiple groups. Total amount of data processed by clustering operation
+ // is defined by below two properties (CLUSTERING_MAX_BYTES_PER_GROUP * CLUSTERING_MAX_NUM_GROUPS).
+ // Max amount of data to be included in one group
+ public static final String CLUSTERING_MAX_BYTES_PER_GROUP = CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group";
+ public static final String DEFAULT_CLUSTERING_MAX_GROUP_SIZE = String.valueOf(2 * 1024 * 1024 * 1024L);
+
+ // Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism.
+ public static final String CLUSTERING_MAX_NUM_GROUPS = CLUSTERING_STRATEGY_PARAM_PREFIX + "max.num.groups";
+ public static final String DEFAULT_CLUSTERING_MAX_NUM_GROUPS = "30";
+
+ // Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups.
+ public static final String CLUSTERING_TARGET_FILE_MAX_BYTES = CLUSTERING_STRATEGY_PARAM_PREFIX + "target.file.max.bytes";
+ public static final String DEFAULT_CLUSTERING_TARGET_FILE_MAX_BYTES = String.valueOf(1 * 1024 * 1024 * 1024L); // 1GB
+
+ // Constants related to clustering that may be used by more than 1 strategy.
+ public static final String CLUSTERING_SORT_COLUMNS_PROPERTY = HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns";
+
+ // When file groups is in clustering, need to handle the update to these file groups. Default strategy just reject the update
+ public static final String CLUSTERING_UPDATES_STRATEGY_PROP = "hoodie.clustering.updates.strategy";
+ public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY = "org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy";
+
+ // Async clustering
+ public static final String ASYNC_CLUSTERING_ENABLE_OPT_KEY = "hoodie.clustering.async.enabled";
+ public static final String DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL = "false";
+
+ public HoodieClusteringConfig(Properties props) {
+ super(props);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.props.load(reader);
+ return this;
+ }
+ }
+
+ public Builder withClusteringPlanStrategyClass(String clusteringStrategyClass) {
+ props.setProperty(CLUSTERING_PLAN_STRATEGY_CLASS, clusteringStrategyClass);
+ return this;
+ }
+
+ public Builder withClusteringExecutionStrategyClass(String runClusteringStrategyClass) {
+ props.setProperty(CLUSTERING_EXECUTION_STRATEGY_CLASS, runClusteringStrategyClass);
+ return this;
+ }
+
+ public Builder withClusteringTargetPartitions(int clusteringTargetPartitions) {
+ props.setProperty(CLUSTERING_TARGET_PARTITIONS, String.valueOf(clusteringTargetPartitions));
+ return this;
+ }
+
+ public Builder withClusteringPlanSmallFileLimit(long clusteringSmallFileLimit) {
+ props.setProperty(CLUSTERING_PLAN_SMALL_FILE_LIMIT, String.valueOf(clusteringSmallFileLimit));
+ return this;
+ }
+
+ public Builder withClusteringSortColumns(String sortColumns) {
+ props.setProperty(CLUSTERING_SORT_COLUMNS_PROPERTY, sortColumns);
+ return this;
+ }
+
+ public Builder withClusteringMaxBytesInGroup(long clusteringMaxGroupSize) {
+ props.setProperty(CLUSTERING_MAX_BYTES_PER_GROUP, String.valueOf(clusteringMaxGroupSize));
+ return this;
+ }
+
+ public Builder withClusteringMaxNumGroups(int maxNumGroups) {
+ props.setProperty(CLUSTERING_MAX_NUM_GROUPS, String.valueOf(maxNumGroups));
+ return this;
+ }
+
+ public Builder withClusteringTargetFileMaxBytes(long targetFileSize) {
+ props.setProperty(CLUSTERING_TARGET_FILE_MAX_BYTES, String.valueOf(targetFileSize));
+ return this;
+ }
+
+ public Builder withInlineClustering(Boolean inlineClustering) {
+ props.setProperty(INLINE_CLUSTERING_PROP, String.valueOf(inlineClustering));
+ return this;
+ }
+
+ public Builder withInlineClusteringNumCommits(int numCommits) {
+ props.setProperty(INLINE_CLUSTERING_MAX_COMMIT_PROP, String.valueOf(numCommits));
+ return this;
+ }
+
+ public Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public Builder withClusteringUpdatesStrategy(String updatesStrategyClass) {
+ props.setProperty(CLUSTERING_UPDATES_STRATEGY_PROP, updatesStrategyClass);
+ return this;
+ }
+
+ public Builder withAsyncClustering(Boolean asyncClustering) {
+ props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY, String.valueOf(asyncClustering));
+ return this;
+ }
+
+ public HoodieClusteringConfig build() {
+ HoodieClusteringConfig config = new HoodieClusteringConfig(props);
+
+ setDefaultOnCondition(props, !props.containsKey(CLUSTERING_PLAN_STRATEGY_CLASS),
+ CLUSTERING_PLAN_STRATEGY_CLASS, DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS);
+ setDefaultOnCondition(props, !props.containsKey(CLUSTERING_EXECUTION_STRATEGY_CLASS),
+ CLUSTERING_EXECUTION_STRATEGY_CLASS, DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS);
+ setDefaultOnCondition(props, !props.containsKey(CLUSTERING_MAX_BYTES_PER_GROUP), CLUSTERING_MAX_BYTES_PER_GROUP,
+ DEFAULT_CLUSTERING_MAX_GROUP_SIZE);
+ setDefaultOnCondition(props, !props.containsKey(CLUSTERING_MAX_NUM_GROUPS), CLUSTERING_MAX_NUM_GROUPS,
+ DEFAULT_CLUSTERING_MAX_NUM_GROUPS);
+ setDefaultOnCondition(props, !props.containsKey(CLUSTERING_TARGET_FILE_MAX_BYTES), CLUSTERING_TARGET_FILE_MAX_BYTES,
+ DEFAULT_CLUSTERING_TARGET_FILE_MAX_BYTES);
+ setDefaultOnCondition(props, !props.containsKey(INLINE_CLUSTERING_PROP), INLINE_CLUSTERING_PROP,
+ DEFAULT_INLINE_CLUSTERING);
+ setDefaultOnCondition(props, !props.containsKey(INLINE_CLUSTERING_MAX_COMMIT_PROP), INLINE_CLUSTERING_MAX_COMMIT_PROP,
+ DEFAULT_INLINE_CLUSTERING_NUM_COMMITS);
+ setDefaultOnCondition(props, !props.containsKey(CLUSTERING_TARGET_PARTITIONS), CLUSTERING_TARGET_PARTITIONS,
+ DEFAULT_CLUSTERING_TARGET_PARTITIONS);
+ setDefaultOnCondition(props, !props.containsKey(CLUSTERING_PLAN_SMALL_FILE_LIMIT), CLUSTERING_PLAN_SMALL_FILE_LIMIT,
+ DEFAULT_CLUSTERING_PLAN_SMALL_FILE_LIMIT);
+ setDefaultOnCondition(props, !props.containsKey(CLUSTERING_UPDATES_STRATEGY_PROP), CLUSTERING_UPDATES_STRATEGY_PROP,
+ DEFAULT_CLUSTERING_UPDATES_STRATEGY);
+ setDefaultOnCondition(props, !props.containsKey(ASYNC_CLUSTERING_ENABLE_OPT_KEY), ASYNC_CLUSTERING_ENABLE_OPT_KEY,
+ DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL);
+ return config;
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 83e9f678bc7ba..2fbd71dc6ab26 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -20,6 +20,8 @@
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import javax.annotation.concurrent.Immutable;
@@ -36,7 +38,6 @@
public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String INDEX_TYPE_PROP = "hoodie.index.type";
- public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name();
public static final String INDEX_CLASS_PROP = "hoodie.index.class";
public static final String DEFAULT_INDEX_CLASS = "";
@@ -103,8 +104,18 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String SIMPLE_INDEX_UPDATE_PARTITION_PATH = "hoodie.simple.index.update.partition.path";
public static final String DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH = "false";
+ private EngineType engineType;
+
+ /**
+ * Use Spark engine by default.
+ */
private HoodieIndexConfig(Properties props) {
+ this(EngineType.SPARK, props);
+ }
+
+ private HoodieIndexConfig(EngineType engineType, Properties props) {
super(props);
+ this.engineType = engineType;
}
public static HoodieIndexConfig.Builder newBuilder() {
@@ -113,6 +124,7 @@ public static HoodieIndexConfig.Builder newBuilder() {
public static class Builder {
+ private EngineType engineType = EngineType.SPARK;
private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException {
@@ -237,9 +249,14 @@ public Builder withGlobalSimpleIndexUpdatePartitionPath(boolean updatePartitionP
return this;
}
+ public Builder withEngineType(EngineType engineType) {
+ this.engineType = engineType;
+ return this;
+ }
+
public HoodieIndexConfig build() {
- HoodieIndexConfig config = new HoodieIndexConfig(props);
- setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
+ HoodieIndexConfig config = new HoodieIndexConfig(engineType, props);
+ setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, getDefaultIndexType(engineType));
setDefaultOnCondition(props, !props.containsKey(INDEX_CLASS_PROP), INDEX_CLASS_PROP, DEFAULT_INDEX_CLASS);
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_NUM_ENTRIES), BLOOM_FILTER_NUM_ENTRIES,
DEFAULT_BLOOM_FILTER_NUM_ENTRIES);
@@ -260,9 +277,9 @@ public HoodieIndexConfig build() {
BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_KEYS_PER_BUCKET_PROP),
BLOOM_INDEX_KEYS_PER_BUCKET_PROP, DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET);
- setDefaultOnCondition(props, !props.contains(BLOOM_INDEX_FILTER_TYPE),
+ setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_FILTER_TYPE),
BLOOM_INDEX_FILTER_TYPE, DEFAULT_BLOOM_INDEX_FILTER_TYPE);
- setDefaultOnCondition(props, !props.contains(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES),
+ setDefaultOnCondition(props, !props.containsKey(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES),
HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES);
setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_PARALLELISM_PROP), SIMPLE_INDEX_PARALLELISM_PROP,
DEFAULT_SIMPLE_INDEX_PARALLELISM);
@@ -278,5 +295,20 @@ public HoodieIndexConfig build() {
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;
}
+
+ private String getDefaultIndexType(EngineType engineType) {
+ switch (engineType) {
+ case SPARK:
+ return HoodieIndex.IndexType.BLOOM.name();
+ case FLINK:
+ return HoodieIndex.IndexType.INMEMORY.name();
+ default:
+ throw new HoodieNotSupportedException("Unsupported engine " + engineType);
+ }
+ }
+
+ public EngineType getEngineType() {
+ return engineType;
+ }
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
index 800c75f824fd4..b6cb6e5add99d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
@@ -62,6 +62,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
public static final String METRICS_REPORTER_CLASS = METRIC_PREFIX + ".reporter.class";
public static final String DEFAULT_METRICS_REPORTER_CLASS = "";
+ // Enable metrics collection from executors
+ public static final String ENABLE_EXECUTOR_METRICS = METRIC_PREFIX + ".executor.enable";
+
private HoodieMetricsConfig(Properties props) {
super(props);
}
@@ -126,6 +129,11 @@ public Builder withReporterClass(String className) {
return this;
}
+ public Builder withExecutorMetrics(boolean enable) {
+ props.setProperty(ENABLE_EXECUTOR_METRICS, String.valueOf(enable));
+ return this;
+ }
+
public HoodieMetricsConfig build() {
HoodieMetricsConfig config = new HoodieMetricsConfig(props);
setDefaultOnCondition(props, !props.containsKey(METRICS_ON), METRICS_ON, String.valueOf(DEFAULT_METRICS_ON));
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
new file mode 100644
index 0000000000000..442bd02c67a25
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.config.HoodieMemoryConfig.Builder;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_ORDERING_FIELD_VAL;
+import static org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP;
+
+/**
+ * Hoodie payload related configs.
+ */
+public class HoodiePayloadConfig extends DefaultHoodieConfig {
+
+ public HoodiePayloadConfig(Properties props) {
+ super(props);
+ }
+
+ public static HoodiePayloadConfig.Builder newBuilder() {
+ return new HoodiePayloadConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.props.load(reader);
+ return this;
+ }
+ }
+
+ public Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public Builder withPayloadOrderingField(String payloadOrderingField) {
+ props.setProperty(PAYLOAD_ORDERING_FIELD_PROP, String.valueOf(payloadOrderingField));
+ return this;
+ }
+
+ public HoodiePayloadConfig build() {
+ HoodiePayloadConfig config = new HoodiePayloadConfig(props);
+ setDefaultOnCondition(props, !props.containsKey(PAYLOAD_ORDERING_FIELD_PROP), DEFAULT_PAYLOAD_ORDERING_FIELD_VAL,
+ String.valueOf(DEFAULT_PAYLOAD_ORDERING_FIELD_VAL));
+ return config;
+ }
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 42d3e2b404568..d8135d44135b0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -18,21 +18,25 @@
package org.apache.hudi.config;
-import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import javax.annotation.concurrent.Immutable;
@@ -49,6 +53,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
+
/**
* Class storing configs for the HoodieWriteClient.
*/
@@ -58,6 +63,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final long serialVersionUID = 0L;
public static final String TABLE_NAME = "hoodie.table.name";
+ public static final String PRECOMBINE_FIELD_PROP = "hoodie.datasource.write.precombine.field";
+ public static final String WRITE_PAYLOAD_CLASS = "hoodie.datasource.write.payload.class";
+ public static final String DEFAULT_WRITE_PAYLOAD_CLASS = OverwriteWithLatestAvroPayload.class.getName();
+ public static final String KEYGENERATOR_CLASS_PROP = "hoodie.datasource.write.keygenerator.class";
+ public static final String DEFAULT_KEYGENERATOR_CLASS = SimpleAvroKeyGenerator.class.getName();
public static final String DEFAULT_ROLLBACK_USING_MARKERS = "false";
public static final String ROLLBACK_USING_MARKERS = "hoodie.rollback.using.markers";
public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
@@ -69,6 +79,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
public static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
+ public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl";
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
@@ -85,8 +96,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
public static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
public static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
- public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
- public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
+
public static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
@@ -117,13 +127,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+ // Data validation check performed during merges before actual commits
+ private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled";
+ private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false";
+
/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
- *
+ *
* Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It
* only works with COW table. Hudi 0.5.x had stopped this behavior.
- *
+ *
* Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
* (disabled by default) which will allow this old behavior.
*/
@@ -140,14 +154,28 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
// We keep track of original config and rewritten config
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
private FileSystemViewStorageConfig viewStorageConfig;
+ private HoodiePayloadConfig hoodiePayloadConfig;
+ private HoodieMetadataConfig metadataConfig;
+
+ private EngineType engineType;
+ /**
+ * Use Spark engine by default.
+ */
protected HoodieWriteConfig(Properties props) {
+ this(EngineType.SPARK, props);
+ }
+
+ protected HoodieWriteConfig(EngineType engineType, Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
+ this.engineType = engineType;
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
+ this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
+ this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
}
public static HoodieWriteConfig.Builder newBuilder() {
@@ -177,12 +205,24 @@ public String getTableName() {
return props.getProperty(TABLE_NAME);
}
+ public String getPreCombineField() {
+ return props.getProperty(PRECOMBINE_FIELD_PROP);
+ }
+
+ public String getWritePayloadClass() {
+ return props.getProperty(WRITE_PAYLOAD_CLASS);
+ }
+
+ public String getKeyGeneratorClass() {
+ return props.getProperty(KEYGENERATOR_CLASS_PROP);
+ }
+
public Boolean shouldAutoCommit() {
return Boolean.parseBoolean(props.getProperty(HOODIE_AUTO_COMMIT_PROP));
}
public Boolean shouldAssumeDatePartitioning() {
- return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP));
+ return metadataConfig.shouldAssumeDatePartitioning();
}
public boolean shouldUseExternalSchemaTransformation() {
@@ -217,6 +257,10 @@ public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}
+ public int getFileListingParallelism() {
+ return metadataConfig.getFileListingParallelism();
+ }
+
public boolean shouldRollbackUsingMarkers() {
return Boolean.parseBoolean(props.getProperty(ROLLBACK_USING_MARKERS));
}
@@ -282,6 +326,14 @@ public BulkInsertSortMode getBulkInsertSortMode() {
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}
+ public boolean isMergeDataValidationCheckEnabled() {
+ return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
+ }
+
+ public EngineType getEngineType() {
+ return engineType;
+ }
+
/**
* compaction properties.
*/
@@ -365,6 +417,23 @@ public Boolean getCompactionReverseLogReadEnabled() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP));
}
+ public boolean isInlineClustering() {
+ return Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_PROP));
+ }
+
+ public boolean isAsyncClusteringEnabled() {
+ return Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
+ }
+
+ public boolean isClusteringEnabled() {
+ // TODO: future support async clustering
+ return isInlineClustering() || isAsyncClusteringEnabled();
+ }
+
+ public int getInlineClusterMaxCommits() {
+ return Integer.parseInt(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP));
+ }
+
public String getPayloadClass() {
return props.getProperty(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
}
@@ -381,6 +450,45 @@ public Boolean shouldCleanBootstrapBaseFile() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED));
}
+ public String getClusteringUpdatesStrategyClass() {
+ return props.getProperty(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP);
+ }
+
+ /**
+ * Clustering properties.
+ */
+ public String getClusteringPlanStrategyClass() {
+ return props.getProperty(HoodieClusteringConfig.CLUSTERING_PLAN_STRATEGY_CLASS);
+ }
+
+ public String getClusteringExecutionStrategyClass() {
+ return props.getProperty(HoodieClusteringConfig.CLUSTERING_EXECUTION_STRATEGY_CLASS);
+ }
+
+ public long getClusteringMaxBytesInGroup() {
+ return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_BYTES_PER_GROUP));
+ }
+
+ public long getClusteringSmallFileLimit() {
+ return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_PLAN_SMALL_FILE_LIMIT));
+ }
+
+ public int getClusteringMaxNumGroups() {
+ return Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS));
+ }
+
+ public long getClusteringTargetFileMaxBytes() {
+ return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_FILE_MAX_BYTES));
+ }
+
+ public int getTargetPartitionsForClustering() {
+ return Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS));
+ }
+
+ public String getClusteringSortColumns() {
+ return props.getProperty(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY);
+ }
+
/**
* index properties.
*/
@@ -587,6 +695,10 @@ public boolean isMetricsOn() {
return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.METRICS_ON));
}
+ public boolean isExecutorMetricsEnabled() {
+ return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.ENABLE_EXECUTOR_METRICS, "false"));
+ }
+
public MetricsReporterType getMetricsReporterType() {
return MetricsReporterType.valueOf(props.getProperty(HoodieMetricsConfig.METRICS_REPORTER_TYPE));
}
@@ -721,6 +833,14 @@ public FileSystemViewStorageConfig getClientSpecifiedViewStorageConfig() {
return clientSpecifiedViewStorageConfig;
}
+ public HoodiePayloadConfig getPayloadConfig() {
+ return hoodiePayloadConfig;
+ }
+
+ public HoodieMetadataConfig getMetadataConfig() {
+ return metadataConfig;
+ }
+
/**
* Commit call back configs.
*/
@@ -768,18 +888,62 @@ public Long getMaxMemoryPerPartitionMerge() {
return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP));
}
+ /**
+ * File listing metadata configs.
+ */
+ public boolean useFileListingMetadata() {
+ return metadataConfig.useFileListingMetadata();
+ }
+
+ public boolean getFileListingMetadataVerify() {
+ return metadataConfig.validateFileListingMetadata();
+ }
+
+ public int getMetadataInsertParallelism() {
+ return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_INSERT_PARALLELISM_PROP));
+ }
+
+ public int getMetadataCompactDeltaCommitMax() {
+ return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_COMPACT_NUM_DELTA_COMMITS_PROP));
+ }
+
+ public boolean isMetadataAsyncClean() {
+ return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ASYNC_CLEAN_PROP));
+ }
+
+ public int getMetadataMaxCommitsToKeep() {
+ return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP_PROP));
+ }
+
+ public int getMetadataMinCommitsToKeep() {
+ return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP_PROP));
+ }
+
+ public int getMetadataCleanerCommitsRetained() {
+ return Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED_PROP));
+ }
+
public static class Builder {
protected final Properties props = new Properties();
+ protected EngineType engineType = EngineType.SPARK;
private boolean isIndexConfigSet = false;
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
+ private boolean isClusteringConfigSet = false;
private boolean isMetricsConfigSet = false;
private boolean isBootstrapConfigSet = false;
private boolean isMemoryConfigSet = false;
private boolean isViewConfigSet = false;
private boolean isConsistencyGuardSet = false;
private boolean isCallbackConfigSet = false;
+ private boolean isPayloadConfigSet = false;
+ private boolean isMetadataConfigSet = false;
+
+ public Builder withEngineType(EngineType engineType) {
+ this.engineType = engineType;
+ return this;
+ }
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
@@ -822,6 +986,21 @@ public Builder forTable(String tableName) {
return this;
}
+ public Builder withPreCombineField(String preCombineField) {
+ props.setProperty(PRECOMBINE_FIELD_PROP, preCombineField);
+ return this;
+ }
+
+ public Builder withWritePayLoad(String payload) {
+ props.setProperty(WRITE_PAYLOAD_CLASS, payload);
+ return this;
+ }
+
+ public Builder withKeyGenerator(String keyGeneratorClass) {
+ props.setProperty(KEYGENERATOR_CLASS_PROP, keyGeneratorClass);
+ return this;
+ }
+
public Builder withTimelineLayoutVersion(int version) {
props.setProperty(TIMELINE_LAYOUT_VERSION, String.valueOf(version));
return this;
@@ -897,6 +1076,12 @@ public Builder withCompactionConfig(HoodieCompactionConfig compactionConfig) {
return this;
}
+ public Builder withClusteringConfig(HoodieClusteringConfig clusteringConfig) {
+ props.putAll(clusteringConfig.getProps());
+ isClusteringConfigSet = true;
+ return this;
+ }
+
public Builder withMetricsConfig(HoodieMetricsConfig metricsConfig) {
props.putAll(metricsConfig.getProps());
isMetricsConfigSet = true;
@@ -915,13 +1100,20 @@ public Builder withBootstrapConfig(HoodieBootstrapConfig bootstrapConfig) {
return this;
}
- public Builder withAutoCommit(boolean autoCommit) {
- props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
+ public Builder withPayloadConfig(HoodiePayloadConfig payloadConfig) {
+ props.putAll(payloadConfig.getProps());
+ isPayloadConfigSet = true;
+ return this;
+ }
+
+ public Builder withMetadataConfig(HoodieMetadataConfig metadataConfig) {
+ props.putAll(metadataConfig.getProps());
+ isMetadataConfigSet = true;
return this;
}
- public Builder withAssumeDatePartitioning(boolean assumeDatePartitioning) {
- props.setProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP, String.valueOf(assumeDatePartitioning));
+ public Builder withAutoCommit(boolean autoCommit) {
+ props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
return this;
}
@@ -983,6 +1175,11 @@ public Builder withExternalSchemaTrasformation(boolean enabled) {
return this;
}
+ public Builder withMergeDataValidationCheckEnabled(boolean enabled) {
+ props.setProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED, String.valueOf(enabled));
+ return this;
+ }
+
public Builder withProperties(Properties properties) {
this.props.putAll(properties);
return this;
@@ -995,8 +1192,13 @@ protected void setDefaults() {
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
+
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
+ setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_CLASS_PROP),
+ KEYGENERATOR_CLASS_PROP, DEFAULT_KEYGENERATOR_CLASS);
+ setDefaultOnCondition(props, !props.containsKey(WRITE_PAYLOAD_CLASS),
+ WRITE_PAYLOAD_CLASS, DEFAULT_WRITE_PAYLOAD_CLASS);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS,
DEFAULT_ROLLBACK_USING_MARKERS);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
@@ -1011,8 +1213,6 @@ protected void setDefaults() {
DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
DEFAULT_HOODIE_AUTO_COMMIT);
- setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP),
- HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP), HOODIE_WRITE_STATUS_CLASS_PROP,
DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM,
@@ -1032,12 +1232,16 @@ protected void setDefaults() {
setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
+ setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
+ MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
// Make sure the props is propagated
- setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());
+ setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build());
setDefaultOnCondition(props, !isStorageConfigSet, HoodieStorageConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(props).build());
+ setDefaultOnCondition(props, !isClusteringConfigSet,
+ HoodieClusteringConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isBootstrapConfigSet,
HoodieBootstrapConfig.newBuilder().fromProperties(props).build());
@@ -1048,6 +1252,10 @@ protected void setDefaults() {
ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isCallbackConfigSet,
HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
+ setDefaultOnCondition(props, !isPayloadConfigSet,
+ HoodiePayloadConfig.newBuilder().fromProperties(props).build());
+ setDefaultOnCondition(props, !isMetadataConfigSet,
+ HoodieMetadataConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !props.containsKey(EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION),
EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION, DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION);
@@ -1066,7 +1274,7 @@ public HoodieWriteConfig build() {
setDefaults();
validate();
// Build WriteConfig at the end
- HoodieWriteConfig config = new HoodieWriteConfig(props);
+ HoodieWriteConfig config = new HoodieWriteConfig(engineType, props);
return config;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java
new file mode 100644
index 0000000000000..bb6aaa24777cf
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieClusteringException extends HoodieException {
+
+ public HoodieClusteringException(String msg) {
+ super(msg);
+ }
+
+ public HoodieClusteringException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
+
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java
new file mode 100644
index 0000000000000..68b62a5421706
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieClusteringUpdateException extends HoodieException {
+ public HoodieClusteringUpdateException(String msg) {
+ super(msg);
+ }
+
+ public HoodieClusteringUpdateException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
index 8af72f351fff0..5e1f832b7f239 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
@@ -19,7 +19,7 @@
package org.apache.hudi.execution;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.TaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
@@ -71,7 +71,7 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
public void consumeOneRecord(HoodieInsertValueGenResult payload) {
final HoodieRecord insertPayload = payload.record;
String partitionPath = insertPayload.getPartitionPath();
- HoodieWriteHandle handle = handles.get(partitionPath);
+ HoodieWriteHandle,?,?,?> handle = handles.get(partitionPath);
if (handle == null) {
// If the records are sorted, this means that we encounter a new partition path
// and the records for the previous partition path are all written,
@@ -87,7 +87,7 @@ public void consumeOneRecord(HoodieInsertValueGenResult payload) {
if (!handle.canWrite(payload.record)) {
// Handle is full. Close the handle and add the WriteStatus
- statuses.add(handle.close());
+ statuses.addAll(handle.close());
// Open new handle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
@@ -108,8 +108,8 @@ public List getResult() {
}
private void closeOpenHandles() {
- for (HoodieWriteHandle handle : handles.values()) {
- statuses.add(handle.close());
+ for (HoodieWriteHandle,?,?,?> handle : handles.values()) {
+ statuses.addAll(handle.close());
}
handles.clear();
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index b435c68de5e2b..cf544814cdba6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -20,7 +20,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.client.common.TaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
index 6d04594cbab63..2e1915ff20431 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
@@ -21,7 +21,7 @@
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
-import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index ad7807b707dcb..55be8fc6598af 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -18,7 +18,7 @@
package org.apache.hudi.index;
-import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
index 5c54dce31cd60..b4c83c141b2bc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
@@ -18,7 +18,7 @@
package org.apache.hudi.io;
-import org.apache.hudi.client.common.TaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
index 67ebadb2dd300..3a04c061a51a3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
@@ -18,7 +18,7 @@
package org.apache.hudi.io;
-import org.apache.hudi.client.common.TaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 0c590fe8818c4..c6ea7bab2ca7f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -20,8 +20,9 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.TaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieKey;
@@ -30,9 +31,9 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -42,6 +43,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
@@ -61,6 +63,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
* IO Operation to append data onto an existing file.
@@ -69,48 +72,49 @@ public class HoodieAppendHandle extends
private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
- private static AtomicLong recordIndex = new AtomicLong(1);
+ private static final AtomicLong RECORD_COUNTER = new AtomicLong(1);
+
private final String fileId;
// Buffer for holding records in memory before they are flushed to disk
- private List recordList = new ArrayList<>();
+ private final List recordList = new ArrayList<>();
// Buffer for holding records (to be deleted) in memory before they are flushed to disk
- private List keysToDelete = new ArrayList<>();
+ private final List keysToDelete = new ArrayList<>();
+ // Incoming records to be written to logs.
+ private final Iterator> recordItr;
+ // Writer to log into the file group's latest slice.
+ private Writer writer;
- private Iterator> recordItr;
+ private final List statuses;
// Total number of records written during an append
private long recordsWritten = 0;
// Total number of records deleted during an append
private long recordsDeleted = 0;
// Total number of records updated during an append
private long updatedRecordsWritten = 0;
+ // Total number of new records inserted into the delta file
+ private long insertRecordsWritten = 0;
+
// Average record size for a HoodieRecord. This size is updated at the end of every log block flushed to disk
private long averageRecordSize = 0;
- private HoodieLogFile currentLogFile;
- private Writer writer;
// Flag used to initialize some metadata
private boolean doInit = true;
// Total number of bytes written during this append phase (an estimation)
private long estimatedNumberOfBytesWritten;
- // Total number of bytes written to file
- private long sizeInBytes = 0;
// Number of records that must be written to meet the max block size for a log block
private int numberOfRecords = 0;
// Max block size to limit to for a log block
- private int maxBlockSize = config.getLogFileDataBlockMaxSize();
+ private final int maxBlockSize = config.getLogFileDataBlockMaxSize();
// Header metadata for a log block
- private Map header = new HashMap<>();
- // Total number of new records inserted into the delta file
- private long insertRecordsWritten = 0;
-
+ private final Map header = new HashMap<>();
private SizeEstimator sizeEstimator;
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
String partitionPath, String fileId, Iterator> recordItr, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
- writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.recordItr = recordItr;
sizeEstimator = new DefaultSizeEstimator();
+ this.statuses = new ArrayList<>();
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
@@ -125,20 +129,32 @@ private void init(HoodieRecord record) {
Option fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
// Set the base commit time as the current instantTime for new inserts into log files
String baseInstantTime;
+ String baseFile = "";
+ List logFiles = new ArrayList<>();
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
+ baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+ logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
} else {
baseInstantTime = instantTime;
// This means there is no base data file, start appending to a new log file
fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
- LOG.info("New InsertHandle for partition :" + partitionPath);
+ LOG.info("New AppendHandle for partition :" + partitionPath);
}
- writeStatus.getStat().setPrevCommit(baseInstantTime);
+
+ // Prepare the first write status
+ writeStatus.setStat(new HoodieDeltaWriteStat());
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
- writeStatus.getStat().setPartitionPath(partitionPath);
- writeStatus.getStat().setFileId(fileId);
averageRecordSize = sizeEstimator.sizeEstimate(record);
+
+ HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat) writeStatus.getStat();
+ deltaWriteStat.setPrevCommit(baseInstantTime);
+ deltaWriteStat.setPartitionPath(partitionPath);
+ deltaWriteStat.setFileId(fileId);
+ deltaWriteStat.setBaseFile(baseFile);
+ deltaWriteStat.setLogFiles(logFiles);
+
try {
//save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime,
@@ -148,34 +164,29 @@ private void init(HoodieRecord record) {
// Since the actual log file written to can be different based on when rollover happens, we use the
// base file to denote some log appends happened on a slice. writeToken will still fence concurrent
// writers.
+ // https://issues.apache.org/jira/browse/HUDI-1517
createMarkerFile(partitionPath, FSUtils.makeDataFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()));
this.writer = createLogWriter(fileSlice, baseInstantTime);
- this.currentLogFile = writer.getLogFile();
- ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
- ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize());
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
+ instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + partitionPath, e);
}
- Path path = partitionPath.length() == 0 ? new Path(writer.getLogFile().getFileName())
- : new Path(partitionPath, writer.getLogFile().getFileName());
- writeStatus.getStat().setPath(path.toString());
doInit = false;
}
}
private Option getIndexedRecord(HoodieRecord hoodieRecord) {
- Option recordMetadata = hoodieRecord.getData().getMetadata();
+ Option