|
38 | 38 | import java.util.ArrayList;
|
39 | 39 | import java.util.Arrays;
|
40 | 40 | import java.util.Collection;
|
| 41 | +import java.util.Collections; |
41 | 42 | import java.util.HashMap;
|
42 | 43 | import java.util.LinkedHashSet;
|
43 | 44 | import java.util.LinkedList;
|
44 | 45 | import java.util.List;
|
45 | 46 | import java.util.Map;
|
46 | 47 | import java.util.Set;
|
| 48 | +import java.util.concurrent.ConcurrentHashMap; |
| 49 | +import java.util.concurrent.ConcurrentMap; |
47 | 50 | import java.util.concurrent.Executors;
|
48 | 51 | import java.util.concurrent.ScheduledExecutorService;
|
49 | 52 | import java.util.concurrent.TimeUnit;
|
50 |
| -import java.util.concurrent.ConcurrentMap; |
51 |
| -import java.util.concurrent.ConcurrentHashMap; |
52 | 53 | import java.util.stream.Collectors;
|
53 |
| -import java.util.Collections; |
54 | 54 |
|
55 | 55 | import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
|
56 | 56 | import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
|
@@ -123,7 +123,7 @@ public class NacosRegistry extends FailbackRegistry {
|
123 | 123 |
|
124 | 124 | private final NacosNamingServiceWrapper namingService;
|
125 | 125 |
|
126 |
| - private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, EventListener>> nacosListeners = new ConcurrentHashMap<>(); |
| 126 | + private final ConcurrentMap<URL, Map<NotifyListener, Map<String, EventListener>>> nacosListeners = new ConcurrentHashMap<>(); |
127 | 127 |
|
128 | 128 | public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) {
|
129 | 129 | super(url);
|
@@ -242,8 +242,7 @@ private boolean isServiceNamesWithCompatibleMode(final URL url) {
|
242 | 242 | public void doUnsubscribe(URL url, NotifyListener listener) {
|
243 | 243 | if (isAdminProtocol(url)) {
|
244 | 244 | shutdownServiceNamesLookup();
|
245 |
| - } |
246 |
| - else { |
| 245 | + } else { |
247 | 246 | Set<String> serviceNames = getServiceNames(url, listener);
|
248 | 247 |
|
249 | 248 | doUnsubscribe(url, listener, serviceNames);
|
@@ -305,7 +304,7 @@ private Set<String> filterServiceNames(NacosServiceName serviceName) {
|
305 | 304 | Set<String> serviceNames = new LinkedHashSet<>();
|
306 | 305 |
|
307 | 306 | execute(namingService -> serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE,
|
308 |
| - getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData() |
| 307 | + getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData() |
309 | 308 | .stream()
|
310 | 309 | .filter(this::isConformRules)
|
311 | 310 | .map(NacosServiceName::new)
|
@@ -511,40 +510,41 @@ private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) {
|
511 | 510 |
|
512 | 511 | private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
|
513 | 512 | throws NacosException {
|
514 |
| - ConcurrentMap<NotifyListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); |
515 |
| - EventListener nacosListener = listeners.computeIfAbsent(listener, k -> { |
516 |
| - EventListener eventListener = event -> { |
517 |
| - if (event instanceof NamingEvent) { |
518 |
| - NamingEvent e = (NamingEvent) event; |
519 |
| - List<Instance> instances = e.getInstances(); |
520 |
| - |
521 |
| - |
522 |
| - if (isServiceNamesWithCompatibleMode(url)) { |
| 513 | + Map<NotifyListener, Map<String, EventListener>> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); |
| 514 | + Map<String, EventListener> eventListenerMap = listeners.computeIfAbsent(listener, k -> new ConcurrentHashMap<>()); |
| 515 | + EventListener nacosListener = eventListenerMap.computeIfAbsent(serviceName, |
| 516 | + name -> event -> { |
| 517 | + if (event instanceof NamingEvent) { |
| 518 | + NamingEvent e = (NamingEvent) event; |
| 519 | + List<Instance> instances = e.getInstances(); |
| 520 | + if (isServiceNamesWithCompatibleMode(url)) { |
| 521 | + |
| 522 | + // Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned |
| 523 | + // in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899 |
| 524 | + NacosInstanceManageUtil.initOrRefreshServiceInstanceList(name, instances); |
| 525 | + instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(name); |
| 526 | + } |
523 | 527 |
|
524 |
| - // Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned |
525 |
| - // in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899 |
526 |
| - NacosInstanceManageUtil.initOrRefreshServiceInstanceList(e.getServiceName(), instances); |
527 |
| - instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(e.getServiceName()); |
| 528 | + notifySubscriber(url, listener, instances); |
528 | 529 | }
|
529 |
| - |
530 |
| - notifySubscriber(url, listener, instances); |
531 |
| - } |
532 |
| - }; |
533 |
| - return eventListener; |
534 |
| - }); |
| 530 | + }); |
535 | 531 | namingService.subscribe(serviceName,
|
536 | 532 | getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
|
537 | 533 | nacosListener);
|
538 | 534 | }
|
539 | 535 |
|
540 | 536 | private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
|
541 | 537 | throws NacosException {
|
542 |
| - ConcurrentMap<NotifyListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url); |
543 |
| - if(notifyListenerEventListenerConcurrentMap == null){ |
| 538 | + Map<NotifyListener, Map<String, EventListener>> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url); |
| 539 | + if (notifyListenerEventListenerConcurrentMap == null) { |
| 540 | + return; |
| 541 | + } |
| 542 | + Map<String, EventListener> listenerMap = notifyListenerEventListenerConcurrentMap.get(listener); |
| 543 | + if (listenerMap == null) { |
544 | 544 | return;
|
545 | 545 | }
|
546 |
| - EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener); |
547 |
| - if(nacosListener == null){ |
| 546 | + EventListener nacosListener = listenerMap.remove(serviceName); |
| 547 | + if (nacosListener == null) { |
548 | 548 | return;
|
549 | 549 | }
|
550 | 550 | namingService.unsubscribe(serviceName,
|
|
0 commit comments