From 241320fd891945fee88470935ee8ea8e616f7ab1 Mon Sep 17 00:00:00 2001 From: lin chen <1572139390@qq.com> Date: Tue, 1 Mar 2022 21:16:52 +0800 Subject: [PATCH] Support shrink in ConcurrentLongHashMap (#14497) (cherry picked from commit 297941964ed739e35ca68aa46d74410cf112b7bc) --- .../mledger/impl/ManagedLedgerImpl.java | 7 +- .../pulsar/broker/service/ServerCnx.java | 10 +- .../apache/pulsar/client/impl/ClientCnx.java | 23 ++- .../impl/TransactionMetaStoreHandler.java | 5 +- .../TransactionCoordinatorClientImpl.java | 6 +- .../collections/ConcurrentLongHashMap.java | 139 ++++++++++++++++-- .../ConcurrentLongHashMapTest.java | 122 +++++++++++++-- 7 files changed, 272 insertions(+), 40 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index dc448ca4f453a..deed473d41e1a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -152,8 +152,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected Map propertiesMap; protected final MetaStore store; - final ConcurrentLongHashMap> ledgerCache = new ConcurrentLongHashMap<>( - 16 /* initial capacity */, 1 /* number of sections */); + final ConcurrentLongHashMap> ledgerCache = + ConcurrentLongHashMap.>newBuilder() + .expectedItems(16) // initial capacity + .concurrencyLevel(1) // number of sections + .build(); protected final NavigableMap ledgers = new ConcurrentSkipListMap<>(); private volatile Stat ledgersStat; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1a0be5ea9ce07..a81652ec3c9fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -229,8 +229,14 @@ public ServerCnx(PulsarService pulsar) { ServiceConfiguration conf = pulsar.getConfiguration(); // This maps are not heavily contended since most accesses are within the cnx thread - this.producers = new ConcurrentLongHashMap<>(8, 1); - this.consumers = new ConcurrentLongHashMap<>(8, 1); + this.producers = ConcurrentLongHashMap.>newBuilder() + .expectedItems(8) + .concurrencyLevel(1) + .build(); + this.consumers = ConcurrentLongHashMap.>newBuilder() + .expectedItems(8) + .concurrencyLevel(1) + .build(); this.replicatorPrefix = conf.getReplicatorPrefix(); this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection(); this.proxyRoles = conf.getProxyRoles(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index fa24286b1755a..e8caecf188964 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -105,13 +105,28 @@ public class ClientCnx extends PulsarHandler { private State state; private final ConcurrentLongHashMap> pendingRequests = - new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap.>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); // LookupRequests that waiting in client side. private final Queue>>> waitingLookupRequests; - private final ConcurrentLongHashMap> producers = new ConcurrentLongHashMap<>(16, 1); - private final ConcurrentLongHashMap> consumers = new ConcurrentLongHashMap<>(16, 1); - private final ConcurrentLongHashMap transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap> producers = + ConcurrentLongHashMap.>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); + private final ConcurrentLongHashMap> consumers = + ConcurrentLongHashMap.>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); + private final ConcurrentLongHashMap transactionMetaStoreHandlers = + ConcurrentLongHashMap.newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); private final CompletableFuture connectionFuture = new CompletableFuture(); private final ConcurrentLinkedQueue requestTimeoutQueue = new ConcurrentLinkedQueue<>(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 05a5cae04248c..4cb89e53b7aa1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -58,7 +58,10 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect private final long transactionCoordinatorId; private final ConnectionHandler connectionHandler; private final ConcurrentLongHashMap> pendingRequests = - new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap.>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); private final ConcurrentLinkedQueue timeoutQueue; private static class RequestTime { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 8db80545ad257..e8baec784a7b1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -52,7 +52,11 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC private final PulsarClientImpl pulsarClient; private TransactionMetaStoreHandler[] handlers; - private ConcurrentLongHashMap handlerMap = new ConcurrentLongHashMap<>(16, 1); + private ConcurrentLongHashMap handlerMap = + ConcurrentLongHashMap.newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); private final AtomicLong epoch = new AtomicLong(0); private static final AtomicReferenceFieldUpdater STATE_UPDATER = diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java index cd285221bc862..a4779357a440b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java @@ -44,33 +44,112 @@ public class ConcurrentLongHashMap { private static final Object EmptyValue = null; private static final Object DeletedValue = new Object(); - private static final float MapFillFactor = 0.66f; - private static final int DefaultExpectedItems = 256; private static final int DefaultConcurrencyLevel = 16; + private static final float DefaultMapFillFactor = 0.66f; + private static final float DefaultMapIdleFactor = 0.15f; + + private static final float DefaultExpandFactor = 2; + private static final float DefaultShrinkFactor = 2; + + private static final boolean DefaultAutoShrink = false; + + public static Builder newBuilder() { + return new Builder<>(); + } + + /** + * Builder of ConcurrentLongHashMap. + */ + public static class Builder { + int expectedItems = DefaultExpectedItems; + int concurrencyLevel = DefaultConcurrencyLevel; + float mapFillFactor = DefaultMapFillFactor; + float mapIdleFactor = DefaultMapIdleFactor; + float expandFactor = DefaultExpandFactor; + float shrinkFactor = DefaultShrinkFactor; + boolean autoShrink = DefaultAutoShrink; + + public Builder expectedItems(int expectedItems) { + this.expectedItems = expectedItems; + return this; + } + + public Builder concurrencyLevel(int concurrencyLevel) { + this.concurrencyLevel = concurrencyLevel; + return this; + } + + public Builder mapFillFactor(float mapFillFactor) { + this.mapFillFactor = mapFillFactor; + return this; + } + + public Builder mapIdleFactor(float mapIdleFactor) { + this.mapIdleFactor = mapIdleFactor; + return this; + } + + public Builder expandFactor(float expandFactor) { + this.expandFactor = expandFactor; + return this; + } + + public Builder shrinkFactor(float shrinkFactor) { + this.shrinkFactor = shrinkFactor; + return this; + } + + public Builder autoShrink(boolean autoShrink) { + this.autoShrink = autoShrink; + return this; + } + + public ConcurrentLongHashMap build() { + return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel, + mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor); + } + } + private final Section[] sections; + @Deprecated public ConcurrentLongHashMap() { this(DefaultExpectedItems); } + @Deprecated public ConcurrentLongHashMap(int expectedItems) { this(expectedItems, DefaultConcurrencyLevel); } + @Deprecated public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) { + this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor, + DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor); + } + + public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel, + float mapFillFactor, float mapIdleFactor, + boolean autoShrink, float expandFactor, float shrinkFactor) { checkArgument(expectedItems > 0); checkArgument(concurrencyLevel > 0); checkArgument(expectedItems >= concurrencyLevel); + checkArgument(mapFillFactor > 0 && mapFillFactor < 1); + checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1); + checkArgument(mapFillFactor > mapIdleFactor); + checkArgument(expandFactor > 1); + checkArgument(shrinkFactor > 1); int numSections = concurrencyLevel; int perSectionExpectedItems = expectedItems / numSections; - int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor); + int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor); this.sections = (Section[]) new Section[numSections]; for (int i = 0; i < numSections; i++) { - sections[i] = new Section<>(perSectionCapacity); + sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor, + autoShrink, expandFactor, shrinkFactor); } } @@ -195,20 +274,35 @@ private static final class Section extends StampedLock { private volatile V[] values; private volatile int capacity; + private final int initCapacity; private static final AtomicIntegerFieldUpdater
SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Section.class, "size"); private volatile int size; private int usedBuckets; - private int resizeThreshold; - - Section(int capacity) { + private int resizeThresholdUp; + private int resizeThresholdBelow; + private final float mapFillFactor; + private final float mapIdleFactor; + private final float expandFactor; + private final float shrinkFactor; + private final boolean autoShrink; + + Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink, + float expandFactor, float shrinkFactor) { this.capacity = alignToPowerOfTwo(capacity); + this.initCapacity = this.capacity; this.keys = new long[this.capacity]; this.values = (V[]) new Object[this.capacity]; this.size = 0; this.usedBuckets = 0; - this.resizeThreshold = (int) (this.capacity * MapFillFactor); + this.autoShrink = autoShrink; + this.mapFillFactor = mapFillFactor; + this.mapIdleFactor = mapIdleFactor; + this.expandFactor = expandFactor; + this.shrinkFactor = shrinkFactor; + this.resizeThresholdUp = (int) (this.capacity * mapFillFactor); + this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor); } V get(long key, int keyHash) { @@ -322,9 +416,10 @@ V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction valu ++bucket; } } finally { - if (usedBuckets >= resizeThreshold) { + if (usedBuckets > resizeThresholdUp) { try { - rehash(); + int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor)); + rehash(newCapacity); } finally { unlockWrite(stamp); } @@ -373,7 +468,20 @@ private V remove(long key, Object value, int keyHash) { } } finally { - unlockWrite(stamp); + if (autoShrink && size < resizeThresholdBelow) { + try { + int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor)); + int newResizeThresholdUp = (int) (newCapacity * mapFillFactor); + if (newCapacity < capacity && newResizeThresholdUp > size) { + // shrink the hashmap + rehash(newCapacity); + } + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } } } @@ -385,6 +493,9 @@ void clear() { Arrays.fill(values, EmptyValue); this.size = 0; this.usedBuckets = 0; + if (autoShrink) { + rehash(initCapacity); + } } finally { unlockWrite(stamp); } @@ -439,9 +550,8 @@ public void forEach(EntryProcessor processor) { } } - private void rehash() { + private void rehash(int newCapacity) { // Expand the hashmap - int newCapacity = capacity * 2; long[] newKeys = new long[newCapacity]; V[] newValues = (V[]) new Object[newCapacity]; @@ -458,7 +568,8 @@ private void rehash() { values = newValues; capacity = newCapacity; usedBuckets = size; - resizeThreshold = (int) (capacity * MapFillFactor); + resizeThresholdUp = (int) (capacity * mapFillFactor); + resizeThresholdBelow = (int) (capacity * mapIdleFactor); } private static void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java index 14d8395ae8c8a..6cf126cf2ff15 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java @@ -48,21 +48,29 @@ public class ConcurrentLongHashMapTest { @Test public void testConstructor() { try { - new ConcurrentLongHashMap(0); + ConcurrentLongHashMap.newBuilder() + .expectedItems(0) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok } try { - new ConcurrentLongHashMap(16, 0); + ConcurrentLongHashMap.newBuilder() + .expectedItems(16) + .concurrencyLevel(0) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok } try { - new ConcurrentLongHashMap(4, 8); + ConcurrentLongHashMap.newBuilder() + .expectedItems(4) + .concurrencyLevel(8) + .build(); fail("should have thrown exception"); } catch (IllegalArgumentException e) { // ok @@ -71,7 +79,9 @@ public void testConstructor() { @Test public void simpleInsertions() { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(16) + .build(); assertTrue(map.isEmpty()); assertNull(map.put(1, "one")); @@ -97,9 +107,64 @@ public void simpleInsertions() { assertEquals(map.size(), 3); } + @Test + public void testClear() { + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertNull(map.put(1, "v1")); + assertNull(map.put(2, "v2")); + assertNull(map.put(3, "v3")); + + assertTrue(map.capacity() == 8); + map.clear(); + assertTrue(map.capacity() == 4); + } + + @Test + public void testExpandAndShrink() { + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertTrue(map.capacity() == 4); + + assertNull(map.put(1, "v1")); + assertNull(map.put(2, "v2")); + assertNull(map.put(3, "v3")); + + // expand hashmap + assertTrue(map.capacity() == 8); + + assertTrue(map.remove(1, "v1")); + // not shrink + assertTrue(map.capacity() == 8); + assertTrue(map.remove(2, "v2")); + // shrink hashmap + assertTrue(map.capacity() == 4); + + // expand hashmap + assertNull(map.put(4, "v4")); + assertNull(map.put(5, "v5")); + assertTrue(map.capacity() == 8); + + //verify that the map does not keep shrinking at every remove() operation + assertNull(map.put(6, "v6")); + assertTrue(map.remove(6, "v6")); + assertTrue(map.capacity() == 8); + } + @Test public void testRemove() { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .build(); assertTrue(map.isEmpty()); assertNull(map.put(1, "one")); @@ -115,7 +180,10 @@ public void testRemove() { @Test public void testNegativeUsedBucketCount() { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); map.put(0, "zero"); assertEquals(1, map.getUsedBucketCount()); @@ -130,7 +198,10 @@ public void testNegativeUsedBucketCount() { @Test public void testRehashing() { int n = 16; - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(n / 2, 1); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(map.capacity(), n); assertEquals(map.size(), 0); @@ -145,7 +216,10 @@ public void testRehashing() { @Test public void testRehashingWithDeletes() { int n = 16; - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(n / 2, 1); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(n / 2) + .concurrencyLevel(1) + .build(); assertEquals(map.capacity(), n); assertEquals(map.size(), 0); @@ -167,7 +241,8 @@ public void testRehashingWithDeletes() { @Test public void concurrentInsertions() throws Throwable { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -201,7 +276,8 @@ public void concurrentInsertions() throws Throwable { @Test public void concurrentInsertionsAndReads() throws Throwable { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); @@ -235,7 +311,10 @@ public void concurrentInsertionsAndReads() throws Throwable { @Test public void stressConcurrentInsertionsAndReads() throws Throwable { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(4, 1); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(4) + .concurrencyLevel(1) + .build(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int writeThreads = 16; @@ -286,7 +365,8 @@ public void stressConcurrentInsertionsAndReads() throws Throwable { @Test public void testIteration() { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .build(); assertEquals(map.keys(), Collections.emptyList()); assertEquals(map.values(), Collections.emptyList()); @@ -330,7 +410,10 @@ public void testIteration() { @Test public void testHashConflictWithDeletion() { final int Buckets = 16; - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(Buckets, 1); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(Buckets) + .concurrencyLevel(1) + .build(); // Pick 2 keys that fall into the same bucket long key1 = 1; @@ -363,7 +446,8 @@ public void testHashConflictWithDeletion() { @Test public void testPutIfAbsent() { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .build(); assertNull(map.putIfAbsent(1, "one")); assertEquals(map.get(1), "one"); @@ -373,7 +457,10 @@ public void testPutIfAbsent() { @Test public void testComputeIfAbsent() { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16, 1); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); AtomicInteger counter = new AtomicInteger(); LongFunction provider = key -> counter.getAndIncrement(); @@ -395,7 +482,10 @@ public void testComputeIfAbsent() { static final int N = 100_000; public void benchConcurrentLongHashMap() throws Exception { - ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(N, 1); + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(N) + .concurrencyLevel(1) + .build(); for (long i = 0; i < Iterations; i++) { for (int j = 0; j < N; j++) {