Skip to content

Commit

Permalink
GEODE-10401: Configurable .drf recovery HashMap overflow threshold (#…
Browse files Browse the repository at this point in the history
…7828)

Configurable with the jvm parameter:

gemfire.disk.drfHashMapOverflowThreshold

Default value: 805306368

When configured threshold value is reached, then server will overflow to
the new hashmap during the recovery of .drf files. Warning: If you set
threshold parameter over 805306368, then uneeded delay will happen due
to bug in fastutil dependency.
  • Loading branch information
jvarenina authored and Mario Kevo committed Sep 16, 2022
1 parent bc47b30 commit d1910cf
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,24 @@ public class DiskStoreImpl implements DiskStore {
public static final String RECOVER_VALUES_SYNC_PROPERTY_NAME =
GeodeGlossary.GEMFIRE_PREFIX + "disk.recoverValuesSync";

/**
* When configured threshold value is reached, then server will overflow to
* the new hashmap during the recovery of .drf files
*/
public static final String DRF_HASHMAP_OVERFLOW_THRESHOLD_NAME =
GeodeGlossary.GEMFIRE_PREFIX + "disk.drfHashMapOverflowThreshold";

/**
* Allows recovering values for LRU regions. By default values are not recovered for LRU regions
* during recovery.
*/
public static final String RECOVER_LRU_VALUES_PROPERTY_NAME =
GeodeGlossary.GEMFIRE_PREFIX + "disk.recoverLruValues";

static final long DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT = 805306368;
static final long DRF_HASHMAP_OVERFLOW_THRESHOLD =
Long.getLong(DRF_HASHMAP_OVERFLOW_THRESHOLD_NAME, DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT);

boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true);

boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false);
Expand Down Expand Up @@ -3525,31 +3536,49 @@ public void add(long id) {
}

try {
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
currentInts.get().add((int) id);
if (shouldOverflow(id)) {
overflowToNewHashMap(id);
} else {
currentLongs.get().add(id);
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
this.currentInts.get().add((int) id);
} else {
this.currentLongs.get().add(id);
}
}
} catch (IllegalArgumentException illegalArgumentException) {
// See GEODE-8029.
// Too many entries on the accumulated drf files, overflow and continue.
// Too many entries on the accumulated drf files, overflow next [Int|Long]OpenHashSet and
// continue.
overflowToNewHashMap(id);
}
}

boolean shouldOverflow(final long id) {
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
return currentInts.get().size() == DRF_HASHMAP_OVERFLOW_THRESHOLD;
} else {
return currentLongs.get().size() == DRF_HASHMAP_OVERFLOW_THRESHOLD;
}
}

void overflowToNewHashMap(final long id) {
if (DRF_HASHMAP_OVERFLOW_THRESHOLD == DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT) {
logger.warn(
"There is a large number of deleted entries within the disk-store, please execute an offline compaction.");
}

// Overflow to the next [Int|Long]OpenHashSet and continue.
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID);
allInts.add(overflownHashSet);
currentInts.set(overflownHashSet);
if (id > 0 && id <= 0x00000000FFFFFFFFL) {
IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID);
allInts.add(overflownHashSet);
currentInts.set(overflownHashSet);

currentInts.get().add((int) id);
} else {
LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID);
allLongs.add(overflownHashSet);
currentLongs.set(overflownHashSet);
currentInts.get().add((int) id);
} else {
LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID);
allLongs.add(overflownHashSet);
currentLongs.set(overflownHashSet);

currentLongs.get().add(id);
}
currentLongs.get().add(id);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetSystemProperty;

import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet;

/**
* Tests DiskStoreImpl.OplogEntryIdSet
*/
public class OplogEntryIdSetDrfHashSetThresholdTest {
@Test
@SetSystemProperty(key = "gemfire.disk.drfHashMapOverflowThreshold", value = "10")
public void addMethodOverflowBasedOnDrfOverflowThresholdParameters() {

int testEntries = 41;
IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
LongOpenHashSet longOpenHashSet = new LongOpenHashSet();

List<IntOpenHashSet> intOpenHashSets =
new ArrayList<>(Collections.singletonList(intOpenHashSet));
List<LongOpenHashSet> longOpenHashSets =
new ArrayList<>(Collections.singletonList(longOpenHashSet));

OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intOpenHashSets, longOpenHashSets);
IntStream.range(1, testEntries).forEach(oplogEntryIdSet::add);
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
.forEach(oplogEntryIdSet::add);

assertThat(intOpenHashSets).hasSize(4);
assertThat(longOpenHashSets).hasSize(4);

IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
.forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());

}
}

0 comments on commit d1910cf

Please sign in to comment.