Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -31,23 +30,19 @@

/**
* Data structure to describe the distribution of HDFS blocks among hosts.
*
* <p/>
* Adding erroneous data will be ignored silently.
*/
@InterfaceAudience.Private
public class HDFSBlocksDistribution {
private Map<String,HostAndWeight> hostAndWeights = null;
private Map<String, HostAndWeight> hostAndWeights = null;
private long uniqueBlocksTotalWeight = 0;

/**
* Stores the hostname and weight for that hostname.
*
* This is used when determining the physical locations of the blocks making
* up a region.
*
* To make a prioritized list of the hosts holding the most data of a region,
* this class is used to count the total weight for each host. The weight is
* currently just the size of the file.
* Stores the hostname and weight for that hostname. This is used when determining the physical
* locations of the blocks making up a region. To make a prioritized list of the hosts holding the
* most data of a region, this class is used to count the total weight for each host. The weight
* is currently just the size of the file.
*/
public static class HostAndWeight {

Expand Down Expand Up @@ -104,7 +99,7 @@ public long getWeightForSsd() {
public static class WeightComparator implements Comparator<HostAndWeight> {
@Override
public int compare(HostAndWeight l, HostAndWeight r) {
if(l.getWeight() == r.getWeight()) {
if (l.getWeight() == r.getWeight()) {
return l.getHost().compareTo(r.getHost());
}
return l.getWeight() < r.getWeight() ? -1 : 1;
Expand All @@ -124,8 +119,7 @@ public HDFSBlocksDistribution() {
*/
@Override
public synchronized String toString() {
return "number of unique hosts in the distribution=" +
this.hostAndWeights.size();
return "number of unique hosts in the distribution=" + this.hostAndWeights.size();
}

/**
Expand Down Expand Up @@ -185,7 +179,7 @@ private void addHostAndBlockWeight(String host, long weight, long weightForSsd)
}

HostAndWeight hostAndWeight = this.hostAndWeights.get(host);
if(hostAndWeight == null) {
if (hostAndWeight == null) {
hostAndWeight = new HostAndWeight(host, weight, weightForSsd);
this.hostAndWeights.put(host, hostAndWeight);
} else {
Expand All @@ -196,21 +190,20 @@ private void addHostAndBlockWeight(String host, long weight, long weightForSsd)
/**
* @return the hosts and their weights
*/
public Map<String,HostAndWeight> getHostAndWeights() {
public Map<String, HostAndWeight> getHostAndWeights() {
return this.hostAndWeights;
}

/**
* return the weight for a specific host, that will be the total bytes of all
* blocks on the host
* return the weight for a specific host, that will be the total bytes of all blocks on the host
* @param host the host name
* @return the weight of the given host
*/
public long getWeight(String host) {
long weight = 0;
if (host != null) {
HostAndWeight hostAndWeight = this.hostAndWeights.get(host);
if(hostAndWeight != null) {
if (hostAndWeight != null) {
weight = hostAndWeight.getWeight();
}
}
Expand Down Expand Up @@ -240,7 +233,7 @@ public float getBlockLocalityIndex(String host) {
return 0.0f;
} else {
return (float) getBlocksLocalityWeightInternal(host, HostAndWeight::getWeight)
/ (float) uniqueBlocksTotalWeight;
/ (float) uniqueBlocksTotalWeight;
}
}

Expand All @@ -253,7 +246,7 @@ public float getBlockLocalityIndexForSsd(String host) {
return 0.0f;
} else {
return (float) getBlocksLocalityWeightInternal(host, HostAndWeight::getWeightForSsd)
/ (float) uniqueBlocksTotalWeight;
/ (float) uniqueBlocksTotalWeight;
}
}

Expand Down Expand Up @@ -303,10 +296,8 @@ private long getBlocksLocalityWeightInternal(String host, Visitor visitor) {
* @param otherBlocksDistribution the other hdfs blocks distribution
*/
public void add(HDFSBlocksDistribution otherBlocksDistribution) {
Map<String,HostAndWeight> otherHostAndWeights =
otherBlocksDistribution.getHostAndWeights();
for (Map.Entry<String, HostAndWeight> otherHostAndWeight:
otherHostAndWeights.entrySet()) {
Map<String, HostAndWeight> otherHostAndWeights = otherBlocksDistribution.getHostAndWeights();
for (Map.Entry<String, HostAndWeight> otherHostAndWeight : otherHostAndWeights.entrySet()) {
addHostAndBlockWeight(otherHostAndWeight.getValue().host,
otherHostAndWeight.getValue().weight, otherHostAndWeight.getValue().weightForSsd);
}
Expand All @@ -319,7 +310,7 @@ public void add(HDFSBlocksDistribution otherBlocksDistribution) {
public List<String> getTopHosts() {
HostAndWeight[] hostAndWeights = getTopHostsWithWeights();
List<String> topHosts = new ArrayList<>(hostAndWeights.length);
for(HostAndWeight haw : hostAndWeights) {
for (HostAndWeight haw : hostAndWeights) {
topHosts.add(haw.getHost());
}
return topHosts;
Expand Down