diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index d31b0aaa6c308..a6e8d5cfb35c0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -208,7 +208,8 @@ public R put(T key, R value) { // Note, the converter may over estimate the size of a record in the JVM this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value); LOG.info("Estimated Payload size => " + estimatedPayloadSize); - } else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) { + } else if (shouldEstimatePayloadSize && !inMemoryMap.isEmpty() + && (inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0)) { // Re-estimate the size of a record by calculating the size of the entire map containing // N entries and then dividing by the number of entries present (N). This helps to get a // correct estimation of the size of each record in the JVM. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java index 4fed5a80eb1f2..f7b45e9d839b6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java @@ -51,6 +51,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -83,7 +84,7 @@ public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType, boole List iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100); List recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records); assert (recordKeys.size() == 100); - + // Test iterator Iterator> itr = records.iterator(); int cntSize = 0; @@ -93,7 +94,7 @@ public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType, boole assert recordKeys.contains(rec.getRecordKey()); } assertEquals(recordKeys.size(), cntSize); - + // Test value stream List> values = records.valueStream().collect(Collectors.toList()); cntSize = 0; @@ -221,7 +222,9 @@ failureOutputPath, new DefaultSizeEstimator(), @ParameterizedTest @MethodSource("testArguments") - public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { + public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws IOException, + URISyntaxException { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); @@ -274,7 +277,9 @@ record = records.get(key); @ParameterizedTest @MethodSource("testArguments") - public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException { + public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws IOException, + URISyntaxException { Schema schema = SchemaTestUtil.getSimpleSchema(); @@ -337,9 +342,34 @@ record = records.get(key); assertEquals(gRecord.get(fieldName).toString(), newValue); } - // TODO : come up with a performance eval test for spillableMap @Test - public void testLargeInsertUpsert() {} + public void testEstimationWithEmptyMap() throws IOException, URISyntaxException { + final ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.BITCASK; + final boolean isCompressionEnabled = false; + final Schema schema = SchemaTestUtil.getSimpleSchema(); + + ExternalSpillableMap> records = + new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); + + List recordKeys = new ArrayList<>(); + + // Put a single record. Payload size estimation happens as part of this initial put. + HoodieRecord seedRecord = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0); + records.put(seedRecord.getRecordKey(), seedRecord); + + // Remove the key immediately to make the map empty again. + records.remove(seedRecord.getRecordKey()); + + // Verify payload size re-estimation does not throw exception + List hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250); + hoodieRecords.stream().forEach(hoodieRecord -> { + assertDoesNotThrow(() -> { + records.put(hoodieRecord.getRecordKey(), hoodieRecord); + }, "ExternalSpillableMap put() should not throw exception!"); + recordKeys.add(hoodieRecord.getRecordKey()); + }); + } private static Stream testArguments() { // Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap