Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public static class Builder {
private int nameNodeHttpPort = 0;
private final Configuration conf;
private int numDataNodes = 1;
private int[] dnHttpPorts = null;
private int[] dnIpcPorts = null;
private StorageType[][] storageTypes = null;
private StorageType[] storageTypes1D = null;
private int storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
Expand Down Expand Up @@ -277,6 +279,16 @@ public Builder numDataNodes(int val) {
return this;
}

public Builder setDnHttpPorts(int... ports) {
this.dnHttpPorts = ports;
return this;
}

public Builder setDnIpcPorts(int... ports) {
this.dnIpcPorts = ports;
return this;
}

/**
* Default: DEFAULT_STORAGES_PER_DATANODE
*/
Expand Down Expand Up @@ -599,7 +611,9 @@ protected MiniDFSCluster(Builder builder) throws IOException {
builder.checkDataNodeHostConfig,
builder.dnConfOverlays,
builder.skipFsyncForTesting,
builder.useConfiguredTopologyMappingClass);
builder.useConfiguredTopologyMappingClass,
builder.dnHttpPorts,
builder.dnIpcPorts);
}

public static class DataNodeProperties {
Expand Down Expand Up @@ -873,7 +887,7 @@ public MiniDFSCluster(int nameNodePort,
operation, null, racks, hosts,
null, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0),
true, false, false, null, true, false);
true, false, false, null, true, false, null, null);
}

private void initMiniDFSCluster(
Expand All @@ -891,7 +905,9 @@ private void initMiniDFSCluster(
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays,
boolean skipFsyncForTesting,
boolean useConfiguredTopologyMappingClass)
boolean useConfiguredTopologyMappingClass,
int[] dnHttpPorts,
int[] dnIpcPorts)
throws IOException {
boolean success = false;
try {
Expand Down Expand Up @@ -974,9 +990,9 @@ private void initMiniDFSCluster(

// Start the DataNodes
startDataNodes(conf, numDataNodes, storageTypes, manageDataDfsDirs,
dnStartOpt != null ? dnStartOpt : startOpt,
racks, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
dnStartOpt != null ? dnStartOpt : startOpt, racks, hosts, storageCapacities,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig,
dnConfOverlays, dnHttpPorts, dnIpcPorts);
waitClusterUp();
//make sure ProxyUsers uses the latest conf
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
Expand Down Expand Up @@ -1598,8 +1614,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
String[] racks, String[] hosts,
long[] simulatedCapacities,
boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
null, simulatedCapacities, setupHostsFile, false, false, null);
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts, null,
simulatedCapacities, setupHostsFile, false, false, null, null, null);
}

public synchronized void startDataNodes(Configuration conf, int numDataNodes,
Expand All @@ -1608,14 +1624,14 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts,
null, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
startDataNodes(conf, numDataNodes, null, manageDfsDirs, operation, racks, hosts, null,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null, null, null);
}

/**
* Modify the config and start up additional DataNodes. The info port for
* DataNodes is guaranteed to use a free port.
*
*
* Data nodes can run with the name node in the mini cluster or
* a real name node. For example, running with a real name node is useful
* when running simulated data nodes with a real name node.
Expand All @@ -1625,20 +1641,24 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
* @param conf the base configuration to use in starting the DataNodes. This
* will be modified as necessary.
* @param numDataNodes Number of DataNodes to start; may be zero
* @param storageTypes Storage Types for DataNodes.
* @param manageDfsDirs if true, the data directories for DataNodes will be
* created and {@link DFSConfigKeys#DFS_DATANODE_DATA_DIR_KEY} will be
* set in the conf
* @param operation the operation with which to start the DataNodes. If null
* or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
* @param racks array of strings indicating the rack that each DataNode is on
* @param hosts array of strings indicating the hostnames for each DataNode
* @param storageCapacities array of Storage Capacities to be used while testing.
* @param simulatedCapacities array of capacities of the simulated data nodes
* @param setupHostsFile add new nodes to dfs hosts files
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
* @param dnConfOverlays An array of {@link Configuration} objects that will overlay the
* global MiniDFSCluster Configuration for the corresponding DataNode.
* @throws IllegalStateException if NameNode has been shutdown
* @param dnHttpPorts An array of Http ports if present, to be used for DataNodes.
* @param dnIpcPorts An array of Ipc ports if present, to be used for DataNodes.
* @throws IOException If the DFS daemons experience some issues.
*/
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
StorageType[][] storageTypes, boolean manageDfsDirs, StartupOption operation,
Expand All @@ -1648,14 +1668,29 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
Configuration[] dnConfOverlays,
int[] dnHttpPorts,
int[] dnIpcPorts) throws IOException {
Comment on lines +1671 to +1673
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a safe change. This class is @InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "Hive", "MapReduce", "Pig"}). I checked the source code of Hive, HBase, and Pig. The method signature is not used anywhere.

assert storageCapacities == null || simulatedCapacities == null;
assert storageTypes == null || storageTypes.length == numDataNodes;
assert storageCapacities == null || storageCapacities.length == numDataNodes;

if (operation == StartupOption.RECOVER) {
return;
}

if (dnHttpPorts != null && dnHttpPorts.length != numDataNodes) {
throw new IllegalArgumentException(
"Num of http ports (" + dnHttpPorts.length + ") should match num of DataNodes ("
+ numDataNodes + ")");
}

if (dnIpcPorts != null && dnIpcPorts.length != numDataNodes) {
throw new IllegalArgumentException(
"Num of ipc ports (" + dnIpcPorts.length + ") should match num of DataNodes ("
+ numDataNodes + ")");
}

if (checkDataNodeHostConfig) {
conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
} else {
Expand Down Expand Up @@ -1711,7 +1746,15 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
dnConf.addResource(dnConfOverlays[i]);
}
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
int httpPort = 0;
int ipcPort = 0;
if(dnHttpPorts != null) {
httpPort = dnHttpPorts[i - curDatanodesNum];
}
if(dnIpcPorts != null) {
ipcPort = dnIpcPorts[i - curDatanodesNum];
}
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig, httpPort, ipcPort);
if (manageDfsDirs) {
String dirs = makeDataNodeDirs(i, storageTypes == null ?
null : storageTypes[i - curDatanodesNum]);
Expand Down Expand Up @@ -3363,9 +3406,9 @@ public void setBlockRecoveryTimeout(long timeout) {
timeout);
}
}

protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
boolean checkDataNodeAddrConfig, int httpPort, int ipcPort) throws IOException {
if (setupHostsFile) {
String hostsFile = conf.get(DFS_HOSTS, "").trim();
if (hostsFile.length() == 0) {
Expand All @@ -3388,11 +3431,11 @@ protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
}
}
if (checkDataNodeAddrConfig) {
conf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
conf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:" + httpPort);
conf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:" + ipcPort);
} else {
conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:" + httpPort);
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:" + ipcPort);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf);
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig, 0, 0);
if (manageDfsDirs) {
String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
Expand Down Expand Up @@ -235,7 +235,9 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
Configuration[] dnConfOverlays,
int[] dnHttpPorts,
int[] dnIpcPorts) throws IOException {
startDataNodes(conf, numDataNodes, storageTypes, manageDfsDirs, operation, racks,
NODE_GROUPS, hosts, storageCapacities, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.hadoop.hdfs;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

Expand All @@ -38,9 +40,13 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.util.Preconditions;

Expand All @@ -52,6 +58,8 @@
*/
public class TestMiniDFSCluster {

private static final Logger LOG = LoggerFactory.getLogger(TestMiniDFSCluster.class);

private static final String CLUSTER_1 = "cluster1";
private static final String CLUSTER_2 = "cluster2";
private static final String CLUSTER_3 = "cluster3";
Expand Down Expand Up @@ -319,4 +327,88 @@ public void testSetUpFederatedCluster() throws Exception {
cluster.restartNameNode(1);
}
}

// There is a possibility that this test might fail if any other concurrently running
// test could bind same port as one of the ports returned by NetUtils.getFreeSocketPorts(6)
// before datanodes are started.
@Test
public void testStartStopWithPorts() throws Exception {
Configuration conf = new Configuration();

LambdaTestUtils.intercept(
IllegalArgumentException.class,
"Num of http ports (1) should match num of DataNodes (3)",
"MiniJournalCluster port validation failed",
() -> {
new MiniDFSCluster.Builder(conf).numDataNodes(3).setDnHttpPorts(8481).build();
});

LambdaTestUtils.intercept(
IllegalArgumentException.class,
"Num of ipc ports (2) should match num of DataNodes (1)",
"MiniJournalCluster port validation failed",
() -> {
new MiniDFSCluster.Builder(conf).setDnIpcPorts(8481, 8482).build();
});

LambdaTestUtils.intercept(
IllegalArgumentException.class,
"Num of ipc ports (1) should match num of DataNodes (3)",
"MiniJournalCluster port validation failed",
() -> {
new MiniDFSCluster.Builder(conf).numDataNodes(3).setDnHttpPorts(800, 9000, 10000)
.setDnIpcPorts(8481).build();
});

LambdaTestUtils.intercept(
IllegalArgumentException.class,
"Num of http ports (4) should match num of DataNodes (3)",
"MiniJournalCluster port validation failed",
() -> {
new MiniDFSCluster.Builder(conf).setDnHttpPorts(800, 9000, 1000, 2000)
.setDnIpcPorts(8481, 8482, 8483).numDataNodes(3).build();
});

final Set<Integer> httpAndIpcPorts = NetUtils.getFreeSocketPorts(6);
LOG.info("Free socket ports: {}", httpAndIpcPorts);

assertThat(httpAndIpcPorts).doesNotContain(0);

final int[] httpPorts = new int[3];
final int[] ipcPorts = new int[3];
int httpPortIdx = 0;
int ipcPortIdx = 0;
for (Integer httpAndIpcPort : httpAndIpcPorts) {
if (httpPortIdx < 3) {
httpPorts[httpPortIdx++] = httpAndIpcPort;
} else {
ipcPorts[ipcPortIdx++] = httpAndIpcPort;
}
}

LOG.info("Http ports selected: {}", httpPorts);
LOG.info("Ipc ports selected: {}", ipcPorts);

try (MiniDFSCluster miniDfsCluster = new MiniDFSCluster.Builder(conf)
.setDnHttpPorts(httpPorts)
.setDnIpcPorts(ipcPorts)
.numDataNodes(3).build()) {
miniDfsCluster.waitActive();

assertEquals(httpPorts[0],
miniDfsCluster.getDataNode(ipcPorts[0]).getInfoPort());
assertEquals(httpPorts[1],
miniDfsCluster.getDataNode(ipcPorts[1]).getInfoPort());
assertEquals(httpPorts[2],
miniDfsCluster.getDataNode(ipcPorts[2]).getInfoPort());

assertEquals(ipcPorts[0],
miniDfsCluster.getDataNode(ipcPorts[0]).getIpcPort());
assertEquals(ipcPorts[1],
miniDfsCluster.getDataNode(ipcPorts[1]).getIpcPort());
assertEquals(ipcPorts[2],
miniDfsCluster.getDataNode(ipcPorts[2]).getIpcPort());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void testBalancerWithRamDisk() throws Exception {
long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
diskStorageLimit}};
cluster.startDataNodes(conf, replicationFactor, storageTypes, true, null,
null, null, storageCapacities, null, false, false, false, null);
null, null, storageCapacities, null, false, false, false, null, null, null);

cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ public void testMoverWithStripedFile() throws Exception {
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
true, null, null, null,capacities, null, false, false, false, null);
true, null, null, null, capacities, null, false, false, false, null, null, null);
cluster.triggerHeartbeats();

// move file to ARCHIVE
Expand Down Expand Up @@ -982,7 +982,7 @@ public void testMoverWithStripedFile() throws Exception {
{ StorageType.SSD, StorageType.DISK },
{ StorageType.SSD, StorageType.DISK },
{ StorageType.SSD, StorageType.DISK } },
true, null, null, null, capacities, null, false, false, false, null);
true, null, null, null, capacities, null, false, false, false, null, null, null);
cluster.triggerHeartbeats();

// move file blocks to ONE_SSD policy
Expand Down Expand Up @@ -1372,7 +1372,7 @@ private void startAdditionalDNs(final Configuration conf,
final MiniDFSCluster cluster) throws IOException {

cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
null, null, null, false, false, false, null);
null, null, null, false, false, false, null, null, null);
cluster.triggerHeartbeats();
}
}
Loading