From 9585d55adccffde50385ed9af89d385ebfcd53c6 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 17 Jun 2020 15:20:06 +0800 Subject: [PATCH 1/3] improvement: merge upstream/develop --- .../nacos/core/file/WatchFileCenter.java | 427 +++++++++--------- 1 file changed, 202 insertions(+), 225 deletions(-) diff --git a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java index 4a7d7e1180c..3b026a6c582 100644 --- a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java @@ -25,241 +25,218 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.nio.file.*; +import java.util.*; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; /** - * Unified file change monitoring management center, which uses {@link WatchService} internally. - * One file directory corresponds to one {@link WatchService}. It can only monitor up to 32 file - * directories. When a file change occurs, a {@link FileChangeEvent} will be issued + * Unified file change monitoring management center, which uses {@link WatchService} internally. One file directory + * corresponds to one {@link WatchService}. It can only monitor up to 32 file directories. When a file change occurs, a + * {@link FileChangeEvent} will be issued * * @author liaochuntao */ public class WatchFileCenter { - - private static final Logger LOGGER = LoggerFactory.getLogger(WatchFileCenter.class); - - /** - * Maximum number of monitored file directories - */ - private static final int MAX_WATCH_FILE_JOB = Integer - .getInteger("nacos.watch-file.max-dirs", 16); - - private static final Map MANAGER = new HashMap( - MAX_WATCH_FILE_JOB); - - private static final FileSystem FILE_SYSTEM = FileSystems.getDefault(); - - private static final AtomicBoolean CLOSED = new AtomicBoolean(false); - - static { + + private static final Logger LOGGER = LoggerFactory.getLogger(WatchFileCenter.class); + + /** + * Maximum number of monitored file directories + */ + private static final int MAX_WATCH_FILE_JOB = Integer.getInteger("nacos.watch-file.max-dirs", 16); + + private static final Map MANAGER = new HashMap(MAX_WATCH_FILE_JOB); + + private static final FileSystem FILE_SYSTEM = FileSystems.getDefault(); + + private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + + static { ThreadUtils.addShutdownHook(new Runnable() { - @Override - public void run() { - shutdown(); - } - }); - } - - /** - * The number of directories that are currently monitored - */ - private static int NOW_WATCH_JOB_CNT = 0; - - public synchronized static boolean registerWatcher(final String paths, - FileWatcher watcher) throws NacosException { - checkState(); - NOW_WATCH_JOB_CNT++; - if (NOW_WATCH_JOB_CNT > MAX_WATCH_FILE_JOB) { - return false; - } - WatchDirJob job = MANAGER.get(paths); - if (job == null) { - job = new WatchDirJob(paths); - job.start(); - MANAGER.put(paths, job); - } - job.addSubscribe(watcher); - return true; - } - - public synchronized static boolean deregisterAllWatcher(final String path) { - WatchDirJob job = MANAGER.get(path); - if (job != null) { - job.shutdown(); - MANAGER.remove(path); - return true; - } - return false; - } - - public static void shutdown() { - if (!CLOSED.compareAndSet(false, true)) { - return; - } - LOGGER.warn("[WatchFileCenter] start close"); - for (Map.Entry entry : MANAGER.entrySet()) { - LOGGER.warn("[WatchFileCenter] start to shutdown this watcher which is watch : " + entry.getKey()); - try { - entry.getValue().shutdown(); - } catch (Throwable e) { - LOGGER.error("[WatchFileCenter] shutdown has error : {}", e); - } - } - MANAGER.clear(); - LOGGER.warn("[WatchFileCenter] already closed"); - } - - public synchronized static boolean deregisterWatcher(final String path, final FileWatcher watcher) { - WatchDirJob job = MANAGER.get(path); - if (job != null) { - job.watchers.remove(watcher); - return true; - } - return false; - } - - private static class WatchDirJob extends Thread { - - private ExecutorService callBackExecutor; - - private final String paths; - - private WatchService watchService; - - private volatile boolean watch = true; - - private Set watchers = new ConcurrentHashSet<>(); - - public WatchDirJob(String paths) throws NacosException { - setName(paths); - this.paths = paths; - final Path p = Paths.get(paths); - if (!p.toFile().isDirectory()) { - throw new IllegalArgumentException("Must be a file directory : " + paths); - } - - this.callBackExecutor = ExecutorFactory - .newFixExecutorService(WatchFileCenter.class.getCanonicalName(), - 1, - new NameThreadFactory("com.alibaba.nacos.file.watch-" + paths)); - - try { - WatchService service = FILE_SYSTEM.newWatchService(); - p.register(service, StandardWatchEventKinds.OVERFLOW, - StandardWatchEventKinds.ENTRY_MODIFY, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE); - this.watchService = service; - } - catch (Throwable ex) { - throw new NacosException(NacosException.SERVER_ERROR, ex); - } - } - - void addSubscribe(final FileWatcher watcher) { - watchers.add(watcher); - } - - void shutdown() { - watch = false; - } - - @Override - public void run() { - while (watch) { - try { - final WatchKey watchKey = watchService.take(); - final List> events = watchKey.pollEvents(); - watchKey.reset(); - if (callBackExecutor.isShutdown()) { - return; - } - if(events.isEmpty()) { + @Override + public void run() { + shutdown(); + } + }); + } + + /** + * The number of directories that are currently monitored + */ + private static int NOW_WATCH_JOB_CNT = 0; + + public synchronized static boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException { + checkState(); + NOW_WATCH_JOB_CNT++; + if (NOW_WATCH_JOB_CNT > MAX_WATCH_FILE_JOB) { + return false; + } + WatchDirJob job = MANAGER.get(paths); + if (job == null) { + job = new WatchDirJob(paths); + job.start(); + MANAGER.put(paths, job); + } + job.addSubscribe(watcher); + return true; + } + + public synchronized static boolean deregisterAllWatcher(final String path) { + WatchDirJob job = MANAGER.get(path); + if (job != null) { + job.shutdown(); + MANAGER.remove(path); + return true; + } + return false; + } + + public static void shutdown() { + if (!CLOSED.compareAndSet(false, true)) { + return; + } + LOGGER.warn("[WatchFileCenter] start close"); + for (Map.Entry entry : MANAGER.entrySet()) { + LOGGER.warn("[WatchFileCenter] start to shutdown this watcher which is watch : " + entry.getKey()); + try { + entry.getValue().shutdown(); + } catch (Throwable e) { + LOGGER.error("[WatchFileCenter] shutdown has error : {}", e); + } + } + MANAGER.clear(); + LOGGER.warn("[WatchFileCenter] already closed"); + } + + public synchronized static boolean deregisterWatcher(final String path, final FileWatcher watcher) { + WatchDirJob job = MANAGER.get(path); + if (job != null) { + job.watchers.remove(watcher); + return true; + } + return false; + } + + private static class WatchDirJob extends Thread { + + private ExecutorService callBackExecutor; + + private final String paths; + + private WatchService watchService; + + private volatile boolean watch = true; + + private Set watchers = new ConcurrentHashSet<>(); + + public WatchDirJob(String paths) throws NacosException { + setName(paths); + this.paths = paths; + final Path p = Paths.get(paths); + if (!p.toFile().isDirectory()) { + throw new IllegalArgumentException("Must be a file directory : " + paths); + } + + this.callBackExecutor = ExecutorFactory.newFixExecutorService(WatchFileCenter.class.getCanonicalName(), 1, + new NameThreadFactory("com.alibaba.nacos.file.watch-" + paths)); + + try { + WatchService service = FILE_SYSTEM.newWatchService(); + p.register(service, StandardWatchEventKinds.OVERFLOW, StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE); + this.watchService = service; + } catch (Throwable ex) { + throw new NacosException(NacosException.SERVER_ERROR, ex); + } + } + + void addSubscribe(final FileWatcher watcher) { + watchers.add(watcher); + } + + void shutdown() { + watch = false; + } + + @Override + public void run() { + while (watch) { + try { + final WatchKey watchKey = watchService.take(); + final List> events = watchKey.pollEvents(); + watchKey.reset(); + if (callBackExecutor.isShutdown()) { + return; + } + if (events.isEmpty()) { continue; } - callBackExecutor.execute(new Runnable() { - @Override - public void run() { - for (WatchEvent event : events) { - WatchEvent.Kind kind = event.kind(); - - // Since the OS's event cache may be overflow, a backstop is needed - if (StandardWatchEventKinds.OVERFLOW.equals(kind)) { - eventOverflow(); - } - else { - eventProcess(event.context()); - } - } - } - }); - } - catch (InterruptedException ignore) { - Thread.interrupted(); - } catch (Throwable ex) { - LOGGER.error("An exception occurred during file listening : {}", ex); - } - } - } - - private void eventProcess(Object context) { - final FileChangeEvent fileChangeEvent = FileChangeEvent.builder().paths(paths) - .context(context).build(); - final String str = String.valueOf(context); - for (final FileWatcher watcher : watchers) { - if (watcher.interest(str)) { - Runnable job = new Runnable() { - @Override - public void run() { - watcher.onChange(fileChangeEvent); - } - }; - Executor executor = watcher.executor(); - if (executor == null) { - try { - job.run(); - } catch (Throwable ex) { - LOGGER.error("File change event callback error : {}", ex); - } - } - else { - executor.execute(job); - } - } - } - } - - private void eventOverflow() { - File dir = Paths.get(paths).toFile(); - for (File file : Objects.requireNonNull(dir.listFiles())) { - // Subdirectories do not participate in listening - if (file.isDirectory()) { - continue; - } - eventProcess(file.getName()); - } - } - - } - - private static void checkState() { - if (CLOSED.get()) { - throw new IllegalStateException("WatchFileCenter already shutdown"); - } - } + callBackExecutor.execute(new Runnable() { + @Override + public void run() { + for (WatchEvent event : events) { + WatchEvent.Kind kind = event.kind(); + + // Since the OS's event cache may be overflow, a backstop is needed + if (StandardWatchEventKinds.OVERFLOW.equals(kind)) { + eventOverflow(); + } else { + eventProcess(event.context()); + } + } + } + }); + } catch (InterruptedException ignore) { + Thread.interrupted(); + } catch (Throwable ex) { + LOGGER.error("An exception occurred during file listening : {}", ex); + } + } + } + + private void eventProcess(Object context) { + final FileChangeEvent fileChangeEvent = FileChangeEvent.builder().paths(paths).context(context).build(); + final String str = String.valueOf(context); + for (final FileWatcher watcher : watchers) { + if (watcher.interest(str)) { + Runnable job = new Runnable() { + @Override + public void run() { + watcher.onChange(fileChangeEvent); + } + }; + Executor executor = watcher.executor(); + if (executor == null) { + try { + job.run(); + } catch (Throwable ex) { + LOGGER.error("File change event callback error : {}", ex); + } + } else { + executor.execute(job); + } + } + } + } + + private void eventOverflow() { + File dir = Paths.get(paths).toFile(); + for (File file : Objects.requireNonNull(dir.listFiles())) { + // Subdirectories do not participate in listening + if (file.isDirectory()) { + continue; + } + eventProcess(file.getName()); + } + } + + } + + private static void checkState() { + if (CLOSED.get()) { + throw new IllegalStateException("WatchFileCenter already shutdown"); + } + } } From a07f84efa557cb122af23b7312d006270d8d0198 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 24 Jun 2020 18:21:37 +0800 Subject: [PATCH 2/3] feat: merge upstream develop --- .../alibaba/nacos/core/file/WatchFileCenter.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java index beb4ed27cf0..97849bb91c5 100644 --- a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java @@ -25,8 +25,19 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.nio.file.*; -import java.util.*; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; From 42064e0c7373c1e309e9d0062ab5cda0eb58c990 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 13 Oct 2020 20:57:48 +0800 Subject: [PATCH 3/3] fix: fix issue 2892 --- .../com/alibaba/nacos/naming/controllers/OperatorController.java | 1 - 1 file changed, 1 deletion(-) diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java index 180c931415f..5df9462c46c 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/OperatorController.java @@ -175,7 +175,6 @@ public String updateSwitch(@RequestParam(required = false) boolean debug, @Reque * @param request request * @return metrics information */ - @Secured(resource = "naming/metrics", action = ActionTypes.READ) @GetMapping("/metrics") public ObjectNode metrics(HttpServletRequest request) {