From f7d52e77c6cd6972db9e85dabbc2143591b774a1 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Fri, 2 Oct 2020 22:51:33 +0800 Subject: [PATCH 01/10] [HUDI-1234] Insert new records regardless of small file when using insert operation --- .../apache/hudi/table/WorkloadProfile.java | 18 ++++- .../commit/BaseSparkCommitActionExecutor.java | 2 +- .../action/commit/UpsertPartitioner.java | 16 +++-- .../TestHoodieClientOnCopyOnWriteStorage.java | 32 +++------ .../action/commit/TestUpsertPartitioner.java | 71 +++++++++++++++++-- .../hudi/common/model/WriteOperationType.java | 3 +- .../command/ITTestHoodieSyncCommand.java | 2 +- 7 files changed, 109 insertions(+), 35 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java index a56710bfb736d..7700e95d1d707 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.collection.Pair; import java.io.Serializable; @@ -41,11 +42,21 @@ public class WorkloadProfile implements Serializable { */ protected final WorkloadStat globalStat; + /** + * Write operation type. + */ + private WriteOperationType operationType; + public WorkloadProfile(Pair, WorkloadStat> profile) { this.partitionPathStatMap = profile.getLeft(); this.globalStat = profile.getRight(); } + public WorkloadProfile(Pair, WorkloadStat> profile, WriteOperationType operationType) { + this(profile); + this.operationType = operationType; + } + public WorkloadStat getGlobalStat() { return globalStat; } @@ -62,11 +73,16 @@ public WorkloadStat getWorkloadStat(String partitionPath) { return partitionPathStatMap.get(partitionPath); } + public WriteOperationType getOperationType() { + return operationType; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("WorkloadProfile {"); sb.append("globalStat=").append(globalStat).append(", "); - sb.append("partitionStat=").append(partitionPathStatMap); + sb.append("partitionStat=").append(partitionPathStatMap).append(", "); + sb.append("operationType=").append(operationType); sb.append('}'); return sb.toString(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 2fabbbfbc9be6..3e4cf7f208ad1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -119,7 +119,7 @@ public HoodieWriteMetadata> execute(JavaRDD WorkloadProfile profile = null; if (isWorkloadProfileNeeded()) { context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile"); - profile = new WorkloadProfile(buildProfile(inputRecordsRDD)); + profile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType); LOG.info("Workload profile :" + profile); saveWorkloadProfileMetadataToInflight(profile, instantTime); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 000cfc7071c0d..d5d2ed2dc7f64 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.NumericUtils; @@ -54,6 +55,8 @@ import scala.Tuple2; +import static org.apache.hudi.common.model.WriteOperationType.isChangingRecords; + /** * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition). */ @@ -112,16 +115,17 @@ private void assignUpdates(WorkloadProfile profile) { for (Map.Entry partitionStat : partitionStatEntries) { for (Map.Entry> updateLocEntry : partitionStat.getValue().getUpdateLocationToCount().entrySet()) { - addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey()); + addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey(), profile.getOperationType()); } } } - private int addUpdateBucket(String partitionPath, String fileIdHint) { + private int addUpdateBucket(String partitionPath, String fileIdHint, WriteOperationType operationType) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.UPDATE; + bucketInfo.bucketType = operationType == null || isChangingRecords(operationType) + ? BucketType.UPDATE : BucketType.INSERT; bucketInfo.fileIdPrefix = fileIdHint; bucketInfo.partitionPath = partitionPath; bucketInfoMap.put(totalBuckets, bucketInfo); @@ -193,11 +197,13 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) if (recordsToAppend > 0) { // create a new bucket or re-use an existing bucket int bucket; - if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { + // insert new records regardless of small file when using insert operation + if (isChangingRecords(profile.getOperationType()) + && updateLocationToBucket.containsKey(smallFile.location.getFileId())) { bucket = updateLocationToBucket.get(smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); } else { - bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); + bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId(), profile.getOperationType()); LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } bucketNumbers.add(bucket); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 48a4d12c08df0..b321890206c3d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -879,7 +879,6 @@ public void testSmallInsertHandlingForUpserts() throws Exception { */ @Test public void testSmallInsertHandlingForInserts() throws Exception { - final String testPartitionPath = "2016/09/26"; final int insertSplitLimit = 100; // setup the small file handling params @@ -887,24 +886,19 @@ public void testSmallInsertHandlingForInserts() throws Exception { dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config, false); - // Inserts => will write file1 String commitTime1 = "001"; client.startCommitWithTime(commitTime1); List inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb Set keys1 = recordsToRecordKeySet(inserts1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses = client.insert(insertRecordsRDD1, commitTime1).collect(); - assertNoWriteErrors(statuses); - assertPartitionMetadata(new String[] {testPartitionPath}, fs); - + assertPartitionMetadata(new String[]{testPartitionPath}, fs); assertEquals(1, statuses.size(), "Just 1 file needs to be added."); - String file1 = statuses.get(0).getFileId(); assertEquals(100, readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); - // Second, set of Inserts should just expand file1 String commitTime2 = "002"; client.startCommitWithTime(commitTime2); List inserts2 = dataGen.generateInserts(commitTime2, 40); @@ -912,14 +906,10 @@ public void testSmallInsertHandlingForInserts() throws Exception { JavaRDD insertRecordsRDD2 = jsc.parallelize(inserts2, 1); statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); assertNoWriteErrors(statuses); - - assertEquals(1, statuses.size(), "Just 1 file needs to be updated."); - assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded"); - assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); + assertEquals(1, statuses.size(), "Just 1 file needs to be added."); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); - assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(), - "file should contain 140 records"); - + assertEquals(40, readRowKeysFromParquet(hadoopConf, newFile).size(), + "file should contain 40 records"); List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); @@ -930,7 +920,6 @@ public void testSmallInsertHandlingForInserts() throws Exception { "key expected to be part of commit 1 or commit2"); } - // Lots of inserts such that file1 is updated and expanded, a new file2 is created. String commitTime3 = "003"; client.startCommitWithTime(commitTime3); List insert3 = dataGen.generateInserts(commitTime3, 200); @@ -938,21 +927,22 @@ public void testSmallInsertHandlingForInserts() throws Exception { statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); + assertEquals(200, + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size() + + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), + "file should contain 200 records"); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTable table = getHoodieTable(metaClient, config); List files = table.getBaseFileOnlyView() .getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList()); - assertEquals(2, files.size(), "Total of 2 valid data files"); + assertEquals(3, files.size(), "Total of 3 valid data files"); int totalInserts = 0; for (HoodieBaseFile file : files) { - assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); - records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); - totalInserts += records.size(); + totalInserts += readRowKeysFromParquet(hadoopConf, new Path(file.getPath())).size(); } - assertEquals(totalInserts, inserts1.size() + inserts2.size() + insert3.size(), - "Total number of records must add up"); + assertEquals(totalInserts, inserts1.size() + insert3.size(), "Total number of records must add up"); } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index f40a97c0bbadc..a1abcb97b1df9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -37,8 +38,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.avro.Schema; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -90,7 +89,7 @@ private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts List records = new ArrayList<>(); records.addAll(insertRecords); records.addAll(updateRecords); - WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(records))); + WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(records)), WriteOperationType.UPSERT); UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config); assertEquals(0, partitioner.getPartition( new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation()))), @@ -98,6 +97,25 @@ private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts return partitioner; } + private UpsertPartitioner getInsertPartitioner(int smallFileSize, int numInserts, int fileSize, String testPartitionPath, + boolean autoSplitInserts) throws Exception { + HoodieWriteConfig config = makeHoodieClientConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize) + .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) + .build(); + + FileCreateUtils.createCommit(basePath, "001"); + FileCreateUtils.createBaseFile(basePath, testPartitionPath, "001", "file1", fileSize); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + List insertRecords = dataGenerator.generateInserts("001", numInserts); + WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)), WriteOperationType.INSERT); + return new UpsertPartitioner(profile, context, table, config); + } + private static List setupHoodieInstants() { List instants = new ArrayList<>(); instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts1")); @@ -282,8 +300,8 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { "Bucket 2 is INSERT"); assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets"); - Double[] weights = { 0.5, 0.25, 0.25}; - Double[] cumulativeWeights = { 0.5, 0.75, 1.0}; + Double[] weights = {0.5, 0.25, 0.25}; + Double[] cumulativeWeights = {0.5, 0.75, 1.0}; assertInsertBuckets(weights, cumulativeWeights, insertBuckets); // Now with insert split size auto tuned @@ -303,6 +321,49 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { weights = new Double[] { 0.08, 0.42, 0.42, 0.08}; cumulativeWeights = new Double[] { 0.08, 0.5, 0.92, 1.0}; + /*======= + weights = new Double[] {0.08, 0.31, 0.31, 0.31}; + cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0}; */ + assertInsertBuckets(weights, cumulativeWeights, insertBuckets); + } + + @Test + public void testInsertPartitionerWithSmallInsertHandling() throws Exception { + final String testPartitionPath = "2016/09/26"; + // Inserts .. Check updates go together & inserts subsplit, after expanding smallest file + UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 * 1024, testPartitionPath, false); + List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + + assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions"); + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType, + "Bucket 0 is INSERT"); + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType, + "Bucket 1 is INSERT"); + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType, + "Bucket 2 is INSERT"); + assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets"); + + Double[] weights = {0.5, 0.25, 0.25}; + Double[] cumulativeWeights = {0.5, 0.75, 1.0}; + assertInsertBuckets(weights, cumulativeWeights, insertBuckets); + + // Now with insert split size auto tuned + partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024, testPartitionPath, true); + insertBuckets = partitioner.getInsertBuckets(testPartitionPath); + + assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions"); + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType, + "Bucket 0 is INSERT"); + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType, + "Bucket 1 is INSERT"); + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType, + "Bucket 2 is INSERT"); + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(3).bucketType, + "Bucket 3 is INSERT"); + assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets"); + + weights = new Double[] {0.08, 0.31, 0.31, 0.31}; + cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0}; assertInsertBuckets(weights, cumulativeWeights, insertBuckets); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index f237156360847..e7169d301a01c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model; +import java.io.Serializable; import org.apache.hudi.exception.HoodieException; import java.util.Locale; @@ -25,7 +26,7 @@ /** * The supported write operation types, used by commitMetadata. */ -public enum WriteOperationType { +public enum WriteOperationType implements Serializable { // directly insert INSERT("insert"), INSERT_PREPPED("insert_prepped"), diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java index a6a4c3ec4201e..411172a251b1e 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java @@ -52,7 +52,7 @@ public void testValidateSync() throws Exception { executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " --cmdfile " + SYNC_VALIDATE_COMMANDS, true); String expected = String.format("Count difference now is (count(%s) - count(%s) == %d. Catch up count is %d", - hiveTableName, hiveTableName2, 100, 200); + hiveTableName, hiveTableName2, 100, 100); assertTrue(result.getStderr().toString().contains(expected)); dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); From 7f742ee0bfdbbbd6bfb6a5e1bb52bfec11f6a753 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Thu, 15 Oct 2020 14:14:49 +0800 Subject: [PATCH 02/10] [HUDI-1234] Insert new records regardless of small file when using insert operation --- .../action/commit/UpsertPartitioner.java | 24 ++++++++--- .../TestHoodieClientOnCopyOnWriteStorage.java | 3 ++ .../action/commit/TestUpsertPartitioner.java | 40 ++++++++----------- .../command/ITTestHoodieSyncCommand.java | 1 + 4 files changed, 39 insertions(+), 29 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index d5d2ed2dc7f64..33e1c04335e24 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.NumericUtils; @@ -115,17 +114,28 @@ private void assignUpdates(WorkloadProfile profile) { for (Map.Entry partitionStat : partitionStatEntries) { for (Map.Entry> updateLocEntry : partitionStat.getValue().getUpdateLocationToCount().entrySet()) { - addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey(), profile.getOperationType()); + addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey()); } } } - private int addUpdateBucket(String partitionPath, String fileIdHint, WriteOperationType operationType) { + private int addUpdateBucket(String partitionPath, String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = operationType == null || isChangingRecords(operationType) - ? BucketType.UPDATE : BucketType.INSERT; + bucketInfo.bucketType = BucketType.UPDATE; + bucketInfo.fileIdPrefix = fileIdHint; + bucketInfo.partitionPath = partitionPath; + bucketInfoMap.put(totalBuckets, bucketInfo); + totalBuckets++; + return bucket; + } + + private int addInsertBucket(String partitionPath, String fileIdHint) { + int bucket = totalBuckets; + updateLocationToBucket.put(fileIdHint, bucket); + BucketInfo bucketInfo = new BucketInfo(); + bucketInfo.bucketType = BucketType.INSERT; bucketInfo.fileIdPrefix = fileIdHint; bucketInfo.partitionPath = partitionPath; bucketInfoMap.put(totalBuckets, bucketInfo); @@ -203,7 +213,9 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) bucket = updateLocationToBucket.get(smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); } else { - bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId(), profile.getOperationType()); + bucket = profile.getOperationType() == null || isChangingRecords(profile.getOperationType()) + ? addUpdateBucket(partitionPath, smallFile.location.getFileId()) + : addInsertBucket(partitionPath, smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } bucketNumbers.add(bucket); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index b321890206c3d..b3b8d1889ae14 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -895,6 +895,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { assertNoWriteErrors(statuses); assertPartitionMetadata(new String[]{testPartitionPath}, fs); assertEquals(1, statuses.size(), "Just 1 file needs to be added."); + String file1 = statuses.get(0).getFileId(); assertEquals(100, readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); @@ -907,6 +908,8 @@ public void testSmallInsertHandlingForInserts() throws Exception { statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); assertNoWriteErrors(statuses); assertEquals(1, statuses.size(), "Just 1 file needs to be added."); + assertEquals(file1 + "-0", statuses.get(0).getFileId(), "Small file should be added"); + assertEquals("null", statuses.get(0).getStat().getPrevCommit(), "Small file should be added"); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); assertEquals(40, readRowKeysFromParquet(hadoopConf, newFile).size(), "file should contain 40 records"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index a1abcb97b1df9..18cc2db89ac4a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -65,11 +65,11 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class); private static final Schema SCHEMA = getSchemaFromResource(TestUpsertPartitioner.class, "/exampleSchema.avsc"); - private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, + private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, int splitSize, String testPartitionPath, boolean autoSplitInserts) throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize) - .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) + .insertSplitSize(splitSize).autoTuneInsertSplits(autoSplitInserts).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) .build(); @@ -97,11 +97,11 @@ private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts return partitioner; } - private UpsertPartitioner getInsertPartitioner(int smallFileSize, int numInserts, int fileSize, String testPartitionPath, + private UpsertPartitioner getInsertPartitioner(int smallFileSize, int numInserts, int fileSize, int splitSize, String testPartitionPath, boolean autoSplitInserts) throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize) - .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) + .insertSplitSize(splitSize).autoTuneInsertSplits(autoSplitInserts).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) .build(); @@ -198,7 +198,7 @@ public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception { public void testUpsertPartitioner() throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts + Updates... Check all updates go together & inserts subsplit - UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false); + UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, 100, testPartitionPath, false); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets"); } @@ -288,7 +288,7 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding // smallest file - UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false); + UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, 100, testPartitionPath, false); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions"); @@ -305,7 +305,7 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { assertInsertBuckets(weights, cumulativeWeights, insertBuckets); // Now with insert split size auto tuned - partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true); + partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, 100, testPartitionPath, true); insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions"); @@ -331,16 +331,14 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { public void testInsertPartitionerWithSmallInsertHandling() throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts .. Check updates go together & inserts subsplit, after expanding smallest file - UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 * 1024, testPartitionPath, false); + UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 * 1024, 100, testPartitionPath, false); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions"); - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType, - "Bucket 0 is INSERT"); - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType, - "Bucket 1 is INSERT"); - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType, - "Bucket 2 is INSERT"); + for (int i = 0; i < partitioner.numPartitions(); i++) { + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType, + String.format("Bucket %d is INSERT", i)); + } assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets"); Double[] weights = {0.5, 0.25, 0.25}; @@ -348,18 +346,14 @@ public void testInsertPartitionerWithSmallInsertHandling() throws Exception { assertInsertBuckets(weights, cumulativeWeights, insertBuckets); // Now with insert split size auto tuned - partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024, testPartitionPath, true); + partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024, 100, testPartitionPath, true); insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions"); - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType, - "Bucket 0 is INSERT"); - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(1).bucketType, - "Bucket 1 is INSERT"); - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType, - "Bucket 2 is INSERT"); - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(3).bucketType, - "Bucket 3 is INSERT"); + for (int i = 0; i < partitioner.numPartitions(); i++) { + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType, + String.format("Bucket %d is INSERT", i)); + } assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets"); weights = new Double[] {0.08, 0.31, 0.31, 0.31}; diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java index 411172a251b1e..04c8dcacd7e7a 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java @@ -51,6 +51,7 @@ public void testValidateSync() throws Exception { TestExecStartResultCallback result = executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " --cmdfile " + SYNC_VALIDATE_COMMANDS, true); + // When using insert operation, the new inserts are created by the small file, which cause the count of new inserts is only 100. String expected = String.format("Count difference now is (count(%s) - count(%s) == %d. Catch up count is %d", hiveTableName, hiveTableName2, 100, 100); assertTrue(result.getStderr().toString().contains(expected)); From dde11ef3cbd27bda17c2ce1f680a110bfcf6e5c1 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Fri, 16 Oct 2020 00:29:52 +0800 Subject: [PATCH 03/10] [HUDI-1234] Insert new records regardless of small file when using insert operation --- .../action/commit/UpsertPartitioner.java | 30 +++++++------------ .../TestHoodieClientOnCopyOnWriteStorage.java | 11 +++---- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 33e1c04335e24..a6ffd132f5c28 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -131,16 +131,13 @@ private int addUpdateBucket(String partitionPath, String fileIdHint) { return bucket; } - private int addInsertBucket(String partitionPath, String fileIdHint) { - int bucket = totalBuckets; - updateLocationToBucket.put(fileIdHint, bucket); + private void addInsertBucket(String partitionPath) { BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.fileIdPrefix = fileIdHint; + bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); bucketInfo.partitionPath = partitionPath; bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; - return bucket; } /** @@ -212,11 +209,13 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) && updateLocationToBucket.containsKey(smallFile.location.getFileId())) { bucket = updateLocationToBucket.get(smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); - } else { - bucket = profile.getOperationType() == null || isChangingRecords(profile.getOperationType()) - ? addUpdateBucket(partitionPath, smallFile.location.getFileId()) - : addInsertBucket(partitionPath, smallFile.location.getFileId()); + } else if (profile.getOperationType() == null || isChangingRecords(profile.getOperationType())) { + bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); + } else { + bucket = totalBuckets; + addInsertBucket(partitionPath); + LOG.info("Assigning " + recordsToAppend + " inserts to new insert bucket " + bucket); } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); @@ -236,17 +235,8 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); - if (b < insertBuckets - 1) { - recordsPerBucket.add(insertRecordsPerBucket); - } else { - recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); - } - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.partitionPath = partitionPath; - bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); - bucketInfoMap.put(totalBuckets, bucketInfo); - totalBuckets++; + recordsPerBucket.add(totalUnassignedInserts / insertBuckets); + addInsertBucket(partitionPath); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index b3b8d1889ae14..c4f6fd2ced630 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -112,6 +112,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -908,7 +909,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); assertNoWriteErrors(statuses); assertEquals(1, statuses.size(), "Just 1 file needs to be added."); - assertEquals(file1 + "-0", statuses.get(0).getFileId(), "Small file should be added"); + assertNotNull(statuses.get(0).getFileId(), "Small file should be added"); assertEquals("null", statuses.get(0).getStat().getPrevCommit(), "Small file should be added"); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); assertEquals(40, readRowKeysFromParquet(hadoopConf, newFile).size(), @@ -925,8 +926,8 @@ public void testSmallInsertHandlingForInserts() throws Exception { String commitTime3 = "003"; client.startCommitWithTime(commitTime3); - List insert3 = dataGen.generateInserts(commitTime3, 200); - JavaRDD insertRecordsRDD3 = jsc.parallelize(insert3, 1); + List inserts3 = dataGen.generateInserts(commitTime3, 200); + JavaRDD insertRecordsRDD3 = jsc.parallelize(inserts3, 1); statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); @@ -939,13 +940,13 @@ public void testSmallInsertHandlingForInserts() throws Exception { HoodieTable table = getHoodieTable(metaClient, config); List files = table.getBaseFileOnlyView() .getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList()); - assertEquals(3, files.size(), "Total of 3 valid data files"); + assertEquals(4, files.size(), "Total of 4 valid data files"); int totalInserts = 0; for (HoodieBaseFile file : files) { totalInserts += readRowKeysFromParquet(hadoopConf, new Path(file.getPath())).size(); } - assertEquals(totalInserts, inserts1.size() + insert3.size(), "Total number of records must add up"); + assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up"); } /** From c466a22d69088104c17e30e68a4b8f824d5dfbf5 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 6 Jan 2021 23:35:03 -0500 Subject: [PATCH 04/10] Adding a config flag to route inserts to new files ignoring small file handling --- .../apache/hudi/config/HoodieWriteConfig.java | 15 +++ .../action/commit/UpsertPartitioner.java | 46 ++++--- .../TestHoodieClientOnCopyOnWriteStorage.java | 126 ++++++++++++------ .../action/commit/TestUpsertPartitioner.java | 86 +++++++----- 4 files changed, 185 insertions(+), 88 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d8135d44135b0..28d04ead5b59d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -131,6 +131,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled"; private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false"; + // Routes inserts to new files ignoring small file handling + private static final String ROUTE_INSERTS_TO_NEW_FILES = "hoodie.route.inserts.to.new.files"; + private static final String DEFAULT_ROUTE_INSERTS_TO_NEW_FILES = "false"; + /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. @@ -330,6 +334,10 @@ public boolean isMergeDataValidationCheckEnabled() { return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED)); } + public boolean isRouteInsertsToNewFiles() { + return Boolean.parseBoolean(props.getProperty(ROUTE_INSERTS_TO_NEW_FILES)); + } + public EngineType getEngineType() { return engineType; } @@ -1180,6 +1188,11 @@ public Builder withMergeDataValidationCheckEnabled(boolean enabled) { return this; } + public Builder withRouteInsertsToNewFiles(boolean routeInsertsToNewFiles) { + props.setProperty(ROUTE_INSERTS_TO_NEW_FILES, String.valueOf(routeInsertsToNewFiles)); + return this; + } + public Builder withProperties(Properties properties) { this.props.putAll(properties); return this; @@ -1234,6 +1247,8 @@ protected void setDefaults() { BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED), MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED); + setDefaultOnCondition(props, !props.containsKey(ROUTE_INSERTS_TO_NEW_FILES), + ROUTE_INSERTS_TO_NEW_FILES, DEFAULT_ROUTE_INSERTS_TO_NEW_FILES); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index a6ffd132f5c28..b8c2970f502c1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.NumericUtils; @@ -54,8 +55,6 @@ import scala.Tuple2; -import static org.apache.hudi.common.model.WriteOperationType.isChangingRecords; - /** * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition). */ @@ -201,21 +200,30 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) for (SmallFile smallFile : smallFiles) { long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts); - if (recordsToAppend > 0) { - // create a new bucket or re-use an existing bucket + if (recordsToAppend > 0 && totalUnassignedInserts > 0) { int bucket; - // insert new records regardless of small file when using insert operation - if (isChangingRecords(profile.getOperationType()) - && updateLocationToBucket.containsKey(smallFile.location.getFileId())) { - bucket = updateLocationToBucket.get(smallFile.location.getFileId()); - LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); - } else if (profile.getOperationType() == null || isChangingRecords(profile.getOperationType())) { - bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); - LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); - } else { - bucket = totalBuckets; - addInsertBucket(partitionPath); - LOG.info("Assigning " + recordsToAppend + " inserts to new insert bucket " + bucket); + if (config.isRouteInsertsToNewFiles()) { + // if insert operation, route inserts to new files regardless of small file handling. + if (WriteOperationType.isChangingRecords(profile.getOperationType()) + && updateLocationToBucket.containsKey(smallFile.location.getFileId())) { + bucket = updateLocationToBucket.get(smallFile.location.getFileId()); + LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); + } else if (profile.getOperationType() == null || WriteOperationType.isChangingRecords(profile.getOperationType())) { + bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); + LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); + } else { + bucket = totalBuckets; + addInsertBucket(partitionPath); + LOG.info("Assigning " + recordsToAppend + " inserts to new insert bucket " + bucket); + } + } else { // create a new bucket or re-use an existing bucket + if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { + bucket = updateLocationToBucket.get(smallFile.location.getFileId()); + LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); + } else { + bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); + LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); + } } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); @@ -235,7 +243,11 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); - recordsPerBucket.add(totalUnassignedInserts / insertBuckets); + if (b < insertBuckets - 1) { + recordsPerBucket.add(insertRecordsPerBucket); + } else { + recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); + } addInsertBucket(partitionPath); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index c4f6fd2ced630..f316d6585b227 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -18,13 +18,11 @@ package org.apache.hudi.client; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroupId; @@ -32,7 +30,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -62,8 +59,8 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -71,6 +68,10 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -79,7 +80,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; @@ -91,11 +94,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.Properties; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0; import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths; @@ -129,6 +132,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } }; + public static Stream configParams() { + return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of); + } + private HoodieTestTable testTable; @BeforeEach @@ -701,7 +708,7 @@ private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set, List> insertBatchRecords(SparkRDDWriteClient client, String commitTime, - Integer recordNum, int expectStatueSize) { + Integer recordNum, int expectStatueSize) { client.startCommitWithTime(commitTime); List inserts1 = dataGen.generateInserts(commitTime, recordNum); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); @@ -734,7 +741,7 @@ public void testUpdateRejectForClustering() throws IOException { String commitTime2 = "002"; List> firstInsertFileSlicesList = table.getFileSystemView().getAllFileGroups(testPartitionPath) .map(fileGroup -> fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList()); - List[] fileSlices = (List[])firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]); + List[] fileSlices = (List[]) firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]); createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices); // 3. insert one record with no updating reject exception, and not merge the small file, just generate a new file group @@ -752,7 +759,8 @@ public void testUpdateRejectForClustering() throws IOException { String assertMsg = String.format("Not allowed to update the clustering files in partition: %s " + "For pending clustering operations, we are not going to support update for now.", testPartitionPath); assertThrows(HoodieUpsertException.class, () -> { - writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1), commitTime3).collect(); }, assertMsg); + writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1), commitTime3).collect(); + }, assertMsg); // 5. insert one record with no updating reject exception, will merge the small file String commitTime5 = "005"; @@ -878,15 +886,17 @@ public void testSmallInsertHandlingForUpserts() throws Exception { /** * Test scenario of new file-group getting added during insert(). */ - @Test - public void testSmallInsertHandlingForInserts() throws Exception { + @ParameterizedTest + @MethodSource("configParams") + public void testSmallInsertHandlingForInserts(boolean routeInsertsToNewFiles) throws Exception { final String testPartitionPath = "2016/09/26"; final int insertSplitLimit = 100; // setup the small file handling params - HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max + HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, routeInsertsToNewFiles); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config, false); + // Inserts => will write file1 String commitTime1 = "001"; client.startCommitWithTime(commitTime1); List inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb @@ -894,13 +904,14 @@ public void testSmallInsertHandlingForInserts() throws Exception { JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses = client.insert(insertRecordsRDD1, commitTime1).collect(); assertNoWriteErrors(statuses); - assertPartitionMetadata(new String[]{testPartitionPath}, fs); + assertPartitionMetadata(new String[] {testPartitionPath}, fs); assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); + // Second, set of Inserts should just expand file1 String commitTime2 = "002"; client.startCommitWithTime(commitTime2); List inserts2 = dataGen.generateInserts(commitTime2, 40); @@ -908,12 +919,20 @@ public void testSmallInsertHandlingForInserts() throws Exception { JavaRDD insertRecordsRDD2 = jsc.parallelize(inserts2, 1); statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); assertNoWriteErrors(statuses); - assertEquals(1, statuses.size(), "Just 1 file needs to be added."); - assertNotNull(statuses.get(0).getFileId(), "Small file should be added"); - assertEquals("null", statuses.get(0).getStat().getPrevCommit(), "Small file should be added"); + assertEquals(1, statuses.size(), "Just 1 file needs to be updated."); + if (!routeInsertsToNewFiles) { + assertNotNull(statuses.get(0).getFileId(), "Existing file should be expanded"); + assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); + } else { + // TODO assertEquals(1, statuses.size(), "Just 1 file needs to be added."); + assertNotNull(statuses.get(0).getFileId(), "Small file should be added"); + assertEquals("null", statuses.get(0).getStat().getPrevCommit(), "Small file should be added"); + } Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); - assertEquals(40, readRowKeysFromParquet(hadoopConf, newFile).size(), - "file should contain 40 records"); + assertEquals(routeInsertsToNewFiles ? 40 : 140, readRowKeysFromParquet(hadoopConf, newFile).size(), + "file should contain 140 records"); + // TODO: assertEquals(40, readRowKeysFromParquet(hadoopConf, newFile).size(), + // "file should contain 40 records"); List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); @@ -924,6 +943,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { "key expected to be part of commit 1 or commit2"); } + // Lots of inserts such that file1 is updated and expanded, a new file2 is created. String commitTime3 = "003"; client.startCommitWithTime(commitTime3); List inserts3 = dataGen.generateInserts(commitTime3, 200); @@ -931,20 +951,28 @@ public void testSmallInsertHandlingForInserts() throws Exception { statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); - assertEquals(200, - readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size() - + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), - "file should contain 200 records"); + if (routeInsertsToNewFiles) { + assertEquals(200, + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size() + + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), + "file should contain 200 records"); + } HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTable table = getHoodieTable(metaClient, config); List files = table.getBaseFileOnlyView() .getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList()); - assertEquals(4, files.size(), "Total of 4 valid data files"); + assertEquals(routeInsertsToNewFiles ? 4 : 2, files.size(), "Total of " + (routeInsertsToNewFiles ? 4 : 2) + " valid data files"); + // TODO assertEquals(4, files.size(), "Total of 4 valid data files"); int totalInserts = 0; for (HoodieBaseFile file : files) { - totalInserts += readRowKeysFromParquet(hadoopConf, new Path(file.getPath())).size(); + if (!routeInsertsToNewFiles) { + assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); + totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size(); + } else { + totalInserts += readRowKeysFromParquet(hadoopConf, new Path(file.getPath())).size(); + } } assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up"); } @@ -1034,7 +1062,7 @@ public void testClusteringWithSortColumns() throws Exception { .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); testClustering(clusteringConfig); } - + private void testClustering(HoodieClusteringConfig clusteringConfig) throws Exception { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10); @@ -1102,10 +1130,10 @@ public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() thr } /** - * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records. - * 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of records. + * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records. + * 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of records. * - * Verify that all records in step1 are overwritten + * Verify that all records in step1 are overwritten */ private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount) throws Exception { final String testPartitionPath = "americas"; @@ -1182,12 +1210,10 @@ private Set deletePartitionWithCommit(SparkRDDWriteClient client, String } /** - * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition. - * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition. - * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition. - * 4) delete first partition and check result. - * 5) delete second and third partition and check result. - * + * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition. + * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition. + * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition. + * 4) delete first partition and check result. 5) delete second and third partition and check result. */ private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount) throws Exception { HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false); @@ -1636,22 +1662,45 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) { * Build Hoodie Write Config for small data file sizes. */ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) { - return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150)); + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, false); + } + + /** + * Build Hoodie Write Config for small data file sizes. + */ + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean routeInsertsToNewFiles) { + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), routeInsertsToNewFiles); } /** * Build Hoodie Write Config for specified small file sizes. */ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize) { + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, smallFileSize, false); + } + + /** + * Build Hoodie Write Config for specified small file sizes. + */ + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean routeInsertsToNewFiles) { String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA; - return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize); + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, routeInsertsToNewFiles); } private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) { - return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, new Properties()); + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false); } - + + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean routeInsertsToNewFiles) { + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, routeInsertsToNewFiles, new Properties()); + } + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, Properties props) { + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false, props); + } + + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean routeInsertsToNewFiles, + Properties props) { HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr); return builder .withCompactionConfig( @@ -1662,6 +1711,7 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String HoodieStorageConfig.newBuilder() .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)) .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) + .withRouteInsertsToNewFiles(routeInsertsToNewFiles) .withProps(props) .build(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 18cc2db89ac4a..cd47db41938f7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -39,15 +39,20 @@ import org.apache.avro.Schema; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import scala.Tuple2; @@ -62,9 +67,14 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { - private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class); + private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with route enable={0}"; + private static final Schema SCHEMA = getSchemaFromResource(TestUpsertPartitioner.class, "/exampleSchema.avsc"); + public static Stream configParams() { + return Arrays.stream(new Boolean[][] {{true},{false}}).map(Arguments::of); + } + private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, int splitSize, String testPartitionPath, boolean autoSplitInserts) throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder() @@ -98,12 +108,12 @@ private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts } private UpsertPartitioner getInsertPartitioner(int smallFileSize, int numInserts, int fileSize, int splitSize, String testPartitionPath, - boolean autoSplitInserts) throws Exception { - HoodieWriteConfig config = makeHoodieClientConfigBuilder() - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize) - .insertSplitSize(splitSize).autoTuneInsertSplits(autoSplitInserts).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) - .build(); + boolean autoSplitInserts, boolean routeInsertsToNewFiles) throws Exception { + HoodieWriteConfig config = makeHoodieClientConfigBuilder().withRouteInsertsToNewFiles(routeInsertsToNewFiles) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize) + .insertSplitSize(splitSize).autoTuneInsertSplits(autoSplitInserts).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) + .build(); FileCreateUtils.createCommit(basePath, "001"); FileCreateUtils.createBaseFile(basePath, testPartitionPath, "001", "file1", fileSize); @@ -207,7 +217,7 @@ public void testUpsertPartitioner() throws Exception { public void testUpsertPartitionerWithRecordsPerBucket() throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts + Updates... Check all updates go together & inserts subsplit - UpsertPartitioner partitioner = getUpsertPartitioner(0, 250, 100, 1024, testPartitionPath, false); + UpsertPartitioner partitioner = getUpsertPartitioner(0, 250, 100, 1024, 100, testPartitionPath, false); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); int insertSplitSize = partitioner.config.getCopyOnWriteInsertSplitSize(); int remainedInsertSize = 250 - 2 * insertSplitSize; @@ -249,9 +259,9 @@ public void testPartitionWeight() throws Exception { insertBuckets.add(1, pair1); Map partition2numRecords = new HashMap(); - for (HoodieRecord hoodieRecord: insertRecords) { + for (HoodieRecord hoodieRecord : insertRecords) { int partition = partitioner.getPartition(new Tuple2<>( - hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation()))); + hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation()))); if (!partition2numRecords.containsKey(partition)) { partition2numRecords.put(partition, 0); } @@ -259,20 +269,20 @@ public void testPartitionWeight() throws Exception { } assertTrue(partition2numRecords.get(0) < partition2numRecords.get(1), - "The insert num of bucket1 should more than bucket0"); + "The insert num of bucket1 should more than bucket0"); assertTrue(partition2numRecords.get(0) + partition2numRecords.get(1) == totalInsertNum, - "The total insert records should be " + totalInsertNum); + "The total insert records should be " + totalInsertNum); assertEquals(String.valueOf(bucket0Weight), - String.format("%.1f", (partition2numRecords.get(0) * 1.0f / totalInsertNum)), - "The weight of bucket0 should be " + bucket0Weight); + String.format("%.1f", (partition2numRecords.get(0) * 1.0f / totalInsertNum)), + "The weight of bucket0 should be " + bucket0Weight); assertEquals(String.valueOf(1 - bucket0Weight), - String.format("%.1f", (partition2numRecords.get(1) * 1.0f / totalInsertNum)), - "The weight of bucket1 should be " + (1 - bucket0Weight)); + String.format("%.1f", (partition2numRecords.get(1) * 1.0f / totalInsertNum)), + "The weight of bucket1 should be " + (1 - bucket0Weight)); } private void assertInsertBuckets(Double[] weights, - Double[] cumulativeWeights, - List insertBuckets) { + Double[] cumulativeWeights, + List insertBuckets) { for (int i = 0; i < weights.length; i++) { assertEquals(i, insertBuckets.get(i).getKey().bucketNumber, String.format("BucketNumber of insert bucket %d must be same as %d", i, i)); @@ -319,25 +329,29 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { "Bucket 3 is INSERT"); assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets"); - weights = new Double[] { 0.08, 0.42, 0.42, 0.08}; - cumulativeWeights = new Double[] { 0.08, 0.5, 0.92, 1.0}; - /*======= - weights = new Double[] {0.08, 0.31, 0.31, 0.31}; - cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0}; */ + weights = new Double[] {0.08, 0.42, 0.42, 0.08}; + cumulativeWeights = new Double[] {0.08, 0.5, 0.92, 1.0}; assertInsertBuckets(weights, cumulativeWeights, insertBuckets); } - @Test - public void testInsertPartitionerWithSmallInsertHandling() throws Exception { + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testInsertPartitionerWithSmallInsertHandling(boolean routeInsertsToNewFiles) throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts .. Check updates go together & inserts subsplit, after expanding smallest file - UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 * 1024, 100, testPartitionPath, false); + UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 * 1024, 100, testPartitionPath, false, + routeInsertsToNewFiles); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions"); for (int i = 0; i < partitioner.numPartitions(); i++) { - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType, - String.format("Bucket %d is INSERT", i)); + if (i == 0 && !routeInsertsToNewFiles) { + assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType, + "Bucket 0 is UPDATE"); + } else { + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(i).bucketType, + String.format("Bucket %d is INSERT", i)); + } } assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets"); @@ -346,18 +360,24 @@ public void testInsertPartitionerWithSmallInsertHandling() throws Exception { assertInsertBuckets(weights, cumulativeWeights, insertBuckets); // Now with insert split size auto tuned - partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024, 100, testPartitionPath, true); + partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024, 100, testPartitionPath, true, + routeInsertsToNewFiles); insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions"); for (int i = 0; i < partitioner.numPartitions(); i++) { - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(0).bucketType, - String.format("Bucket %d is INSERT", i)); + if (i == 0 && !routeInsertsToNewFiles) { + assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType, + "Bucket 0 is UPDATE"); + } else { + assertEquals(BucketType.INSERT, partitioner.getBucketInfo(i).bucketType, + String.format("Bucket %d is INSERT", i)); + } } assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets"); - weights = new Double[] {0.08, 0.31, 0.31, 0.31}; - cumulativeWeights = new Double[] {0.08, 0.39, 0.69, 1.0}; + weights = new Double[] {0.08, 0.42, 0.42, 0.08}; + cumulativeWeights = new Double[] {0.08, 0.50, 0.92, 1.0}; assertInsertBuckets(weights, cumulativeWeights, insertBuckets); } From 591e97e2837e0a15709edce85df31b09fa2b1586 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 15 Jan 2021 13:12:08 -0500 Subject: [PATCH 05/10] Adding HoodieConcatHandle to insert records w/o merging with "Insert" operation --- .../org/apache/hudi/io/HoodieMergeHandle.java | 6 +- .../hudi/io/storage/HoodieConcatHandle.java | 72 +++++++++++++++++++ .../commit/BaseSparkCommitActionExecutor.java | 3 + .../action/commit/UpsertPartitioner.java | 31 ++------ .../TestHoodieClientOnCopyOnWriteStorage.java | 60 +++++++++++++++- 5 files changed, 143 insertions(+), 29 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 4f5b82a67d46d..e1574bddfb2ac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -64,11 +64,11 @@ public class HoodieMergeHandle extends H protected Map> keyToNewRecords; protected Set writtenRecordKeys; - private HoodieFileWriter fileWriter; + protected HoodieFileWriter fileWriter; - private Path newFilePath; + protected Path newFilePath; private Path oldFilePath; - private long recordsWritten = 0; + protected long recordsWritten = 0; private long recordsDeleted = 0; private long updatedRecordsWritten = 0; protected long insertRecordsWritten = 0; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java new file mode 100644 index 0000000000000..4cb61e73bb23e --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.client.common.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.io.HoodieMergeHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.generic.GenericRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#isRouteInsertsToNewFiles()}} + * is set, this handle will be used instead of {@link HoodieMergeHandle} + */ +public class HoodieConcatHandle extends HoodieMergeHandle { + + private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class); + + public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator recordItr, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); + } + + public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier); + } + + /** + * Write old record as is w/o merging with incoming record. + */ + @Override + public void write(GenericRecord oldRecord) { + String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + try { + fileWriter.writeAvro(key, oldRecord); + } catch (IOException | RuntimeException e) { + String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", + key, getOldFilePath(), newFilePath, writerSchemaWithMetafields.toString(true)); + LOG.debug("Old record is " + oldRecord); + throw new HoodieUpsertException(errMsg, e); + } + recordsWritten++; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 3e4cf7f208ad1..41b23d4fdda3f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -43,6 +43,7 @@ import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.io.storage.HoodieConcatHandle; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; @@ -320,6 +321,8 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> recordItr) { if (table.requireSortedRecords()) { return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier); + } else if (!WriteOperationType.isChangingRecords(operationType) && config.isRouteInsertsToNewFiles()) { + return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); } else { return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index b8c2970f502c1..a78f4e7aa4dc9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.NumericUtils; @@ -201,29 +200,13 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts); if (recordsToAppend > 0 && totalUnassignedInserts > 0) { - int bucket; - if (config.isRouteInsertsToNewFiles()) { - // if insert operation, route inserts to new files regardless of small file handling. - if (WriteOperationType.isChangingRecords(profile.getOperationType()) - && updateLocationToBucket.containsKey(smallFile.location.getFileId())) { - bucket = updateLocationToBucket.get(smallFile.location.getFileId()); - LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); - } else if (profile.getOperationType() == null || WriteOperationType.isChangingRecords(profile.getOperationType())) { - bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); - LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); - } else { - bucket = totalBuckets; - addInsertBucket(partitionPath); - LOG.info("Assigning " + recordsToAppend + " inserts to new insert bucket " + bucket); - } - } else { // create a new bucket or re-use an existing bucket - if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { - bucket = updateLocationToBucket.get(smallFile.location.getFileId()); - LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); - } else { - bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); - LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); - } + int bucket;// create a new bucket or re-use an existing bucket + if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { + bucket = updateLocationToBucket.get(smallFile.location.getFileId()); + LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); + } else { + bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); + LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index f316d6585b227..7bedf51b07cdf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -373,7 +373,9 @@ private void testUpsertsInternal(HoodieWriteConfig config, 0, 150); // Now simulate an upgrade and perform a restore operation - HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion( + HoodieWriteConfig newConfig = getConfigBuilder() + .withRouteInsertsToNewFiles(true) + .withProps(config.getProps()).withTimelineLayoutVersion( TimelineLayoutVersion.CURR_VERSION).build(); client = getHoodieWriteClient(newConfig, false); client.restoreToInstant("004"); @@ -459,9 +461,63 @@ private void testUpsertsInternal(HoodieWriteConfig config, } /** - * Tesst deletion of records. + * Test Insert API for HoodieConcatHandle */ @Test + public void testInsertsWithHoodieConcatHandle() throws Exception { + testHoodieConcatHandle(getConfig(), false); + } + + /** + * Test UpsertPrepped API. + */ + @Test + public void testInsertsPreppedWithHoodieConcatHandle() throws Exception { + testHoodieConcatHandle(getConfig(), true); + } + + /** + * Test one of HoodieConcatHandle w/ {@link AbstractHoodieWriteClient#index} API. + * + * @param config Write Config + * @throws Exception in case of error + */ + private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped) + throws Exception { + // Force using older timeline layout + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withRouteInsertsToNewFiles(true) + .withProps(config.getProps()).withTimelineLayoutVersion( + VERSION_0).build(); + HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), + metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), + metaClient.getTableConfig().getPayloadClass(), VERSION_0); + SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); + + // Write 1 (only inserts) + String newCommitTime = "001"; + String initCommitTime = "000"; + int numRecords = 200; + insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, SparkRDDWriteClient::insert, + isPrepped, true, numRecords); + + // Write 2 (updates) + String prevCommitTime = newCommitTime; + newCommitTime = "004"; + numRecords = 100; + String commitTimeBetweenPrevAndNew = "002"; + + final Function2, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPrepped, hoodieWriteConfig, dataGen::generateUniqueUpdates); + + writeBatch(client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, + numRecords, recordGenFunction, SparkRDDWriteClient::insert, true, numRecords, 300, + 2); + } + + /** + * Tesst deletion of records. + */ + @Test public void testDeletes() throws Exception { SparkRDDWriteClient client = getHoodieWriteClient(getConfig(), false); From 39123ef582ab8c8d388d7f3b714188310de68fc1 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 18 Jan 2021 23:51:19 -0500 Subject: [PATCH 06/10] Reverting some of previous changes --- .../apache/hudi/config/HoodieWriteConfig.java | 18 +-- .../org/apache/hudi/io/HoodieMergeHandle.java | 28 ++++ .../hudi/io/storage/HoodieConcatHandle.java | 20 ++- .../commit/BaseSparkCommitActionExecutor.java | 2 +- .../action/commit/UpsertPartitioner.java | 19 ++- .../TestHoodieClientOnCopyOnWriteStorage.java | 19 +-- .../action/commit/TestUpsertPartitioner.java | 123 ++++-------------- .../command/ITTestHoodieSyncCommand.java | 3 +- 8 files changed, 101 insertions(+), 131 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 28d04ead5b59d..2d365a070c825 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -131,9 +131,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled"; private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false"; - // Routes inserts to new files ignoring small file handling - private static final String ROUTE_INSERTS_TO_NEW_FILES = "hoodie.route.inserts.to.new.files"; - private static final String DEFAULT_ROUTE_INSERTS_TO_NEW_FILES = "false"; + // Concats inserts to data files without merging + private static final String MERGE_ALLOW_DUPLICATE_INSERTS = "hoodie.merge.allow.duplicate.inserts"; + private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_INSERTS = "false"; /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow @@ -334,8 +334,8 @@ public boolean isMergeDataValidationCheckEnabled() { return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED)); } - public boolean isRouteInsertsToNewFiles() { - return Boolean.parseBoolean(props.getProperty(ROUTE_INSERTS_TO_NEW_FILES)); + public boolean isMergeAllowDuplicateInserts() { + return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_INSERTS)); } public EngineType getEngineType() { @@ -1188,8 +1188,8 @@ public Builder withMergeDataValidationCheckEnabled(boolean enabled) { return this; } - public Builder withRouteInsertsToNewFiles(boolean routeInsertsToNewFiles) { - props.setProperty(ROUTE_INSERTS_TO_NEW_FILES, String.valueOf(routeInsertsToNewFiles)); + public Builder withMergeAllowDuplicateInserts(boolean routeInsertsToNewFiles) { + props.setProperty(MERGE_ALLOW_DUPLICATE_INSERTS, String.valueOf(routeInsertsToNewFiles)); return this; } @@ -1247,8 +1247,8 @@ protected void setDefaults() { BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED), MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED); - setDefaultOnCondition(props, !props.containsKey(ROUTE_INSERTS_TO_NEW_FILES), - ROUTE_INSERTS_TO_NEW_FILES, DEFAULT_ROUTE_INSERTS_TO_NEW_FILES); + setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_INSERTS), + MERGE_ALLOW_DUPLICATE_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_INSERTS); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index e1574bddfb2ac..d5df2cb226665 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -58,6 +58,34 @@ import java.util.Set; @SuppressWarnings("Duplicates") +/** + * Handle to merge incoming records to those in storage. + *

+ * Simplified Logic: + * For every existing record + * Check if there is a new record coming in. If yes, merge two records and write to file + * else write the record as is + * For all pending records from incoming batch, write to file. + * + * Illustration with simple data. + * Incoming data: + * rec1_2, rec4_2, rec5_1, rec6_1 + * Existing data: + * rec1_1, rec2_1, rec3_1, rec4_1 + * + * For every existing record, merge w/ incoming if requried and write to storage. + * => rec1_1 and rec1_2 is merged to write rec1_2 to storage + * => rec2_1 is written as is + * => rec3_1 is written as is + * => rec4_2 and rec4_1 is merged to write rec4_2 to storage + * Write all pending records from incoming set to storage + * => rec5_1 and rec6_1 + * + * Final snapshot in storage + * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1 + * + *

+ */ public class HoodieMergeHandle extends HoodieWriteHandle { private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java index 4cb61e73bb23e..150ebda2be559 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java @@ -36,8 +36,26 @@ import java.util.Map; /** - * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#isRouteInsertsToNewFiles()}} + * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#isMergeAllowDuplicateInserts()}} * is set, this handle will be used instead of {@link HoodieMergeHandle} + * Simplified Logic: + * For every existing record + * Write the record as is + * For all incoming records, write to file as is. + * + * Illustration with simple data. + * Incoming data: + * rec1_2, rec4_2, rec5_1, rec6_1 + * Existing data: + * rec1_1, rec2_1, rec3_1, rec4_1 + * + * For every existing record, merge w/ incoming if requried and write to storage. + * => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage + * Write all records from incoming set to storage + * => rec1_2, rec4_2, rec5_1 and rec6_1 + * + * Final snapshot in storage + * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1 */ public class HoodieConcatHandle extends HoodieMergeHandle { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 41b23d4fdda3f..e3e2613a282f8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -321,7 +321,7 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> recordItr) { if (table.requireSortedRecords()) { return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier); - } else if (!WriteOperationType.isChangingRecords(operationType) && config.isRouteInsertsToNewFiles()) { + } else if (!WriteOperationType.isChangingRecords(operationType) && config.isMergeAllowDuplicateInserts()) { return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); } else { return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index a78f4e7aa4dc9..ee153c8468ecf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -129,15 +129,6 @@ private int addUpdateBucket(String partitionPath, String fileIdHint) { return bucket; } - private void addInsertBucket(String partitionPath) { - BucketInfo bucketInfo = new BucketInfo(); - bucketInfo.bucketType = BucketType.INSERT; - bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); - bucketInfo.partitionPath = partitionPath; - bucketInfoMap.put(totalBuckets, bucketInfo); - totalBuckets++; - } - /** * Get the in pending clustering fileId for each partition path. * @return partition path to pending clustering file groups id @@ -200,7 +191,8 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts); if (recordsToAppend > 0 && totalUnassignedInserts > 0) { - int bucket;// create a new bucket or re-use an existing bucket + // create a new bucket or re-use an existing bucket + int bucket; if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { bucket = updateLocationToBucket.get(smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); @@ -231,7 +223,12 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) } else { recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); } - addInsertBucket(partitionPath); + BucketInfo bucketInfo = new BucketInfo(); + bucketInfo.bucketType = BucketType.INSERT; + bucketInfo.partitionPath = partitionPath; + bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); + bucketInfoMap.put(totalBuckets, bucketInfo); + totalBuckets++; } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 7bedf51b07cdf..b49069e274353 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -374,7 +374,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, // Now simulate an upgrade and perform a restore operation HoodieWriteConfig newConfig = getConfigBuilder() - .withRouteInsertsToNewFiles(true) + .withMergeAllowDuplicateInserts(true) .withProps(config.getProps()).withTimelineLayoutVersion( TimelineLayoutVersion.CURR_VERSION).build(); client = getHoodieWriteClient(newConfig, false); @@ -461,7 +461,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, } /** - * Test Insert API for HoodieConcatHandle + * Test Insert API for HoodieConcatHandle. */ @Test public void testInsertsWithHoodieConcatHandle() throws Exception { @@ -485,9 +485,12 @@ public void testInsertsPreppedWithHoodieConcatHandle() throws Exception { private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped) throws Exception { // Force using older timeline layout - HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withRouteInsertsToNewFiles(true) - .withProps(config.getProps()).withTimelineLayoutVersion( + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withProps(config.getProps()).withMergeAllowDuplicateInserts(true).withTimelineLayoutVersion( VERSION_0).build(); + + System.out.println(" allow merge" + hoodieWriteConfig.isMergeAllowDuplicateInserts()); + HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_0); @@ -514,9 +517,9 @@ private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped) 2); } - /** - * Tesst deletion of records. - */ + /** + * Tests deletion of records. + */ @Test public void testDeletes() throws Exception { SparkRDDWriteClient client = getHoodieWriteClient(getConfig(), false); @@ -1767,7 +1770,7 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String HoodieStorageConfig.newBuilder() .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)) .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) - .withRouteInsertsToNewFiles(routeInsertsToNewFiles) + .withMergeAllowDuplicateInserts(routeInsertsToNewFiles) .withProps(props) .build(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index cd47db41938f7..f40a97c0bbadc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -38,21 +37,18 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.avro.Schema; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import scala.Tuple2; @@ -67,19 +63,14 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { - private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with route enable={0}"; - + private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class); private static final Schema SCHEMA = getSchemaFromResource(TestUpsertPartitioner.class, "/exampleSchema.avsc"); - public static Stream configParams() { - return Arrays.stream(new Boolean[][] {{true},{false}}).map(Arguments::of); - } - - private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, int splitSize, + private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, String testPartitionPath, boolean autoSplitInserts) throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize) - .insertSplitSize(splitSize).autoTuneInsertSplits(autoSplitInserts).build()) + .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) .build(); @@ -99,7 +90,7 @@ private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts List records = new ArrayList<>(); records.addAll(insertRecords); records.addAll(updateRecords); - WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(records)), WriteOperationType.UPSERT); + WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(records))); UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config); assertEquals(0, partitioner.getPartition( new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation()))), @@ -107,25 +98,6 @@ private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts return partitioner; } - private UpsertPartitioner getInsertPartitioner(int smallFileSize, int numInserts, int fileSize, int splitSize, String testPartitionPath, - boolean autoSplitInserts, boolean routeInsertsToNewFiles) throws Exception { - HoodieWriteConfig config = makeHoodieClientConfigBuilder().withRouteInsertsToNewFiles(routeInsertsToNewFiles) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(smallFileSize) - .insertSplitSize(splitSize).autoTuneInsertSplits(autoSplitInserts).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) - .build(); - - FileCreateUtils.createCommit(basePath, "001"); - FileCreateUtils.createBaseFile(basePath, testPartitionPath, "001", "file1", fileSize); - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); - - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); - List insertRecords = dataGenerator.generateInserts("001", numInserts); - WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)), WriteOperationType.INSERT); - return new UpsertPartitioner(profile, context, table, config); - } - private static List setupHoodieInstants() { List instants = new ArrayList<>(); instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts1")); @@ -208,7 +180,7 @@ public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception { public void testUpsertPartitioner() throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts + Updates... Check all updates go together & inserts subsplit - UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, 100, testPartitionPath, false); + UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets"); } @@ -217,7 +189,7 @@ public void testUpsertPartitioner() throws Exception { public void testUpsertPartitionerWithRecordsPerBucket() throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts + Updates... Check all updates go together & inserts subsplit - UpsertPartitioner partitioner = getUpsertPartitioner(0, 250, 100, 1024, 100, testPartitionPath, false); + UpsertPartitioner partitioner = getUpsertPartitioner(0, 250, 100, 1024, testPartitionPath, false); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); int insertSplitSize = partitioner.config.getCopyOnWriteInsertSplitSize(); int remainedInsertSize = 250 - 2 * insertSplitSize; @@ -259,9 +231,9 @@ public void testPartitionWeight() throws Exception { insertBuckets.add(1, pair1); Map partition2numRecords = new HashMap(); - for (HoodieRecord hoodieRecord : insertRecords) { + for (HoodieRecord hoodieRecord: insertRecords) { int partition = partitioner.getPartition(new Tuple2<>( - hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation()))); + hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation()))); if (!partition2numRecords.containsKey(partition)) { partition2numRecords.put(partition, 0); } @@ -269,20 +241,20 @@ public void testPartitionWeight() throws Exception { } assertTrue(partition2numRecords.get(0) < partition2numRecords.get(1), - "The insert num of bucket1 should more than bucket0"); + "The insert num of bucket1 should more than bucket0"); assertTrue(partition2numRecords.get(0) + partition2numRecords.get(1) == totalInsertNum, - "The total insert records should be " + totalInsertNum); + "The total insert records should be " + totalInsertNum); assertEquals(String.valueOf(bucket0Weight), - String.format("%.1f", (partition2numRecords.get(0) * 1.0f / totalInsertNum)), - "The weight of bucket0 should be " + bucket0Weight); + String.format("%.1f", (partition2numRecords.get(0) * 1.0f / totalInsertNum)), + "The weight of bucket0 should be " + bucket0Weight); assertEquals(String.valueOf(1 - bucket0Weight), - String.format("%.1f", (partition2numRecords.get(1) * 1.0f / totalInsertNum)), - "The weight of bucket1 should be " + (1 - bucket0Weight)); + String.format("%.1f", (partition2numRecords.get(1) * 1.0f / totalInsertNum)), + "The weight of bucket1 should be " + (1 - bucket0Weight)); } private void assertInsertBuckets(Double[] weights, - Double[] cumulativeWeights, - List insertBuckets) { + Double[] cumulativeWeights, + List insertBuckets) { for (int i = 0; i < weights.length; i++) { assertEquals(i, insertBuckets.get(i).getKey().bucketNumber, String.format("BucketNumber of insert bucket %d must be same as %d", i, i)); @@ -298,7 +270,7 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { final String testPartitionPath = "2016/09/26"; // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding // smallest file - UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, 100, testPartitionPath, false); + UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions"); @@ -310,12 +282,12 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { "Bucket 2 is INSERT"); assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets"); - Double[] weights = {0.5, 0.25, 0.25}; - Double[] cumulativeWeights = {0.5, 0.75, 1.0}; + Double[] weights = { 0.5, 0.25, 0.25}; + Double[] cumulativeWeights = { 0.5, 0.75, 1.0}; assertInsertBuckets(weights, cumulativeWeights, insertBuckets); // Now with insert split size auto tuned - partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, 100, testPartitionPath, true); + partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true); insertBuckets = partitioner.getInsertBuckets(testPartitionPath); assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions"); @@ -329,55 +301,8 @@ public void testUpsertPartitionerWithSmallInsertHandling() throws Exception { "Bucket 3 is INSERT"); assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets"); - weights = new Double[] {0.08, 0.42, 0.42, 0.08}; - cumulativeWeights = new Double[] {0.08, 0.5, 0.92, 1.0}; - assertInsertBuckets(weights, cumulativeWeights, insertBuckets); - } - - @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) - @MethodSource("configParams") - public void testInsertPartitionerWithSmallInsertHandling(boolean routeInsertsToNewFiles) throws Exception { - final String testPartitionPath = "2016/09/26"; - // Inserts .. Check updates go together & inserts subsplit, after expanding smallest file - UpsertPartitioner partitioner = getInsertPartitioner(1000 * 1024, 400, 800 * 1024, 100, testPartitionPath, false, - routeInsertsToNewFiles); - List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); - - assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions"); - for (int i = 0; i < partitioner.numPartitions(); i++) { - if (i == 0 && !routeInsertsToNewFiles) { - assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType, - "Bucket 0 is UPDATE"); - } else { - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(i).bucketType, - String.format("Bucket %d is INSERT", i)); - } - } - assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets"); - - Double[] weights = {0.5, 0.25, 0.25}; - Double[] cumulativeWeights = {0.5, 0.75, 1.0}; - assertInsertBuckets(weights, cumulativeWeights, insertBuckets); - - // Now with insert split size auto tuned - partitioner = getInsertPartitioner(1000 * 1024, 2400, 800 * 1024, 100, testPartitionPath, true, - routeInsertsToNewFiles); - insertBuckets = partitioner.getInsertBuckets(testPartitionPath); - - assertEquals(4, partitioner.numPartitions(), "Should have 4 partitions"); - for (int i = 0; i < partitioner.numPartitions(); i++) { - if (i == 0 && !routeInsertsToNewFiles) { - assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType, - "Bucket 0 is UPDATE"); - } else { - assertEquals(BucketType.INSERT, partitioner.getBucketInfo(i).bucketType, - String.format("Bucket %d is INSERT", i)); - } - } - assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets"); - - weights = new Double[] {0.08, 0.42, 0.42, 0.08}; - cumulativeWeights = new Double[] {0.08, 0.50, 0.92, 1.0}; + weights = new Double[] { 0.08, 0.42, 0.42, 0.08}; + cumulativeWeights = new Double[] { 0.08, 0.5, 0.92, 1.0}; assertInsertBuckets(weights, cumulativeWeights, insertBuckets); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java index 04c8dcacd7e7a..a6a4c3ec4201e 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java @@ -51,9 +51,8 @@ public void testValidateSync() throws Exception { TestExecStartResultCallback result = executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + " --cmdfile " + SYNC_VALIDATE_COMMANDS, true); - // When using insert operation, the new inserts are created by the small file, which cause the count of new inserts is only 100. String expected = String.format("Count difference now is (count(%s) - count(%s) == %d. Catch up count is %d", - hiveTableName, hiveTableName2, 100, 100); + hiveTableName, hiveTableName2, 100, 200); assertTrue(result.getStderr().toString().contains(expected)); dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); From 09fcaa1fd9f21c1573e8450b7e28e21acfd05273 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 19 Jan 2021 00:29:09 -0500 Subject: [PATCH 07/10] Fixing tests in TestHoodieClientOnCopyOnWriteStorage.java --- .../TestHoodieClientOnCopyOnWriteStorage.java | 98 ++++++++----------- 1 file changed, 42 insertions(+), 56 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index b49069e274353..96cb4a62c5499 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -18,11 +18,13 @@ package org.apache.hudi.client; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroupId; @@ -30,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -59,8 +62,8 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -68,10 +71,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -94,11 +93,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.Properties; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0; import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths; @@ -132,8 +131,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } }; - public static Stream configParams() { - return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of); + private static Stream configParams() { + return Arrays.stream(new Boolean[][] {{true},{false}}).map(Arguments::of); } private HoodieTestTable testTable; @@ -373,9 +372,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, 0, 150); // Now simulate an upgrade and perform a restore operation - HoodieWriteConfig newConfig = getConfigBuilder() - .withMergeAllowDuplicateInserts(true) - .withProps(config.getProps()).withTimelineLayoutVersion( + HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion( TimelineLayoutVersion.CURR_VERSION).build(); client = getHoodieWriteClient(newConfig, false); client.restoreToInstant("004"); @@ -477,7 +474,7 @@ public void testInsertsPreppedWithHoodieConcatHandle() throws Exception { } /** - * Test one of HoodieConcatHandle w/ {@link AbstractHoodieWriteClient#index} API. + * Test one of HoodieConcatHandle w/ {@link AbstractHoodieWriteClient#insert(Object, String)} API. * * @param config Write Config * @throws Exception in case of error @@ -489,8 +486,6 @@ private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped) .withProps(config.getProps()).withMergeAllowDuplicateInserts(true).withTimelineLayoutVersion( VERSION_0).build(); - System.out.println(" allow merge" + hoodieWriteConfig.isMergeAllowDuplicateInserts()); - HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_0); @@ -767,7 +762,7 @@ private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set, List> insertBatchRecords(SparkRDDWriteClient client, String commitTime, - Integer recordNum, int expectStatueSize) { + Integer recordNum, int expectStatueSize) { client.startCommitWithTime(commitTime); List inserts1 = dataGen.generateInserts(commitTime, recordNum); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); @@ -800,7 +795,7 @@ public void testUpdateRejectForClustering() throws IOException { String commitTime2 = "002"; List> firstInsertFileSlicesList = table.getFileSystemView().getAllFileGroups(testPartitionPath) .map(fileGroup -> fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList()); - List[] fileSlices = (List[]) firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]); + List[] fileSlices = (List[])firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]); createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices); // 3. insert one record with no updating reject exception, and not merge the small file, just generate a new file group @@ -818,8 +813,7 @@ public void testUpdateRejectForClustering() throws IOException { String assertMsg = String.format("Not allowed to update the clustering files in partition: %s " + "For pending clustering operations, we are not going to support update for now.", testPartitionPath); assertThrows(HoodieUpsertException.class, () -> { - writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1), commitTime3).collect(); - }, assertMsg); + writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1), commitTime3).collect(); }, assertMsg); // 5. insert one record with no updating reject exception, will merge the small file String commitTime5 = "005"; @@ -947,11 +941,11 @@ public void testSmallInsertHandlingForUpserts() throws Exception { */ @ParameterizedTest @MethodSource("configParams") - public void testSmallInsertHandlingForInserts(boolean routeInsertsToNewFiles) throws Exception { + public void testSmallInsertHandlingForInserts(boolean mergeInsertsAllowDuplicates) throws Exception { final String testPartitionPath = "2016/09/26"; final int insertSplitLimit = 100; // setup the small file handling params - HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, routeInsertsToNewFiles); // hold upto 200 records max + HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeInsertsAllowDuplicates); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config, false); @@ -979,19 +973,12 @@ public void testSmallInsertHandlingForInserts(boolean routeInsertsToNewFiles) th statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); assertNoWriteErrors(statuses); assertEquals(1, statuses.size(), "Just 1 file needs to be updated."); - if (!routeInsertsToNewFiles) { - assertNotNull(statuses.get(0).getFileId(), "Existing file should be expanded"); - assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); - } else { - // TODO assertEquals(1, statuses.size(), "Just 1 file needs to be added."); - assertNotNull(statuses.get(0).getFileId(), "Small file should be added"); - assertEquals("null", statuses.get(0).getStat().getPrevCommit(), "Small file should be added"); - } + assertNotNull(statuses.get(0).getFileId(), "Existing file should be expanded"); + assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); + Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); - assertEquals(routeInsertsToNewFiles ? 40 : 140, readRowKeysFromParquet(hadoopConf, newFile).size(), + assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(), "file should contain 140 records"); - // TODO: assertEquals(40, readRowKeysFromParquet(hadoopConf, newFile).size(), - // "file should contain 40 records"); List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); @@ -1010,23 +997,20 @@ public void testSmallInsertHandlingForInserts(boolean routeInsertsToNewFiles) th statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); - if (routeInsertsToNewFiles) { - assertEquals(200, - readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size() - + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), - "file should contain 200 records"); - } + assertEquals(340, + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size() + + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), + "file should contain 340 records"); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTable table = getHoodieTable(metaClient, config); List files = table.getBaseFileOnlyView() .getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList()); - assertEquals(routeInsertsToNewFiles ? 4 : 2, files.size(), "Total of " + (routeInsertsToNewFiles ? 4 : 2) + " valid data files"); - // TODO assertEquals(4, files.size(), "Total of 4 valid data files"); + assertEquals(2, files.size(), "Total of 2 valid data files"); int totalInserts = 0; for (HoodieBaseFile file : files) { - if (!routeInsertsToNewFiles) { + if (!mergeInsertsAllowDuplicates) { assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size(); } else { @@ -1189,10 +1173,10 @@ public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() thr } /** - * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records. - * 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of records. + * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records. + * 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of records. * - * Verify that all records in step1 are overwritten + * Verify that all records in step1 are overwritten */ private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount) throws Exception { final String testPartitionPath = "americas"; @@ -1269,10 +1253,12 @@ private Set deletePartitionWithCommit(SparkRDDWriteClient client, String } /** - * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition. - * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition. - * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition. - * 4) delete first partition and check result. 5) delete second and third partition and check result. + * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition. + * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition. + * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition. + * 4) delete first partition and check result. + * 5) delete second and third partition and check result. + * */ private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount) throws Exception { HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false); @@ -1727,8 +1713,8 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean /** * Build Hoodie Write Config for small data file sizes. */ - private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean routeInsertsToNewFiles) { - return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), routeInsertsToNewFiles); + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean mergeInsertsAllowDups) { + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), mergeInsertsAllowDups); } /** @@ -1741,24 +1727,24 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean /** * Build Hoodie Write Config for specified small file sizes. */ - private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean routeInsertsToNewFiles) { + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean mergeInsertsAllowDups) { String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA; - return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, routeInsertsToNewFiles); + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeInsertsAllowDups); } private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) { return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false); } - private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean routeInsertsToNewFiles) { - return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, routeInsertsToNewFiles, new Properties()); + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeInsertsAllowDups) { + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeInsertsAllowDups, new Properties()); } private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, Properties props) { return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false, props); } - private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean routeInsertsToNewFiles, + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeInsertsAllowDups, Properties props) { HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr); return builder @@ -1770,7 +1756,7 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String HoodieStorageConfig.newBuilder() .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)) .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) - .withMergeAllowDuplicateInserts(routeInsertsToNewFiles) + .withMergeAllowDuplicateInserts(mergeInsertsAllowDups) .withProps(props) .build(); } From 4c5d84cc5c0472c6d9001502d768ed1f16372296 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 19 Jan 2021 00:36:34 -0500 Subject: [PATCH 08/10] Minor fixes --- .../TestHoodieClientOnCopyOnWriteStorage.java | 33 ++++++++----------- .../hudi/common/model/WriteOperationType.java | 3 +- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 96cb4a62c5499..ed6b0ecd827e7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -114,7 +114,6 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -466,7 +465,7 @@ public void testInsertsWithHoodieConcatHandle() throws Exception { } /** - * Test UpsertPrepped API. + * Test InsertPrepped API for HoodieConcatHandle. */ @Test public void testInsertsPreppedWithHoodieConcatHandle() throws Exception { @@ -941,11 +940,11 @@ public void testSmallInsertHandlingForUpserts() throws Exception { */ @ParameterizedTest @MethodSource("configParams") - public void testSmallInsertHandlingForInserts(boolean mergeInsertsAllowDuplicates) throws Exception { + public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts) throws Exception { final String testPartitionPath = "2016/09/26"; final int insertSplitLimit = 100; // setup the small file handling params - HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeInsertsAllowDuplicates); // hold upto 200 records max + HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config, false); @@ -973,7 +972,7 @@ public void testSmallInsertHandlingForInserts(boolean mergeInsertsAllowDuplicate statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); assertNoWriteErrors(statuses); assertEquals(1, statuses.size(), "Just 1 file needs to be updated."); - assertNotNull(statuses.get(0).getFileId(), "Existing file should be expanded"); + assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded"); assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); @@ -1010,12 +1009,8 @@ public void testSmallInsertHandlingForInserts(boolean mergeInsertsAllowDuplicate int totalInserts = 0; for (HoodieBaseFile file : files) { - if (!mergeInsertsAllowDuplicates) { - assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); - totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size(); - } else { - totalInserts += readRowKeysFromParquet(hadoopConf, new Path(file.getPath())).size(); - } + assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); + totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size(); } assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up"); } @@ -1713,8 +1708,8 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean /** * Build Hoodie Write Config for small data file sizes. */ - private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean mergeInsertsAllowDups) { - return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), mergeInsertsAllowDups); + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean mergeAllowDuplicateInserts) { + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), mergeAllowDuplicateInserts); } /** @@ -1727,24 +1722,24 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean /** * Build Hoodie Write Config for specified small file sizes. */ - private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean mergeInsertsAllowDups) { + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean mergeAllowDuplicateInserts) { String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA; - return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeInsertsAllowDups); + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts); } private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) { return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false); } - private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeInsertsAllowDups) { - return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeInsertsAllowDups, new Properties()); + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) { + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts, new Properties()); } private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, Properties props) { return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false, props); } - private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeInsertsAllowDups, + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts, Properties props) { HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr); return builder @@ -1756,7 +1751,7 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String HoodieStorageConfig.newBuilder() .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)) .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) - .withMergeAllowDuplicateInserts(mergeInsertsAllowDups) + .withMergeAllowDuplicateInserts(mergeAllowDuplicateInserts) .withProps(props) .build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index e7169d301a01c..f237156360847 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.model; -import java.io.Serializable; import org.apache.hudi.exception.HoodieException; import java.util.Locale; @@ -26,7 +25,7 @@ /** * The supported write operation types, used by commitMetadata. */ -public enum WriteOperationType implements Serializable { +public enum WriteOperationType { // directly insert INSERT("insert"), INSERT_PREPPED("insert_prepped"), From 1eec9b1cdb71e451fef5f19dc4f2d0692e773443 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 19 Jan 2021 23:44:18 -0500 Subject: [PATCH 09/10] Addressing comments --- .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | 2 +- .../java/org/apache/hudi/io/storage/HoodieConcatHandle.java | 2 +- .../hudi/table/action/commit/BaseSparkCommitActionExecutor.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2d365a070c825..44fb0148bbe75 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -334,7 +334,7 @@ public boolean isMergeDataValidationCheckEnabled() { return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED)); } - public boolean isMergeAllowDuplicateInserts() { + public boolean allowDuplicateInserts() { return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_INSERTS)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java index 150ebda2be559..dde3a1ae408f4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java @@ -36,7 +36,7 @@ import java.util.Map; /** - * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#isMergeAllowDuplicateInserts()}} + * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#allowDuplicateInserts()}} * is set, this handle will be used instead of {@link HoodieMergeHandle} * Simplified Logic: * For every existing record diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index e3e2613a282f8..5a4d79c7e63e8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -321,7 +321,7 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> recordItr) { if (table.requireSortedRecords()) { return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier); - } else if (!WriteOperationType.isChangingRecords(operationType) && config.isMergeAllowDuplicateInserts()) { + } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) { return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); } else { return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); From 287a016ff22b8d555b8a6ea39135003417b93d7c Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 20 Jan 2021 10:17:11 -0500 Subject: [PATCH 10/10] Fetching and rebasing with master --- .../apache/hudi/config/HoodieWriteConfig.java | 16 ++++++++-------- .../hudi/io/storage/HoodieConcatHandle.java | 10 +++++++--- .../TestHoodieClientOnCopyOnWriteStorage.java | 4 ++-- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 44fb0148bbe75..3fc5a2df38985 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -131,9 +131,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled"; private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false"; - // Concats inserts to data files without merging - private static final String MERGE_ALLOW_DUPLICATE_INSERTS = "hoodie.merge.allow.duplicate.inserts"; - private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_INSERTS = "false"; + // Allow duplicates with inserts while merging with existing records + private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = "hoodie.merge.allow.duplicate.on.inserts"; + private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = "false"; /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow @@ -335,7 +335,7 @@ public boolean isMergeDataValidationCheckEnabled() { } public boolean allowDuplicateInserts() { - return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_INSERTS)); + return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS)); } public EngineType getEngineType() { @@ -1188,8 +1188,8 @@ public Builder withMergeDataValidationCheckEnabled(boolean enabled) { return this; } - public Builder withMergeAllowDuplicateInserts(boolean routeInsertsToNewFiles) { - props.setProperty(MERGE_ALLOW_DUPLICATE_INSERTS, String.valueOf(routeInsertsToNewFiles)); + public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles) { + props.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS, String.valueOf(routeInsertsToNewFiles)); return this; } @@ -1247,8 +1247,8 @@ protected void setDefaults() { BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED), MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED); - setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_INSERTS), - MERGE_ALLOW_DUPLICATE_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_INSERTS); + setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_ON_INSERTS), + MERGE_ALLOW_DUPLICATE_ON_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java index dde3a1ae408f4..ea56689b5364f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java @@ -18,7 +18,7 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.client.common.TaskContextSupplier; +import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -37,7 +37,8 @@ /** * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#allowDuplicateInserts()}} - * is set, this handle will be used instead of {@link HoodieMergeHandle} + * is set, this handle will be used instead of {@link HoodieMergeHandle}. + * * Simplified Logic: * For every existing record * Write the record as is @@ -49,13 +50,16 @@ * Existing data: * rec1_1, rec2_1, rec3_1, rec4_1 * - * For every existing record, merge w/ incoming if requried and write to storage. + * For every existing record, write to storage as is. * => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage * Write all records from incoming set to storage * => rec1_2, rec4_2, rec5_1 and rec6_1 * * Final snapshot in storage * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1 + * + * Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not + * happen and every batch should have new records to be inserted. Above example is for illustration purposes only. */ public class HoodieConcatHandle extends HoodieMergeHandle { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index ed6b0ecd827e7..b4a392e6478bf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -482,7 +482,7 @@ private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped) throws Exception { // Force using older timeline layout HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() - .withProps(config.getProps()).withMergeAllowDuplicateInserts(true).withTimelineLayoutVersion( + .withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion( VERSION_0).build(); HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), @@ -1751,7 +1751,7 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String HoodieStorageConfig.newBuilder() .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)) .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) - .withMergeAllowDuplicateInserts(mergeAllowDuplicateInserts) + .withMergeAllowDuplicateOnInserts(mergeAllowDuplicateInserts) .withProps(props) .build(); }