Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dispatch task execute #3995

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* @author xiweng.yy
*/
public abstract class AbstractExecuteTask implements NacosTask {
public abstract class AbstractExecuteTask implements NacosTask, Runnable {

@Override
public boolean shouldProcess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface NacosTaskProcessor {
* @param task task.
* @return process task result.
*/
boolean process(AbstractDelayTask task);
boolean process(NacosTask task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,13 @@

package com.alibaba.nacos.common.task.engine;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* Abstract nacos task execute engine.
Expand All @@ -40,57 +33,12 @@ public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implem

private final Logger log;

private final ScheduledExecutorService processingExecutor;

private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();

protected final ConcurrentHashMap<Object, T> tasks;

protected final ReentrantLock lock = new ReentrantLock();

private NacosTaskProcessor defaultTaskProcessor;

public AbstractNacosTaskExecuteEngine(String name) {
this(name, 32, null, 100L);
}

public AbstractNacosTaskExecuteEngine(String name, Logger logger) {
this(name, 32, logger, 100L);
}

public AbstractNacosTaskExecuteEngine(String name, Logger logger, long processInterval) {
this(name, 32, logger, processInterval);
}

public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger) {
this(name, initCapacity, logger, 100L);
}

public AbstractNacosTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
public AbstractNacosTaskExecuteEngine(Logger logger) {
this.log = null != logger ? logger : LoggerFactory.getLogger(AbstractNacosTaskExecuteEngine.class.getName());
tasks = new ConcurrentHashMap<Object, T>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

@Override
public int size() {
lock.lock();
try {
return tasks.size();
} finally {
lock.unlock();
}
}

@Override
public boolean isEmpty() {
lock.lock();
try {
return tasks.isEmpty();
} finally {
lock.unlock();
}
}

@Override
Expand Down Expand Up @@ -118,56 +66,7 @@ public void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor) {
this.defaultTaskProcessor = defaultTaskProcessor;
}

@Override
public T removeTask(Object key) {
lock.lock();
try {
T task = tasks.get(key);
if (null != task && task.shouldProcess()) {
return tasks.remove(key);
} else {
return null;
}
} finally {
lock.unlock();
}
}

@Override
public Collection<Object> getAllTaskKeys() {
Collection<Object> keys = new HashSet<Object>();
lock.lock();
try {
keys.addAll(tasks.keySet());
} finally {
lock.unlock();
}
return keys;
}

@Override
public void shutdown() throws NacosException {
processingExecutor.shutdown();
}

protected Logger getEngineLog() {
return log;
}

/**
* process tasks in execute engine.
*/
protected abstract void processTasks();

private class ProcessRunnable implements Runnable {

@Override
public void run() {
try {
AbstractNacosTaskExecuteEngine.this.processTasks();
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@

package com.alibaba.nacos.common.task.engine;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* Nacos delay task execute engine.
Expand All @@ -29,27 +37,105 @@
*/
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {

private final ScheduledExecutorService processingExecutor;

protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;

protected final ReentrantLock lock = new ReentrantLock();

public NacosDelayTaskExecuteEngine(String name) {
super(name);
this(name, null);
}

public NacosDelayTaskExecuteEngine(String name, Logger logger) {
super(name, logger);
this(name, 32, logger, 100L);
}

public NacosDelayTaskExecuteEngine(String name, Logger logger, long processInterval) {
super(name, logger, processInterval);
this(name, 32, logger, processInterval);
}

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) {
super(name, initCapacity, logger);
this(name, initCapacity, logger, 100L);
}

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(name, initCapacity, logger, processInterval);
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

@Override
public int size() {
lock.lock();
try {
return tasks.size();
} finally {
lock.unlock();
}
}

@Override
public boolean isEmpty() {
lock.lock();
try {
return tasks.isEmpty();
} finally {
lock.unlock();
}
}

@Override
public AbstractDelayTask removeTask(Object key) {
lock.lock();
try {
AbstractDelayTask task = tasks.get(key);
if (null != task && task.shouldProcess()) {
return tasks.remove(key);
} else {
return null;
}
} finally {
lock.unlock();
}
}

@Override
public Collection<Object> getAllTaskKeys() {
Collection<Object> keys = new HashSet<Object>();
lock.lock();
try {
keys.addAll(tasks.keySet());
} finally {
lock.unlock();
}
return keys;
}

@Override
public void shutdown() throws NacosException {
processingExecutor.shutdown();
}

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}

/**
* process tasks in execute engine.
*/
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
Expand All @@ -74,22 +160,20 @@ protected void processTasks() {
}
}

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}

private void retryFailedTask(Object key, AbstractDelayTask task) {
task.setLastProcessTime(System.currentTimeMillis());
addTask(key, task);
}

private class ProcessRunnable implements Runnable {

@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
}
Loading