Skip to content

Commit

Permalink
Support shrink in ConcurrentLongHashMap (#14497)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2979419)
  • Loading branch information
lordcheng10 authored and codelipenghui committed Apr 29, 2022
1 parent e04c19e commit 025500a
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected Map<String, String> propertiesMap;
protected final MetaStore store;

final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
16 /* initial capacity */, 1 /* number of sections */);
final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache =
ConcurrentLongHashMap.<CompletableFuture<ReadHandle>>newBuilder()
.expectedItems(16) // initial capacity
.concurrencyLevel(1) // number of sections
.build();
protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
private volatile Stat ledgersStat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ public TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
this.tbClient = tbClient;
this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
this.transactionOpRetryTimer = timer;
this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
this.pendingConnectRequests = new ConcurrentLongHashMap<>();
this.tcLoadSemaphores = ConcurrentLongHashMap.<Semaphore>newBuilder().build();
this.pendingConnectRequests =
ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build();
this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,14 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
ServiceConfiguration conf = pulsar.getConfiguration();

// This maps are not heavily contended since most accesses are within the cnx thread
this.producers = new ConcurrentLongHashMap<>(8, 1);
this.consumers = new ConcurrentLongHashMap<>(8, 1);
this.producers = ConcurrentLongHashMap.<CompletableFuture<Producer>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.consumers = ConcurrentLongHashMap.<CompletableFuture<Consumer>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,28 @@ public class ClientCnx extends PulsarHandler {
private State state;

private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
// LookupRequests that waiting in client side.
private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;

private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers =
ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
private final long transactionCoordinatorId;
private final ConnectionHandler connectionHandler;
private final ConcurrentLongHashMap<OpBase<?>> pendingRequests =
new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap.<OpBase<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;

protected final Timer timer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC

private final PulsarClientImpl pulsarClient;
private TransactionMetaStoreHandler[] handlers;
private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = new ConcurrentLongHashMap<>(16, 1);
private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap =
ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final AtomicLong epoch = new AtomicLong(0);

private static final AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> STATE_UPDATER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,112 @@ public class ConcurrentLongHashMap<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();

private static final float MapFillFactor = 0.66f;

private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;

private static final float DefaultMapFillFactor = 0.66f;
private static final float DefaultMapIdleFactor = 0.15f;

private static final float DefaultExpandFactor = 2;
private static final float DefaultShrinkFactor = 2;

private static final boolean DefaultAutoShrink = false;

public static <V> Builder<V> newBuilder() {
return new Builder<>();
}

/**
* Builder of ConcurrentLongHashMap.
*/
public static class Builder<T> {
int expectedItems = DefaultExpectedItems;
int concurrencyLevel = DefaultConcurrencyLevel;
float mapFillFactor = DefaultMapFillFactor;
float mapIdleFactor = DefaultMapIdleFactor;
float expandFactor = DefaultExpandFactor;
float shrinkFactor = DefaultShrinkFactor;
boolean autoShrink = DefaultAutoShrink;

public Builder<T> expectedItems(int expectedItems) {
this.expectedItems = expectedItems;
return this;
}

public Builder<T> concurrencyLevel(int concurrencyLevel) {
this.concurrencyLevel = concurrencyLevel;
return this;
}

public Builder<T> mapFillFactor(float mapFillFactor) {
this.mapFillFactor = mapFillFactor;
return this;
}

public Builder<T> mapIdleFactor(float mapIdleFactor) {
this.mapIdleFactor = mapIdleFactor;
return this;
}

public Builder<T> expandFactor(float expandFactor) {
this.expandFactor = expandFactor;
return this;
}

public Builder<T> shrinkFactor(float shrinkFactor) {
this.shrinkFactor = shrinkFactor;
return this;
}

public Builder<T> autoShrink(boolean autoShrink) {
this.autoShrink = autoShrink;
return this;
}

public ConcurrentLongHashMap<T> build() {
return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel,
mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
}
}

private final Section<V>[] sections;

@Deprecated
public ConcurrentLongHashMap() {
this(DefaultExpectedItems);
}

@Deprecated
public ConcurrentLongHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}

@Deprecated
public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
}

public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel,
float mapFillFactor, float mapIdleFactor,
boolean autoShrink, float expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
checkArgument(mapFillFactor > mapIdleFactor);
checkArgument(expandFactor > 1);
checkArgument(shrinkFactor > 1);

int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
this.sections = (Section<V>[]) new Section[numSections];

for (int i = 0; i < numSections; i++) {
sections[i] = new Section<>(perSectionCapacity);
sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
autoShrink, expandFactor, shrinkFactor);
}
}

Expand Down Expand Up @@ -195,20 +274,35 @@ private static final class Section<V> extends StampedLock {
private volatile V[] values;

private volatile int capacity;
private final int initCapacity;
private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");

private volatile int size;
private int usedBuckets;
private int resizeThreshold;

Section(int capacity) {
private int resizeThresholdUp;
private int resizeThresholdBelow;
private final float mapFillFactor;
private final float mapIdleFactor;
private final float expandFactor;
private final float shrinkFactor;
private final boolean autoShrink;

Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
this.initCapacity = this.capacity;
this.keys = new long[this.capacity];
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.resizeThreshold = (int) (this.capacity * MapFillFactor);
this.autoShrink = autoShrink;
this.mapFillFactor = mapFillFactor;
this.mapIdleFactor = mapIdleFactor;
this.expandFactor = expandFactor;
this.shrinkFactor = shrinkFactor;
this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}

V get(long key, int keyHash) {
Expand Down Expand Up @@ -322,9 +416,10 @@ V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valu
++bucket;
}
} finally {
if (usedBuckets >= resizeThreshold) {
if (usedBuckets > resizeThresholdUp) {
try {
rehash();
int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
Expand Down Expand Up @@ -373,7 +468,20 @@ private V remove(long key, Object value, int keyHash) {
}

} finally {
unlockWrite(stamp);
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
rehash(newCapacity);
}
} finally {
unlockWrite(stamp);
}
} else {
unlockWrite(stamp);
}
}
}

Expand All @@ -385,6 +493,9 @@ void clear() {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
}
} finally {
unlockWrite(stamp);
}
Expand Down Expand Up @@ -439,9 +550,8 @@ public void forEach(EntryProcessor<V> processor) {
}
}

private void rehash() {
private void rehash(int newCapacity) {
// Expand the hashmap
int newCapacity = capacity * 2;
long[] newKeys = new long[newCapacity];
V[] newValues = (V[]) new Object[newCapacity];

Expand All @@ -458,7 +568,8 @@ private void rehash() {
values = newValues;
capacity = newCapacity;
usedBuckets = size;
resizeThreshold = (int) (capacity * MapFillFactor);
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
Expand Down
Loading

0 comments on commit 025500a

Please sign in to comment.