Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
*/
package org.apache.hadoop.log;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.hadoop.util.Timer;

Expand Down Expand Up @@ -130,11 +131,11 @@ public interface LogAction {
* will trigger logging. Other recorders are dependent on the state of this
* recorder. This may be null, in which case a primary has not yet been set.
*/
private String primaryRecorderName;
private final AtomicReference<String> primaryRecorderName = new AtomicReference<>();
private final Timer timer;
private final Map<String, LoggingAction> currentLogs;
private final ConcurrentHashMap<String, LoggingAction> currentLogs;

private long lastLogTimestampMs = Long.MIN_VALUE;
private final AtomicLong lastLogTimestampMs = new AtomicLong(Long.MIN_VALUE);

/**
* Create a log helper without any primary recorder.
Expand Down Expand Up @@ -164,9 +165,9 @@ public LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName) {
LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName,
Timer timer) {
this.minLogPeriodMs = minLogPeriodMs;
this.primaryRecorderName = primaryRecorderName;
this.primaryRecorderName.set(primaryRecorderName);
this.timer = timer;
this.currentLogs = new HashMap<>();
this.currentLogs = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -246,35 +247,44 @@ public LogAction record(double... values) {
*/
public LogAction record(String recorderName, long currentTimeMs,
double... values) {
if (primaryRecorderName == null) {
primaryRecorderName = recorderName;
}
LoggingAction currentLog = currentLogs.get(recorderName);
if (currentLog == null || currentLog.hasLogged()) {
currentLog = new LoggingAction(values.length);
if (!currentLogs.containsKey(recorderName)) {
// Always log newly created loggers
currentLog.setShouldLog();
}
currentLogs.put(recorderName, currentLog);
}
currentLog.recordValues(values);
if (primaryRecorderName.equals(recorderName) &&
currentTimeMs - minLogPeriodMs >= lastLogTimestampMs) {
lastLogTimestampMs = currentTimeMs;
currentLogs.replaceAll((key, log) -> {
LoggingAction newLog = log;
if (log.hasLogged()) {
// create a fresh log since the old one has already been logged
newLog = new LoggingAction(log.getValueCount());
primaryRecorderName.compareAndSet(null, recorderName);

if (primaryRecorderName.get().equals(recorderName)) {
long localLastLogTs = lastLogTimestampMs.get();
while (currentTimeMs - minLogPeriodMs >= localLastLogTs) {
if (lastLogTimestampMs.compareAndSet(localLastLogTs, currentTimeMs)) {
currentLogs.replaceAll((key, log) -> {
LoggingAction newLog = log;
if (log.logState.get() == LogState.HAS_LOGGED) {
// create a fresh log since the old one has already been logged
newLog = new LoggingAction(log.getValueCount());
}
newLog.logState.set(LogState.SHOULD_LOG);
return newLog;
});
break;
} else {
// another thread updated lastLogTimestampMs concurrently, update the local copy
localLastLogTs = lastLogTimestampMs.get();
}
newLog.setShouldLog();
return newLog;
});
}
}
if (currentLog.shouldLog()) {
currentLog.setHasLogged();
return currentLog;

LoggingAction log = currentLogs.compute(recorderName, (k, oldV) -> {
LoggingAction newV;
if (oldV == null) {
newV = new LoggingAction(values.length);
newV.logState.set(LogState.SHOULD_LOG);
} else if (oldV.logState.get() == LogState.HAS_LOGGED) {
newV = new LoggingAction(values.length);
} else {
newV = oldV;
}
newV.recordValues(values);
return newV;
});
if (log.logState.compareAndSet(LogState.SHOULD_LOG, LogState.HAS_LOGGED)) {
return log;
} else {
return DO_NOT_LOG;
}
Expand Down Expand Up @@ -314,6 +324,10 @@ public static String getLogSupressionMessage(LogAction action) {
}
}

private enum LogState {
RECORDING, SHOULD_LOG, HAS_LOGGED;
}

/**
* A standard log action which keeps track of all of the values which have
* been logged. This is also used for internal bookkeeping via its private
Expand All @@ -325,8 +339,7 @@ private static class LoggingAction implements LogAction {

private int count = 0;
private final SummaryStatistics[] stats;
private boolean shouldLog = false;
private boolean hasLogged = false;
final AtomicReference<LogState> logState = new AtomicReference<>(LogState.RECORDING);

LoggingAction(int valueCount) {
stats = new SummaryStatistics[valueCount];
Expand All @@ -348,19 +361,8 @@ public SummaryStatistics getStats(int idx) {
}

public boolean shouldLog() {
return shouldLog;
}

private void setShouldLog() {
shouldLog = true;
}

private boolean hasLogged() {
return hasLogged;
}

private void setHasLogged() {
hasLogged = true;
LogState localState = logState.get();
return localState == LogState.SHOULD_LOG || localState == LogState.HAS_LOGGED;
}

private int getValueCount() {
Expand Down