Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 71 additions & 115 deletions hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -30,14 +30,15 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
Expand Down Expand Up @@ -88,6 +89,9 @@ public final class HddsUtils {
"OzoneScmServiceInstance";
private static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC");

private static final String MULTIPLE_SCM_NOT_YET_SUPPORTED =
ScmConfigKeys.OZONE_SCM_NAMES + " must contain a single hostname."
+ " Multiple SCM hosts are currently unsupported";

private static final int NO_PORT = -1;

Expand All @@ -98,38 +102,22 @@ private HddsUtils() {
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM client endpoint.
* @return Target {@code InetSocketAddress} for the SCM client endpoint.
*/
public static InetSocketAddress getScmAddressForClients(Configuration conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);

if (!host.isPresent()) {
// Fallback to Ozone SCM names.
Collection<InetSocketAddress> scmAddresses = getSCMAddresses(conf);
if (scmAddresses.size() > 1) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_NAMES +
" must contain a single hostname. Multiple SCM hosts are " +
"currently unsupported");
}
host = Optional.of(scmAddresses.iterator().next().getHostName());
// Fallback to Ozone SCM name
host = Optional.of(getSingleSCMAddress(conf).getHostName());
}

if (!host.isPresent()) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY + " must be defined. See"
+ " https://wiki.apache.org/hadoop/Ozone#Configuration for "
+ "details"
+ " on configuring Ozone.");
}
final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT);

final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);

return NetUtils.createSocketAddr(host.get() + ":" + port
.orElse(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
return NetUtils.createSocketAddr(host.get() + ":" + port);
}

/**
Expand All @@ -139,45 +127,25 @@ public static InetSocketAddress getScmAddressForClients(Configuration conf) {
* then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used. If neither
* is defined then {@link ScmConfigKeys#OZONE_SCM_NAMES} is used.
*
* @param conf
* @return Target InetSocketAddress for the SCM block client endpoint.
* @return Target {@code InetSocketAddress} for the SCM block client endpoint.
* @throws IllegalArgumentException if configuration is not defined.
*/
public static InetSocketAddress getScmAddressForBlockClients(
Configuration conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);

if (!host.isPresent()) {
host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
}

if (!host.isPresent()) {
// Fallback to Ozone SCM names.
Collection<InetSocketAddress> scmAddresses = getSCMAddresses(conf);
if (scmAddresses.size() > 1) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_NAMES +
" must contain a single hostname. Multiple SCM hosts are " +
"currently unsupported");
}
host = Optional.of(scmAddresses.iterator().next().getHostName());
}
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);

if (!host.isPresent()) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY
+ " must be defined. See"
+ " https://wiki.apache.org/hadoop/Ozone#Configuration"
+ " for details on configuring Ozone.");
// Fallback to Ozone SCM name
host = Optional.of(getSingleSCMAddress(conf).getHostName());
}

final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT);

return NetUtils.createSocketAddr(host.get() + ":" + port
.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
return NetUtils.createSocketAddr(host.get() + ":" + port);
}

/**
Expand All @@ -198,13 +166,11 @@ public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient(
RetryPolicy retryPolicy =
RetryPolicies.retryForeverWithFixedSleep(
1000, TimeUnit.MILLISECONDS);
SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
new SCMSecurityProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(SCMSecurityProtocolPB.class, scmVersion,
address, UserGroupInformation.getCurrentUser(),
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf), retryPolicy).getProxy());
return scmSecurityClient;
return new SCMSecurityProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(SCMSecurityProtocolPB.class, scmVersion,
address, UserGroupInformation.getCurrentUser(),
conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf), retryPolicy).getProxy());
}

/**
Expand Down Expand Up @@ -249,19 +215,19 @@ public static Optional<String> getHostName(String value) {
}

/**
* Gets the port if there is one, throws otherwise.
* Gets the port if there is one, returns empty {@code OptionalInt} otherwise.
* @param value String in host:port format.
* @return Port
*/
public static Optional<Integer> getHostPort(String value) {
public static OptionalInt getHostPort(String value) {
if ((value == null) || value.isEmpty()) {
return Optional.empty();
return OptionalInt.empty();
}
int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT);
if (port == NO_PORT) {
return Optional.empty();
return OptionalInt.empty();
} else {
return Optional.of(port);
return OptionalInt.of(port);
}
}

Expand All @@ -277,53 +243,64 @@ public static Optional<Integer> getHostPort(String value) {
* @throws IllegalArgumentException if any values are not in the 'host'
* or host:port format.
*/
public static Optional<Integer> getPortNumberFromConfigKeys(
public static OptionalInt getPortNumberFromConfigKeys(
Configuration conf, String... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
final Optional<Integer> hostPort = getHostPort(value);
final OptionalInt hostPort = getHostPort(value);
if (hostPort.isPresent()) {
return hostPort;
}
}
return Optional.empty();
return OptionalInt.empty();
}

/**
* Retrieve the socket addresses of all storage container managers.
*
* @param conf
* @return A collection of SCM addresses
* @throws IllegalArgumentException If the configuration is invalid
*/
public static Collection<InetSocketAddress> getSCMAddresses(
Configuration conf) throws IllegalArgumentException {
Collection<InetSocketAddress> addresses =
new HashSet<InetSocketAddress>();
Configuration conf) {
Collection<String> names =
conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
if (names == null || names.isEmpty()) {
if (names.isEmpty()) {
throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
+ " need to be a set of valid DNS names or IP addresses."
+ " Null or empty address list found.");
+ " Empty address list found.");
}

final Optional<Integer> defaultPort = Optional
.of(ScmConfigKeys.OZONE_SCM_DEFAULT_PORT);
Collection<InetSocketAddress> addresses = new HashSet<>(names.size());
for (String address : names) {
Optional<String> hostname = getHostName(address);
if (!hostname.isPresent()) {
throw new IllegalArgumentException("Invalid hostname for SCM: "
+ hostname);
+ address);
}
Optional<Integer> port = getHostPort(address);
InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
port.orElse(defaultPort.get()));
int port = getHostPort(address)
.orElse(ScmConfigKeys.OZONE_SCM_DEFAULT_PORT);
InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(), port);
addresses.add(addr);
}
return addresses;
}


/**
* Retrieve the address of the only SCM (as currently multiple ones are not
* supported).
*
* @return SCM address
* @throws IllegalArgumentException if {@code conf} has more than one SCM
* address or it has none
*/
public static InetSocketAddress getSingleSCMAddress(Configuration conf) {
Collection<InetSocketAddress> singleton = getSCMAddresses(conf);
Preconditions.checkArgument(singleton.size() == 1,
MULTIPLE_SCM_NOT_YET_SUPPORTED);
return singleton.iterator().next();
}

/**
* Returns the hostname for this datanode. If the hostname is not
* explicitly configured in the given config, then it is determined
Expand All @@ -340,9 +317,9 @@ public static String getHostName(Configuration conf)
String name = conf.get(DFS_DATANODE_HOST_NAME_KEY);
if (name == null) {
String dnsInterface = conf.get(
CommonConfigurationKeys.HADOOP_SECURITY_DNS_INTERFACE_KEY);
CommonConfigurationKeysPublic.HADOOP_SECURITY_DNS_INTERFACE_KEY);
String nameServer = conf.get(
CommonConfigurationKeys.HADOOP_SECURITY_DNS_NAMESERVER_KEY);
CommonConfigurationKeysPublic.HADOOP_SECURITY_DNS_NAMESERVER_KEY);
boolean fallbackToHosts = false;

if (dnsInterface == null) {
Expand Down Expand Up @@ -399,7 +376,6 @@ public static boolean isReadOnly(
* read/write data on datanode via input/output stream.
* Ozone datanode uses this helper to decide which command requires block
* token.
* @param cmdType
* @return true if it is a cmd that block token should be checked when
* security is enabled
* false if block token does not apply to the command.
Expand Down Expand Up @@ -518,51 +494,31 @@ public static long getUtcTime() {
* is defined then {@link ScmConfigKeys#OZONE_SCM_NAMES} is used.
*
* @param conf
* @return Target InetSocketAddress for the SCM block client endpoint.
* @throws IllegalArgumentException if configuration is not defined.
* @return Target {@code InetSocketAddress} for the SCM block client endpoint.
* @throws IllegalArgumentException if configuration is not defined or invalid
*/
public static InetSocketAddress getScmAddressForSecurityProtocol(
Configuration conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY);

if (!host.isPresent()) {
host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
}

if (!host.isPresent()) {
// Fallback to Ozone SCM names.
Collection<InetSocketAddress> scmAddresses = getSCMAddresses(conf);
if (scmAddresses.size() > 1) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_NAMES +
" must contain a single hostname. Multiple SCM hosts are " +
"currently unsupported");
}
host = Optional.of(scmAddresses.iterator().next().getHostName());
}
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);

if (!host.isPresent()) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY
+ " must be defined. See"
+ " https://wiki.apache.org/hadoop/Ozone#Configuration"
+ " for details on configuring Ozone.");
// Fallback to Ozone SCM name
host = Optional.of(getSingleSCMAddress(conf).getHostName());
}

final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_KEY);
final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT);

return NetUtils.createSocketAddr(host.get() + ":" + port
.orElse(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT));
return NetUtils.createSocketAddr(host.get() + ":" + port);
}

/**
* Initialize hadoop metrics system for Ozone servers.
* @param configuration OzoneConfiguration to use.
* @param serverName The logical name of the server components.
* @return
*/
public static MetricsSystem initializeMetrics(
OzoneConfiguration configuration, String serverName) {
Expand Down
Loading