Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 49 additions & 55 deletions hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -37,6 +38,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand All @@ -56,6 +58,9 @@
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
Expand Down Expand Up @@ -95,53 +100,58 @@ private HddsUtils() {
*
* @return Target {@code InetSocketAddress} for the SCM client endpoint.
*/
public static InetSocketAddress getScmAddressForClients(
public static Collection<InetSocketAddress> getScmAddressForClients(
ConfigurationSource conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);

if (!host.isPresent()) {
// Fallback to Ozone SCM name
host = Optional.of(getSingleSCMAddress(conf).getHostName());
}
if (SCMHAUtils.getScmServiceId(conf) != null) {
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
Collection<InetSocketAddress> scmAddressList =
new HashSet<>(scmNodeInfoList.size());
for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
if (scmNodeInfo.getScmClientAddress() == null) {
throw new ConfigurationException("Ozone scm client address is not " +
"set for SCM service-id " + scmNodeInfo.getServiceId() +
"node-id" + scmNodeInfo.getNodeId());
}
scmAddressList.add(
NetUtils.createSocketAddr(scmNodeInfo.getScmClientAddress()));
}
return scmAddressList;
} else {
String address = conf.getTrimmed(OZONE_SCM_CLIENT_ADDRESS_KEY);
int port = -1;

if (address == null) {
// fall back to ozone.scm.names for non-ha
Collection<String> scmAddresses =
conf.getTrimmedStringCollection(OZONE_SCM_NAMES);

if (scmAddresses.isEmpty()) {
throw new ConfigurationException("Ozone scm client address is not " +
"set. Configure one of these config " +
OZONE_SCM_CLIENT_ADDRESS_KEY + ", " + OZONE_SCM_NAMES);
}

final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT);
if (scmAddresses.size() > 1) {
throw new ConfigurationException("For non-HA SCM " + OZONE_SCM_NAMES
+ " should be set with single address");
}

return NetUtils.createSocketAddr(host.get() + ":" + port);
}
address = scmAddresses.iterator().next();

/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM for block service. If
* {@link ScmConfigKeys#OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY} is not defined
* then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used. If neither
* is defined then {@link ScmConfigKeys#OZONE_SCM_NAMES} is used.
*
* @return Target {@code InetSocketAddress} for the SCM block client endpoint.
* @throws IllegalArgumentException if configuration is not defined.
*/
public static InetSocketAddress getScmAddressForBlockClients(
ConfigurationSource conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
port = conf.getInt(OZONE_SCM_CLIENT_PORT_KEY,
OZONE_SCM_CLIENT_PORT_DEFAULT);
} else {
port = getHostPort(address)
.orElse(conf.getInt(OZONE_SCM_CLIENT_PORT_KEY,
OZONE_SCM_CLIENT_PORT_DEFAULT));
}

if (!host.isPresent()) {
// Fallback to Ozone SCM name
host = Optional.of(getSingleSCMAddress(conf).getHostName());
return Collections.singletonList(
NetUtils.createSocketAddr(getHostName(address).get() + ":" + port));
}

final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT);

return NetUtils.createSocketAddr(host.get() + ":" + port);
}



/**
* Retrieve the hostname, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
Expand Down Expand Up @@ -231,7 +241,7 @@ public static OptionalInt getPortNumberFromConfigKeys(
* @return A collection of SCM addresses
* @throws IllegalArgumentException If the configuration is invalid
*/
public static Collection<InetSocketAddress> getSCMAddresses(
public static Collection<InetSocketAddress> getSCMAddressForDatanodes(
ConfigurationSource conf) {

// First check HA style config, if not defined fall back to OZONE_SCM_NAMES
Expand Down Expand Up @@ -300,22 +310,6 @@ public static InetSocketAddress getReconAddresses(
return NetUtils.createSocketAddr(hostname.get(), port);
}

/**
* Retrieve the address of the only SCM (as currently multiple ones are not
* supported).
*
* @return SCM address
* @throws IllegalArgumentException if {@code conf} has more than one SCM
* address or it has none
*/
public static InetSocketAddress getSingleSCMAddress(
ConfigurationSource conf) {
Collection<InetSocketAddress> singleton = getSCMAddresses(conf);
// Preconditions.checkArgument(singleton.size() == 1,
// MULTIPLE_SCM_NOT_YET_SUPPORTED);
return singleton.iterator().next();
}

/**
* Returns the hostname for this datanode. If the hostname is not
* explicitly configured in the given config, then it is determined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NODES_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT;
Expand Down Expand Up @@ -141,7 +142,7 @@ public static List<SCMNodeInfo> buildNodeInfo(ConfigurationSource conf) {
String scmDatanodeAddress =
getHostNameFromConfigKeys(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY,
OZONE_SCM_CLIENT_ADDRESS_KEY).orElse(null);
OZONE_SCM_CLIENT_ADDRESS_KEY, OZONE_SCM_NAMES).orElse(null);

int scmBlockClientPort = getPortNumberFromConfigKeys(conf,
OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY)
Expand Down Expand Up @@ -199,7 +200,7 @@ private static int getPort(ConfigurationSource conf,
return port.getAsInt();
} else {
return conf.getInt(ConfUtils.addKeySuffixes(portKey, scmServiceId,
scmNodeId), defaultPort);
scmNodeId), conf.getInt(portKey, defaultPort));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.hdds.HddsUtils.getSCMAddresses;
import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
Expand Down Expand Up @@ -88,23 +88,23 @@ public void testGetSCMAddresses() {

// Verify valid IP address setup
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "1.2.3.4");
addresses = getSCMAddresses(conf);
addresses = getSCMAddressForDatanodes(conf);
assertThat(addresses.size(), is(1));
addr = addresses.iterator().next();
assertThat(addr.getHostName(), is("1.2.3.4"));
assertThat(addr.getPort(), is(OZONE_SCM_DATANODE_PORT_DEFAULT));

// Verify valid hostname setup
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1");
addresses = getSCMAddresses(conf);
addresses = getSCMAddressForDatanodes(conf);
assertThat(addresses.size(), is(1));
addr = addresses.iterator().next();
assertThat(addr.getHostName(), is("scm1"));
assertThat(addr.getPort(), is(OZONE_SCM_DATANODE_PORT_DEFAULT));

// Verify valid hostname and port
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234");
addresses = getSCMAddresses(conf);
addresses = getSCMAddressForDatanodes(conf);
assertThat(addresses.size(), is(1));
addr = addresses.iterator().next();
assertThat(addr.getHostName(), is("scm1"));
Expand All @@ -118,7 +118,7 @@ public void testGetSCMAddresses() {
// Verify multiple hosts and port
conf.setStrings(
ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234,scm2:2345,scm3:3456");
addresses = getSCMAddresses(conf);
addresses = getSCMAddressForDatanodes(conf);
assertThat(addresses.size(), is(3));
it = addresses.iterator();
HashMap<String, Integer> expected1 = new HashMap<>(hostsAndPorts);
Expand All @@ -132,7 +132,7 @@ public void testGetSCMAddresses() {
// Verify names with spaces
conf.setStrings(
ScmConfigKeys.OZONE_SCM_NAMES, " scm1:1234, scm2:2345 , scm3:3456 ");
addresses = getSCMAddresses(conf);
addresses = getSCMAddressForDatanodes(conf);
assertThat(addresses.size(), is(3));
it = addresses.iterator();
HashMap<String, Integer> expected2 = new HashMap<>(hostsAndPorts);
Expand All @@ -146,7 +146,7 @@ public void testGetSCMAddresses() {
// Verify empty value
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "");
try {
getSCMAddresses(conf);
getSCMAddressForDatanodes(conf);
fail("Empty value should cause an IllegalArgumentException");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
Expand All @@ -155,7 +155,7 @@ public void testGetSCMAddresses() {
// Verify invalid hostname
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "s..x..:1234");
try {
getSCMAddresses(conf);
getSCMAddressForDatanodes(conf);
fail("An invalid hostname should cause an IllegalArgumentException");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
Expand All @@ -164,7 +164,7 @@ public void testGetSCMAddresses() {
// Verify invalid port
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz");
try {
getSCMAddresses(conf);
getSCMAddressForDatanodes(conf);
fail("An invalid port should cause an IllegalArgumentException");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
Expand All @@ -173,7 +173,7 @@ public void testGetSCMAddresses() {
// Verify a mixed case (valid and invalid value both appears)
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, "scm1:1234, scm:xyz");
try {
getSCMAddresses(conf);
getSCMAddressForDatanodes(conf);
fail("An invalid value should cause an IllegalArgumentException");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
Expand Down Expand Up @@ -201,7 +201,7 @@ public void testGetSCMAddressesWithHAConfig() {
}

Collection<InetSocketAddress> scmAddressList =
HddsUtils.getSCMAddresses(conf);
HddsUtils.getSCMAddressForDatanodes(conf);

Assert.assertNotNull(scmAddressList);
Assert.assertEquals(3, scmAddressList.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private int getEndPointTaskThreadPoolSize() {
int totalServerCount = reconServerCount;

try {
totalServerCount += HddsUtils.getSCMAddresses(conf).size();
totalServerCount += HddsUtils.getSCMAddressForDatanodes(conf).size();
} catch (Exception e) {
LOG.error("Fail to get scm addresses", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

import com.google.common.base.Strings;
import static org.apache.hadoop.hdds.HddsUtils.getReconAddresses;
import static org.apache.hadoop.hdds.HddsUtils.getSCMAddresses;
import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,7 +79,7 @@ public InitDatanodeState(ConfigurationSource conf,
public DatanodeStateMachine.DatanodeStates call() throws Exception {
Collection<InetSocketAddress> addresses = null;
try {
addresses = getSCMAddresses(conf);
addresses = getSCMAddressForDatanodes(conf);
} catch (IllegalArgumentException e) {
if(!Strings.isNullOrEmpty(e.getMessage())) {
LOG.error("Failed to get SCM addresses: {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
import static org.apache.hadoop.hdds.HddsUtils.getSingleSCMAddress;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
Expand Down Expand Up @@ -115,37 +114,6 @@ public static void addPBProtocol(Configuration conf, Class<?> protocol,
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}

/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target {@code InetSocketAddress} for the SCM service endpoint.
*/
public static InetSocketAddress getScmAddressForDataNodes(
ConfigurationSource conf) {
// We try the following settings in decreasing priority to retrieve the
// target host.
// - OZONE_SCM_DATANODE_ADDRESS_KEY
// - OZONE_SCM_CLIENT_ADDRESS_KEY
// - OZONE_SCM_NAMES
//
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);

if (!host.isPresent()) {
// Fallback to Ozone SCM name
host = Optional.of(getSingleSCMAddress(conf).getHostName());
}

final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT);

return NetUtils.createSocketAddr(host.get() + ":" + port);
}

/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
Expand Down
Loading