diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java b/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java index 4884944f9b5..0c934d51d53 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -84,6 +85,8 @@ public class HostReactor implements Closeable { private final InstancesChangeNotifier notifier; + private String notifierEventScope; + public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) { this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT); } @@ -113,7 +116,8 @@ public Thread newThread(Runnable r) { this.updatingMap = new ConcurrentHashMap(); this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushReceiver = new PushReceiver(this); - this.notifier = new InstancesChangeNotifier(); + this.notifierEventScope = UUID.randomUUID().toString(); + this.notifier = new InstancesChangeNotifier(this.notifierEventScope); NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); NotifyCenter.registerSubscriber(notifier); @@ -255,7 +259,7 @@ public ServiceInfo processServiceJson(String json) { serviceInfo.setJsonFromServer(json); if (changed) { - NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), + NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } @@ -265,7 +269,7 @@ public ServiceInfo processServiceJson(String json) { NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); - NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), + NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeEvent.java b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeEvent.java index 170e07bd8f0..87cd8953882 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeEvent.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeEvent.java @@ -31,6 +31,8 @@ public class InstancesChangeEvent extends Event { private static final long serialVersionUID = -8823087028212249603L; + private final String eventScope; + private final String serviceName; private final String groupName; @@ -39,7 +41,8 @@ public class InstancesChangeEvent extends Event { private final List hosts; - public InstancesChangeEvent(String serviceName, String groupName, String clusters, List hosts) { + public InstancesChangeEvent(String eventScope, String serviceName, String groupName, String clusters, List hosts) { + this.eventScope = eventScope; this.serviceName = serviceName; this.groupName = groupName; this.clusters = clusters; @@ -62,4 +65,8 @@ public List getHosts() { return hosts; } + @Override + public String scope() { + return this.eventScope; + } } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java index 4ab6f853de5..21fb9be02ac 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/event/InstancesChangeNotifier.java @@ -38,10 +38,16 @@ */ public class InstancesChangeNotifier extends Subscriber { + private final String eventScope; + private final Map> listenerMap = new ConcurrentHashMap>(); private final Object lock = new Object(); + public InstancesChangeNotifier(String eventScope) { + this.eventScope = eventScope; + } + /** * register listener. * @@ -137,4 +143,8 @@ public Class subscribeType() { return InstancesChangeEvent.class; } + @Override + public boolean scopeMatches(InstancesChangeEvent event) { + return this.eventScope.equals(event.scope()); + } } diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java index 0558fff7f53..cf97b8c9cb9 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/DefaultPublisher.java @@ -175,6 +175,10 @@ void receiveEvent(Event event) { // Notification single event listener for (Subscriber subscriber : subscribers) { + if (!subscriber.scopeMatches(event)) { + continue; + } + // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java index a8562884213..d19216072fa 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/Event.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/Event.java @@ -41,5 +41,13 @@ public long sequence() { return sequence; } + /** + * Event scope. + * + * @return event scope, return null if for all scope + */ + public String scope() { + return null; + } } diff --git a/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java index 65a41d59eee..2e6500b558f 100644 --- a/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java +++ b/common/src/main/java/com/alibaba/nacos/common/notify/listener/Subscriber.java @@ -60,4 +60,15 @@ public Executor executor() { public boolean ignoreExpireEvent() { return false; } + + /** + * Whether the event's scope matches current subscriber. Default implementation is all scopes matched. + * If you override this method, it better to override related {@link com.alibaba.nacos.common.notify.Event#scope()}. + * + * @param event {@link Event} + * @return Whether the event's scope matches current subscriber + */ + public boolean scopeMatches(T event) { + return event.scope() == null; + } } diff --git a/test/src/test/java/com/alibaba/nacos/test/naming/Subscribe_ITCase.java b/test/src/test/java/com/alibaba/nacos/test/naming/Subscribe_ITCase.java index 77b9be11f7f..0d61e9e3778 100644 --- a/test/src/test/java/com/alibaba/nacos/test/naming/Subscribe_ITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/naming/Subscribe_ITCase.java @@ -38,7 +38,10 @@ import java.util.Collections; import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by wangtong.wt on 2018/6/20. @@ -258,5 +261,51 @@ public void onEvent(Event event) { Assert.assertEquals(2, body.get("subscribers").size()); } + + @Test + public void subscribeSameServiceForTwoNamingService() throws Exception { + Properties properties1 = new Properties(); + properties1.setProperty("serverAddr", "127.0.0.1" + ":" + port); + properties1.setProperty("namespace", "ns-001"); + final NamingService naming1 = NamingFactory.createNamingService(properties1); + Properties properties2 = new Properties(); + properties2.setProperty("serverAddr", "127.0.0.1" + ":" + port); + properties2.setProperty("namespace", "ns-002"); + final NamingService naming2 = NamingFactory.createNamingService(properties2); + + final AtomicInteger atomicInteger = new AtomicInteger(0); + final String serviceName = randomDomainName(); + + naming1.subscribe(serviceName, new EventListener() { + @Override + public void onEvent(Event event) { + System.out.println("Event from naming1: " + ((NamingEvent) event).getServiceName()); + System.out.println("Event from naming1: " + ((NamingEvent) event).getInstances()); + instances = ((NamingEvent) event).getInstances(); + } + }); + naming2.subscribe(serviceName, new EventListener() { + @Override + public void onEvent(Event event) { + System.out.println("Event from naming2: " + ((NamingEvent) event).getServiceName()); + System.out.println("Event from naming2: " + ((NamingEvent) event).getInstances()); + atomicInteger.incrementAndGet(); + } + }); + + naming1.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1"); + + while (instances.isEmpty()) { + Thread.sleep(1000L); + } + + try { + Assert.assertTrue(verifyInstanceList(instances, naming1.getAllInstances(serviceName))); + Assert.assertEquals(0, atomicInteger.get()); + } finally { + naming1.shutDown(); + naming2.shutDown(); + } + } }