Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, Si
}

public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator, DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException {
SizeEstimator<R> valueSizeEstimator, DiskMapType diskMapType,
boolean isCompressionEnabled) throws IOException {
this.inMemoryMap = new HashMap<>();
this.baseFilePath = baseFilePath;
this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
Expand All @@ -116,7 +117,7 @@ private DiskMap<T, R> getDiskBasedMap() {
break;
case BITCASK:
default:
diskBasedMap = new BitCaskDiskMap<>(baseFilePath, isCompressionEnabled);
diskBasedMap = new BitCaskDiskMap<>(baseFilePath, isCompressionEnabled);
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
Expand Down Expand Up @@ -204,19 +205,21 @@ public R get(Object key) {
public R put(T key, R value) {
if (this.currentInMemoryMapSize < maxInMemorySizeInBytes || inMemoryMap.containsKey(key)) {
if (shouldEstimatePayloadSize && estimatedPayloadSize == 0) {
// At first, use the sizeEstimate of a record being inserted into the spillable map.
// Note, the converter may over estimate the size of a record in the JVM
// At first, use the size estimate of a record being inserted into the Spillable map.
// Note, the converter may overestimate the size of a record in the JVM.
this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value);
LOG.info("Estimated Payload size => " + estimatedPayloadSize);
} else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
LOG.debug("Estimated Payload size => " + estimatedPayloadSize);
} else if (shouldEstimatePayloadSize
&& !inMemoryMap.isEmpty()
&& (inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0)) {
// Re-estimate the size of a record by calculating the size of the entire map containing
// N entries and then dividing by the number of entries present (N). This helps to get a
// correct estimation of the size of each record in the JVM.
long totalMapSize = ObjectSizeCalculator.getObjectSize(inMemoryMap);
this.currentInMemoryMapSize = totalMapSize;
this.estimatedPayloadSize = totalMapSize / inMemoryMap.size();
shouldEstimatePayloadSize = false;
LOG.info("New Estimated Payload size => " + this.estimatedPayloadSize);
LOG.debug("New Estimated Payload size => " + this.estimatedPayloadSize);
}
if (!inMemoryMap.containsKey(key)) {
// TODO : Add support for adjusting payloadSize for updates to the same key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType, boole
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
assert (recordKeys.size() == 100);

// Test iterator
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
int cntSize = 0;
Expand All @@ -93,7 +93,7 @@ public void simpleInsertTest(ExternalSpillableMap.DiskMapType diskMapType, boole
assert recordKeys.contains(rec.getRecordKey());
}
assertEquals(recordKeys.size(), cntSize);

// Test value stream
List<HoodieRecord<? extends HoodieRecordPayload>> values = records.valueStream().collect(Collectors.toList());
cntSize = 0;
Expand Down Expand Up @@ -221,7 +221,9 @@ failureOutputPath, new DefaultSizeEstimator(),

@ParameterizedTest
@MethodSource("testArguments")
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException {
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws IOException,
URISyntaxException {

Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());

Expand Down Expand Up @@ -274,7 +276,9 @@ record = records.get(key);

@ParameterizedTest
@MethodSource("testArguments")
public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException, URISyntaxException {
public void testDataCorrectnessWithoutHoodieMetadata(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws IOException,
URISyntaxException {

Schema schema = SchemaTestUtil.getSimpleSchema();

Expand Down Expand Up @@ -339,7 +343,42 @@ record = records.get(key);

// TODO : come up with a performance eval test for spillableMap
@Test
public void testLargeInsertUpsert() {}
public void testLargeInsertUpsert() {
}

@Test
public void testPayloadSizeEstimate() throws IOException, URISyntaxException {
final ExternalSpillableMap.DiskMapType diskMapType = ExternalSpillableMap.DiskMapType.BITCASK;
final boolean isCompressionEnabled = false;
final Schema schema = SchemaTestUtil.getSimpleSchema();

ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled);

List<String> recordKeys = new ArrayList<>();

// Put a single record. Payload size estimation happens as part of this initial put.
HoodieRecord seedRecord = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
records.put(seedRecord.getRecordKey(), seedRecord);

// Remove the key immediately to make the map empty again.
records.remove(seedRecord.getRecordKey());

// Payload size re-estimation should not happen as the map
// size has not reached the minimum size threshold for
// recalculation.
records.put(seedRecord.getRecordKey(), seedRecord);

// Put more records than the threshold to trigger payload size re-estimation
while (records.getDiskBasedMapNumEntries() < 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this test asserting? I don't see any conditions being tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, Map put operation throws exception and fails the test. To make it more clear on the expectation of the test, added the assertDoesNotThrow to the line where the exception will be thrown without the fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the lines 371-379. they are just trying to validate no error is thrown? in general, tests should have clear asserts. My suggestion would be to scope down this test to just testing the divide by zero scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the while block. But, I still need to put more than 100 entries to verify the payload size re-estimation code path. Updated the test. Please take one more look.

List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
hoodieRecords.stream().forEach(r -> {
records.put(r.getRecordKey(), r);
recordKeys.add(r.getRecordKey());
});
}
}

private static Stream<Arguments> testArguments() {
// Arguments : 1. Disk Map Type 2. isCompressionEnabled for BitCaskMap
Expand Down