Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1453 #1454

Merged
merged 2 commits into from
Jul 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.service.ServerListService;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.*;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.config.server.utils.RunningConfigUtils;
import com.alibaba.nacos.config.server.utils.StringUtils;
import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener;
import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -68,7 +71,7 @@ public void onEvent(Event event) {

// 并发产生 ConfigDataChangeEvent
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
Expand All @@ -79,9 +82,9 @@ public void onEvent(Event event) {
// 其实这里任何类型队列都可以
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (int i = 0; i < ipList.size(); i++) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta));
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
}
EXCUTOR.execute(new AsyncTask(httpclient, queue));
EXECUTOR.execute(new AsyncTask(httpclient, queue));
}
}

Expand All @@ -92,11 +95,11 @@ public AsyncNotifyService(ServerListService serverListService) {
}

public Executor getExecutor() {
return EXCUTOR;
return EXECUTOR;
}

@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());
private static final Executor EXECUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());

private RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(PropertyUtil.getNotifyConnectTimeout())
Expand All @@ -105,7 +108,7 @@ public Executor getExecutor() {
private CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom()
.setDefaultRequestConfig(requestConfig).build();

static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class);
private static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class);

private ServerListService serverListService;

Expand All @@ -118,15 +121,11 @@ public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> qu

@Override
public void run() {

executeAsyncInvoke();

}

private void executeAsyncInvoke() {

while (!queue.isEmpty()) {

NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (serverListService.getServerList().contains(
Expand All @@ -139,11 +138,7 @@ private void executeAsyncInvoke() {
task.getLastModified(),
LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
// get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
asyncTaskExecute(task);
} else {
HttpGet request = new HttpGet(task.url);
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
Expand All @@ -152,7 +147,7 @@ private void executeAsyncInvoke() {
if (task.isBeta) {
request.setHeader("isBeta", "true");
}
httpclient.execute(request, new AyscNotifyCallBack(httpclient, task));
httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
}
}
}
Expand All @@ -163,12 +158,20 @@ private void executeAsyncInvoke() {

}

class AyscNotifyCallBack implements FutureCallback<HttpResponse> {
private void asyncTaskExecute(NotifySingleTask task) {
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor) EXECUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
}


class AsyncNotifyCallBack implements FutureCallback<HttpResponse> {

public AyscNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task
) {
public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) {
this.task = task;
this.httpclient = httpclient;
this.httpClient = httpClient;
}

@Override
Expand All @@ -183,31 +186,19 @@ public void completed(HttpResponse response) {
ConfigTraceService.NOTIFY_EVENT_OK, delayed,
task.target);
} else {
log.error("[notify-error] {}, {}, to {}, result {}",
new Object[] {task.getDataId(), task.getGroup(),
task.target,
response.getStatusLine().getStatusCode()});
log.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), response.getStatusLine().getStatusCode());
ConfigTraceService.logNotifyEvent(task.getDataId(),
task.getGroup(), task.getTenant(), null, task.getLastModified(),
LOCAL_IP,
ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
task.target);


//get delay time and set fail count to the task
int delay = getDelayTime(task);
asyncTaskExecute(task);

Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();

queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);

((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);

LogUtil.notifyLog.error(
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified());

MetricsMonitor.getConfigNotifyException().increment();
}
Expand All @@ -218,59 +209,38 @@ public void completed(HttpResponse response) {
public void failed(Exception ex) {

long delayed = System.currentTimeMillis() - task.getLastModified();
log.error("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", "
+ ex.toString());
log.debug("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", "
+ ex.toString(), ex);
log.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), ex.toString());
ConfigTraceService.logNotifyEvent(task.getDataId(),
task.getGroup(), task.getTenant(), null, task.getLastModified(),
LOCAL_IP,
ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed,
task.target);

//get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();

queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);

((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
LogUtil.notifyLog.error(
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
asyncTaskExecute(task);
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified());

MetricsMonitor.getConfigNotifyException().increment();
}

@Override
public void cancelled() {

LogUtil.notifyLog.error(
"[notify-exception] target:{} dataid:{} group:{} ts:{}",
new Object[] {task.target, task.getGroup(),
task.getGroup(), task.getLastModified()},
"CANCELED");
LogUtil.notifyLog.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED");

//get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();

queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);

((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
LogUtil.notifyLog.error(
"[notify-retry] target:{} dataid:{} group:{} ts:{}",
new Object[] {task.target, task.getDataId(),
task.getGroup(), task.getLastModified()});
asyncTaskExecute(task);
LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}",
task.target, task.getDataId(), task.getGroup(), task.getLastModified());

MetricsMonitor.getConfigNotifyException().increment();
}

private NotifySingleTask task;
private CloseableHttpAsyncClient httpclient;
private CloseableHttpAsyncClient httpClient;
}

static class NotifySingleTask extends NotifyTask {
Expand Down Expand Up @@ -339,8 +309,7 @@ static class NotifyThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,
"com.alibaba.nacos.AsyncNotifyServiceThread");
Thread thread = new Thread(r, "com.alibaba.nacos.AsyncNotifyServiceThread");
thread.setDaemon(true);
return thread;
}
Expand All @@ -354,15 +323,15 @@ public Thread newThread(Runnable r) {
*/
private static int getDelayTime(NotifySingleTask task) {
int failCount = task.getFailCount();
int delay = MINRETRYINTERVAL + failCount * failCount * INCREASESTEPS;
if (failCount <= MAXCOUNT) {
int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
if (failCount <= MAX_COUNT) {
task.setFailCount(failCount + 1);
}
return delay;
}

private static int MINRETRYINTERVAL = 500;
private static int INCREASESTEPS = 1000;
private static int MAXCOUNT = 6;
private static int MIN_RETRY_INTERVAL = 500;
private static int INCREASE_STEPS = 1000;
private static int MAX_COUNT = 6;

}