Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.namenode;

import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
Expand All @@ -41,16 +38,14 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.function.Supplier;

/**
* End-to-end test case for upgrade domain
* The test configs upgrade domain for nodes via admin json
Expand All @@ -63,6 +58,8 @@ public class TestUpgradeDomainBlockPlacementPolicy {

private static final short REPLICATION_FACTOR = (short) 3;
private static final int DEFAULT_BLOCK_SIZE = 1024;
private static final int WAIT_TIMEOUT_MS = 60000;
private static final long FILE_SIZE = DEFAULT_BLOCK_SIZE * 5;
static final String[] racks =
{ "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" };
static final String[] hosts =
Expand All @@ -71,9 +68,6 @@ public class TestUpgradeDomainBlockPlacementPolicy {
{"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"};
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
private MiniDFSCluster cluster = null;
private NamenodeProtocols nameNodeRpc = null;
private FSNamesystem namesystem = null;
private PermissionStatus perm = null;
private HostsFileWriter hostsFileWriter = new HostsFileWriter();

@Before
Expand All @@ -92,10 +86,6 @@ public void setup() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
.hosts(hosts).build();
cluster.waitActive();
nameNodeRpc = cluster.getNameNodeRpc();
namesystem = cluster.getNamesystem();
perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
FsPermission.getDefault());
refreshDatanodeAdminProperties();
}

Expand Down Expand Up @@ -186,43 +176,51 @@ private void refreshDatanodeAdminProperties2()
expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId());
}

private void createFileAndWaitForReplication(final Path path,
final long fileLen)
throws Exception {
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileLen,
REPLICATION_FACTOR, 1000L);
DFSTestUtil.waitForReplication(cluster.getFileSystem(), path,
REPLICATION_FACTOR, WAIT_TIMEOUT_MS);
}

@Test
public void testPlacement() throws Exception {
final long fileSize = DEFAULT_BLOCK_SIZE * 5;
final String testFile = new String("/testfile");
final long fileSize = FILE_SIZE;
final String testFile = "/testfile";
final Path path = new Path(testFile);
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize,
REPLICATION_FACTOR, 1000L);
createFileAndWaitForReplication(path, FILE_SIZE);
LocatedBlocks locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize);
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
Set<DatanodeInfo> locs = new HashSet<>();
for(DatanodeInfo datanodeInfo : block.getLocations()) {
if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.NORMAL) {
if (datanodeInfo.getAdminState()
.equals(DatanodeInfo.AdminStates.NORMAL)) {
locs.add(datanodeInfo);
}
}
for (DatanodeID datanodeID : expectedDatanodeIDs) {
assertTrue(locs.contains(datanodeID));
Assert.assertTrue(locs.contains(datanodeID));
}
}
}

@Test(timeout = 300000)
public void testPlacementAfterDecommission() throws Exception {
final long fileSize = DEFAULT_BLOCK_SIZE * 5;
final String testFile = new String("/testfile");
final long fileSize = FILE_SIZE;
final String testFile = "/testfile-afterdecomm";
final Path path = new Path(testFile);
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize,
REPLICATION_FACTOR, 1000L);
createFileAndWaitForReplication(path, fileSize);

// Decommission some nodes and wait until decommissions have finished.
refreshDatanodeAdminProperties2();

GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
boolean successful = true;
LocatedBlocks locatedBlocks;
try {
locatedBlocks =
Expand All @@ -231,32 +229,34 @@ public Boolean get() {
} catch (IOException ioe) {
return false;
}
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
Set<DatanodeInfo> locs = new HashSet<>();
for (DatanodeInfo datanodeInfo : block.getLocations()) {
if (datanodeInfo.getAdminState() ==
DatanodeInfo.AdminStates.NORMAL) {
if (datanodeInfo.getAdminState().equals(
DatanodeInfo.AdminStates.NORMAL)) {
locs.add(datanodeInfo);
}
}
for (DatanodeID datanodeID : expectedDatanodeIDs) {
successful = successful && locs.contains(datanodeID);
if (!locs.contains(datanodeID)) {
return false;
}
}
}
return successful;
return true;
}
}, 1000, 60000);
}, 1000, WAIT_TIMEOUT_MS);

// Verify block placement policy of each block.
LocatedBlocks locatedBlocks;
locatedBlocks =
LocatedBlocks locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize);
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
BlockPlacementStatus status = cluster.getNamesystem().getBlockManager().
getBlockPlacementPolicy().verifyBlockPlacement(
block.getLocations(), REPLICATION_FACTOR);
assertTrue(status.isPlacementPolicySatisfied());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not leave this static import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @goiri I did not know that static imports are preferred. I remembered it used to be not favorable because of code readability.
I will take a note of that for future changes.

for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
BlockPlacementStatus status =
cluster.getNamesystem().getBlockManager()
.getBlockPlacementPolicy()
.verifyBlockPlacement(block.getLocations(), REPLICATION_FACTOR);
Assert.assertTrue(status.isPlacementPolicySatisfied());
}
}
}