diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d8135d44135b0..3fc5a2df38985 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -131,6 +131,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled"; private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false"; + // Allow duplicates with inserts while merging with existing records + private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = "hoodie.merge.allow.duplicate.on.inserts"; + private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = "false"; + /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. @@ -330,6 +334,10 @@ public boolean isMergeDataValidationCheckEnabled() { return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED)); } + public boolean allowDuplicateInserts() { + return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS)); + } + public EngineType getEngineType() { return engineType; } @@ -1180,6 +1188,11 @@ public Builder withMergeDataValidationCheckEnabled(boolean enabled) { return this; } + public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles) { + props.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS, String.valueOf(routeInsertsToNewFiles)); + return this; + } + public Builder withProperties(Properties properties) { this.props.putAll(properties); return this; @@ -1234,6 +1247,8 @@ protected void setDefaults() { BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED), MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED); + setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_ON_INSERTS), + MERGE_ALLOW_DUPLICATE_ON_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 4f5b82a67d46d..d5df2cb226665 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -58,17 +58,45 @@ import java.util.Set; @SuppressWarnings("Duplicates") +/** + * Handle to merge incoming records to those in storage. + *

+ * Simplified Logic: + * For every existing record + * Check if there is a new record coming in. If yes, merge two records and write to file + * else write the record as is + * For all pending records from incoming batch, write to file. + * + * Illustration with simple data. + * Incoming data: + * rec1_2, rec4_2, rec5_1, rec6_1 + * Existing data: + * rec1_1, rec2_1, rec3_1, rec4_1 + * + * For every existing record, merge w/ incoming if requried and write to storage. + * => rec1_1 and rec1_2 is merged to write rec1_2 to storage + * => rec2_1 is written as is + * => rec3_1 is written as is + * => rec4_2 and rec4_1 is merged to write rec4_2 to storage + * Write all pending records from incoming set to storage + * => rec5_1 and rec6_1 + * + * Final snapshot in storage + * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1 + * + *

+ */ public class HoodieMergeHandle extends HoodieWriteHandle { private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class); protected Map> keyToNewRecords; protected Set writtenRecordKeys; - private HoodieFileWriter fileWriter; + protected HoodieFileWriter fileWriter; - private Path newFilePath; + protected Path newFilePath; private Path oldFilePath; - private long recordsWritten = 0; + protected long recordsWritten = 0; private long recordsDeleted = 0; private long updatedRecordsWritten = 0; protected long insertRecordsWritten = 0; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java new file mode 100644 index 0000000000000..ea56689b5364f --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.io.HoodieMergeHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.generic.GenericRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#allowDuplicateInserts()}} + * is set, this handle will be used instead of {@link HoodieMergeHandle}. + * + * Simplified Logic: + * For every existing record + * Write the record as is + * For all incoming records, write to file as is. + * + * Illustration with simple data. + * Incoming data: + * rec1_2, rec4_2, rec5_1, rec6_1 + * Existing data: + * rec1_1, rec2_1, rec3_1, rec4_1 + * + * For every existing record, write to storage as is. + * => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage + * Write all records from incoming set to storage + * => rec1_2, rec4_2, rec5_1 and rec6_1 + * + * Final snapshot in storage + * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1 + * + * Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not + * happen and every batch should have new records to be inserted. Above example is for illustration purposes only. + */ +public class HoodieConcatHandle extends HoodieMergeHandle { + + private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class); + + public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator recordItr, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); + } + + public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier); + } + + /** + * Write old record as is w/o merging with incoming record. + */ + @Override + public void write(GenericRecord oldRecord) { + String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + try { + fileWriter.writeAvro(key, oldRecord); + } catch (IOException | RuntimeException e) { + String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", + key, getOldFilePath(), newFilePath, writerSchemaWithMetafields.toString(true)); + LOG.debug("Old record is " + oldRecord); + throw new HoodieUpsertException(errMsg, e); + } + recordsWritten++; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java index a56710bfb736d..7700e95d1d707 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.collection.Pair; import java.io.Serializable; @@ -41,11 +42,21 @@ public class WorkloadProfile implements Serializable { */ protected final WorkloadStat globalStat; + /** + * Write operation type. + */ + private WriteOperationType operationType; + public WorkloadProfile(Pair, WorkloadStat> profile) { this.partitionPathStatMap = profile.getLeft(); this.globalStat = profile.getRight(); } + public WorkloadProfile(Pair, WorkloadStat> profile, WriteOperationType operationType) { + this(profile); + this.operationType = operationType; + } + public WorkloadStat getGlobalStat() { return globalStat; } @@ -62,11 +73,16 @@ public WorkloadStat getWorkloadStat(String partitionPath) { return partitionPathStatMap.get(partitionPath); } + public WriteOperationType getOperationType() { + return operationType; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("WorkloadProfile {"); sb.append("globalStat=").append(globalStat).append(", "); - sb.append("partitionStat=").append(partitionPathStatMap); + sb.append("partitionStat=").append(partitionPathStatMap).append(", "); + sb.append("operationType=").append(operationType); sb.append('}'); return sb.toString(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 2fabbbfbc9be6..5a4d79c7e63e8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -43,6 +43,7 @@ import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.io.storage.HoodieConcatHandle; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; @@ -119,7 +120,7 @@ public HoodieWriteMetadata> execute(JavaRDD WorkloadProfile profile = null; if (isWorkloadProfileNeeded()) { context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile"); - profile = new WorkloadProfile(buildProfile(inputRecordsRDD)); + profile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType); LOG.info("Workload profile :" + profile); saveWorkloadProfileMetadataToInflight(profile, instantTime); } @@ -320,6 +321,8 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> recordItr) { if (table.requireSortedRecords()) { return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier); + } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) { + return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); } else { return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 000cfc7071c0d..ee153c8468ecf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -190,7 +190,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) for (SmallFile smallFile : smallFiles) { long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts); - if (recordsToAppend > 0) { + if (recordsToAppend > 0 && totalUnassignedInserts > 0) { // create a new bucket or re-use an existing bucket int bucket; if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 48a4d12c08df0..b4a392e6478bf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -79,7 +79,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; @@ -128,6 +130,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } }; + private static Stream configParams() { + return Arrays.stream(new Boolean[][] {{true},{false}}).map(Arguments::of); + } + private HoodieTestTable testTable; @BeforeEach @@ -451,7 +457,62 @@ private void testUpsertsInternal(HoodieWriteConfig config, } /** - * Tesst deletion of records. + * Test Insert API for HoodieConcatHandle. + */ + @Test + public void testInsertsWithHoodieConcatHandle() throws Exception { + testHoodieConcatHandle(getConfig(), false); + } + + /** + * Test InsertPrepped API for HoodieConcatHandle. + */ + @Test + public void testInsertsPreppedWithHoodieConcatHandle() throws Exception { + testHoodieConcatHandle(getConfig(), true); + } + + /** + * Test one of HoodieConcatHandle w/ {@link AbstractHoodieWriteClient#insert(Object, String)} API. + * + * @param config Write Config + * @throws Exception in case of error + */ + private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped) + throws Exception { + // Force using older timeline layout + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion( + VERSION_0).build(); + + HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), + metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), + metaClient.getTableConfig().getPayloadClass(), VERSION_0); + SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); + + // Write 1 (only inserts) + String newCommitTime = "001"; + String initCommitTime = "000"; + int numRecords = 200; + insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, SparkRDDWriteClient::insert, + isPrepped, true, numRecords); + + // Write 2 (updates) + String prevCommitTime = newCommitTime; + newCommitTime = "004"; + numRecords = 100; + String commitTimeBetweenPrevAndNew = "002"; + + final Function2, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPrepped, hoodieWriteConfig, dataGen::generateUniqueUpdates); + + writeBatch(client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, + numRecords, recordGenFunction, SparkRDDWriteClient::insert, true, numRecords, 300, + 2); + } + + /** + * Tests deletion of records. */ @Test public void testDeletes() throws Exception { @@ -877,13 +938,13 @@ public void testSmallInsertHandlingForUpserts() throws Exception { /** * Test scenario of new file-group getting added during insert(). */ - @Test - public void testSmallInsertHandlingForInserts() throws Exception { - + @ParameterizedTest + @MethodSource("configParams") + public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts) throws Exception { final String testPartitionPath = "2016/09/26"; final int insertSplitLimit = 100; // setup the small file handling params - HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max + HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config, false); @@ -894,10 +955,8 @@ public void testSmallInsertHandlingForInserts() throws Exception { Set keys1 = recordsToRecordKeySet(inserts1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses = client.insert(insertRecordsRDD1, commitTime1).collect(); - assertNoWriteErrors(statuses); assertPartitionMetadata(new String[] {testPartitionPath}, fs); - assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, @@ -912,14 +971,13 @@ public void testSmallInsertHandlingForInserts() throws Exception { JavaRDD insertRecordsRDD2 = jsc.parallelize(inserts2, 1); statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); assertNoWriteErrors(statuses); - assertEquals(1, statuses.size(), "Just 1 file needs to be updated."); assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded"); assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); + Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(), "file should contain 140 records"); - List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); @@ -933,11 +991,15 @@ public void testSmallInsertHandlingForInserts() throws Exception { // Lots of inserts such that file1 is updated and expanded, a new file2 is created. String commitTime3 = "003"; client.startCommitWithTime(commitTime3); - List insert3 = dataGen.generateInserts(commitTime3, 200); - JavaRDD insertRecordsRDD3 = jsc.parallelize(insert3, 1); + List inserts3 = dataGen.generateInserts(commitTime3, 200); + JavaRDD insertRecordsRDD3 = jsc.parallelize(inserts3, 1); statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); + assertEquals(340, + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size() + + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), + "file should contain 340 records"); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTable table = getHoodieTable(metaClient, config); @@ -948,11 +1010,9 @@ public void testSmallInsertHandlingForInserts() throws Exception { int totalInserts = 0; for (HoodieBaseFile file : files) { assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); - records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); - totalInserts += records.size(); + totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size(); } - assertEquals(totalInserts, inserts1.size() + inserts2.size() + insert3.size(), - "Total number of records must add up"); + assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up"); } /** @@ -1040,7 +1100,7 @@ public void testClusteringWithSortColumns() throws Exception { .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); testClustering(clusteringConfig); } - + private void testClustering(HoodieClusteringConfig clusteringConfig) throws Exception { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10); @@ -1642,22 +1702,45 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) { * Build Hoodie Write Config for small data file sizes. */ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) { - return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150)); + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, false); + } + + /** + * Build Hoodie Write Config for small data file sizes. + */ + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean mergeAllowDuplicateInserts) { + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), mergeAllowDuplicateInserts); } /** * Build Hoodie Write Config for specified small file sizes. */ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize) { + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, smallFileSize, false); + } + + /** + * Build Hoodie Write Config for specified small file sizes. + */ + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean mergeAllowDuplicateInserts) { String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA; - return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize); + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts); } private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) { - return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, new Properties()); + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false); + } + + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) { + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts, new Properties()); } - + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, Properties props) { + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false, props); + } + + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts, + Properties props) { HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr); return builder .withCompactionConfig( @@ -1668,6 +1751,7 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String HoodieStorageConfig.newBuilder() .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)) .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) + .withMergeAllowDuplicateOnInserts(mergeAllowDuplicateInserts) .withProps(props) .build(); }