Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support shrink in ConcurrentLongHashMap #14497

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected Map<String, String> propertiesMap;
protected final MetaStore store;

final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
16 /* initial capacity */, 1 /* number of sections */);
final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache =
ConcurrentLongHashMap.<CompletableFuture<ReadHandle>>newBuilder()
.expectedItems(16) // initial capacity
.concurrencyLevel(1) // number of sections
.build();
protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
private volatile Stat ledgersStat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ public TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
this.tbClient = tbClient;
this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
this.transactionOpRetryTimer = timer;
this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
this.pendingConnectRequests = new ConcurrentLongHashMap<>();
this.tcLoadSemaphores = ConcurrentLongHashMap.<Semaphore>newBuilder().build();
this.pendingConnectRequests =
ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build();
this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,14 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
ServiceConfiguration conf = pulsar.getConfiguration();

// This maps are not heavily contended since most accesses are within the cnx thread
this.producers = new ConcurrentLongHashMap<>(8, 1);
this.consumers = new ConcurrentLongHashMap<>(8, 1);
this.producers = ConcurrentLongHashMap.<CompletableFuture<Producer>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.consumers = ConcurrentLongHashMap.<CompletableFuture<Consumer>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,28 @@ public class ClientCnx extends PulsarHandler {
private State state;

private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
// LookupRequests that waiting in client side.
private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;

private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers =
new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();

private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ public class TransactionMetaStoreHandler extends HandlerState
private final long transactionCoordinatorId;
private final ConnectionHandler connectionHandler;
private final ConcurrentLongHashMap<OpBase<?>> pendingRequests =
new ConcurrentLongHashMap<>(16, 1);
ConcurrentLongHashMap.<OpBase<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;

protected final Timer timer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC

private final PulsarClientImpl pulsarClient;
private TransactionMetaStoreHandler[] handlers;
private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = new ConcurrentLongHashMap<>(16, 1);
private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap =
ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
private final AtomicLong epoch = new AtomicLong(0);

private static final AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> STATE_UPDATER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,112 @@ public class ConcurrentLongHashMap<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();

private static final float MapFillFactor = 0.66f;

private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;

private static final float DefaultMapFillFactor = 0.66f;
private static final float DefaultMapIdleFactor = 0.15f;

private static final float DefaultExpandFactor = 2;
private static final float DefaultShrinkFactor = 2;

private static final boolean DefaultAutoShrink = false;

public static <V> Builder<V> newBuilder() {
return new Builder<>();
}

/**
* Builder of ConcurrentLongHashMap.
*/
public static class Builder<T> {
int expectedItems = DefaultExpectedItems;
int concurrencyLevel = DefaultConcurrencyLevel;
float mapFillFactor = DefaultMapFillFactor;
float mapIdleFactor = DefaultMapIdleFactor;
float expandFactor = DefaultExpandFactor;
float shrinkFactor = DefaultShrinkFactor;
boolean autoShrink = DefaultAutoShrink;

public Builder<T> expectedItems(int expectedItems) {
this.expectedItems = expectedItems;
return this;
}

public Builder<T> concurrencyLevel(int concurrencyLevel) {
this.concurrencyLevel = concurrencyLevel;
return this;
}

public Builder<T> mapFillFactor(float mapFillFactor) {
this.mapFillFactor = mapFillFactor;
return this;
}

public Builder<T> mapIdleFactor(float mapIdleFactor) {
this.mapIdleFactor = mapIdleFactor;
return this;
}

public Builder<T> expandFactor(float expandFactor) {
this.expandFactor = expandFactor;
return this;
}

public Builder<T> shrinkFactor(float shrinkFactor) {
this.shrinkFactor = shrinkFactor;
return this;
}

public Builder<T> autoShrink(boolean autoShrink) {
this.autoShrink = autoShrink;
return this;
}

public ConcurrentLongHashMap<T> build() {
return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel,
mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
}
}

private final Section<V>[] sections;

@Deprecated
public ConcurrentLongHashMap() {
this(DefaultExpectedItems);
}

@Deprecated
public ConcurrentLongHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}

@Deprecated
public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
}

public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel,
float mapFillFactor, float mapIdleFactor,
boolean autoShrink, float expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
checkArgument(mapFillFactor > mapIdleFactor);
checkArgument(expandFactor > 1);
checkArgument(shrinkFactor > 1);

int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
this.sections = (Section<V>[]) new Section[numSections];

for (int i = 0; i < numSections; i++) {
sections[i] = new Section<>(perSectionCapacity);
sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
autoShrink, expandFactor, shrinkFactor);
}
}

Expand Down Expand Up @@ -195,20 +274,35 @@ private static final class Section<V> extends StampedLock {
private volatile V[] values;

private volatile int capacity;
private final int initCapacity;
private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");

private volatile int size;
private int usedBuckets;
private int resizeThreshold;

Section(int capacity) {
private int resizeThresholdUp;
private int resizeThresholdBelow;
private final float mapFillFactor;
private final float mapIdleFactor;
private final float expandFactor;
private final float shrinkFactor;
private final boolean autoShrink;

Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
this.initCapacity = this.capacity;
this.keys = new long[this.capacity];
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.resizeThreshold = (int) (this.capacity * MapFillFactor);
this.autoShrink = autoShrink;
this.mapFillFactor = mapFillFactor;
this.mapIdleFactor = mapIdleFactor;
this.expandFactor = expandFactor;
this.shrinkFactor = shrinkFactor;
this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}

V get(long key, int keyHash) {
Expand Down Expand Up @@ -322,9 +416,10 @@ V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valu
++bucket;
}
} finally {
if (usedBuckets >= resizeThreshold) {
if (usedBuckets > resizeThresholdUp) {
try {
rehash();
int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
Expand Down Expand Up @@ -373,7 +468,20 @@ private V remove(long key, Object value, int keyHash) {
}

} finally {
unlockWrite(stamp);
if (autoShrink && size < resizeThresholdBelow) {
try {
int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
if (newCapacity < capacity && newResizeThresholdUp > size) {
// shrink the hashmap
rehash(newCapacity);
}
} finally {
unlockWrite(stamp);
}
} else {
unlockWrite(stamp);
}
}
}

Expand All @@ -385,6 +493,9 @@ void clear() {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
if (autoShrink) {
rehash(initCapacity);
}
} finally {
unlockWrite(stamp);
}
Expand Down Expand Up @@ -439,9 +550,8 @@ public void forEach(EntryProcessor<V> processor) {
}
}

private void rehash() {
private void rehash(int newCapacity) {
// Expand the hashmap
int newCapacity = capacity * 2;
long[] newKeys = new long[newCapacity];
V[] newValues = (V[]) new Object[newCapacity];

Expand All @@ -458,7 +568,8 @@ private void rehash() {
values = newValues;
capacity = newCapacity;
usedBuckets = size;
resizeThreshold = (int) (capacity * MapFillFactor);
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
Expand Down
Loading