Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature get subscribers #1297

Merged
merged 9 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.alibaba.nacos.naming.exception.NacosException;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.selector.LabelSelector;
import com.alibaba.nacos.naming.selector.NoneSelector;
import com.alibaba.nacos.naming.selector.Selector;
Expand All @@ -40,7 +41,6 @@
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.net.URLDecoder;
import java.util.*;

Expand All @@ -62,6 +62,9 @@ public class ServiceController {
@Autowired
private ServerListManager serverListManager;

@Autowired
private SubscribeManager subscribeManager;

@RequestMapping(value = "", method = RequestMethod.POST)
public String create(HttpServletRequest request) throws Exception {

Expand Down Expand Up @@ -365,6 +368,30 @@ public JSONObject checksum(HttpServletRequest request) throws Exception {
return result;
}

/**
* get subscriber list
* @param request
* @return
*/
@RequestMapping(value = "/subscribers", method = RequestMethod.GET)
public JSONObject subscribers(HttpServletRequest request){
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
boolean aggregation = Boolean.valueOf(WebUtils.required(request,"aggregation"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改成可选,默认为true


JSONObject result = new JSONObject();

try {
List<Subscriber> subscribers = subscribeManager.getSubscribers(serviceName,namespaceId,aggregation);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码没有格式化

result.put("subscribers",subscribers);
return result;
} catch (InterruptedException e) {

}
return result;
}

private List<String> filterInstanceMetadata(String namespaceId, List<String> serviceNames, String key, String value) {

List<String> filteredServiceNames = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.core;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.pojo.Subscribers;
import com.alibaba.nacos.naming.push.PushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.net.HttpURLConnection;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* @author Nicholas
*/
Copy link
Collaborator

@nkorange nkorange Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加上 @ since 1.0.1

@Service
public class SubscribeManager {

private static final String DATA_ON_SYNC_URL = "/service/subscribers";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

常量名字换一下


@Autowired
private PushService pushService;

@Autowired
private ServerListManager serverListManager;


private List<Subscriber> getSubscribers(String serviceName, String namespaceId){
return pushService.getClients(serviceName,namespaceId);
}

/**
*
* @param serviceName
* @param namespaceId
* @param aggregation
* @return
* @throws InterruptedException
*/
public List<Subscriber> getSubscribers(String serviceName, String namespaceId, boolean aggregation) throws InterruptedException {
if (aggregation){
// size = 1 means only myself in the list, we need at least one another server alive:
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.EPHEMERAL.info("waiting server list init...");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不需要等待,小于等于1直接返回本机的订阅者就行

}

List<Subscriber> subscriberList = new ArrayList<Subscriber>();
// try sync data from remote server:
for (Server server : serverListManager.getHealthyServers()) {

Map<String, String> paramValues = new HashMap<>(128);
paramValues.put("serviceName",serviceName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用常量替代

paramValues.put("namespaceId",namespaceId);
paramValues.put("aggregation",String.valueOf(!aggregation));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不用取反,直接false就行

if (NetUtils.localServer().equals(server.getKey())) {
subscriberList.addAll(getSubscribers(serviceName,namespaceId));
}

HttpClient.HttpResult result = HttpClient.httpGet("http://" + server.getKey() + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, new ArrayList<>(),paramValues);

if (HttpURLConnection.HTTP_OK == result.code) {
Subscribers subscribers = (Subscribers) JSONObject.parseObject(result.content, Subscribers.class);
subscriberList.addAll(subscribers.getSubscribers());
}
return subscriberList.stream().filter(distinctByKey(Subscriber::toString)).collect(Collectors.toList());

}
} else {
// local server
return getSubscribers(serviceName,namespaceId);
}
return Collections.emptyList();
}

public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
Map<Object, Boolean> seen = new ConcurrentHashMap<>(128);
return object -> seen.putIfAbsent(keyExtractor.apply(object), Boolean.TRUE) == null;
}
}
94 changes: 94 additions & 0 deletions naming/src/main/java/com/alibaba/nacos/naming/pojo/Subscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.pojo;

import java.io.Serializable;

/**
* @author nicholas
* @version $Id: Subscriber.java, v 0.1 2019-05-28 下午10:47 nicholas Exp $$
*/
public class Subscriber implements Serializable {

private String addrStr;

private String agent;

private String app;

private String ip;

private String namespaceId;

private String serviceName;

public Subscriber(String addrStr, String agent, String app, String ip, String namespaceId, String serviceName) {
this.addrStr = addrStr;
this.agent = agent;
this.app = app;
this.ip = ip;
this.namespaceId = namespaceId;
this.serviceName = serviceName;
}

public String getAddrStr() {
return addrStr;
}

public void setAddrStr(String addrStr) {
this.addrStr = addrStr;
}

public String getAgent() {
return agent;
}

public void setAgent(String agent) {
this.agent = agent;
}

public String getApp() {
return app;
}

public void setApp(String app) {
this.app = app;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public String getNamespaceId() {
return namespaceId;
}

public void setNamespaceId(String namespaceId) {
this.namespaceId = namespaceId;
}

public String getServiceName() {
return serviceName;
}

public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.pojo;

import java.io.Serializable;
import java.util.List;

/**
* @author nicholas
* @version $Id: Subscribers.java, v 0.1 2019-05-28 下午10:47 nicholas Exp $$
*/
public class Subscribers implements Serializable {

private List<Subscriber> subscribers;

public List<Subscriber> getSubscribers() {
return subscribers;
}

public void setSubscribers(List<Subscriber> subscribers) {
this.subscribers = subscribers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Subscriber;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.util.VersionUtil;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void addClient(String namespaceId,
String tenant,
String app) {

PushClient client = new PushService.PushClient(namespaceId,
PushClient client = new PushClient(namespaceId,
serviceName,
clusters,
agent,
Expand Down Expand Up @@ -170,6 +171,19 @@ public static void addClient(PushClient client) {
}
}

public List<Subscriber> getClients(String serviceName, String namespaceId) {
String serviceKey = UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName);
ConcurrentMap<String, PushClient> clientConcurrentMap = clientMap.get(serviceKey);
if (Objects.isNull(clientConcurrentMap)) {
return null;
}
List<Subscriber> clients = new ArrayList<Subscriber>();
clientConcurrentMap.forEach((key, client) -> {
clients.add(new Subscriber(client.getAddrStr(),client.getAgent(),client.getApp(),client.getIp(),namespaceId,serviceName));
});
return clients;
}

public static void removeClientIfZombie() {

int size = 0;
Expand Down