diff --git a/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java b/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java index c3bab086362..5e6e93363b6 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/AbstractExecuteTask.java @@ -21,7 +21,7 @@ * * @author xiweng.yy */ -public abstract class AbstractExecuteTask implements NacosTask { +public abstract class AbstractExecuteTask implements NacosTask, Runnable { @Override public boolean shouldProcess() { diff --git a/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java b/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java index 9dcfad7cdce..22275fd68a6 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/NacosTaskProcessor.java @@ -29,5 +29,5 @@ public interface NacosTaskProcessor { * @param task task. * @return process task result. */ - boolean process(AbstractDelayTask task); + boolean process(NacosTask task); } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java index 9b571c6ea2e..b690eb23f6b 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/AbstractNacosTaskExecuteEngine.java @@ -16,20 +16,13 @@ package com.alibaba.nacos.common.task.engine; -import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.common.executor.ExecutorFactory; -import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; -import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; /** * Abstract nacos task execute engine. @@ -40,57 +33,12 @@ public abstract class AbstractNacosTaskExecuteEngine implem private final Logger log; - private final ScheduledExecutorService processingExecutor; - private final ConcurrentHashMap taskProcessors = new ConcurrentHashMap(); - protected final ConcurrentHashMap tasks; - - protected final ReentrantLock lock = new ReentrantLock(); - private NacosTaskProcessor defaultTaskProcessor; - public AbstractNacosTaskExecuteEngine(String name) { - this(name, 32, null, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, Logger logger) { - this(name, 32, logger, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, Logger logger, long processInterval) { - this(name, 32, logger, processInterval); - } - - public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger) { - this(name, initCapacity, logger, 100L); - } - - public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { + public AbstractNacosTaskExecuteEngine(Logger logger) { this.log = null != logger ? logger : LoggerFactory.getLogger(AbstractNacosTaskExecuteEngine.class.getName()); - tasks = new ConcurrentHashMap(initCapacity); - processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); - processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); - } - - @Override - public int size() { - lock.lock(); - try { - return tasks.size(); - } finally { - lock.unlock(); - } - } - - @Override - public boolean isEmpty() { - lock.lock(); - try { - return tasks.isEmpty(); - } finally { - lock.unlock(); - } } @Override @@ -118,56 +66,7 @@ public void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor) { this.defaultTaskProcessor = defaultTaskProcessor; } - @Override - public T removeTask(Object key) { - lock.lock(); - try { - T task = tasks.get(key); - if (null != task && task.shouldProcess()) { - return tasks.remove(key); - } else { - return null; - } - } finally { - lock.unlock(); - } - } - - @Override - public Collection getAllTaskKeys() { - Collection keys = new HashSet(); - lock.lock(); - try { - keys.addAll(tasks.keySet()); - } finally { - lock.unlock(); - } - return keys; - } - - @Override - public void shutdown() throws NacosException { - processingExecutor.shutdown(); - } - protected Logger getEngineLog() { return log; } - - /** - * process tasks in execute engine. - */ - protected abstract void processTasks(); - - private class ProcessRunnable implements Runnable { - - @Override - public void run() { - try { - AbstractNacosTaskExecuteEngine.this.processTasks(); - } catch (Throwable e) { - log.error(e.toString(), e); - } - } - } } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java index 192d4518df5..01b4e0de7aa 100644 --- a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosDelayTaskExecuteEngine.java @@ -16,11 +16,19 @@ package com.alibaba.nacos.common.task.engine; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.executor.ExecutorFactory; +import com.alibaba.nacos.common.executor.NameThreadFactory; import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * Nacos delay task execute engine. @@ -29,27 +37,105 @@ */ public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine { + private final ScheduledExecutorService processingExecutor; + + protected final ConcurrentHashMap tasks; + + protected final ReentrantLock lock = new ReentrantLock(); + public NacosDelayTaskExecuteEngine(String name) { - super(name); + this(name, null); } public NacosDelayTaskExecuteEngine(String name, Logger logger) { - super(name, logger); + this(name, 32, logger, 100L); } public NacosDelayTaskExecuteEngine(String name, Logger logger, long processInterval) { - super(name, logger, processInterval); + this(name, 32, logger, processInterval); } public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) { - super(name, initCapacity, logger); + this(name, initCapacity, logger, 100L); } public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { - super(name, initCapacity, logger, processInterval); + super(logger); + tasks = new ConcurrentHashMap(initCapacity); + processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); + processingExecutor + .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); + } + + @Override + public int size() { + lock.lock(); + try { + return tasks.size(); + } finally { + lock.unlock(); + } + } + + @Override + public boolean isEmpty() { + lock.lock(); + try { + return tasks.isEmpty(); + } finally { + lock.unlock(); + } + } + + @Override + public AbstractDelayTask removeTask(Object key) { + lock.lock(); + try { + AbstractDelayTask task = tasks.get(key); + if (null != task && task.shouldProcess()) { + return tasks.remove(key); + } else { + return null; + } + } finally { + lock.unlock(); + } + } + + @Override + public Collection getAllTaskKeys() { + Collection keys = new HashSet(); + lock.lock(); + try { + keys.addAll(tasks.keySet()); + } finally { + lock.unlock(); + } + return keys; } @Override + public void shutdown() throws NacosException { + processingExecutor.shutdown(); + } + + @Override + public void addTask(Object key, AbstractDelayTask newTask) { + lock.lock(); + try { + AbstractDelayTask existTask = tasks.get(key); + if (null != existTask) { + newTask.merge(existTask); + } + tasks.put(key, newTask); + } finally { + lock.unlock(); + } + } + + /** + * process tasks in execute engine. + */ protected void processTasks() { Collection keys = getAllTaskKeys(); for (Object taskKey : keys) { @@ -74,22 +160,20 @@ protected void processTasks() { } } - @Override - public void addTask(Object key, AbstractDelayTask newTask) { - lock.lock(); - try { - AbstractDelayTask existTask = tasks.get(key); - if (null != existTask) { - newTask.merge(existTask); - } - tasks.put(key, newTask); - } finally { - lock.unlock(); - } - } - private void retryFailedTask(Object key, AbstractDelayTask task) { task.setLastProcessTime(System.currentTimeMillis()); addTask(key, task); } + + private class ProcessRunnable implements Runnable { + + @Override + public void run() { + try { + processTasks(); + } catch (Throwable e) { + getEngineLog().error(e.toString(), e); + } + } + } } diff --git a/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java new file mode 100644 index 00000000000..81de691626a --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngine.java @@ -0,0 +1,111 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.task.engine; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.common.task.NacosTaskProcessor; +import com.alibaba.nacos.common.utils.ThreadUtils; +import org.slf4j.Logger; + +import java.util.Collection; + +/** + * Nacos execute task execute engine. + * + * @author xiweng.yy + */ +public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine { + + private final TaskExecuteWorker[] executeWorkers; + + public NacosExecuteTaskExecuteEngine(String name, Logger logger) { + this(name, logger, ThreadUtils.getSuitableThreadCount(1)); + } + + public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) { + super(logger); + executeWorkers = new TaskExecuteWorker[dispatchWorkerCount]; + for (int mod = 0; mod < dispatchWorkerCount; ++mod) { + executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog()); + } + } + + @Override + public int size() { + int result = 0; + for (TaskExecuteWorker each : executeWorkers) { + result += each.pendingTaskCount(); + } + return result; + } + + @Override + public boolean isEmpty() { + return 0 == size(); + } + + @Override + public void addTask(Object tag, AbstractExecuteTask task) { + NacosTaskProcessor processor = getProcessor(tag); + if (null != processor) { + processor.process(task); + return; + } + TaskExecuteWorker worker = getWorker(tag); + worker.process(task); + } + + private TaskExecuteWorker getWorker(Object tag) { + int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount(); + return executeWorkers[idx]; + } + + private int workersCount() { + return executeWorkers.length; + } + + @Override + public AbstractExecuteTask removeTask(Object key) { + throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task"); + } + + @Override + public Collection getAllTaskKeys() { + throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys"); + } + + @Override + public void shutdown() throws NacosException { + for (TaskExecuteWorker each : executeWorkers) { + each.shutdown(); + } + } + + /** + * Get workers status. + * + * @return workers status string + */ + public String workersStatus() { + StringBuilder sb = new StringBuilder(); + for (TaskExecuteWorker worker : executeWorkers) { + sb.append(worker.status()).append("\n"); + } + return sb.toString(); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java similarity index 64% rename from core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java rename to common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java index c8ab9f2f8e9..59fa690f0b0 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorker.java +++ b/common/src/main/java/com/alibaba/nacos/common/task/engine/TaskExecuteWorker.java @@ -14,69 +14,69 @@ * limitations under the License. */ -package com.alibaba.nacos.core.distributed.distro.task.execute; +package com.alibaba.nacos.common.task.engine; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.common.lifecycle.Closeable; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.common.task.NacosTask; +import com.alibaba.nacos.common.task.NacosTaskProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicBoolean; /** - * Distro execute worker. + * Nacos execute task execute worker. * * @author xiweng.yy */ -public final class DistroExecuteWorker implements Closeable { +public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(DistroExecuteWorker.class); - - private static final int QUEUE_CAPACITY = 50000; + /** + * Max task queue size 32768. + */ + private static final int QUEUE_CAPACITY = 1 << 15; - private final BlockingQueue queue; + private final Logger log; private final String name; + private final BlockingQueue queue; + private final AtomicBoolean closed; - - public DistroExecuteWorker(final int mod, final int total) { - name = getClass().getName() + "_" + mod + "%" + total; - queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY); - closed = new AtomicBoolean(false); + + public TaskExecuteWorker(final String name, final int mod, final int total) { + this(name, mod, total, null); + } + + public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) { + this.name = name + "_" + mod + "%" + total; + this.queue = new ArrayBlockingQueue(QUEUE_CAPACITY); + this.closed = new AtomicBoolean(false); + this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger; new InnerWorker(name).start(); } - + public String getName() { return name; } - - /** - * Execute task without result. - */ - public void execute(Runnable task) { - putTask(task); - } - - /** - * Execute task with a result. - */ - public Future execute(Callable task) { - FutureTask future = new FutureTask(task); - putTask(future); - return future; + + @Override + public boolean process(NacosTask task) { + if (task instanceof AbstractExecuteTask) { + putTask((Runnable) task); + } + return true; } - + private void putTask(Runnable task) { try { queue.put(task); } catch (InterruptedException ire) { - LOGGER.error(ire.toString(), ire); + log.error(ire.toString(), ire); } } @@ -101,12 +101,12 @@ public void shutdown() throws NacosException { * Inner execute worker. */ private class InnerWorker extends Thread { - + InnerWorker(String name) { setDaemon(false); setName(name); } - + @Override public void run() { while (!closed.get()) { @@ -116,10 +116,10 @@ public void run() { task.run(); long duration = System.currentTimeMillis() - begin; if (duration > 1000L) { - LOGGER.warn("distro task {} takes {}ms", task, duration); + log.warn("distro task {} takes {}ms", task, duration); } } catch (Throwable e) { - LOGGER.error("[DISTRO-FAILED] " + e.toString(), e); + log.error("[DISTRO-FAILED] " + e.toString(), e); } } } diff --git a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java index 67d68babb0f..57149f205db 100644 --- a/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java +++ b/common/src/main/java/com/alibaba/nacos/common/utils/ThreadUtils.java @@ -94,9 +94,19 @@ public static void latchAwait(CountDownLatch latch, long time, TimeUnit unit) { * @return thread count */ public static int getSuitableThreadCount() { + return getSuitableThreadCount(THREAD_MULTIPLER); + } + + /** + * Through the number of cores, calculate the appropriate number of threads. + * + * @param threadMultiple multiple time of cores + * @return thread count + */ + public static int getSuitableThreadCount(int threadMultiple) { final int coreCount = Runtime.getRuntime().availableProcessors(); int workerCount = 1; - while (workerCount < coreCount * THREAD_MULTIPLER) { + while (workerCount < coreCount * threadMultiple) { workerCount <<= 1; } return workerCount; diff --git a/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java b/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java new file mode 100644 index 00000000000..4d166686aab --- /dev/null +++ b/common/src/test/java/com/alibaba/nacos/common/task/engine/NacosExecuteTaskExecuteEngineTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.task.engine; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.common.task.AbstractExecuteTask; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class NacosExecuteTaskExecuteEngineTest { + + private NacosExecuteTaskExecuteEngine executeTaskExecuteEngine; + + @Before + public void setUp() { + executeTaskExecuteEngine = new NacosExecuteTaskExecuteEngine("TEST", null); + } + + @After + public void tearDown() throws NacosException { + executeTaskExecuteEngine.shutdown(); + } + + @Mock + private AbstractExecuteTask task; + + @Test + public void testAddTask() { + executeTaskExecuteEngine.addTask("test", task); + verify(task).run(); + assertTrue(executeTaskExecuteEngine.isEmpty()); + assertEquals(0, executeTaskExecuteEngine.size()); + } +} diff --git a/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java b/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java index 720ce3acbf3..9c00aeb654d 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java @@ -44,10 +44,6 @@ public final class TaskManager extends NacosDelayTaskExecuteEngine implements Ta Condition notEmpty = this.lock.newCondition(); - public TaskManager() { - this(null); - } - public TaskManager(String name) { super(name, LOGGER, 100L); this.name = name; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java index 44452091e70..cea4c8eaef7 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllBetaProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoBetaWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -43,7 +43,7 @@ public DumpAllBetaProcessor(DumpService dumpService) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { int rowCount = persistService.configInfoBetaCount(); int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java index 72ba2f07adc..96527deacc6 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllProcessor.java @@ -16,9 +16,9 @@ package com.alibaba.nacos.config.server.service.dump.processor; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -47,7 +47,7 @@ public DumpAllProcessor(DumpService dumpService) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { long currentMaxId = persistService.findConfigMaxId(); long lastMaxId = 0; while (lastMaxId < currentMaxId) { diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java index 2b013f70eab..d0b823134b9 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpAllTagProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfoTagWrapper; import com.alibaba.nacos.config.server.model.Page; @@ -42,7 +42,7 @@ public DumpAllTagProcessor(DumpService dumpService) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { int rowCount = persistService.configInfoTagCount(); int pageCount = (int) Math.ceil(rowCount * 1.0 / PAGE_SIZE); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java index cabd9a225ba..0444fe3d5b6 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpChangeProcessor.java @@ -16,9 +16,9 @@ package com.alibaba.nacos.config.server.service.dump.processor; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfoWrapper; @@ -47,7 +47,7 @@ public DumpChangeProcessor(DumpService dumpService, Timestamp startTime, Timesta } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { LogUtil.DEFAULT_LOG.warn("quick start; startTime:{},endTime:{}", startTime, endTime); LogUtil.DEFAULT_LOG.warn("updateMd5 start"); long startUpdateMd5 = System.currentTimeMillis(); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java index f2ccc17edb6..545be5da4d5 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.service.dump.processor; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfo4Beta; @@ -44,7 +44,7 @@ public DumpProcessor(DumpService dumpService) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { final PersistService persistService = dumpService.getPersistService(); DumpTask dumpTask = (DumpTask) task; String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey()); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java index f2f14808b8a..17fda431fe2 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java @@ -17,8 +17,8 @@ package com.alibaba.nacos.config.server.service.merge; import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.model.ConfigInfo; import com.alibaba.nacos.config.server.model.ConfigInfoAggr; @@ -52,7 +52,7 @@ public class MergeTaskProcessor implements NacosTaskProcessor { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { MergeDataTask mergeTask = (MergeDataTask) task; final String dataId = mergeTask.dataId; final String group = mergeTask.groupId; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java index bf778db6955..b427f42b8b4 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifySingleService.java @@ -18,7 +18,7 @@ import com.alibaba.nacos.common.executor.ExecutorFactory; import com.alibaba.nacos.common.executor.NameThreadFactory; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.ServerMemberManager; @@ -50,7 +50,7 @@ public NotifyTaskProcessorWrapper() { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { NotifySingleTask notifyTask = (NotifySingleTask) task; return notifyToDump(notifyTask.getDataId(), notifyTask.getGroup(), notifyTask.getTenant(), notifyTask.getLastModified(), notifyTask.target); diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java index 02282693f8a..aa505c7fb45 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/NotifyTaskProcessor.java @@ -17,8 +17,8 @@ package com.alibaba.nacos.config.server.service.notify; import com.alibaba.nacos.common.model.RestResult; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.config.server.constant.Constants; -import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.config.server.monitor.MetricsMonitor; import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; @@ -46,7 +46,7 @@ public NotifyTaskProcessor(ServerMemberManager memberManager) { } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { NotifyTask notifyTask = (NotifyTask) task; String dataId = notifyTask.getDataId(); String group = notifyTask.getGroup(); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java index 3724a2727d3..8cfe9ea03d5 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/DistroTaskEngineHolder.java @@ -20,7 +20,7 @@ import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor; -import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteWorkersManager; +import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteTaskExecuteEngine; import org.springframework.stereotype.Component; /** @@ -33,7 +33,7 @@ public class DistroTaskEngineHolder { private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine(); - private final DistroExecuteWorkersManager executeWorkersManager = new DistroExecuteWorkersManager(); + private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine(); public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) { DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder); @@ -44,7 +44,7 @@ public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() { return delayTaskExecuteEngine; } - public DistroExecuteWorkersManager getExecuteWorkersManager() { + public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() { return executeWorkersManager; } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java index 05ac026db13..2566c61d6c0 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/delay/DistroDelayTaskProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.core.distributed.distro.task.delay; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder; @@ -42,7 +42,7 @@ public DistroDelayTaskProcessor(DistroTaskEngineHolder distroTaskEngineHolder, } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } @@ -50,7 +50,7 @@ public boolean process(AbstractDelayTask task) { DistroKey distroKey = distroDelayTask.getDistroKey(); if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); - distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask); + distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; } return false; diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java index c1fed5b56bf..25d23bdc534 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/AbstractDistroExecuteTask.java @@ -24,7 +24,7 @@ * * @author xiweng.yy */ -public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask implements Runnable { +public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask { private final DistroKey distroKey; diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java new file mode 100644 index 00000000000..1883dac2a97 --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteTaskExecuteEngine.java @@ -0,0 +1,32 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.distributed.distro.task.execute; + +import com.alibaba.nacos.common.task.engine.NacosExecuteTaskExecuteEngine; +import com.alibaba.nacos.core.utils.Loggers; + +/** + * Distro execute task execute engine. + * + * @author xiweng.yy + */ +public class DistroExecuteTaskExecuteEngine extends NacosExecuteTaskExecuteEngine { + + public DistroExecuteTaskExecuteEngine() { + super(DistroExecuteTaskExecuteEngine.class.getSimpleName(), Loggers.DISTRO); + } +} diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java deleted file mode 100644 index afd19f0fefe..00000000000 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/execute/DistroExecuteWorkersManager.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.nacos.core.distributed.distro.task.execute; - -/** - * Distro execute workers manager. - * - * @author xiweng.yy - */ -public final class DistroExecuteWorkersManager { - - private final DistroExecuteWorker[] connectionWorkers; - - public DistroExecuteWorkersManager() { - int workerCount = findWorkerCount(); - connectionWorkers = new DistroExecuteWorker[workerCount]; - for (int mod = 0; mod < workerCount; ++mod) { - connectionWorkers[mod] = new DistroExecuteWorker(mod, workerCount); - } - } - - private int findWorkerCount() { - final int coreCount = Runtime.getRuntime().availableProcessors(); - int result = 1; - while (result < coreCount) { - result <<= 1; - } - return result; - } - - /** - * Dispatch task to worker by tag. - */ - public void dispatch(Object tag, Runnable task) { - DistroExecuteWorker worker = getWorker(tag); - worker.execute(task); - } - - private DistroExecuteWorker getWorker(Object tag) { - int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount(); - return connectionWorkers[idx]; - } - - /** - * Get workers status. - * - * @return workers status string - */ - public String workersStatus() { - StringBuilder sb = new StringBuilder(); - for (DistroExecuteWorker worker : connectionWorkers) { - sb.append(worker.status()).append("\n"); - } - return sb.toString(); - } - - public int workersCount() { - return connectionWorkers.length; - } - -} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java index a82d3db50f0..5e13e164306 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyDelayTask.java @@ -18,9 +18,9 @@ import com.alibaba.nacos.common.task.AbstractDelayTask; import com.alibaba.nacos.consistency.DataOperation; -import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask; +import com.alibaba.nacos.naming.consistency.KeyBuilder; import java.util.HashSet; import java.util.Set; @@ -49,15 +49,18 @@ public Set getActualResourceKeys() { public void merge(AbstractDelayTask task) { actualResourceKeys.addAll(((DistroHttpCombinedKeyDelayTask) task).getActualResourceKeys()); if (actualResourceKeys.size() >= batchSize) { - this.setLastProcessTime(0); DistroHttpCombinedKey.incrementSequence(); + setLastProcessTime(0); + } else { + setLastProcessTime(task.getLastProcessTime()); } } @Override public DistroKey getDistroKey() { DistroKey taskKey = super.getDistroKey(); - DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, taskKey.getTargetServer()); + DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, + taskKey.getTargetServer()); result.setResourceKey(taskKey.getResourceKey()); result.getActualResourceTypes().addAll(actualResourceKeys); return result; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java index 9acd566fcdb..2903ee092bb 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpCombinedKeyExecuteTask.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; +import com.alibaba.nacos.common.task.AbstractExecuteTask; import com.alibaba.nacos.consistency.DataOperation; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine; @@ -31,7 +32,7 @@ * * @author xiweng.yy */ -public class DistroHttpCombinedKeyExecuteTask implements Runnable { +public class DistroHttpCombinedKeyExecuteTask extends AbstractExecuteTask { private final GlobalConfig globalConfig; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java index 43264e0a0b5..d4bb5b57d0e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/combined/DistroHttpDelayTaskProcessor.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined; -import com.alibaba.nacos.common.task.AbstractDelayTask; +import com.alibaba.nacos.common.task.NacosTask; import com.alibaba.nacos.common.task.NacosTaskProcessor; import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder; @@ -50,12 +50,12 @@ public DistroHttpDelayTaskProcessor(GlobalConfig globalConfig, DistroTaskEngineH } @Override - public boolean process(AbstractDelayTask task) { + public boolean process(NacosTask task) { DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig, distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction()); - distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, executeTask); + distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, executeTask); return true; } }