Skip to content

Commit

Permalink
Develop fix jraft null datum (#4048)
Browse files Browse the repository at this point in the history
* Fix old raft sync null datum problem

* Fix TimerContext NPE problem
  • Loading branch information
KomachiSion authored Oct 22, 2020
1 parent 9ecc17e commit 6eeeb76
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public ServerMemberManager getMemberManager() {

protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
TimerContext.start("CONFIG_DUMP_TO_FILE");
String dumpFileContext = "CONFIG_DUMP_TO_FILE";
TimerContext.start(dumpFileContext);
try {
LogUtil.DEFAULT_LOG.warn("DumpService start");

Expand Down Expand Up @@ -229,7 +230,7 @@ protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProc

ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
} finally {
TimerContext.end(LogUtil.DUMP_LOG);
TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
*/
public class DerbySnapshotOperation implements SnapshotOperation {

private static final String DERBY_SNAPSHOT_SAVE = DerbySnapshotOperation.class.getSimpleName() + ".SAVE";

private static final String DERBY_SNAPSHOT_LOAD = DerbySnapshotOperation.class.getSimpleName() + ".LOAD";

private final String backupSql = "CALL SYSCS_UTIL.SYSCS_BACKUP_DATABASE(?)";

private final String snapshotDir = "derby_data";
Expand All @@ -72,8 +76,7 @@ public DerbySnapshotOperation(ReentrantReadWriteLock.WriteLock writeLock) {
@Override
public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFinally) {
RaftExecutor.doSnapshot(() -> {

TimerContext.start("CONFIG_DERBY_SNAPSHOT_SAVE");
TimerContext.start(DERBY_SNAPSHOT_SAVE);

final Lock lock = writeLock;
lock.lock();
Expand All @@ -100,7 +103,7 @@ public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFin
callFinally.accept(false, t);
} finally {
lock.unlock();
TimerContext.end(LogUtil.FATAL_LOG);
TimerContext.end(DERBY_SNAPSHOT_SAVE, LogUtil.FATAL_LOG);
}
});
}
Expand All @@ -109,8 +112,7 @@ public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFin
public boolean onSnapshotLoad(Reader reader) {
final String readerPath = reader.getPath();
final String sourceFile = Paths.get(readerPath, snapshotArchive).toString();

TimerContext.start("CONFIG_DERBY_SNAPSHOT_LOAD");
TimerContext.start(DERBY_SNAPSHOT_LOAD);
final Lock lock = writeLock;
lock.lock();
try {
Expand Down Expand Up @@ -145,7 +147,7 @@ public boolean onSnapshotLoad(Reader reader) {
return false;
} finally {
lock.unlock();
TimerContext.end(LogUtil.FATAL_LOG);
TimerContext.end(DERBY_SNAPSHOT_LOAD, LogUtil.FATAL_LOG);
}
}

Expand Down
51 changes: 27 additions & 24 deletions core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package com.alibaba.nacos.core.utils;

import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.Pair;
import com.alibaba.nacos.common.utils.StringUtils;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -33,47 +34,49 @@
*/
public class TimerContext {

private static final ThreadLocal<Pair<String, Long>> TIME_RECORD = new ThreadLocal<>();
private static final ThreadLocal<Map<String, Long>> TIME_RECORD = ThreadLocal.withInitial(() -> new HashMap<>(2));

/**
* Record context start time.
*
* @param name context name
*/
public static void start(final String name) {
long startTime = System.currentTimeMillis();
TIME_RECORD.set(Pair.with(name, startTime));
TIME_RECORD.get().put(name, System.currentTimeMillis());
}

public static void end(final Logger logger) {
end(logger, LoggerUtils.DEBUG);
public static void end(final String name, final Logger logger) {
end(name, logger, LoggerUtils.DEBUG);
}

/**
* End the task and print based on the log level.
*
* @param name context name
* @param logger logger
* @param level logger level
*/
public static void end(final Logger logger, final String level) {
long endTime = System.currentTimeMillis();
Pair<String, Long> record = TIME_RECORD.get();
public static void end(final String name, final Logger logger, final String level) {
Map<String, Long> record = TIME_RECORD.get();
long contextTime = System.currentTimeMillis() - record.remove(name);
if (StringUtils.equals(level, LoggerUtils.DEBUG)) {
LoggerUtils.printIfDebugEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfDebugEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (StringUtils.equals(level, LoggerUtils.INFO)) {
LoggerUtils.printIfInfoEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfInfoEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (StringUtils.equals(level, LoggerUtils.TRACE)) {
LoggerUtils.printIfTraceEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfTraceEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (StringUtils.equals(level, LoggerUtils.ERROR)) {
LoggerUtils.printIfErrorEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfErrorEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (StringUtils.equals(level, LoggerUtils.WARN)) {
LoggerUtils.printIfWarnEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfWarnEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (record.isEmpty()) {
TIME_RECORD.remove();
}
TIME_RECORD.remove();
}

/**
Expand All @@ -88,7 +91,7 @@ public static void run(final Runnable job, final String name, final Logger logge
try {
job.run();
} finally {
end(logger);
end(name, logger);
}
}

Expand All @@ -104,7 +107,7 @@ public static <V> V run(final Supplier<V> job, final String name, final Logger l
try {
return job.get();
} finally {
end(logger);
end(name, logger);
}
}

Expand All @@ -121,7 +124,7 @@ public static <T, R> R run(final Function<T, R> job, T args, final String name,
try {
return job.apply(args);
} finally {
end(logger);
end(name, logger);
}
}

Expand All @@ -138,7 +141,7 @@ public static <T> void run(final Consumer<T> job, T args, final String name, fin
try {
job.accept(args);
} finally {
end(logger);
end(name, logger);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*/
public class NamingKvStorage extends MemoryKvStorage {

private static final String LOAD_SNAPSHOT = NamingKvStorage.class.getSimpleName() + ".snapshotLoad";

private final String baseDir;

private final KvStorage baseDirStorage;
Expand Down Expand Up @@ -142,18 +144,13 @@ public void doSnapshot(String backupPath) throws KvStorageException {

@Override
public void snapshotLoad(String path) throws KvStorageException {
final KvStorageException e = TimerContext.run(() -> {
try {
baseDirStorage.snapshotLoad(path);
loadSnapshotFromActualStorage(baseDirStorage);
loadNamespaceSnapshot();
return null;
} catch (KvStorageException exception) {
return exception;
}
}, "naming kv storage load snapshot", Loggers.RAFT);
if (e != null) {
throw e;
TimerContext.start(LOAD_SNAPSHOT);
try {
baseDirStorage.snapshotLoad(path);
loadSnapshotFromActualStorage(baseDirStorage);
loadNamespaceSnapshot();
} finally {
TimerContext.end(LOAD_SNAPSHOT, Loggers.RAFT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
*/
public class NamingSnapshotOperation implements SnapshotOperation {

private static final String NAMING_SNAPSHOT_SAVE = NamingSnapshotOperation.class.getSimpleName() + ".SAVE";

private static final String NAMING_SNAPSHOT_LOAD = NamingSnapshotOperation.class.getSimpleName() + ".LOAD";

private final String snapshotDir = "naming_persistent";

private final String snapshotArchive = "naming_persistent.zip";
Expand All @@ -59,7 +63,7 @@ public NamingSnapshotOperation(KvStorage storage, ReentrantReadWriteLock lock) {
@Override
public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFinally) {
RaftExecutor.doSnapshot(() -> {
TimerContext.start("NAMING_SNAPSHOT_SAVE");
TimerContext.start(NAMING_SNAPSHOT_SAVE);

final Lock lock = writeLock;
lock.lock();
Expand All @@ -85,7 +89,7 @@ public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFin
callFinally.accept(false, t);
} finally {
lock.unlock();
TimerContext.end(Loggers.RAFT);
TimerContext.end(NAMING_SNAPSHOT_SAVE, Loggers.RAFT);
}
});
}
Expand All @@ -95,7 +99,7 @@ public boolean onSnapshotLoad(Reader reader) {
final String readerPath = reader.getPath();
final String sourceFile = Paths.get(readerPath, snapshotArchive).toString();

TimerContext.start("NAMING_SNAPSHOT_LOAD");
TimerContext.start(NAMING_SNAPSHOT_LOAD);
final Lock lock = writeLock;
lock.lock();
try {
Expand All @@ -119,7 +123,7 @@ public boolean onSnapshotLoad(Reader reader) {
return false;
} finally {
lock.unlock();
TimerContext.end(Loggers.RAFT);
TimerContext.end(NAMING_SNAPSHOT_LOAD, Loggers.RAFT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public RaftCore(RaftPeerSet peers, SwitchDomain switchDomain, GlobalConfig globa
this.raftProxy = raftProxy;
this.raftStore = raftStore;
this.versionJudgement = versionJudgement;
this.notifier = new PersistentNotifier(key -> getDatum(key).value);
this.notifier = new PersistentNotifier(key -> null == getDatum(key) ? null : getDatum(key).value);
this.publisher = NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384);
}

Expand Down Expand Up @@ -241,7 +241,7 @@ public void signalPublish(String key, Record value) throws Exception {
continue;
}
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new Callback<String>() {
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Expand Down Expand Up @@ -817,8 +817,10 @@ public RaftPeer receivedBeat(JsonNode beat) throws Exception {
processedCount, beatDatums.size(), datums.size());

// update datum entry
String url = buildUrl(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
HttpClient.asyncHttpGet(url, null, null, new Callback<String>() {
String url = buildUrl(remote.ip, API_GET);
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Expand Down

0 comments on commit 6eeeb76

Please sign in to comment.