diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java index ad5ad7b04d080..d5936e0fa21f4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java @@ -17,9 +17,10 @@ */ package org.apache.hadoop.log; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.VisibleForTesting; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.math3.stat.descriptive.SummaryStatistics; import org.apache.hadoop.util.Timer; @@ -130,11 +131,11 @@ public interface LogAction { * will trigger logging. Other recorders are dependent on the state of this * recorder. This may be null, in which case a primary has not yet been set. */ - private String primaryRecorderName; + private final AtomicReference primaryRecorderName = new AtomicReference<>(); private final Timer timer; - private final Map currentLogs; + private final ConcurrentHashMap currentLogs; - private long lastLogTimestampMs = Long.MIN_VALUE; + private final AtomicLong lastLogTimestampMs = new AtomicLong(Long.MIN_VALUE); /** * Create a log helper without any primary recorder. @@ -164,9 +165,9 @@ public LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName) { LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName, Timer timer) { this.minLogPeriodMs = minLogPeriodMs; - this.primaryRecorderName = primaryRecorderName; + this.primaryRecorderName.set(primaryRecorderName); this.timer = timer; - this.currentLogs = new HashMap<>(); + this.currentLogs = new ConcurrentHashMap<>(); } /** @@ -246,35 +247,44 @@ public LogAction record(double... values) { */ public LogAction record(String recorderName, long currentTimeMs, double... values) { - if (primaryRecorderName == null) { - primaryRecorderName = recorderName; - } - LoggingAction currentLog = currentLogs.get(recorderName); - if (currentLog == null || currentLog.hasLogged()) { - currentLog = new LoggingAction(values.length); - if (!currentLogs.containsKey(recorderName)) { - // Always log newly created loggers - currentLog.setShouldLog(); - } - currentLogs.put(recorderName, currentLog); - } - currentLog.recordValues(values); - if (primaryRecorderName.equals(recorderName) && - currentTimeMs - minLogPeriodMs >= lastLogTimestampMs) { - lastLogTimestampMs = currentTimeMs; - currentLogs.replaceAll((key, log) -> { - LoggingAction newLog = log; - if (log.hasLogged()) { - // create a fresh log since the old one has already been logged - newLog = new LoggingAction(log.getValueCount()); + primaryRecorderName.compareAndSet(null, recorderName); + + if (primaryRecorderName.get().equals(recorderName)) { + long localLastLogTs = lastLogTimestampMs.get(); + while (currentTimeMs - minLogPeriodMs >= localLastLogTs) { + if (lastLogTimestampMs.compareAndSet(localLastLogTs, currentTimeMs)) { + currentLogs.replaceAll((key, log) -> { + LoggingAction newLog = log; + if (log.logState.get() == LogState.HAS_LOGGED) { + // create a fresh log since the old one has already been logged + newLog = new LoggingAction(log.getValueCount()); + } + newLog.logState.set(LogState.SHOULD_LOG); + return newLog; + }); + break; + } else { + // another thread updated lastLogTimestampMs concurrently, update the local copy + localLastLogTs = lastLogTimestampMs.get(); } - newLog.setShouldLog(); - return newLog; - }); + } } - if (currentLog.shouldLog()) { - currentLog.setHasLogged(); - return currentLog; + + LoggingAction log = currentLogs.compute(recorderName, (k, oldV) -> { + LoggingAction newV; + if (oldV == null) { + newV = new LoggingAction(values.length); + newV.logState.set(LogState.SHOULD_LOG); + } else if (oldV.logState.get() == LogState.HAS_LOGGED) { + newV = new LoggingAction(values.length); + } else { + newV = oldV; + } + newV.recordValues(values); + return newV; + }); + if (log.logState.compareAndSet(LogState.SHOULD_LOG, LogState.HAS_LOGGED)) { + return log; } else { return DO_NOT_LOG; } @@ -314,6 +324,10 @@ public static String getLogSupressionMessage(LogAction action) { } } + private enum LogState { + RECORDING, SHOULD_LOG, HAS_LOGGED; + } + /** * A standard log action which keeps track of all of the values which have * been logged. This is also used for internal bookkeeping via its private @@ -325,8 +339,7 @@ private static class LoggingAction implements LogAction { private int count = 0; private final SummaryStatistics[] stats; - private boolean shouldLog = false; - private boolean hasLogged = false; + final AtomicReference logState = new AtomicReference<>(LogState.RECORDING); LoggingAction(int valueCount) { stats = new SummaryStatistics[valueCount]; @@ -348,19 +361,8 @@ public SummaryStatistics getStats(int idx) { } public boolean shouldLog() { - return shouldLog; - } - - private void setShouldLog() { - shouldLog = true; - } - - private boolean hasLogged() { - return hasLogged; - } - - private void setHasLogged() { - hasLogged = true; + LogState localState = logState.get(); + return localState == LogState.SHOULD_LOG || localState == LogState.HAS_LOGGED; } private int getValueCount() {