Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,9 @@ class BalancerClusterState {
int[] regionIndexToServerIndex; // regionIndex -> serverIndex
int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; // regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to the refactoring of cached double cost array, I am doing some cleaning up and refactoring here for table skew cost function.

int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions
int[] numRegionsPerTable; // tableIndex -> region count
double[] meanRegionsPerTable; // mean region count per table
double[] regionSkewByTable; // skew on RS per by table
double[] minRegionSkewByTable; // min skew on RS per by table
double[] maxRegionSkewByTable; // max skew on RS per by table
int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; // whether there is regions with replicas

Expand Down Expand Up @@ -283,6 +280,11 @@ public String getRack(ServerName server) {
regionIndex++;
}

if (LOG.isDebugEnabled()) {
for (int i = 0; i < numServers; i++) {
LOG.debug("server {} has {} regions", i, regionsPerServer[i].length);
}
}
for (int i = 0; i < serversPerHostList.size(); i++) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) {
Expand All @@ -303,40 +305,29 @@ public String getRack(ServerName server) {
}

numTables = tables.size();
LOG.debug("Number of tables={}", numTables);
numRegionsPerServerPerTable = new int[numServers][numTables];
LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables,
numHosts, numRacks);
numRegionsPerServerPerTable = new int[numTables][numServers];
numRegionsPerTable = new int[numTables];

for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
for (int i = 0; i < numTables; i++) {
for (int j = 0; j < numServers; j++) {
numRegionsPerServerPerTable[i][j] = 0;
}
}

for (int i = 0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
numRegionsPerTable[regionIndexToTableIndex[i]]++;
}
}

// Avoid repeated computation for planning
meanRegionsPerTable = new double[numTables];
regionSkewByTable = new double[numTables];
maxRegionSkewByTable = new double[numTables];
minRegionSkewByTable = new double[numTables];

for (int i = 0; i < numTables; i++) {
meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
minRegionSkewByTable[i] += DoubleArrayCost.getMinSkew(numRegionsPerTable[i], numServers);
maxRegionSkewByTable[i] += DoubleArrayCost.getMaxSkew(numRegionsPerTable[i], numServers);
}

for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (int tableIdx = 0; tableIdx < aNumRegionsPerServerPerTable.length; tableIdx++) {
regionSkewByTable[tableIdx] += Math.abs(aNumRegionsPerServerPerTable[tableIdx] -
meanRegionsPerTable[tableIdx]);
}
}

for (int i = 0; i < regions.length; i++) {
Expand Down Expand Up @@ -684,14 +675,9 @@ void regionMoved(int region, int oldServer, int newServer) {
}
int tableIndex = regionIndexToTableIndex[region];
if (oldServer >= 0) {
numRegionsPerServerPerTable[oldServer][tableIndex]--;
// update regionSkewPerTable for the move from old server
regionSkewByTable[tableIndex] += getSkewChangeFor(oldServer, tableIndex, -1);
numRegionsPerServerPerTable[tableIndex][oldServer]--;
}
numRegionsPerServerPerTable[newServer][tableIndex]++;

// update regionSkewPerTable for the move to new server
regionSkewByTable[tableIndex] += getSkewChangeFor(newServer, tableIndex, 1);
numRegionsPerServerPerTable[tableIndex][newServer]++;

// update for servers
int primary = regionIndexToPrimaryIndex[region];
Expand Down Expand Up @@ -865,18 +851,9 @@ public String toString() {
.append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
.append(Arrays.deepToString(regionsPerServer));

desc.append(", regionSkewByTable=").append(Arrays.toString(regionSkewByTable))
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
.append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
.append('}');
return desc.toString();
}

private double getSkewChangeFor(int serverIndex, int tableIndex, int regionCountChange) {
double curSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
meanRegionsPerTable[tableIndex]);
double oldSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
regionCountChange - meanRegionsPerTable[tableIndex]);
return curSkew - oldSkew;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,21 @@ void applyCostsChange(Consumer<double[]> consumer) {
}

private static double computeCost(double[] stats) {
if (stats == null || stats.length == 0) {
return 0;
}
double totalCost = 0;
double total = getSum(stats);

double count = stats.length;
double mean = total / count;

for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
double diff = (mean - n) * (mean - n);
totalCost += diff;
}
// No need to compute standard deviation with division by cluster size when scaling.
totalCost = Math.sqrt(totalCost);
Comment on lines +79 to +83
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the standard deviation instead of linear deviation to assign higher penalty on outliers and therefore unstuck balancer when even region count distribution cannot be achieved with other constraint such as rack/host constraints

return CostFunction.scale(getMinSkew(total, count),
getMaxSkew(total, count), totalCost);
}
Expand All @@ -94,18 +98,22 @@ private static double getSum(double[] stats) {
* @param total is total number of regions
*/
public static double getMinSkew(double total, double numServers) {
if (numServers == 0) {
return 0;
}
double mean = total / numServers;
// It's possible that there aren't enough regions to go around
double min;
if (numServers > total) {
min = ((numServers - total) * mean + (1 - mean) * total) ;
min = ((numServers - total) * mean * mean + (1 - mean) * (1 - mean) * total) ;
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * numServers));
int numLow = (int) (numServers - numHigh);
min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean));
min = numHigh * (Math.ceil(mean) - mean) * (Math.ceil(mean) - mean) +
numLow * (mean - Math.floor(mean)) * (mean - Math.floor(mean));
}
return min;
return Math.sqrt(min);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above.

}

/**
Expand All @@ -114,7 +122,10 @@ public static double getMinSkew(double total, double numServers) {
* a zero sum cost for this to make sense.
*/
public static double getMaxSkew(double total, double numServers) {
if (numServers == 0) {
return 0;
}
double mean = total / numServers;
return (total - mean) + (numServers - 1) * mean;
return Math.sqrt((total - mean) * (total - mean) + (numServers - 1) * mean * mean);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MoveCostFunction extends CostFunction {
static final float DEFAULT_MOVE_COST = 7;
static final float DEFAULT_MOVE_COST_OFFPEAK = 3;
private static final int DEFAULT_MAX_MOVES = 600;
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
private static final float DEFAULT_MAX_MOVE_PERCENT = 1.0f;

private final float maxMovesPercent;
private final ClusterInfoProvider provider;
Expand Down Expand Up @@ -79,4 +79,4 @@ protected double cost() {

return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,12 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Compute the cost of a potential cluster state from skew in number of regions on a cluster.
*/
@InterfaceAudience.Private
class RegionCountSkewCostFunction extends CostFunction {

private static final Logger LOG = LoggerFactory.getLogger(RegionCountSkewCostFunction.class);

static final String REGION_COUNT_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.regionCountCost";
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
Expand All @@ -50,14 +45,6 @@ void prepare(BalancerClusterState cluster) {
costs[i] = cluster.regionsPerServer[i].length;
}
});
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions);
if (LOG.isTraceEnabled()) {
for (int i = 0; i < cluster.numServers; i++) {
LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
cluster.servers[i], cluster.regionsPerServer[i].length);
}
}
}

@Override
Expand All @@ -72,4 +59,4 @@ protected void regionMoved(int region, int oldServer, int newServer) {
costs[newServer] = cluster.regionsPerServer[newServer].length;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,43 @@ class TableSkewCostFunction extends CostFunction {
private static final String TABLE_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.tableSkewCost";
private static final float DEFAULT_TABLE_SKEW_COST = 35;
DoubleArrayCost[] costsPerTable;

TableSkewCostFunction(Configuration conf) {
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
}

@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
costsPerTable = new DoubleArrayCost[cluster.numTables];
for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
costsPerTable[tableIdx] = new DoubleArrayCost();
costsPerTable[tableIdx].prepare(cluster.numServers);
final int tableIndex = tableIdx;
costsPerTable[tableIdx].applyCostsChange(costs -> {
// Keep a cached deep copy for change-only recomputation
for (int i = 0; i < cluster.numServers; i++) {
costs[i] = cluster.numRegionsPerServerPerTable[tableIndex][i];
}
});
}
}

@Override
protected void regionMoved(int region, int oldServer, int newServer) {
int tableIdx = cluster.regionIndexToTableIndex[region];
costsPerTable[tableIdx].applyCostsChange(costs -> {
costs[oldServer] = cluster.numRegionsPerServerPerTable[tableIdx][oldServer];
costs[newServer] = cluster.numRegionsPerServerPerTable[tableIdx][newServer];
});
}

@Override
protected double cost() {
double cost = 0;
for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
cost += scale(cluster.minRegionSkewByTable[tableIdx],
cluster.maxRegionSkewByTable[tableIdx], cluster.regionSkewByTable[tableIdx]);
cost += costsPerTable[tableIdx].cost();
}
return cost;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
public static void beforeAllTests() throws Exception {
conf = HBaseConfiguration.create();
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
conf.setFloat("hbase.regions.slop", 0.0f);
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class StochasticBalancerTestBase2 extends StochasticBalancerTestBase {

@Before
public void before() {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ public void testRegionAvailabilityWithRegionMoves() throws Exception {

// now move region1 from servers[0] to servers[2]
cluster.doAction(new MoveRegionAction(0, 0, 2));
// check that the regionSkewByTable for "table" has increased to 2
assertEquals(2, cluster.regionSkewByTable[0], 0.01);
// now repeat check whether moving region1 from servers[1] to servers[2]
// would lower availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ public void testComputeCost() {
}
costs[100] = 100;
});
assertEquals(0.5, cost.cost(), 0.01);
assertEquals(0.0708, cost.cost(), 0.01);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,13 @@ public void testMoveCost() throws Exception {
cluster.setNumRegions(10000);
cluster.setNumMovedRegions(250);
cost = costFunction.cost();
assertEquals(0.1f, cost, 0.001);
assertEquals(0.025f, cost, 0.001);
cluster.setNumMovedRegions(1250);
cost = costFunction.cost();
assertEquals(0.5f, cost, 0.001);
assertEquals(0.125f, cost, 0.001);
cluster.setNumMovedRegions(2500);
cost = costFunction.cost();
assertEquals(1.0f, cost, 0.01);
assertEquals(0.25f, cost, 0.01);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class TestStochasticLoadBalancerBalanceCluster extends StochasticBalancer
*/
@Test
public void testBalanceCluster() throws Exception {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.onConfigurationChange(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public void testLargeCluster() {
int numTables = 100;
int replication = 1;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class TestStochasticLoadBalancerRegionReplicaSameHosts extends Stochastic
public void testRegionReplicationOnMidClusterSameHosts() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.onConfigurationChange(conf);
int numHosts = 30;
int numRegions = 30 * 30;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public ForTestRackManager(int numRacks) {
this.numRacks = numRacks;
}


@Override
public String getRack(ServerName server) {
String key = server.getServerName();
Expand All @@ -62,10 +61,7 @@ public String getRack(ServerName server) {
public void testRegionReplicationOnMidClusterWithRacks() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
// for full balance
// conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.001f);
loadBalancer.onConfigurationChange(conf);
int numNodes = 5;
int numRegions = numNodes * 1;
Expand All @@ -76,15 +72,13 @@ public void testRegionReplicationOnMidClusterWithRacks() {
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
RackManager rm = new ForTestRackManager(numRacks);

testWithClusterWithIteration(serverMap, rm, true, true);
}

@Test
public void testRegionReplicationOnLargeClusterWithRacks() {
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 5000L);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10 * 1000); // 10 sec
loadBalancer.onConfigurationChange(conf);
int numNodes = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testSmallCluster3() {
int numRegionsPerServer = 1; // all servers except one
int replication = 1;
int numTables = 10;
/* fails because of max moves */
// fails because of max moves
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false,
false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public static void setupBeforeClass() throws Exception {
conf = UTIL.getConfiguration();

conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
conf.setFloat("hbase.regions.slop", 0.0f);
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName());
Random rand = new Random();
Expand Down