Skip to content

Commit

Permalink
[ISSUE alibaba#8428] fix naming subscribe bug when multiple NamingSer…
Browse files Browse the repository at this point in the history
…vice
  • Loading branch information
liqipeng committed May 23, 2022
1 parent b05f471 commit dcad25f
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class HostReactor implements Closeable {

private final InstancesChangeNotifier notifier;

private String notifierEventScope;

public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
Expand Down Expand Up @@ -113,7 +116,8 @@ public Thread newThread(Runnable r) {
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
this.notifier = new InstancesChangeNotifier();
this.notifierEventScope = UUID.randomUUID().toString();
this.notifier = new InstancesChangeNotifier(this.notifierEventScope);

NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(notifier);
Expand Down Expand Up @@ -255,7 +259,7 @@ public ServiceInfo processServiceJson(String json) {
serviceInfo.setJsonFromServer(json);

if (changed) {
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
Expand All @@ -265,7 +269,7 @@ public ServiceInfo processServiceJson(String json) {
NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class InstancesChangeEvent extends Event {

private static final long serialVersionUID = -8823087028212249603L;

private final String eventScope;

private final String serviceName;

private final String groupName;
Expand All @@ -39,7 +41,8 @@ public class InstancesChangeEvent extends Event {

private final List<Instance> hosts;

public InstancesChangeEvent(String serviceName, String groupName, String clusters, List<Instance> hosts) {
public InstancesChangeEvent(String eventScope, String serviceName, String groupName, String clusters, List<Instance> hosts) {
this.eventScope = eventScope;
this.serviceName = serviceName;
this.groupName = groupName;
this.clusters = clusters;
Expand All @@ -62,4 +65,8 @@ public List<Instance> getHosts() {
return hosts;
}

@Override
public String scope() {
return this.eventScope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,16 @@
*/
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {

private final String eventScope;

private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();

private final Object lock = new Object();

public InstancesChangeNotifier(String eventScope) {
this.eventScope = eventScope;
}

/**
* register listener.
*
Expand Down Expand Up @@ -137,4 +143,8 @@ public Class<? extends Event> subscribeType() {
return InstancesChangeEvent.class;
}

@Override
public boolean scopeMatches(InstancesChangeEvent event) {
return this.eventScope.equals(event.scope());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ void receiveEvent(Event event) {

// Notification single event listener
for (Subscriber subscriber : subscribers) {
if (!subscriber.scopeMatches(event)) {
continue;
}

// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ public long sequence() {
return sequence;
}

/**
* Event scope.
*
* @return event scope, return null if for all scope
*/
public String scope() {
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,15 @@ public Executor executor() {
public boolean ignoreExpireEvent() {
return false;
}

/**
* Whether the event's scope matches current subscriber. Default implementation is all scopes matched.
* If you override this method, it better to override related {@link com.alibaba.nacos.common.notify.Event#scope()}.
*
* @param event {@link Event}
* @return Whether the event's scope matches current subscriber
*/
public boolean scopeMatches(T event) {
return event.scope() == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by wangtong.wt on 2018/6/20.
Expand Down Expand Up @@ -258,5 +261,51 @@ public void onEvent(Event event) {

Assert.assertEquals(2, body.get("subscribers").size());
}

@Test
public void subscribeSameServiceForTwoNamingService() throws Exception {
Properties properties1 = new Properties();
properties1.setProperty("serverAddr", "127.0.0.1" + ":" + port);
properties1.setProperty("namespace", "ns-001");
final NamingService naming1 = NamingFactory.createNamingService(properties1);
Properties properties2 = new Properties();
properties2.setProperty("serverAddr", "127.0.0.1" + ":" + port);
properties2.setProperty("namespace", "ns-002");
final NamingService naming2 = NamingFactory.createNamingService(properties2);

final AtomicInteger atomicInteger = new AtomicInteger(0);
final String serviceName = randomDomainName();

naming1.subscribe(serviceName, new EventListener() {
@Override
public void onEvent(Event event) {
System.out.println("Event from naming1: " + ((NamingEvent) event).getServiceName());
System.out.println("Event from naming1: " + ((NamingEvent) event).getInstances());
instances = ((NamingEvent) event).getInstances();
}
});
naming2.subscribe(serviceName, new EventListener() {
@Override
public void onEvent(Event event) {
System.out.println("Event from naming2: " + ((NamingEvent) event).getServiceName());
System.out.println("Event from naming2: " + ((NamingEvent) event).getInstances());
atomicInteger.incrementAndGet();
}
});

naming1.registerInstance(serviceName, "1.1.1.1", TEST_PORT, "c1");

while (instances.isEmpty()) {
Thread.sleep(1000L);
}

try {
Assert.assertTrue(verifyInstanceList(instances, naming1.getAllInstances(serviceName)));
Assert.assertEquals(0, atomicInteger.get());
} finally {
naming1.shutDown();
naming2.shutDown();
}
}

}

0 comments on commit dcad25f

Please sign in to comment.