From 9585d55adccffde50385ed9af89d385ebfcd53c6 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 17 Jun 2020 15:20:06 +0800 Subject: [PATCH 1/4] improvement: merge upstream/develop --- .../nacos/core/file/WatchFileCenter.java | 427 +++++++++--------- 1 file changed, 202 insertions(+), 225 deletions(-) diff --git a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java index 4a7d7e1180c..3b026a6c582 100644 --- a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java @@ -25,241 +25,218 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.nio.file.*; +import java.util.*; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; /** - * Unified file change monitoring management center, which uses {@link WatchService} internally. - * One file directory corresponds to one {@link WatchService}. It can only monitor up to 32 file - * directories. When a file change occurs, a {@link FileChangeEvent} will be issued + * Unified file change monitoring management center, which uses {@link WatchService} internally. One file directory + * corresponds to one {@link WatchService}. It can only monitor up to 32 file directories. When a file change occurs, a + * {@link FileChangeEvent} will be issued * * @author liaochuntao */ public class WatchFileCenter { - - private static final Logger LOGGER = LoggerFactory.getLogger(WatchFileCenter.class); - - /** - * Maximum number of monitored file directories - */ - private static final int MAX_WATCH_FILE_JOB = Integer - .getInteger("nacos.watch-file.max-dirs", 16); - - private static final Map MANAGER = new HashMap( - MAX_WATCH_FILE_JOB); - - private static final FileSystem FILE_SYSTEM = FileSystems.getDefault(); - - private static final AtomicBoolean CLOSED = new AtomicBoolean(false); - - static { + + private static final Logger LOGGER = LoggerFactory.getLogger(WatchFileCenter.class); + + /** + * Maximum number of monitored file directories + */ + private static final int MAX_WATCH_FILE_JOB = Integer.getInteger("nacos.watch-file.max-dirs", 16); + + private static final Map MANAGER = new HashMap(MAX_WATCH_FILE_JOB); + + private static final FileSystem FILE_SYSTEM = FileSystems.getDefault(); + + private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + + static { ThreadUtils.addShutdownHook(new Runnable() { - @Override - public void run() { - shutdown(); - } - }); - } - - /** - * The number of directories that are currently monitored - */ - private static int NOW_WATCH_JOB_CNT = 0; - - public synchronized static boolean registerWatcher(final String paths, - FileWatcher watcher) throws NacosException { - checkState(); - NOW_WATCH_JOB_CNT++; - if (NOW_WATCH_JOB_CNT > MAX_WATCH_FILE_JOB) { - return false; - } - WatchDirJob job = MANAGER.get(paths); - if (job == null) { - job = new WatchDirJob(paths); - job.start(); - MANAGER.put(paths, job); - } - job.addSubscribe(watcher); - return true; - } - - public synchronized static boolean deregisterAllWatcher(final String path) { - WatchDirJob job = MANAGER.get(path); - if (job != null) { - job.shutdown(); - MANAGER.remove(path); - return true; - } - return false; - } - - public static void shutdown() { - if (!CLOSED.compareAndSet(false, true)) { - return; - } - LOGGER.warn("[WatchFileCenter] start close"); - for (Map.Entry entry : MANAGER.entrySet()) { - LOGGER.warn("[WatchFileCenter] start to shutdown this watcher which is watch : " + entry.getKey()); - try { - entry.getValue().shutdown(); - } catch (Throwable e) { - LOGGER.error("[WatchFileCenter] shutdown has error : {}", e); - } - } - MANAGER.clear(); - LOGGER.warn("[WatchFileCenter] already closed"); - } - - public synchronized static boolean deregisterWatcher(final String path, final FileWatcher watcher) { - WatchDirJob job = MANAGER.get(path); - if (job != null) { - job.watchers.remove(watcher); - return true; - } - return false; - } - - private static class WatchDirJob extends Thread { - - private ExecutorService callBackExecutor; - - private final String paths; - - private WatchService watchService; - - private volatile boolean watch = true; - - private Set watchers = new ConcurrentHashSet<>(); - - public WatchDirJob(String paths) throws NacosException { - setName(paths); - this.paths = paths; - final Path p = Paths.get(paths); - if (!p.toFile().isDirectory()) { - throw new IllegalArgumentException("Must be a file directory : " + paths); - } - - this.callBackExecutor = ExecutorFactory - .newFixExecutorService(WatchFileCenter.class.getCanonicalName(), - 1, - new NameThreadFactory("com.alibaba.nacos.file.watch-" + paths)); - - try { - WatchService service = FILE_SYSTEM.newWatchService(); - p.register(service, StandardWatchEventKinds.OVERFLOW, - StandardWatchEventKinds.ENTRY_MODIFY, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE); - this.watchService = service; - } - catch (Throwable ex) { - throw new NacosException(NacosException.SERVER_ERROR, ex); - } - } - - void addSubscribe(final FileWatcher watcher) { - watchers.add(watcher); - } - - void shutdown() { - watch = false; - } - - @Override - public void run() { - while (watch) { - try { - final WatchKey watchKey = watchService.take(); - final List> events = watchKey.pollEvents(); - watchKey.reset(); - if (callBackExecutor.isShutdown()) { - return; - } - if(events.isEmpty()) { + @Override + public void run() { + shutdown(); + } + }); + } + + /** + * The number of directories that are currently monitored + */ + private static int NOW_WATCH_JOB_CNT = 0; + + public synchronized static boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException { + checkState(); + NOW_WATCH_JOB_CNT++; + if (NOW_WATCH_JOB_CNT > MAX_WATCH_FILE_JOB) { + return false; + } + WatchDirJob job = MANAGER.get(paths); + if (job == null) { + job = new WatchDirJob(paths); + job.start(); + MANAGER.put(paths, job); + } + job.addSubscribe(watcher); + return true; + } + + public synchronized static boolean deregisterAllWatcher(final String path) { + WatchDirJob job = MANAGER.get(path); + if (job != null) { + job.shutdown(); + MANAGER.remove(path); + return true; + } + return false; + } + + public static void shutdown() { + if (!CLOSED.compareAndSet(false, true)) { + return; + } + LOGGER.warn("[WatchFileCenter] start close"); + for (Map.Entry entry : MANAGER.entrySet()) { + LOGGER.warn("[WatchFileCenter] start to shutdown this watcher which is watch : " + entry.getKey()); + try { + entry.getValue().shutdown(); + } catch (Throwable e) { + LOGGER.error("[WatchFileCenter] shutdown has error : {}", e); + } + } + MANAGER.clear(); + LOGGER.warn("[WatchFileCenter] already closed"); + } + + public synchronized static boolean deregisterWatcher(final String path, final FileWatcher watcher) { + WatchDirJob job = MANAGER.get(path); + if (job != null) { + job.watchers.remove(watcher); + return true; + } + return false; + } + + private static class WatchDirJob extends Thread { + + private ExecutorService callBackExecutor; + + private final String paths; + + private WatchService watchService; + + private volatile boolean watch = true; + + private Set watchers = new ConcurrentHashSet<>(); + + public WatchDirJob(String paths) throws NacosException { + setName(paths); + this.paths = paths; + final Path p = Paths.get(paths); + if (!p.toFile().isDirectory()) { + throw new IllegalArgumentException("Must be a file directory : " + paths); + } + + this.callBackExecutor = ExecutorFactory.newFixExecutorService(WatchFileCenter.class.getCanonicalName(), 1, + new NameThreadFactory("com.alibaba.nacos.file.watch-" + paths)); + + try { + WatchService service = FILE_SYSTEM.newWatchService(); + p.register(service, StandardWatchEventKinds.OVERFLOW, StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE); + this.watchService = service; + } catch (Throwable ex) { + throw new NacosException(NacosException.SERVER_ERROR, ex); + } + } + + void addSubscribe(final FileWatcher watcher) { + watchers.add(watcher); + } + + void shutdown() { + watch = false; + } + + @Override + public void run() { + while (watch) { + try { + final WatchKey watchKey = watchService.take(); + final List> events = watchKey.pollEvents(); + watchKey.reset(); + if (callBackExecutor.isShutdown()) { + return; + } + if (events.isEmpty()) { continue; } - callBackExecutor.execute(new Runnable() { - @Override - public void run() { - for (WatchEvent event : events) { - WatchEvent.Kind kind = event.kind(); - - // Since the OS's event cache may be overflow, a backstop is needed - if (StandardWatchEventKinds.OVERFLOW.equals(kind)) { - eventOverflow(); - } - else { - eventProcess(event.context()); - } - } - } - }); - } - catch (InterruptedException ignore) { - Thread.interrupted(); - } catch (Throwable ex) { - LOGGER.error("An exception occurred during file listening : {}", ex); - } - } - } - - private void eventProcess(Object context) { - final FileChangeEvent fileChangeEvent = FileChangeEvent.builder().paths(paths) - .context(context).build(); - final String str = String.valueOf(context); - for (final FileWatcher watcher : watchers) { - if (watcher.interest(str)) { - Runnable job = new Runnable() { - @Override - public void run() { - watcher.onChange(fileChangeEvent); - } - }; - Executor executor = watcher.executor(); - if (executor == null) { - try { - job.run(); - } catch (Throwable ex) { - LOGGER.error("File change event callback error : {}", ex); - } - } - else { - executor.execute(job); - } - } - } - } - - private void eventOverflow() { - File dir = Paths.get(paths).toFile(); - for (File file : Objects.requireNonNull(dir.listFiles())) { - // Subdirectories do not participate in listening - if (file.isDirectory()) { - continue; - } - eventProcess(file.getName()); - } - } - - } - - private static void checkState() { - if (CLOSED.get()) { - throw new IllegalStateException("WatchFileCenter already shutdown"); - } - } + callBackExecutor.execute(new Runnable() { + @Override + public void run() { + for (WatchEvent event : events) { + WatchEvent.Kind kind = event.kind(); + + // Since the OS's event cache may be overflow, a backstop is needed + if (StandardWatchEventKinds.OVERFLOW.equals(kind)) { + eventOverflow(); + } else { + eventProcess(event.context()); + } + } + } + }); + } catch (InterruptedException ignore) { + Thread.interrupted(); + } catch (Throwable ex) { + LOGGER.error("An exception occurred during file listening : {}", ex); + } + } + } + + private void eventProcess(Object context) { + final FileChangeEvent fileChangeEvent = FileChangeEvent.builder().paths(paths).context(context).build(); + final String str = String.valueOf(context); + for (final FileWatcher watcher : watchers) { + if (watcher.interest(str)) { + Runnable job = new Runnable() { + @Override + public void run() { + watcher.onChange(fileChangeEvent); + } + }; + Executor executor = watcher.executor(); + if (executor == null) { + try { + job.run(); + } catch (Throwable ex) { + LOGGER.error("File change event callback error : {}", ex); + } + } else { + executor.execute(job); + } + } + } + } + + private void eventOverflow() { + File dir = Paths.get(paths).toFile(); + for (File file : Objects.requireNonNull(dir.listFiles())) { + // Subdirectories do not participate in listening + if (file.isDirectory()) { + continue; + } + eventProcess(file.getName()); + } + } + + } + + private static void checkState() { + if (CLOSED.get()) { + throw new IllegalStateException("WatchFileCenter already shutdown"); + } + } } From 45e134b7aeea987f4d184a4e722c06b1fd56b47f Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 19 Jun 2020 14:22:50 +0800 Subject: [PATCH 2/4] fix: fix config ut bug --- .../nacos/test/config/ConfigAPI_CITCase.java | 15 ++++++-- .../nacos/test/config/ConfigBeta_CITCase.java | 8 +++++ .../nacos/test/config/ConfigCleanUtils.java | 35 +++++++++++++++++++ .../ConfigExportAndImportAPI_CITCase.java | 7 +++- .../ConfigLongPollReturnChanges_CITCase.java | 8 +++++ .../test/config/ConfigLongPoll_CITCase.java | 8 +++++ 6 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 test/src/test/java/com/alibaba/nacos/test/config/ConfigCleanUtils.java diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java index 1ae67b691e1..785ace828b8 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java @@ -30,8 +30,10 @@ import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.ThreadUtils; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; @@ -90,6 +92,12 @@ public void cleanup() throws Exception { Assert.fail(); } } + + @BeforeClass + @AfterClass + public static void cleanClientCache() throws Exception { + ConfigCleanUtils.cleanClientCache(); + } /** * @TCDescription : nacos_正常获取数据 @@ -383,8 +391,8 @@ public void nacos_addListener_3() throws InterruptedException, NacosException { final AtomicInteger count = new AtomicInteger(0); final String dataId = "nacos_addListener_3"; final String group = "nacos_addListener_3"; - final String content = "test-abc"; - final String newContent = "nacos_addListener_3"; + final String content = "test-abc-" + System.currentTimeMillis(); + final String newContent = "nacos_addListener_3-" + System.currentTimeMillis(); boolean result = iconfig.publishConfig(dataId, group, content); Assert.assertTrue(result); @@ -395,7 +403,8 @@ public void receiveConfigInfo(String configInfo) { Assert.assertEquals(newContent, configInfo); } }; - iconfig.addListener(dataId, group, ml); + String receive = iconfig.getConfigAndSignListener(dataId, group, 5000L, ml); + Assert.assertEquals(content, receive); result = iconfig.publishConfig(dataId, group, newContent); Assert.assertTrue(result); // Get enough sleep to ensure that the monitor is triggered only once diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigBeta_CITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigBeta_CITCase.java index 19731048211..b016c24daf6 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigBeta_CITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigBeta_CITCase.java @@ -21,8 +21,10 @@ import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.test.base.Params; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -61,6 +63,12 @@ public class ConfigBeta_CITCase { String tenant = "dungu"; String content = "test"; String appName = "nacos"; + + @BeforeClass + @AfterClass + public static void cleanClientCache() throws Exception { + ConfigCleanUtils.cleanClientCache(); + } @Before public void init() throws NacosException { diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigCleanUtils.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigCleanUtils.java new file mode 100644 index 00000000000..ffc652a4e89 --- /dev/null +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigCleanUtils.java @@ -0,0 +1,35 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.test.config; + +import com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor; +import com.alibaba.nacos.core.utils.DiskUtils; + +import java.io.IOException; + +/** + * Cache files to clear tool classes. + * + * @author liaochuntao + */ +public class ConfigCleanUtils { + + public static void cleanClientCache() throws IOException { + DiskUtils.deleteDirThenMkdir(LocalConfigInfoProcessor.LOCAL_SNAPSHOT_PATH); + } + +} diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigExportAndImportAPI_CITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigExportAndImportAPI_CITCase.java index 7791135f8de..cd55ccacaf4 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigExportAndImportAPI_CITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigExportAndImportAPI_CITCase.java @@ -56,7 +56,12 @@ public class ConfigExportAndImportAPI_CITCase { private String SERVER_ADDR = null; private HttpAgent agent = null; - + + @BeforeClass + @AfterClass + public static void cleanClientCache() throws Exception { + ConfigCleanUtils.cleanClientCache(); + } @Before public void setUp() throws Exception { diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPollReturnChanges_CITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPollReturnChanges_CITCase.java index 7f3e4a630de..936aab9498e 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPollReturnChanges_CITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPollReturnChanges_CITCase.java @@ -25,8 +25,10 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.client.config.listener.impl.AbstractConfigChangeListener; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; @@ -46,6 +48,12 @@ public class ConfigLongPollReturnChanges_CITCase { private int port; private ConfigService configService; + + @BeforeClass + @AfterClass + public static void cleanClientCache() throws Exception { + ConfigCleanUtils.cleanClientCache(); + } @Before public void init() throws NacosException { diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_CITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_CITCase.java index 054c29dc2df..67df2d81967 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_CITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigLongPoll_CITCase.java @@ -23,7 +23,9 @@ import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; @@ -47,6 +49,12 @@ public class ConfigLongPoll_CITCase { private int port; private ConfigService configService; + + @BeforeClass + @AfterClass + public static void cleanClientCache() throws Exception { + ConfigCleanUtils.cleanClientCache(); + } @Before public void init() throws NacosException { From 3e18648561ca7c5ba2d0a2d0ef96d4a3f9af6352 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 19 Jun 2020 14:25:15 +0800 Subject: [PATCH 3/4] style: fix code style --- .../alibaba/nacos/core/file/WatchFileCenter.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java index e8cc0f88257..723a8f0a259 100644 --- a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java @@ -25,8 +25,19 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.nio.file.*; -import java.util.*; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; From f4748a8c8e8318f929522eb55380adbcbb84b2a3 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 19 Jun 2020 19:16:34 +0800 Subject: [PATCH 4/4] fix: fix ut bug --- .../java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java b/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java index 785ace828b8..b572e3d851f 100644 --- a/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java +++ b/test/src/test/java/com/alibaba/nacos/test/config/ConfigAPI_CITCase.java @@ -395,6 +395,9 @@ public void nacos_addListener_3() throws InterruptedException, NacosException { final String newContent = "nacos_addListener_3-" + System.currentTimeMillis(); boolean result = iconfig.publishConfig(dataId, group, content); Assert.assertTrue(result); + + // Maximum assurance level notification has been performed + ThreadUtils.sleep(5000); Listener ml = new AbstractListener() { @Override