Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support shrink in ConcurrentLongHashMap (apache#14497)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2979419)
lordcheng10 authored and codelipenghui committed Apr 28, 2022
1 parent 58d074d commit 241320f
Showing 7 changed files with 272 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -152,8 +152,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected Map<String, String> propertiesMap;
protected final MetaStore store;

final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
16 /* initial capacity */, 1 /* number of sections */);
final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache =
ConcurrentLongHashMap.<CompletableFuture<ReadHandle>>newBuilder()
.expectedItems(16) // initial capacity
.concurrencyLevel(1) // number of sections
.build();
protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
private volatile Stat ledgersStat;

Original file line number Diff line number Diff line change
@@ -229,8 +229,14 @@ public ServerCnx(PulsarService pulsar) {
ServiceConfiguration conf = pulsar.getConfiguration();

// This maps are not heavily contended since most accesses are within the cnx thread
this.producers = new ConcurrentLongHashMap<>(8, 1);
this.consumers = new ConcurrentLongHashMap<>(8, 1);
this.producers = ConcurrentLongHashMap.<CompletableFuture<Producer>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.consumers = ConcurrentLongHashMap.<CompletableFuture<Consumer>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
Original file line number Diff line number Diff line change
@@ -105,13 +105,28 @@ public class ClientCnx extends PulsarHandler {
private State state;

private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
// LookupRequests that waiting in client side.
private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;

private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers =
ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
Original file line number Diff line number Diff line change
@@ -58,7 +58,10 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
private final long transactionCoordinatorId;
private final ConnectionHandler connectionHandler;
private final ConcurrentLongHashMap<OpBase<?>> pendingRequests =
new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap.<OpBase<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;

private static class RequestTime {
Original file line number Diff line number Diff line change
@@ -52,7 +52,11 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC

private final PulsarClientImpl pulsarClient;
private TransactionMetaStoreHandler[] handlers;
private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = new ConcurrentLongHashMap<>(16, 1);
private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap =
ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final AtomicLong epoch = new AtomicLong(0);

private static final AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> STATE_UPDATER =
Original file line number Diff line number Diff line change
@@ -44,33 +44,112 @@ public class ConcurrentLongHashMap<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();

private static final float MapFillFactor = 0.66f;

private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;

private static final float DefaultMapFillFactor = 0.66f;
private static final float DefaultMapIdleFactor = 0.15f;

private static final float DefaultExpandFactor = 2;
private static final float DefaultShrinkFactor = 2;

private static final boolean DefaultAutoShrink = false;

public static <V> Builder<V> newBuilder() {
return new Builder<>();
}

/**
* Builder of ConcurrentLongHashMap.
*/
public static class Builder<T> {
int expectedItems = DefaultExpectedItems;
int concurrencyLevel = DefaultConcurrencyLevel;
float mapFillFactor = DefaultMapFillFactor;
float mapIdleFactor = DefaultMapIdleFactor;
float expandFactor = DefaultExpandFactor;
float shrinkFactor = DefaultShrinkFactor;
boolean autoShrink = DefaultAutoShrink;

public Builder<T> expectedItems(int expectedItems) {
this.expectedItems = expectedItems;
return this;
}

public Builder<T> concurrencyLevel(int concurrencyLevel) {
this.concurrencyLevel = concurrencyLevel;
return this;
}

public Builder<T> mapFillFactor(float mapFillFactor) {
this.mapFillFactor = mapFillFactor;
return this;
}

public Builder<T> mapIdleFactor(float mapIdleFactor) {
this.mapIdleFactor = mapIdleFactor;
return this;
}

public Builder<T> expandFactor(float expandFactor) {
this.expandFactor = expandFactor;
return this;
}

public Builder<T> shrinkFactor(float shrinkFactor) {
this.shrinkFactor = shrinkFactor;
return this;
}

public Builder<T> autoShrink(boolean autoShrink) {
this.autoShrink = autoShrink;
return this;
}

public ConcurrentLongHashMap<T> build() {
return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel,
mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
}
}

private final Section<V>[] sections;

@Deprecated
public ConcurrentLongHashMap() {
this(DefaultExpectedItems);
}

@Deprecated
public ConcurrentLongHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}

@Deprecated
public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
}

public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel,
float mapFillFactor, float mapIdleFactor,
boolean autoShrink, float expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
checkArgument(mapFillFactor > mapIdleFactor);
checkArgument(expandFactor > 1);
checkArgument(shrinkFactor > 1);

int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
this.sections = (Section<V>[]) new Section[numSections];

for (int i = 0; i < numSections; i++) {
sections[i] = new Section<>(perSectionCapacity);
sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
autoShrink, expandFactor, shrinkFactor);
}
}

@@ -195,20 +274,35 @@ private static final class Section<V> extends StampedLock {
private volatile V[] values;

private volatile int capacity;
private final int initCapacity;
private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");

private volatile int size;
private int usedBuckets;
private int resizeThreshold;

Section(int capacity) {
private int resizeThresholdUp;
private int resizeThresholdBelow;
private final float mapFillFactor;
private final float mapIdleFactor;
private final float expandFactor;
private final float shrinkFactor;
private final boolean autoShrink;

Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
this.initCapacity = this.capacity;
this.keys = new long[this.capacity];
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.resizeThreshold = (int) (this.capacity * MapFillFactor);
this.autoShrink = autoShrink;
this.mapFillFactor = mapFillFactor;
this.mapIdleFactor = mapIdleFactor;
this.expandFactor = expandFactor;
this.shrinkFactor = shrinkFactor;
this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}

V get(long key, int keyHash) {
@@ -322,9 +416,10 @@ V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valu
++bucket;
}
} finally {
if (usedBuckets >= resizeThreshold) {
if (usedBuckets > resizeThresholdUp) {
try {
rehash();
int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -373,7 +468,20 @@ private V remove(long key, Object value, int keyHash) {
}

} finally {
unlockWrite(stamp);
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
rehash(newCapacity);
}
} finally {
unlockWrite(stamp);
}
} else {
unlockWrite(stamp);
}
}
}

@@ -385,6 +493,9 @@ void clear() {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
}
} finally {
unlockWrite(stamp);
}
@@ -439,9 +550,8 @@ public void forEach(EntryProcessor<V> processor) {
}
}

private void rehash() {
private void rehash(int newCapacity) {
// Expand the hashmap
int newCapacity = capacity * 2;
long[] newKeys = new long[newCapacity];
V[] newValues = (V[]) new Object[newCapacity];

@@ -458,7 +568,8 @@ private void rehash() {
values = newValues;
capacity = newCapacity;
usedBuckets = size;
resizeThreshold = (int) (capacity * MapFillFactor);
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
Original file line number Diff line number Diff line change
@@ -48,21 +48,29 @@ public class ConcurrentLongHashMapTest {
@Test
public void testConstructor() {
try {
new ConcurrentLongHashMap<String>(0);
ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(0)
.build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}

try {
new ConcurrentLongHashMap<String>(16, 0);
ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(16)
.concurrencyLevel(0)
.build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}

try {
new ConcurrentLongHashMap<String>(4, 8);
ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(4)
.concurrencyLevel(8)
.build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -71,7 +79,9 @@ public void testConstructor() {

@Test
public void simpleInsertions() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(16)
.build();

assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
@@ -97,9 +107,64 @@ public void simpleInsertions() {
assertEquals(map.size(), 3);
}

@Test
public void testClear() {
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(2)
.concurrencyLevel(1)
.autoShrink(true)
.mapIdleFactor(0.25f)
.build();
assertTrue(map.capacity() == 4);

assertNull(map.put(1, "v1"));
assertNull(map.put(2, "v2"));
assertNull(map.put(3, "v3"));

assertTrue(map.capacity() == 8);
map.clear();
assertTrue(map.capacity() == 4);
}

@Test
public void testExpandAndShrink() {
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(2)
.concurrencyLevel(1)
.autoShrink(true)
.mapIdleFactor(0.25f)
.build();
assertTrue(map.capacity() == 4);

assertNull(map.put(1, "v1"));
assertNull(map.put(2, "v2"));
assertNull(map.put(3, "v3"));

// expand hashmap
assertTrue(map.capacity() == 8);

assertTrue(map.remove(1, "v1"));
// not shrink
assertTrue(map.capacity() == 8);
assertTrue(map.remove(2, "v2"));
// shrink hashmap
assertTrue(map.capacity() == 4);

// expand hashmap
assertNull(map.put(4, "v4"));
assertNull(map.put(5, "v5"));
assertTrue(map.capacity() == 8);

//verify that the map does not keep shrinking at every remove() operation
assertNull(map.put(6, "v6"));
assertTrue(map.remove(6, "v6"));
assertTrue(map.capacity() == 8);
}

@Test
public void testRemove() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.build();

assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
@@ -115,7 +180,10 @@ public void testRemove() {

@Test
public void testNegativeUsedBucketCount() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

map.put(0, "zero");
assertEquals(1, map.getUsedBucketCount());
@@ -130,7 +198,10 @@ public void testNegativeUsedBucketCount() {
@Test
public void testRehashing() {
int n = 16;
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
.expectedItems(n / 2)
.concurrencyLevel(1)
.build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);

@@ -145,7 +216,10 @@ public void testRehashing() {
@Test
public void testRehashingWithDeletes() {
int n = 16;
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
.expectedItems(n / 2)
.concurrencyLevel(1)
.build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);

@@ -167,7 +241,8 @@ public void testRehashingWithDeletes() {

@Test
public void concurrentInsertions() throws Throwable {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();

@@ -201,7 +276,8 @@ public void concurrentInsertions() throws Throwable {

@Test
public void concurrentInsertionsAndReads() throws Throwable {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();

@@ -235,7 +311,10 @@ public void concurrentInsertionsAndReads() throws Throwable {

@Test
public void stressConcurrentInsertionsAndReads() throws Throwable {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(4)
.concurrencyLevel(1)
.build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final int writeThreads = 16;
@@ -286,7 +365,8 @@ public void stressConcurrentInsertionsAndReads() throws Throwable {

@Test
public void testIteration() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.build();

assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
@@ -330,7 +410,10 @@ public void testIteration() {
@Test
public void testHashConflictWithDeletion() {
final int Buckets = 16;
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(Buckets)
.concurrencyLevel(1)
.build();

// Pick 2 keys that fall into the same bucket
long key1 = 1;
@@ -363,7 +446,8 @@ public void testHashConflictWithDeletion() {

@Test
public void testPutIfAbsent() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.build();
assertNull(map.putIfAbsent(1, "one"));
assertEquals(map.get(1), "one");

@@ -373,7 +457,10 @@ public void testPutIfAbsent() {

@Test
public void testComputeIfAbsent() {
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
AtomicInteger counter = new AtomicInteger();
LongFunction<Integer> provider = key -> counter.getAndIncrement();

@@ -395,7 +482,10 @@ public void testComputeIfAbsent() {
static final int N = 100_000;

public void benchConcurrentLongHashMap() throws Exception {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(N)
.concurrencyLevel(1)
.build();

for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {

0 comments on commit 241320f

Please sign in to comment.