diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index 774212b27b2c7..bbda80ea0a3c1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -202,28 +202,20 @@ public R get(Object key) { @Override public R put(T key, R value) { - if (this.currentInMemoryMapSize >= maxInMemorySizeInBytes || inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) { - long tmpEstimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9 - + (keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value)) * 0.1); - if (this.estimatedPayloadSize != tmpEstimatedPayloadSize) { - LOG.info("Update Estimated Payload size to => " + this.estimatedPayloadSize); - } - this.estimatedPayloadSize = tmpEstimatedPayloadSize; + if (this.estimatedPayloadSize == 0) { + // At first, use the sizeEstimate of a record being inserted into the spillable map. + // Note, the converter may over-estimate the size of a record in the JVM + this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value); + } else if (this.inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) { + this.estimatedPayloadSize = (long) (this.estimatedPayloadSize * 0.9 + (keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value)) * 0.1); this.currentInMemoryMapSize = this.inMemoryMap.size() * this.estimatedPayloadSize; } - if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) { - if (estimatedPayloadSize == 0) { - // At first, use the sizeEstimate of a record being inserted into the spillable map. - // Note, the converter may over estimate the size of a record in the JVM - this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value); - LOG.info("Estimated Payload size => " + estimatedPayloadSize); - } - if (!inMemoryMap.containsKey(key)) { - // TODO : Add support for adjusting payloadSize for updates to the same key - currentInMemoryMapSize += this.estimatedPayloadSize; - } - inMemoryMap.put(key, value); + if (this.inMemoryMap.containsKey(key)) { + this.inMemoryMap.put(key, value); + } else if (this.currentInMemoryMapSize < this.maxInMemorySizeInBytes) { + this.currentInMemoryMapSize += this.estimatedPayloadSize; + this.inMemoryMap.put(key, value); } else { getDiskBasedMap().put(key, value); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index fada3256fed32..9920ff2d5ed5e 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -692,7 +692,7 @@ public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType d .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) .withReaderSchema(schema) .withLatestInstantTime("100") - .withMaxMemorySizeInBytes(1024L) + .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) @@ -783,7 +783,7 @@ public void testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap. .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) .withReaderSchema(schema) .withLatestInstantTime("100") - .withMaxMemorySizeInBytes(1024L) + .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE)