Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ public R put(T key, R value) {
// Note, the converter may over estimate the size of a record in the JVM
this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value);
LOG.info("Estimated Payload size => " + estimatedPayloadSize);
} else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
} else if (shouldEstimatePayloadSize && !inMemoryMap.isEmpty()
&& (inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0)) {
// Re-estimate the size of a record by calculating the size of the entire map containing
// N entries and then dividing by the number of entries present (N). This helps to get a
// correct estimation of the size of each record in the JVM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType, boole
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
assert (recordKeys.size() == 100);

// Test iterator
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
int cntSize = 0;
Expand All @@ -93,7 +94,7 @@ public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType, boole
assert recordKeys.contains(rec.getRecordKey());
}
assertEquals(recordKeys.size(), cntSize);

// Test value stream
List<HoodieRecord<? extends HoodieRecordPayload>> values = records.valueStream().collect(Collectors.toList());
cntSize = 0;
Expand Down Expand Up @@ -221,7 +222,9 @@ failureOutputPath, new DefaultSizeEstimator(),

@ParameterizedTest
@MethodSource("testArguments")
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException {
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws IOException,
URISyntaxException {

Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());

Expand Down Expand Up @@ -274,7 +277,9 @@ record = records.get(key);

@ParameterizedTest
@MethodSource("testArguments")
public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException {
public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws IOException,
URISyntaxException {

Schema schema = SchemaTestUtil.getSimpleSchema();

Expand Down Expand Up @@ -337,9 +342,34 @@ record = records.get(key);
assertEquals(gRecord.get(fieldName).toString(), newValue);
}

// TODO : come up with a performance eval test for spillableMap
@Test
public void testLargeInsertUpsert() {}
public void testEstimationWithEmptyMap() throws IOException, URISyntaxException {
final ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.BITCASK;
final boolean isCompressionEnabled = false;
final Schema schema = SchemaTestUtil.getSimpleSchema();

ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled);

List<String> recordKeys = new ArrayList<>();

// Put a single record. Payload size estimation happens as part of this initial put.
HoodieRecord seedRecord = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
records.put(seedRecord.getRecordKey(), seedRecord);

// Remove the key immediately to make the map empty again.
records.remove(seedRecord.getRecordKey());

// Verify payload size re-estimation does not throw exception
List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
hoodieRecords.stream().forEach(hoodieRecord -> {
assertDoesNotThrow(() -> {
records.put(hoodieRecord.getRecordKey(), hoodieRecord);
}, "ExternalSpillableMap put() should not throw exception!");
recordKeys.add(hoodieRecord.getRecordKey());
});
}

private static Stream<Arguments> testArguments() {
// Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap
Expand Down