From d4f21fc974c63ac0839f5dab6fdd0dca7cc28a6c Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Thu, 15 Oct 2020 10:44:10 +0800 Subject: [PATCH 1/7] Refactor nacos task execute engine --- .../nacos/common/task/NacosTaskProcessor.java | 2 +- .../AbstractNacosTaskExecuteEngine.java | 103 +-------------- .../engine/NacosDelayTaskExecuteEngine.java | 122 +++++++++++++++--- .../config/server/manager/TaskManager.java | 4 - .../dump/processor/DumpAllBetaProcessor.java | 4 +- .../dump/processor/DumpAllProcessor.java | 4 +- .../dump/processor/DumpAllTagProcessor.java | 4 +- .../dump/processor/DumpChangeProcessor.java | 4 +- .../service/dump/processor/DumpProcessor.java | 4 +- .../service/merge/MergeTaskProcessor.java | 4 +- .../service/notify/NotifySingleService.java | 4 +- .../service/notify/NotifyTaskProcessor.java | 4 +- .../DistroHttpDelayTaskProcessor.java | 4 +- 13 files changed, 123 insertions(+), 144 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java b/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java index 9dcfad7cdce..22275fd68a6 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java @@ -29,5 +29,5 @@ public interface NacosTaskProcessor { * @param task task. * @return process task result. */ - boolean process(AbstractDelayTask task); + boolean process(NacosTask task); } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java index 9b571c6ea2e..b690eb23f6b 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java @@ -16,20 +16,13 @@ package com.alibaba.nacos.common.task.engine; -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.common.executor.ExecutorFactory; -import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; -import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; /** * Abstract nacos task execute engine. @@ -40,57 +33,12 @@ public abstract class AbstractNacosTaskExecuteEngine implem private final Logger log; - private final ScheduledExecutorService processingExecutor; - private final ConcurrentHashMap taskProcessors = new ConcurrentHashMap(); - protected final ConcurrentHashMap tasks; - - protected final ReentrantLock lock = new ReentrantLock(); - private NacosTaskProcessor defaultTaskProcessor; - public AbstractNacosTaskExecuteEngine(String name) { - this(name, 32, null, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, Logger logger) { - this(name, 32, logger, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, Logger logger, long processInterval) { - this(name, 32, logger, processInterval); - } - - public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger) { - this(name, initCapacity, logger, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { + public AbstractNacosTaskExecuteEngine(Logger logger) { this.log = null != logger ? logger : LoggerFactory.getLogger(AbstractNacosTaskExecuteEngine.class.getName()); - tasks = new ConcurrentHashMap(initCapacity); - processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); - processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); - } - - @Override - public int size() { - lock.lock(); - try { - return tasks.size(); - } finally { - lock.unlock(); - } - } - - @Override - public boolean isEmpty() { - lock.lock(); - try { - return tasks.isEmpty(); - } finally { - lock.unlock(); - } } @Override @@ -118,56 +66,7 @@ public void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor) { this.defaultTaskProcessor = defaultTaskProcessor; } - @Override - public T removeTask(Object key) { - lock.lock(); - try { - T task = tasks.get(key); - if (null != task && task.shouldProcess()) { - return tasks.remove(key); - } else { - return null; - } - } finally { - lock.unlock(); - } - } - - @Override - public Collection getAllTaskKeys() { - Collection keys = new HashSet(); - lock.lock(); - try { - keys.addAll(tasks.keySet()); - } finally { - lock.unlock(); - } - return keys; - } - - @Override - public void shutdown() throws NacosException { - processingExecutor.shutdown(); - } - protected Logger getEngineLog() { return log; } - - /** - * process tasks in execute engine. - */ - protected abstract void processTasks(); - - private class ProcessRunnable implements Runnable { - - @Override - public void run() { - try { - AbstractNacosTaskExecuteEngine.this.processTasks(); - } catch (Throwable e) { - log.error(e.toString(), e); - } - } - } } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java index 192d4518df5..01b4e0de7aa 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java @@ -16,11 +16,19 @@ package com.alibaba.nacos.common.task.engine; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.executor.ExecutorFactory; +import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * Nacos delay task execute engine. @@ -29,27 +37,105 @@ */ public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine { + private final ScheduledExecutorService processingExecutor; + + protected final ConcurrentHashMap tasks; + + protected final ReentrantLock lock = new ReentrantLock(); + public NacosDelayTaskExecuteEngine(String name) { - super(name); + this(name, null); } public NacosDelayTaskExecuteEngine(String name, Logger logger) { - super(name, logger); + this(name, 32, logger, 100L); } public NacosDelayTaskExecuteEngine(String name, Logger logger, long processInterval) { - super(name, logger, processInterval); + this(name, 32, logger, processInterval); } public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) { - super(name, initCapacity, logger); + this(name, initCapacity, logger, 100L); } public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { - super(name, initCapacity, logger, processInterval); + super(logger); + tasks = new ConcurrentHashMap(initCapacity); + processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); + processingExecutor + .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); + } + + @Override + public int size() { + lock.lock(); + try { + return tasks.size(); + } finally { + lock.unlock(); + } + } + + @Override + public boolean isEmpty() { + lock.lock(); + try { + return tasks.isEmpty(); + } finally { + lock.unlock(); + } + } + + @Override + public AbstractDelayTask removeTask(Object key) { + lock.lock(); + try { + AbstractDelayTask task = tasks.get(key); + if (null != task && task.shouldProcess()) { + return tasks.remove(key); + } else { + return null; + } + } finally { + lock.unlock(); + } + } + + @Override + public Collection getAllTaskKeys() { + Collection keys = new HashSet(); + lock.lock(); + try { + keys.addAll(tasks.keySet()); + } finally { + lock.unlock(); + } + return keys; } @Override + public void shutdown() throws NacosException { + processingExecutor.shutdown(); + } + + @Override + public void addTask(Object key, AbstractDelayTask newTask) { + lock.lock(); + try { + AbstractDelayTask existTask = tasks.get(key); + if (null != existTask) { + newTask.merge(existTask); + } + tasks.put(key, newTask); + } finally { + lock.unlock(); + } + } + + /** + * process tasks in execute engine. + */ protected void processTasks() { Collection keys = getAllTaskKeys(); for (Object taskKey : keys) { @@ -74,22 +160,20 @@ protected void processTasks() { } } - @Override - public void addTask(Object key, AbstractDelayTask newTask) { - lock.lock(); - try { - AbstractDelayTask existTask = tasks.get(key); - if (null != existTask) { - newTask.merge(existTask); - } - tasks.put(key, newTask); - } finally { - lock.unlock(); - } - } - private void retryFailedTask(Object key, AbstractDelayTask task) { task.setLastProcessTime(System.currentTimeMillis()); addTask(key, task); } + + private class ProcessRunnable implements Runnable { + + @Override + public void run() { + try { + processTasks(); + } catch (Throwable e) { + getEngineLog().error(e.toString(), e); + } + } + } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java b/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java index 720ce3acbf3..9c00aeb654d 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java @@ -44,10 +44,6 @@ public final class TaskManager extends NacosDelayTaskExecuteEngine implements Ta Condition notEmpty = this.lock.newCondition(); - public TaskManager() { - this(null); - } - public TaskManager(String name) { super(name, LOGGER, 100L); this.name = name; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java index 44452091e70..cea4c8eaef7 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoBetaWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -43,7 +43,7 @@ public DumpAllBetaProcessor(DumpService dumpService) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { int rowCount = persistService.configInfoBetaCount(); int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java index 72ba2f07adc..96527deacc6 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java @@ -16,9 +16,9 @@ package com.alibaba.nacos.config.server.service.dump.processor; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -47,7 +47,7 @@ public DumpAllProcessor(DumpService dumpService) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { long currentMaxId = persistService.findConfigMaxId(); long lastMaxId = 0; while (lastMaxId < currentMaxId) { diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java index 2b013f70eab..d0b823134b9 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoTagWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -42,7 +42,7 @@ public DumpAllTagProcessor(DumpService dumpService) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { int rowCount = persistService.configInfoTagCount(); int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java index cabd9a225ba..0444fe3d5b6 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java @@ -16,9 +16,9 @@ package com.alibaba.nacos.config.server.service.dump.processor; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; @@ -47,7 +47,7 @@ public DumpChangeProcessor(DumpService dumpService, Timestamp startTime, Timesta } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { LogUtil.DEFAULT_LOG.warn("quick start; startTime:{},endTime:{}", startTime, endTime); LogUtil.DEFAULT_LOG.warn("updateMd5 start"); long startUpdateMd5 = System.currentTimeMillis(); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java index f2ccc17edb6..545be5da4d5 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfo4Beta; @@ -44,7 +44,7 @@ public DumpProcessor(DumpService dumpService) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { final PersistService persistService = dumpService.getPersistService(); DumpTask dumpTask = (DumpTask) task; String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey()); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java index f2f14808b8a..17fda431fe2 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java @@ -17,8 +17,8 @@ package com.alibaba.nacos.config.server.service.merge; import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfoAggr; @@ -52,7 +52,7 @@ public class MergeTaskProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { MergeDataTask mergeTask = (MergeDataTask) task; final String dataId = mergeTask.dataId; final String group = mergeTask.groupId; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java index bf778db6955..b427f42b8b4 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java @@ -18,7 +18,7 @@ import com.alibaba.nacos.common.executor.ExecutorFactory; import com.alibaba.nacos.common.executor.NameThreadFactory; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.ServerMemberManager; @@ -50,7 +50,7 @@ public NotifyTaskProcessorWrapper() { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { NotifySingleTask notifyTask = (NotifySingleTask) task; return notifyToDump(notifyTask.getDataId(), notifyTask.getGroup(), notifyTask.getTenant(), notifyTask.getLastModified(), notifyTask.target); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java index 02282693f8a..aa505c7fb45 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java @@ -17,8 +17,8 @@ package com.alibaba.nacos.config.server.service.notify; import com.alibaba.nacos.common.model.RestResult; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.monitor.MetricsMonitor; import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; @@ -46,7 +46,7 @@ public NotifyTaskProcessor(ServerMemberManager memberManager) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { NotifyTask notifyTask = (NotifyTask) task; String dataId = notifyTask.getDataId(); String group = notifyTask.getGroup(); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java index 43264e0a0b5..fccc2dd6e3d 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder; @@ -50,7 +50,7 @@ public DistroHttpDelayTaskProcessor(GlobalConfig globalConfig, DistroTaskEngineH } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig, From 6b291294d605fba294f3e381488d4142291afc59 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Thu, 15 Oct 2020 14:43:20 +0800 Subject: [PATCH 2/7] Refactor nacos task execute engine --- .../common/task/AbstractExecuteTask.java | 2 +- .../engine/NacosExecuteTaskExecuteEngine.java | 116 ++++++++++++++++++ .../common/task/engine/TaskExecuteWorker.java | 57 ++++----- .../NacosExecuteTaskExecuteEngineTest.java | 59 +++++++++ .../distro/task/DistroTaskEngineHolder.java | 6 +- .../task/delay/DistroDelayTaskProcessor.java | 6 +- .../execute/AbstractDistroExecuteTask.java | 2 +- .../DistroExecuteTaskExecuteEngine.java | 32 +++++ .../execute/DistroExecuteWorkersManager.java | 75 ----------- .../DistroHttpCombinedKeyExecuteTask.java | 3 +- .../DistroHttpDelayTaskProcessor.java | 2 +- 11 files changed, 244 insertions(+), 116 deletions(-) create mode 100644 common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java rename core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java => common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java (75%) create mode 100644 common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java create mode 100644 core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java delete mode 100644 core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java diff --git a/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java b/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java index c3bab086362..5e6e93363b6 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java @@ -21,7 +21,7 @@ * * @author xiweng.yy */ -public abstract class AbstractExecuteTask implements NacosTask { +public abstract class AbstractExecuteTask implements NacosTask, Runnable { @Override public boolean shouldProcess() { diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java new file mode 100644 index 00000000000..b2b5b4f7c68 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java @@ -0,0 +1,116 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.task.engine; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.common.task.NacosTaskProcessor; +import org.slf4j.Logger; + +import java.util.Collection; + +/** + * Nacos execute task execute engine. + * + * @author xiweng.yy + */ +public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine { + + private final TaskExecuteWorker[] executeWorkers; + + public NacosExecuteTaskExecuteEngine(String name, Logger logger) { + super(logger); + int workerCount = findWorkerCount(); + executeWorkers = new TaskExecuteWorker[workerCount]; + for (int mod = 0; mod < workerCount; ++mod) { + executeWorkers[mod] = new TaskExecuteWorker(name, mod, workerCount); + } + } + + private int findWorkerCount() { + final int coreCount = Runtime.getRuntime().availableProcessors(); + int result = 1; + while (result < coreCount) { + result <<= 1; + } + return result; + } + + @Override + public int size() { + int result = 0; + for (TaskExecuteWorker each : executeWorkers) { + result += each.pendingTaskCount(); + } + return result; + } + + @Override + public boolean isEmpty() { + return 0 == size(); + } + + @Override + public void addTask(Object tag, AbstractExecuteTask task) { + NacosTaskProcessor processor = getProcessor(tag); + if (null != processor) { + processor.process(task); + return; + } + TaskExecuteWorker worker = getWorker(tag); + worker.process(task); + } + + private TaskExecuteWorker getWorker(Object tag) { + int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount(); + return executeWorkers[idx]; + } + + private int workersCount() { + return executeWorkers.length; + } + + @Override + public AbstractExecuteTask removeTask(Object key) { + throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task"); + } + + @Override + public Collection getAllTaskKeys() { + throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys"); + } + + @Override + public void shutdown() throws NacosException { + for (TaskExecuteWorker each : executeWorkers) { + each.shutdown(); + } + } + + /** + * Get workers status. + * + * @return workers status string + */ + public String workersStatus() { + StringBuilder sb = new StringBuilder(); + for (TaskExecuteWorker worker : executeWorkers) { + sb.append(worker.status()).append("\n"); + } + return sb.toString(); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java similarity index 75% rename from core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java rename to common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java index c8ab9f2f8e9..2e0d7b50b40 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java @@ -14,64 +14,59 @@ * limitations under the License. */ -package com.alibaba.nacos.core.distributed.distro.task.execute; +package com.alibaba.nacos.common.task.engine; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.lifecycle.Closeable; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.common.task.NacosTask; +import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicBoolean; /** - * Distro execute worker. + * Nacos execute task execute worker. * * @author xiweng.yy */ -public final class DistroExecuteWorker implements Closeable { +public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(DistroExecuteWorker.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecuteWorker.class); - private static final int QUEUE_CAPACITY = 50000; + /** + * Max task queue size 32768. + */ + private static final int QUEUE_CAPACITY = 1 << 15; private final BlockingQueue queue; private final String name; private final AtomicBoolean closed; - - public DistroExecuteWorker(final int mod, final int total) { - name = getClass().getName() + "_" + mod + "%" + total; - queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + + public TaskExecuteWorker(final String name, final int mod, final int total) { + this.name = name + "_" + mod + "%" + total; + queue = new ArrayBlockingQueue(QUEUE_CAPACITY); closed = new AtomicBoolean(false); new InnerWorker(name).start(); } - + public String getName() { return name; } - - /** - * Execute task without result. - */ - public void execute(Runnable task) { - putTask(task); - } - - /** - * Execute task with a result. - */ - public Future execute(Callable task) { - FutureTask future = new FutureTask(task); - putTask(future); - return future; + + @Override + public boolean process(NacosTask task) { + if (task instanceof AbstractExecuteTask) { + putTask((Runnable) task); + } + return true; } - + private void putTask(Runnable task) { try { queue.put(task); @@ -101,12 +96,12 @@ public void shutdown() throws NacosException { * Inner execute worker. */ private class InnerWorker extends Thread { - + InnerWorker(String name) { setDaemon(false); setName(name); } - + @Override public void run() { while (!closed.get()) { diff --git a/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java b/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java new file mode 100644 index 00000000000..bf31def2e48 --- /dev/null +++ b/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.task.engine; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class NacosExecuteTaskExecuteEngineTest { + + private NacosExecuteTaskExecuteEngine executeTaskExecuteEngine; + + @Before + public void setUp() { + executeTaskExecuteEngine = new NacosExecuteTaskExecuteEngine("TEST", null); + } + + @After + public void tearDown() throws NacosException { + executeTaskExecuteEngine.shutdown(); + } + + @Mock + private AbstractExecuteTask task; + + @Test + public void testAddTask() { + executeTaskExecuteEngine.addTask("test", task); + verify(task).run(); + assertTrue(executeTaskExecuteEngine.isEmpty()); + assertEquals(0, executeTaskExecuteEngine.size()); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java index 3724a2727d3..8cfe9ea03d5 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java @@ -20,7 +20,7 @@ import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor; -import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteWorkersManager; +import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteTaskExecuteEngine; import org.springframework.stereotype.Component; /** @@ -33,7 +33,7 @@ public class DistroTaskEngineHolder { private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine(); - private final DistroExecuteWorkersManager executeWorkersManager = new DistroExecuteWorkersManager(); + private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine(); public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) { DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder); @@ -44,7 +44,7 @@ public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() { return delayTaskExecuteEngine; } - public DistroExecuteWorkersManager getExecuteWorkersManager() { + public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() { return executeWorkersManager; } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java index 05ac026db13..2566c61d6c0 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.core.distributed.distro.task.delay; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder; @@ -42,7 +42,7 @@ public DistroDelayTaskProcessor(DistroTaskEngineHolder distroTaskEngineHolder, } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } @@ -50,7 +50,7 @@ public boolean process(AbstractDelayTask task) { DistroKey distroKey = distroDelayTask.getDistroKey(); if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); - distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask); + distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; } return false; diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java index c1fed5b56bf..25d23bdc534 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java @@ -24,7 +24,7 @@ * * @author xiweng.yy */ -public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask implements Runnable { +public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask { private final DistroKey distroKey; diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java new file mode 100644 index 00000000000..1883dac2a97 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java @@ -0,0 +1,32 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.distributed.distro.task.execute; + +import com.alibaba.nacos.common.task.engine.NacosExecuteTaskExecuteEngine; +import com.alibaba.nacos.core.utils.Loggers; + +/** + * Distro execute task execute engine. + * + * @author xiweng.yy + */ +public class DistroExecuteTaskExecuteEngine extends NacosExecuteTaskExecuteEngine { + + public DistroExecuteTaskExecuteEngine() { + super(DistroExecuteTaskExecuteEngine.class.getSimpleName(), Loggers.DISTRO); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java deleted file mode 100644 index afd19f0fefe..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.distributed.distro.task.execute; - -/** - * Distro execute workers manager. - * - * @author xiweng.yy - */ -public final class DistroExecuteWorkersManager { - - private final DistroExecuteWorker[] connectionWorkers; - - public DistroExecuteWorkersManager() { - int workerCount = findWorkerCount(); - connectionWorkers = new DistroExecuteWorker[workerCount]; - for (int mod = 0; mod < workerCount; ++mod) { - connectionWorkers[mod] = new DistroExecuteWorker(mod, workerCount); - } - } - - private int findWorkerCount() { - final int coreCount = Runtime.getRuntime().availableProcessors(); - int result = 1; - while (result < coreCount) { - result <<= 1; - } - return result; - } - - /** - * Dispatch task to worker by tag. - */ - public void dispatch(Object tag, Runnable task) { - DistroExecuteWorker worker = getWorker(tag); - worker.execute(task); - } - - private DistroExecuteWorker getWorker(Object tag) { - int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount(); - return connectionWorkers[idx]; - } - - /** - * Get workers status. - * - * @return workers status string - */ - public String workersStatus() { - StringBuilder sb = new StringBuilder(); - for (DistroExecuteWorker worker : connectionWorkers) { - sb.append(worker.status()).append("\n"); - } - return sb.toString(); - } - - public int workersCount() { - return connectionWorkers.length; - } - -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java index 9acd566fcdb..2903ee092bb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; +import com.alibaba.nacos.common.task.AbstractExecuteTask; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine; @@ -31,7 +32,7 @@ * * @author xiweng.yy */ -public class DistroHttpCombinedKeyExecuteTask implements Runnable { +public class DistroHttpCombinedKeyExecuteTask extends AbstractExecuteTask { private final GlobalConfig globalConfig; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java index fccc2dd6e3d..d4bb5b57d0e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java @@ -55,7 +55,7 @@ public boolean process(NacosTask task) { DistroKey distroKey = distroDelayTask.getDistroKey(); DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig, distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction()); - distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, executeTask); + distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, executeTask); return true; } } From 0ba85b823c42c1e6c6e4280096a846e503e52d52 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Thu, 15 Oct 2020 16:44:44 +0800 Subject: [PATCH 3/7] For checkstyle --- .../task/engine/NacosExecuteTaskExecuteEngineTest.java | 2 -- .../distro/combined/DistroHttpCombinedKeyDelayTask.java | 9 ++++++--- .../distro/combined/DistroHttpDelayTaskProcessor.java | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java b/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java index bf31def2e48..4d166686aab 100644 --- a/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java +++ b/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java @@ -26,9 +26,7 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java index a82d3db50f0..5e13e164306 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java @@ -18,9 +18,9 @@ import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.consistency.DataOperation; -import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask; +import com.alibaba.nacos.naming.consistency.KeyBuilder; import java.util.HashSet; import java.util.Set; @@ -49,15 +49,18 @@ public Set getActualResourceKeys() { public void merge(AbstractDelayTask task) { actualResourceKeys.addAll(((DistroHttpCombinedKeyDelayTask) task).getActualResourceKeys()); if (actualResourceKeys.size() >= batchSize) { - this.setLastProcessTime(0); DistroHttpCombinedKey.incrementSequence(); + setLastProcessTime(0); + } else { + setLastProcessTime(task.getLastProcessTime()); } } @Override public DistroKey getDistroKey() { DistroKey taskKey = super.getDistroKey(); - DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, taskKey.getTargetServer()); + DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, + taskKey.getTargetServer()); result.setResourceKey(taskKey.getResourceKey()); result.getActualResourceTypes().addAll(actualResourceKeys); return result; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java index d4bb5b57d0e..11b30b5e879 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask; +import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.naming.misc.GlobalConfig; /** From e2b1f38be54de5d5037409df5530c0ce7047500f Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Thu, 15 Oct 2020 16:45:34 +0800 Subject: [PATCH 4/7] For checkstyle --- .../ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java index 11b30b5e879..d4bb5b57d0e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java @@ -21,7 +21,6 @@ import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask; -import com.alibaba.nacos.core.utils.Loggers; import com.alibaba.nacos.naming.misc.GlobalConfig; /** From e8fe6692dc6799e2d3564a78b761bc82f932141a Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Fri, 16 Oct 2020 11:43:09 +0800 Subject: [PATCH 5/7] Use ThreadUtils to reduce duplicate codes --- .../engine/NacosExecuteTaskExecuteEngine.java | 19 +++++++------------ .../nacos/common/utils/ThreadUtils.java | 12 +++++++++++- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java index b2b5b4f7c68..e7330dd6bc3 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java @@ -19,6 +19,7 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.task.AbstractExecuteTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; +import com.alibaba.nacos.common.utils.ThreadUtils; import org.slf4j.Logger; import java.util.Collection; @@ -33,21 +34,15 @@ public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngin private final TaskExecuteWorker[] executeWorkers; public NacosExecuteTaskExecuteEngine(String name, Logger logger) { - super(logger); - int workerCount = findWorkerCount(); - executeWorkers = new TaskExecuteWorker[workerCount]; - for (int mod = 0; mod < workerCount; ++mod) { - executeWorkers[mod] = new TaskExecuteWorker(name, mod, workerCount); - } + this(name, logger, ThreadUtils.getSuitableThreadCount(1)); } - private int findWorkerCount() { - final int coreCount = Runtime.getRuntime().availableProcessors(); - int result = 1; - while (result < coreCount) { - result <<= 1; + public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) { + super(logger); + executeWorkers = new TaskExecuteWorker[dispatchWorkerCount]; + for (int mod = 0; mod < dispatchWorkerCount; ++mod) { + executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount); } - return result; } @Override diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java index 67d68babb0f..57149f205db 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java @@ -94,9 +94,19 @@ public static void latchAwait(CountDownLatch latch, long time, TimeUnit unit) { * @return thread count */ public static int getSuitableThreadCount() { + return getSuitableThreadCount(THREAD_MULTIPLER); + } + + /** + * Through the number of cores, calculate the appropriate number of threads. + * + * @param threadMultiple multiple time of cores + * @return thread count + */ + public static int getSuitableThreadCount(int threadMultiple) { final int coreCount = Runtime.getRuntime().availableProcessors(); int workerCount = 1; - while (workerCount < coreCount * THREAD_MULTIPLER) { + while (workerCount < coreCount * threadMultiple) { workerCount <<= 1; } return workerCount; From b0c9ddb2a2c2bfcfbbcf620ea46f509739bc81d6 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Fri, 16 Oct 2020 11:53:12 +0800 Subject: [PATCH 6/7] Set custom logger for TaskExecuteWorker --- .../engine/NacosExecuteTaskExecuteEngine.java | 2 +- .../common/task/engine/TaskExecuteWorker.java | 25 +++++++++++++------ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java index e7330dd6bc3..81de691626a 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java @@ -41,7 +41,7 @@ public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWor super(logger); executeWorkers = new TaskExecuteWorker[dispatchWorkerCount]; for (int mod = 0; mod < dispatchWorkerCount; ++mod) { - executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount); + executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog()); } } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java index 2e0d7b50b40..efaa17318d2 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java @@ -35,23 +35,32 @@ */ public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecuteWorker.class); - /** * Max task queue size 32768. */ private static final int QUEUE_CAPACITY = 1 << 15; - private final BlockingQueue queue; + private final Logger log; private final String name; + private final BlockingQueue queue; + private final AtomicBoolean closed; public TaskExecuteWorker(final String name, final int mod, final int total) { this.name = name + "_" + mod + "%" + total; - queue = new ArrayBlockingQueue(QUEUE_CAPACITY); - closed = new AtomicBoolean(false); + this.queue = new ArrayBlockingQueue(QUEUE_CAPACITY); + this.closed = new AtomicBoolean(false); + this.log = LoggerFactory.getLogger(TaskExecuteWorker.class); + new InnerWorker(name).start(); + } + + public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) { + this.name = name + "_" + mod + "%" + total; + this.queue = new ArrayBlockingQueue(QUEUE_CAPACITY); + this.closed = new AtomicBoolean(false); + this.log = logger; new InnerWorker(name).start(); } @@ -71,7 +80,7 @@ private void putTask(Runnable task) { try { queue.put(task); } catch (InterruptedException ire) { - LOGGER.error(ire.toString(), ire); + log.error(ire.toString(), ire); } } @@ -111,10 +120,10 @@ public void run() { task.run(); long duration = System.currentTimeMillis() - begin; if (duration > 1000L) { - LOGGER.warn("distro task {} takes {}ms", task, duration); + log.warn("distro task {} takes {}ms", task, duration); } } catch (Throwable e) { - LOGGER.error("[DISTRO-FAILED] " + e.toString(), e); + log.error("[DISTRO-FAILED] " + e.toString(), e); } } } From 0f20fcd8f3811967564f7c593e11a9f1df617ef7 Mon Sep 17 00:00:00 2001 From: KomachiSion <263976490@qq.com> Date: Fri, 16 Oct 2020 11:58:44 +0800 Subject: [PATCH 7/7] Set custom logger for TaskExecuteWorker --- .../nacos/common/task/engine/TaskExecuteWorker.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java index efaa17318d2..59fa690f0b0 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java @@ -49,18 +49,14 @@ public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable { private final AtomicBoolean closed; public TaskExecuteWorker(final String name, final int mod, final int total) { - this.name = name + "_" + mod + "%" + total; - this.queue = new ArrayBlockingQueue(QUEUE_CAPACITY); - this.closed = new AtomicBoolean(false); - this.log = LoggerFactory.getLogger(TaskExecuteWorker.class); - new InnerWorker(name).start(); + this(name, mod, total, null); } public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) { this.name = name + "_" + mod + "%" + total; this.queue = new ArrayBlockingQueue(QUEUE_CAPACITY); this.closed = new AtomicBoolean(false); - this.log = logger; + this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger; new InnerWorker(name).start(); }