Skip to content

Commit

Permalink
[ISSUE #8434] enhance DistroConsistencyServiceImpl listen/unListen me…
Browse files Browse the repository at this point in the history
…thod in concurrent condition (#8435)
  • Loading branch information
liqipeng authored May 23, 2022
1 parent 356aff1 commit b05f471
Showing 1 changed file with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,26 +322,36 @@ public boolean processSnapshot(DistroData distroData) {

@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
listeners.put(key, new ConcurrentLinkedQueue<>());
ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(key);
if (recordListeners == null) {
recordListeners = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<RecordListener> recordListenersExist;
if ((recordListenersExist = listeners.putIfAbsent(key, recordListeners)) != null) {
recordListeners = recordListenersExist;
}
}

if (listeners.get(key).contains(listener)) {
return;

if (!recordListeners.contains(listener)) {
synchronized (recordListeners) {
if (!recordListeners.contains(listener)) {
recordListeners.add(listener);
}
}
}

listeners.get(key).add(listener);
}

@Override
public void unListen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(key);
if (recordListeners == null) {
return;
}
for (RecordListener recordListener : listeners.get(key)) {
if (recordListener.equals(listener)) {
listeners.get(key).remove(listener);
break;
synchronized (recordListeners) {
for (RecordListener recordListener : recordListeners) {
if (recordListener.equals(listener)) {
recordListeners.remove(listener);
break;
}
}
}
}
Expand Down

0 comments on commit b05f471

Please sign in to comment.