Skip to content

Commit

Permalink
[ISSUE #8428] fix naming subscribe bug when multiple NamingService (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
liqipeng authored May 27, 2022
1 parent e909483 commit f7a2810
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

/**
* Nacos Naming Service.
Expand Down Expand Up @@ -71,6 +72,8 @@ public class NacosNamingService implements NamingService {

private NamingClientProxy clientProxy;

private String notifierEventScope;

public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
Expand All @@ -87,11 +90,12 @@ private void init(Properties properties) throws NacosException {
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);

this.changeNotifier = new InstancesChangeNotifier();

this.notifierEventScope = UUID.randomUUID().toString();
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, properties);
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public class ServiceInfoHolder implements Closeable {

private String cacheDir;

public ServiceInfoHolder(String namespace, Properties properties) {
private String notifierEventScope;

public ServiceInfoHolder(String namespace, String notifierEventScope, Properties properties) {
initCacheDir(namespace, properties);
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
Expand All @@ -75,6 +77,7 @@ public ServiceInfoHolder(String namespace, Properties properties) {
}
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushEmptyProtection = isPushEmptyProtect(properties);
this.notifierEventScope = notifierEventScope;
}

private void initCacheDir(String namespace, Properties properties) {
Expand Down Expand Up @@ -165,7 +168,7 @@ public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class InstancesChangeEvent extends Event {

private static final long serialVersionUID = -8823087028212249603L;

private final String eventScope;

private final String serviceName;

private final String groupName;
Expand All @@ -39,7 +41,8 @@ public class InstancesChangeEvent extends Event {

private final List<Instance> hosts;

public InstancesChangeEvent(String serviceName, String groupName, String clusters, List<Instance> hosts) {
public InstancesChangeEvent(String eventScope, String serviceName, String groupName, String clusters, List<Instance> hosts) {
this.eventScope = eventScope;
this.serviceName = serviceName;
this.groupName = groupName;
this.clusters = clusters;
Expand All @@ -62,4 +65,8 @@ public List<Instance> getHosts() {
return hosts;
}

@Override
public String scope() {
return this.eventScope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.CollectionUtils;
Expand All @@ -29,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -39,10 +41,21 @@
*/
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {

private final String eventScope;

private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();

private final Object lock = new Object();

@JustForTest
public InstancesChangeNotifier() {
this.eventScope = UUID.randomUUID().toString();
}

public InstancesChangeNotifier(String eventScope) {
this.eventScope = eventScope;
}

/**
* register listener.
*
Expand Down Expand Up @@ -137,4 +150,8 @@ public Class<? extends Event> subscribeType() {
return InstancesChangeEvent.class;
}

@Override
public boolean scopeMatches(InstancesChangeEvent event) {
return this.eventScope.equals(event.scope());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
public class ServiceInfoHolderTest {

@Test
public void testGetServiceInfoMap() {
public void testGetServiceInfoMap() throws NoSuchFieldException, IllegalAccessException {
Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
Assert.assertEquals(0, holder.getServiceInfoMap().size());

Field fieldNotifierEventScope = ServiceInfoHolder.class.getDeclaredField("notifierEventScope");
fieldNotifierEventScope.setAccessible(true);
Assert.assertEquals("scope-001", fieldNotifierEventScope.get(holder));
}

@Test
Expand All @@ -53,7 +55,7 @@ public void testProcessServiceInfo() {
info.setHosts(hosts);

Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);

ServiceInfo actual1 = holder.processServiceInfo(info);
Assert.assertEquals(info, actual1);
Expand Down Expand Up @@ -81,7 +83,7 @@ private Instance createInstance(String ip, int port) {
@Test
public void testProcessServiceInfo2() {
Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
String json = "{\"groupName\":\"a\",\"name\":\"b\",\"clusters\":\"c\"}";

ServiceInfo actual = holder.processServiceInfo(json);
Expand All @@ -102,7 +104,7 @@ public void testProcessServiceInfoWithPushEmpty() {

Properties prop = new Properties();
prop.setProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION, "true");
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
holder.processServiceInfo(oldInfo);

ServiceInfo newInfo = new ServiceInfo("a@@b@@c");
Expand All @@ -122,7 +124,7 @@ public void testGetServiceInfo() {
info.setHosts(hosts);

Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);

ServiceInfo expect = holder.processServiceInfo(info);
String serviceName = "b";
Expand All @@ -137,7 +139,7 @@ public void testGetServiceInfo() {
@Test
public void testShutdown() throws NacosException, NoSuchFieldException, IllegalAccessException {
Properties prop = new Properties();
ServiceInfoHolder holder = new ServiceInfoHolder("aa", prop);
ServiceInfoHolder holder = new ServiceInfoHolder("aa", "scope-001", prop);
Field field = ServiceInfoHolder.class.getDeclaredField("failoverReactor");
field.setAccessible(true);
FailoverReactor reactor = (FailoverReactor) field.get(holder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ public class InstancesChangeEventTest {

@Test
public void testGetServiceName() {
String eventScope = "scope-001";
String serviceName = "a";
String groupName = "b";
String clusters = "c";
List<Instance> hosts = new ArrayList<>();
Instance ins = new Instance();
hosts.add(ins);
InstancesChangeEvent event = new InstancesChangeEvent(serviceName, groupName, clusters, hosts);
InstancesChangeEvent event = new InstancesChangeEvent(eventScope, serviceName, groupName, clusters, hosts);
Assert.assertEquals(eventScope, event.scope());
Assert.assertEquals(serviceName, event.getServiceName());
Assert.assertEquals(clusters, event.getClusters());
Assert.assertEquals(groupName, event.getGroupName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.alibaba.nacos.client.naming.event;

import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -31,26 +33,33 @@ public class InstancesChangeNotifierTest {

@Test
public void testRegisterListener() {
String eventScope = "scope-001";
String group = "a";
String name = "b";
String clusters = "c";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
EventListener listener = Mockito.mock(EventListener.class);
instancesChangeNotifier.registerListener(group, name, clusters, listener);
List<ServiceInfo> subscribeServices = instancesChangeNotifier.getSubscribeServices();
Assert.assertEquals(1, subscribeServices.size());
Assert.assertEquals(group, subscribeServices.get(0).getGroupName());
Assert.assertEquals(name, subscribeServices.get(0).getName());
Assert.assertEquals(clusters, subscribeServices.get(0).getClusters());


List<Instance> hosts = new ArrayList<>();
Instance ins = new Instance();
hosts.add(ins);
InstancesChangeEvent event = new InstancesChangeEvent(eventScope, name, group, clusters, hosts);
Assert.assertEquals(true, instancesChangeNotifier.scopeMatches(event));
}

@Test
public void testDeregisterListener() {
String eventScope = "scope-001";
String group = "a";
String name = "b";
String clusters = "c";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
EventListener listener = Mockito.mock(EventListener.class);
instancesChangeNotifier.registerListener(group, name, clusters, listener);
List<ServiceInfo> subscribeServices = instancesChangeNotifier.getSubscribeServices();
Expand All @@ -64,10 +73,11 @@ public void testDeregisterListener() {

@Test
public void testIsSubscribed() {
String eventScope = "scope-001";
String group = "a";
String name = "b";
String clusters = "c";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
EventListener listener = Mockito.mock(EventListener.class);
Assert.assertFalse(instancesChangeNotifier.isSubscribed(group, name, clusters));

Expand All @@ -77,10 +87,11 @@ public void testIsSubscribed() {

@Test
public void testOnEvent() {
String eventScope = "scope-001";
String group = "a";
String name = "b";
String clusters = "c";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
EventListener listener = Mockito.mock(EventListener.class);

instancesChangeNotifier.registerListener(group, name, clusters, listener);
Expand All @@ -95,7 +106,8 @@ public void testOnEvent() {

@Test
public void testSubscribeType() {
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier();
String eventScope = "scope-001";
InstancesChangeNotifier instancesChangeNotifier = new InstancesChangeNotifier(eventScope);
Assert.assertEquals(InstancesChangeEvent.class, instancesChangeNotifier.subscribeType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ void receiveEvent(Event event) {

// Notification single event listener
for (Subscriber subscriber : subscribers) {
if (!subscriber.scopeMatches(event)) {
continue;
}

// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,13 @@ public long sequence() {
return sequence;
}

/**
* Event scope.
*
* @return event scope, return null if for all scope
*/
public String scope() {
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,15 @@ public Executor executor() {
public boolean ignoreExpireEvent() {
return false;
}

/**
* Whether the event's scope matches current subscriber. Default implementation is all scopes matched.
* If you override this method, it better to override related {@link com.alibaba.nacos.common.notify.Event#scope()}.
*
* @param event {@link Event}
* @return Whether the event's scope matches current subscriber
*/
public boolean scopeMatches(T event) {
return event.scope() == null;
}
}
Loading

0 comments on commit f7a2810

Please sign in to comment.