Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public long getNumBytes(StorageGroup storage) {

public static class DBlockStriped extends DBlock {

final byte[] indices;
private byte[] indices;
final short dataBlockNum;
final int cellSize;

Expand Down Expand Up @@ -532,6 +532,29 @@ public long getNumBytes(StorageGroup storage) {
}
return block.getNumBytes();
}

public void setIndices(byte[] indices) {
this.indices = indices;
}

/**
* Adjust EC block indices,it will remove the element of adjustList from indices.
* @param adjustList the list will be removed from indices
*/
public void adjustIndices(List<Integer> adjustList) {
if (adjustList.isEmpty()) {
return;
}

byte[] newIndices = new byte[indices.length - adjustList.size()];
for (int i = 0, j = 0; i < indices.length; ++i) {
if (!adjustList.contains(i)) {
newIndices[j] = indices[i];
++j;
}
}
this.indices = newIndices;
}
}

/** The class represents a desired move. */
Expand Down Expand Up @@ -810,7 +833,7 @@ Iterator<DBlock> getBlockIterator() {
*
* @return the total size of the received blocks in the number of bytes.
*/
private long getBlockList() throws IOException {
private long getBlockList() throws IOException, IllegalArgumentException {
final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlksLocs =
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize,
Expand Down Expand Up @@ -848,16 +871,35 @@ private long getBlockList() throws IOException {
synchronized (block) {
block.clearLocations();

if (blkLocs instanceof StripedBlockWithLocations) {
// EC block may adjust indices before, avoid repeated adjustments
((DBlockStriped) block).setIndices(
((StripedBlockWithLocations) blkLocs).getIndices());
}

// update locations
List<Integer> adjustList = new ArrayList<>();
final String[] datanodeUuids = blkLocs.getDatanodeUuids();
final StorageType[] storageTypes = blkLocs.getStorageTypes();
for (int i = 0; i < datanodeUuids.length; i++) {
final StorageGroup g = storageGroupMap.get(
datanodeUuids[i], storageTypes[i]);
if (g != null) { // not unknown
block.addLocation(g);
} else if (blkLocs instanceof StripedBlockWithLocations) {
// some datanode may not in storageGroupMap due to decommission operation
// or balancer cli with "-exclude" parameter
adjustList.add(i);
}
}

if (!adjustList.isEmpty()) {
// block.locations mismatch with block.indices
// adjust indices to get correct internalBlock for Datanode in #getInternalBlock
((DBlockStriped) block).adjustIndices(adjustList);
Preconditions.checkArgument(((DBlockStriped) block).indices.length
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Preconditions.checkArgument() can throw IllegalArgumentException, getBlockList() should declares it, and dispatchBlocks() should catch the exception.

-    private long getBlockList() throws IOException {
+    private long getBlockList() throws IOException, IllegalArgumentException {
          try {
            final long received = getBlockList();
            if (received == 0) {
              return;
            }
            blocksToReceive -= received;
            continue;
-          } catch (IOException e) {
+          } catch (IOException|IllegalArgumentException e) {
            LOG.warn("Exception while getting reportedBlock list", e);
            return;
          }

== block.locations.size());
}
}
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
if (LOG.isTraceEnabled()) {
Expand Down Expand Up @@ -977,7 +1019,7 @@ private void dispatchBlocks() {
}
blocksToReceive -= received;
continue;
} catch (IOException e) {
} catch (IOException | IllegalArgumentException e) {
LOG.warn("Exception while getting reportedBlock list", e);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,19 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
int expectedExcludedNodes) throws IOException, TimeoutException {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true);
}

/**
* Wait until balanced: each datanode gives utilization within.
* BALANCE_ALLOWED_VARIANCE of average
* @throws IOException
* @throws TimeoutException
*/
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
int expectedExcludedNodes, boolean checkExcludeNodesUtilization)
throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.monotonicNow() + timeout;
Expand All @@ -491,7 +504,9 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity();
if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
assertTrue(nodeUtilization == 0);
if (checkExcludeNodesUtilization) {
assertTrue(nodeUtilization == 0);
}
actualExcludedNodeCount++;
continue;
}
Expand Down Expand Up @@ -778,6 +793,12 @@ private void runBalancer(Configuration conf, long totalUsedSpace,
private void runBalancer(Configuration conf, long totalUsedSpace,
long totalCapacity, BalancerParameters p, int excludedNodes)
throws Exception {
runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true);
}

private void runBalancer(Configuration conf, long totalUsedSpace,
long totalCapacity, BalancerParameters p, int excludedNodes,
boolean checkExcludeNodesUtilization) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);

int retry = 5;
Expand All @@ -798,7 +819,7 @@ private void runBalancer(Configuration conf, long totalUsedSpace,
LOG.info(" .");
try {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
excludedNodes);
excludedNodes, checkExcludeNodesUtilization);
} catch (TimeoutException e) {
// See HDFS-11682. NN may not get heartbeat to reflect the newest
// block changes.
Expand Down Expand Up @@ -1678,6 +1699,103 @@ private void doTestBalancerWithStripedFile(Configuration conf) throws Exception
}
}

@Test
public void testBalancerWithExcludeListWithStripedFile() throws Exception {
Configuration conf = new Configuration();
initConfWithStripe(conf);
NameNodeConnector.setWrite2IdFile(true);
doTestBalancerWithExcludeListWithStripedFile(conf);
NameNodeConnector.setWrite2IdFile(false);
}

private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) throws Exception {
int numOfDatanodes = dataBlocks + parityBlocks + 5;
int numOfRacks = dataBlocks;
long capacity = 20 * defaultBlockSize;
long[] capacities = new long[numOfDatanodes];
Arrays.fill(capacities, capacity);
String[] racks = new String[numOfDatanodes];
for (int i = 0; i < numOfDatanodes; i++) {
racks[i] = "/rack" + (i % numOfRacks);
}
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.racks(racks)
.simulatedCapacities(capacities)
.build();

try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
client.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
client.setErasureCodingPolicy("/",
StripedFileTestUtil.getDefaultECPolicy().getName());

long totalCapacity = sum(capacities);

// fill up the cluster with 30% data. It'll be 45% full plus parity.
long fileLen = totalCapacity * 3 / 10;
long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
FileSystem fs = cluster.getFileSystem(0);
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());

// verify locations of striped blocks
LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);

// get datanode report
DatanodeInfo[] datanodeReport = client.getDatanodeReport(DatanodeReportType.ALL);
long totalBlocks = 0;
for (DatanodeInfo dn : datanodeReport) {
totalBlocks += dn.getNumBlocks();
}

// add datanode in new rack
String newRack = "/rack" + (++numOfRacks);
cluster.startDataNodes(conf, 2, true, null,
new String[]{newRack, newRack}, null,
new long[]{capacity, capacity});
totalCapacity += capacity*2;
cluster.triggerHeartbeats();

// add datanode to exclude list
Set<String> excludedList = new HashSet<>();
excludedList.add(datanodeReport[0].getXferAddr());
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
pBuilder.setExcludedNodes(excludedList);

// start balancer and check the failed num of moving task
runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(),
excludedList.size(), false);

// check total blocks, max wait time 60s
final long blocksBeforeBalancer = totalBlocks;
GenericTestUtils.waitFor(() -> {
DatanodeInfo[] datanodeInfos = null;
try {
cluster.triggerHeartbeats();
datanodeInfos = client.getDatanodeReport(DatanodeReportType.ALL);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
long blocksAfterBalancer = 0;
for (DatanodeInfo dn : datanodeInfos) {
blocksAfterBalancer += dn.getNumBlocks();
}
return blocksBeforeBalancer == blocksAfterBalancer;
}, 3000, 60000);

// verify locations of striped blocks
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);

} finally {
cluster.shutdown();
}
}

private void testNullStripedBlocks(Configuration conf) throws IOException {
NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors(
DFSUtil.getInternalNsRpcUris(conf),
Expand Down