Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop fix jraft null datum #4048

Merged
merged 2 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public ServerMemberManager getMemberManager() {

protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
TimerContext.start("CONFIG_DUMP_TO_FILE");
String dumpFileContext = "CONFIG_DUMP_TO_FILE";
TimerContext.start(dumpFileContext);
try {
LogUtil.DEFAULT_LOG.warn("DumpService start");

Expand Down Expand Up @@ -229,7 +230,7 @@ protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProc

ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
} finally {
TimerContext.end(LogUtil.DUMP_LOG);
TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
*/
public class DerbySnapshotOperation implements SnapshotOperation {

private static final String DERBY_SNAPSHOT_SAVE = DerbySnapshotOperation.class.getSimpleName() + ".SAVE";

private static final String DERBY_SNAPSHOT_LOAD = DerbySnapshotOperation.class.getSimpleName() + ".LOAD";

private final String backupSql = "CALL SYSCS_UTIL.SYSCS_BACKUP_DATABASE(?)";

private final String snapshotDir = "derby_data";
Expand All @@ -72,8 +76,7 @@ public DerbySnapshotOperation(ReentrantReadWriteLock.WriteLock writeLock) {
@Override
public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFinally) {
RaftExecutor.doSnapshot(() -> {

TimerContext.start("CONFIG_DERBY_SNAPSHOT_SAVE");
TimerContext.start(DERBY_SNAPSHOT_SAVE);

final Lock lock = writeLock;
lock.lock();
Expand All @@ -100,7 +103,7 @@ public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFin
callFinally.accept(false, t);
} finally {
lock.unlock();
TimerContext.end(LogUtil.FATAL_LOG);
TimerContext.end(DERBY_SNAPSHOT_SAVE, LogUtil.FATAL_LOG);
}
});
}
Expand All @@ -109,8 +112,7 @@ public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFin
public boolean onSnapshotLoad(Reader reader) {
final String readerPath = reader.getPath();
final String sourceFile = Paths.get(readerPath, snapshotArchive).toString();

TimerContext.start("CONFIG_DERBY_SNAPSHOT_LOAD");
TimerContext.start(DERBY_SNAPSHOT_LOAD);
final Lock lock = writeLock;
lock.lock();
try {
Expand Down Expand Up @@ -145,7 +147,7 @@ public boolean onSnapshotLoad(Reader reader) {
return false;
} finally {
lock.unlock();
TimerContext.end(LogUtil.FATAL_LOG);
TimerContext.end(DERBY_SNAPSHOT_LOAD, LogUtil.FATAL_LOG);
}
}

Expand Down
51 changes: 27 additions & 24 deletions core/src/main/java/com/alibaba/nacos/core/utils/TimerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package com.alibaba.nacos.core.utils;

import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.Pair;
import com.alibaba.nacos.common.utils.StringUtils;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -33,47 +34,49 @@
*/
public class TimerContext {

private static final ThreadLocal<Pair<String, Long>> TIME_RECORD = new ThreadLocal<>();
private static final ThreadLocal<Map<String, Long>> TIME_RECORD = ThreadLocal.withInitial(() -> new HashMap<>(2));

/**
* Record context start time.
*
* @param name context name
*/
public static void start(final String name) {
long startTime = System.currentTimeMillis();
TIME_RECORD.set(Pair.with(name, startTime));
TIME_RECORD.get().put(name, System.currentTimeMillis());
}

public static void end(final Logger logger) {
end(logger, LoggerUtils.DEBUG);
public static void end(final String name, final Logger logger) {
end(name, logger, LoggerUtils.DEBUG);
}

/**
* End the task and print based on the log level.
*
* @param name context name
* @param logger logger
* @param level logger level
*/
public static void end(final Logger logger, final String level) {
long endTime = System.currentTimeMillis();
Pair<String, Long> record = TIME_RECORD.get();
public static void end(final String name, final Logger logger, final String level) {
Map<String, Long> record = TIME_RECORD.get();
long contextTime = System.currentTimeMillis() - record.remove(name);
if (StringUtils.equals(level, LoggerUtils.DEBUG)) {
LoggerUtils.printIfDebugEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfDebugEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (StringUtils.equals(level, LoggerUtils.INFO)) {
LoggerUtils.printIfInfoEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfInfoEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (StringUtils.equals(level, LoggerUtils.TRACE)) {
LoggerUtils.printIfTraceEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfTraceEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (StringUtils.equals(level, LoggerUtils.ERROR)) {
LoggerUtils.printIfErrorEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfErrorEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (StringUtils.equals(level, LoggerUtils.WARN)) {
LoggerUtils.printIfWarnEnabled(logger, "{} cost time : {} ms", record.getFirst(),
(endTime - record.getSecond()));
LoggerUtils.printIfWarnEnabled(logger, "{} cost time : {} ms", name, contextTime);
}
if (record.isEmpty()) {
TIME_RECORD.remove();
}
TIME_RECORD.remove();
}

/**
Expand All @@ -88,7 +91,7 @@ public static void run(final Runnable job, final String name, final Logger logge
try {
job.run();
} finally {
end(logger);
end(name, logger);
}
}

Expand All @@ -104,7 +107,7 @@ public static <V> V run(final Supplier<V> job, final String name, final Logger l
try {
return job.get();
} finally {
end(logger);
end(name, logger);
}
}

Expand All @@ -121,7 +124,7 @@ public static <T, R> R run(final Function<T, R> job, T args, final String name,
try {
return job.apply(args);
} finally {
end(logger);
end(name, logger);
}
}

Expand All @@ -138,7 +141,7 @@ public static <T> void run(final Consumer<T> job, T args, final String name, fin
try {
job.accept(args);
} finally {
end(logger);
end(name, logger);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*/
public class NamingKvStorage extends MemoryKvStorage {

private static final String LOAD_SNAPSHOT = NamingKvStorage.class.getSimpleName() + ".snapshotLoad";

private final String baseDir;

private final KvStorage baseDirStorage;
Expand Down Expand Up @@ -142,18 +144,13 @@ public void doSnapshot(String backupPath) throws KvStorageException {

@Override
public void snapshotLoad(String path) throws KvStorageException {
final KvStorageException e = TimerContext.run(() -> {
try {
baseDirStorage.snapshotLoad(path);
loadSnapshotFromActualStorage(baseDirStorage);
loadNamespaceSnapshot();
return null;
} catch (KvStorageException exception) {
return exception;
}
}, "naming kv storage load snapshot", Loggers.RAFT);
if (e != null) {
throw e;
TimerContext.start(LOAD_SNAPSHOT);
try {
baseDirStorage.snapshotLoad(path);
loadSnapshotFromActualStorage(baseDirStorage);
loadNamespaceSnapshot();
} finally {
TimerContext.end(LOAD_SNAPSHOT, Loggers.RAFT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
*/
public class NamingSnapshotOperation implements SnapshotOperation {

private static final String NAMING_SNAPSHOT_SAVE = NamingSnapshotOperation.class.getSimpleName() + ".SAVE";

private static final String NAMING_SNAPSHOT_LOAD = NamingSnapshotOperation.class.getSimpleName() + ".LOAD";

private final String snapshotDir = "naming_persistent";

private final String snapshotArchive = "naming_persistent.zip";
Expand All @@ -59,7 +63,7 @@ public NamingSnapshotOperation(KvStorage storage, ReentrantReadWriteLock lock) {
@Override
public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFinally) {
RaftExecutor.doSnapshot(() -> {
TimerContext.start("NAMING_SNAPSHOT_SAVE");
TimerContext.start(NAMING_SNAPSHOT_SAVE);

final Lock lock = writeLock;
lock.lock();
Expand All @@ -85,7 +89,7 @@ public void onSnapshotSave(Writer writer, BiConsumer<Boolean, Throwable> callFin
callFinally.accept(false, t);
} finally {
lock.unlock();
TimerContext.end(Loggers.RAFT);
TimerContext.end(NAMING_SNAPSHOT_SAVE, Loggers.RAFT);
}
});
}
Expand All @@ -95,7 +99,7 @@ public boolean onSnapshotLoad(Reader reader) {
final String readerPath = reader.getPath();
final String sourceFile = Paths.get(readerPath, snapshotArchive).toString();

TimerContext.start("NAMING_SNAPSHOT_LOAD");
TimerContext.start(NAMING_SNAPSHOT_LOAD);
final Lock lock = writeLock;
lock.lock();
try {
Expand All @@ -119,7 +123,7 @@ public boolean onSnapshotLoad(Reader reader) {
return false;
} finally {
lock.unlock();
TimerContext.end(Loggers.RAFT);
TimerContext.end(NAMING_SNAPSHOT_LOAD, Loggers.RAFT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public RaftCore(RaftPeerSet peers, SwitchDomain switchDomain, GlobalConfig globa
this.raftProxy = raftProxy;
this.raftStore = raftStore;
this.versionJudgement = versionJudgement;
this.notifier = new PersistentNotifier(key -> getDatum(key).value);
this.notifier = new PersistentNotifier(key -> null == getDatum(key) ? null : getDatum(key).value);
this.publisher = NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384);
}

Expand Down Expand Up @@ -241,7 +241,7 @@ public void signalPublish(String key, Record value) throws Exception {
continue;
}
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new Callback<String>() {
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Expand Down Expand Up @@ -817,8 +817,10 @@ public RaftPeer receivedBeat(JsonNode beat) throws Exception {
processedCount, beatDatums.size(), datums.size());

// update datum entry
String url = buildUrl(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
HttpClient.asyncHttpGet(url, null, null, new Callback<String>() {
String url = buildUrl(remote.ip, API_GET);
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Expand Down