Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.protocol;

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;

/**
* Outlier detection metrics - median, median absolute deviation, upper latency limit,
* actual latency etc.
*/
@InterfaceAudience.Private
public class OutlierMetrics {

private final Double median;
private final Double mad;
private final Double upperLimitLatency;
private final Double actualLatency;

public OutlierMetrics(Double median, Double mad, Double upperLimitLatency,
Double actualLatency) {
this.median = median;
this.mad = mad;
this.upperLimitLatency = upperLimitLatency;
this.actualLatency = actualLatency;
}

public Double getMedian() {
return median;
}

public Double getMad() {
return mad;
}

public Double getUpperLimitLatency() {
return upperLimitLatency;
}

public Double getActualLatency() {
return actualLatency;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

OutlierMetrics that = (OutlierMetrics) o;

return new EqualsBuilder()
.append(median, that.median)
.append(mad, that.mad)
.append(upperLimitLatency, that.upperLimitLatency)
.append(actualLatency, that.actualLatency)
.isEquals();
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(median)
.append(mad)
.append(upperLimitLatency)
.append(actualLatency)
.toHashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class SlowPeerReports {
* meaningful and must be avoided.
*/
@Nonnull
private final Map<String, Double> slowPeers;
private final Map<String, OutlierMetrics> slowPeers;

/**
* An object representing a SlowPeerReports with no entries. Should
Expand All @@ -61,19 +61,19 @@ public final class SlowPeerReports {
public static final SlowPeerReports EMPTY_REPORT =
new SlowPeerReports(ImmutableMap.of());

private SlowPeerReports(Map<String, Double> slowPeers) {
private SlowPeerReports(Map<String, OutlierMetrics> slowPeers) {
this.slowPeers = slowPeers;
}

public static SlowPeerReports create(
@Nullable Map<String, Double> slowPeers) {
@Nullable Map<String, OutlierMetrics> slowPeers) {
if (slowPeers == null || slowPeers.isEmpty()) {
return EMPTY_REPORT;
}
return new SlowPeerReports(slowPeers);
}

public Map<String, Double> getSlowPeers() {
public Map<String, OutlierMetrics> getSlowPeers() {
return slowPeers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
Expand Down Expand Up @@ -853,11 +854,15 @@ public static List<SlowPeerReportProto> convertSlowPeerInfo(

List<SlowPeerReportProto> slowPeerInfoProtos =
new ArrayList<>(slowPeers.getSlowPeers().size());
for (Map.Entry<String, Double> entry :
slowPeers.getSlowPeers().entrySet()) {
slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder()
for (Map.Entry<String, OutlierMetrics> entry : slowPeers.getSlowPeers().entrySet()) {
OutlierMetrics outlierMetrics = entry.getValue();
slowPeerInfoProtos.add(
SlowPeerReportProto.newBuilder()
.setDataNodeId(entry.getKey())
.setAggregateLatency(entry.getValue())
.setAggregateLatency(outlierMetrics.getActualLatency())
.setMedian(outlierMetrics.getMedian())
.setMad(outlierMetrics.getMad())
.setUpperLimitLatency(outlierMetrics.getUpperLimitLatency())
.build());
}
return slowPeerInfoProtos;
Expand All @@ -871,15 +876,19 @@ public static SlowPeerReports convertSlowPeerInfo(
return SlowPeerReports.EMPTY_REPORT;
}

Map<String, Double> slowPeersMap = new HashMap<>(slowPeerProtos.size());
Map<String, OutlierMetrics> slowPeersMap = new HashMap<>(slowPeerProtos.size());
for (SlowPeerReportProto proto : slowPeerProtos) {
if (!proto.hasDataNodeId()) {
// The DataNodeId should be reported.
continue;
}
slowPeersMap.put(
proto.getDataNodeId(),
proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0);
Double aggregateLatency = proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0;
Double medianLatency = proto.hasMedian() ? proto.getMedian() : 0.0;
Double madLatency = proto.hasMad() ? proto.getMad() : 0.0;
Double upperLimitLatency = proto.hasUpperLimitLatency() ? proto.getUpperLimitLatency() : 0.0;
OutlierMetrics outlierMetrics =
new OutlierMetrics(medianLatency, madLatency, upperLimitLatency, aggregateLatency);
slowPeersMap.put(proto.getDataNodeId(), outlierMetrics);
}
return SlowPeerReports.create(slowPeersMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1896,14 +1896,14 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");

if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
final Map<String, OutlierMetrics> slowPeersMap = slowPeers.getSlowPeers();
if (!slowPeersMap.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
}
for (Map.Entry<String, Double> slowNodeId : slowPeersMap.entrySet()) {
slowPeerTracker.addReport(slowNodeId.getKey(), nodeReg.getIpcAddr(false),
slowNodeId.getValue());
for (Map.Entry<String, OutlierMetrics> slowNodeEntry : slowPeersMap.entrySet()) {
slowPeerTracker.addReport(slowNodeEntry.getKey(), nodeReg.getIpcAddr(false),
slowNodeEntry.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Timer;

Expand All @@ -58,7 +59,7 @@ public boolean isSlowPeerTrackerEnabled() {
}

@Override
public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
public void addReport(String slowNode, String reportingNode, OutlierMetrics slowNodeMetrics) {
LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,31 @@ final class SlowPeerLatencyWithReportingNode
@JsonProperty("ReportedLatency")
private final Double reportedLatency;

@JsonProperty("MedianLatency")
private final Double medianLatency;

@JsonProperty("MadLatency")
private final Double madLatency;

@JsonProperty("UpperLimitLatency")
private final Double upperLimitLatency;

SlowPeerLatencyWithReportingNode(
@JsonProperty("ReportingNode")
String reportingNode,
@JsonProperty("ReportedLatency")
Double reportedLatency) {
Double reportedLatency,
@JsonProperty("MedianLatency")
Double medianLatency,
@JsonProperty("MadLatency")
Double madLatency,
@JsonProperty("UpperLimitLatency")
Double upperLimitLatency) {
this.reportingNode = reportingNode;
this.reportedLatency = reportedLatency;
this.medianLatency = medianLatency;
this.madLatency = madLatency;
this.upperLimitLatency = upperLimitLatency;
}

public String getReportingNode() {
Expand All @@ -55,6 +73,18 @@ public Double getReportedLatency() {
return reportedLatency;
}

public Double getMedianLatency() {
return medianLatency;
}

public Double getMadLatency() {
return madLatency;
}

public Double getUpperLimitLatency() {
return upperLimitLatency;
}

@Override
public int compareTo(SlowPeerLatencyWithReportingNode o) {
return this.reportingNode.compareTo(o.getReportingNode());
Expand All @@ -75,6 +105,9 @@ public boolean equals(Object o) {
return new EqualsBuilder()
.append(reportingNode, that.reportingNode)
.append(reportedLatency, that.reportedLatency)
.append(medianLatency, that.medianLatency)
.append(madLatency, that.madLatency)
.append(upperLimitLatency, that.upperLimitLatency)
.isEquals();
}

Expand All @@ -83,6 +116,9 @@ public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(reportingNode)
.append(reportedLatency)
.append(medianLatency)
.append(madLatency)
.append(upperLimitLatency)
.toHashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -123,9 +124,10 @@ public boolean isSlowPeerTrackerEnabled() {
*
* @param slowNode DataNodeId of the peer suspected to be slow.
* @param reportingNode DataNodeId of the node reporting on its peer.
* @param slowNodeLatency Aggregate latency of slownode as reported by the reporting node.
* @param slowNodeMetrics Aggregate latency metrics of slownode as reported by the
* reporting node.
*/
public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
public void addReport(String slowNode, String reportingNode, OutlierMetrics slowNodeMetrics) {
ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries = allReports.get(slowNode);

if (nodeEntries == null) {
Expand All @@ -136,7 +138,7 @@ public void addReport(String slowNode, String reportingNode, Double slowNodeLate

// Replace the existing entry from this node, if any.
nodeEntries.put(reportingNode,
new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeLatency));
new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeMetrics));
}

/**
Expand Down Expand Up @@ -195,8 +197,11 @@ private SortedSet<SlowPeerLatencyWithReportingNode> filterNodeReports(

for (Map.Entry<String, LatencyWithLastReportTime> entry : reports.entrySet()) {
if (now - entry.getValue().getTime() < reportValidityMs) {
OutlierMetrics outlierMetrics = entry.getValue().getLatency();
validReports.add(
new SlowPeerLatencyWithReportingNode(entry.getKey(), entry.getValue().getLatency()));
new SlowPeerLatencyWithReportingNode(entry.getKey(), outlierMetrics.getActualLatency(),
outlierMetrics.getMedian(), outlierMetrics.getMad(),
outlierMetrics.getUpperLimitLatency()));
}
}
return validReports;
Expand Down Expand Up @@ -279,9 +284,9 @@ long getReportValidityMs() {

private static class LatencyWithLastReportTime {
private final Long time;
private final Double latency;
private final OutlierMetrics latency;

LatencyWithLastReportTime(Long time, Double latency) {
LatencyWithLastReportTime(Long time, OutlierMetrics latency) {
this.time = time;
this.latency = latency;
}
Expand All @@ -290,7 +295,7 @@ public Long getTime() {
return time;
}

public Double getLatency() {
public OutlierMetrics getLatency() {
return latency;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class DataNodePeerMetrics {
private final String name;

// Strictly to be used by test code only. Source code is not supposed to use this.
private Map<String, Double> testOutlier = null;
private Map<String, OutlierMetrics> testOutlier = null;

private final OutlierDetector slowNodeDetector;

Expand Down Expand Up @@ -143,15 +144,15 @@ public void collectThreadLocalStates() {
* Retrieve the set of dataNodes that look significantly slower
* than their peers.
*/
public Map<String, Double> getOutliers() {
public Map<String, OutlierMetrics> getOutliers() {
// outlier must be null for source code.
if (testOutlier == null) {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
return slowNodeDetector.getOutliers(stats);
return slowNodeDetector.getOutlierMetrics(stats);
} else {
// this happens only for test code.
return testOutlier;
Expand All @@ -164,7 +165,7 @@ public Map<String, Double> getOutliers() {
*
* @param outlier outlier directly set by tests.
*/
public void setTestOutliers(Map<String, Double> outlier) {
public void setTestOutliers(Map<String, OutlierMetrics> outlier) {
this.testOutlier = outlier;
}

Expand Down
Loading