Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import javax.net.SocketFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import org.apache.commons.net.util.SubnetUtils;
import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -177,11 +181,35 @@ public static InetSocketAddress createSocketAddr(String target,
* include a port number
* @param configName the name of the configuration from which
* <code>target</code> was loaded. This is used in the
* exception message in the case that parsing fails.
* exception message in the case that parsing fails.
*/
public static InetSocketAddress createSocketAddr(String target,
int defaultPort,
String configName) {
return createSocketAddr(target, defaultPort, configName, false, 0);
}

/**
* Create an InetSocketAddress from the given target string and
* default port. If the string cannot be parsed correctly, the
* <code>configName</code> parameter is used as part of the
* exception message, allowing the user to better diagnose
* the misconfiguration.
*
* @param target a string of either "host" or "host:port"
* @param defaultPort the default port if <code>target</code> does not
* include a port number
* @param configName the name of the configuration from which
* <code>target</code> was loaded. This is used in the
* exception message in the case that parsing fails.
* @param useCacheIfPresent Whether use cache when create URI
* @param uriCacheExpireMs The expire time of uri cache
*/
public static InetSocketAddress createSocketAddr(String target,
int defaultPort,
String configName,
boolean useCacheIfPresent,
long uriCacheExpireMs) {
String helpText = "";
if (configName != null) {
helpText = " (configuration property '" + configName + "')";
Expand All @@ -191,33 +219,71 @@ public static InetSocketAddress createSocketAddr(String target,
helpText);
}
target = target.trim();
boolean hasScheme = target.contains("://");
URI uri = null;
try {
uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
}
boolean hasScheme = target.contains("://");
URI uri = createURI(target, hasScheme, helpText,
useCacheIfPresent, uriCacheExpireMs);

String host = uri.getHost();
int port = uri.getPort();
if (port == -1) {
port = defaultPort;
}
String path = uri.getPath();

if ((host == null) || (port < 0) ||
(!hasScheme && path != null && !path.isEmpty()))
{
(!hasScheme && path != null && !path.isEmpty())) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
}
return createSocketAddrForHost(host, port);
}

private static volatile Cache<String, URI> uriCache;
private static final long URI_CACHE_SIZE_DEFAULT = 1000;

private static URI createURI(String target,
boolean hasScheme,
String helpText,
boolean useCacheIfPresent,
long uriCacheExpireMs) {
URI uri;
if (useCacheIfPresent) {
initURICache(uriCacheExpireMs);
uri = uriCache.getIfPresent(target);
if (uri != null) {
return uri;
}
}

try {
uri = hasScheme ? URI.create(target) :
URI.create("dummyscheme://" + target);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
}

if (useCacheIfPresent) {
uriCache.put(target, uri);
}
return uri;
}

private static void initURICache(long cacheExpireMs) {
if (uriCache == null) {
synchronized (NetUtils.class) {
if (uriCache == null) {
uriCache = CacheBuilder.newBuilder()
.maximumSize(URI_CACHE_SIZE_DEFAULT)
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.build();
}
}
}
}

/**
* Create a socket address with the given host and port. The hostname
* might be replaced with another host that was set via
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,49 @@ public void testCreateSocketAddress() throws Throwable {
}
}

@Test
public void testCreateSocketAddressWithURICache() throws Throwable {
InetSocketAddress addr = NetUtils.createSocketAddr(
"127.0.0.1:12345", 1000, "myconfig", true, 10_000);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(12345, addr.getPort());

addr = NetUtils.createSocketAddr(
"127.0.0.1:12345", 1000, "myconfig", true, 10_000);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(12345, addr.getPort());

// ----------------------------------------------------

addr = NetUtils.createSocketAddr(
"127.0.0.1", 1000, "myconfig", true, 10_000);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(1000, addr.getPort());

addr = NetUtils.createSocketAddr(
"127.0.0.1", 1000, "myconfig", true, 10_000);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(1000, addr.getPort());

// ----------------------------------------------------

try {
addr = NetUtils.createSocketAddr(
"127.0.0.1:blahblah", 1000, "myconfig", true, 10_000);
fail("Should have failed to parse bad port");
} catch (IllegalArgumentException iae) {
assertInException(iae, "myconfig");
}

try {
addr = NetUtils.createSocketAddr(
"127.0.0.1:blahblah", 1000, "myconfig", true, 10_000);
fail("Should have failed to parse bad port");
} catch (IllegalArgumentException iae) {
assertInException(iae, "myconfig");
}
}

private void assertRemoteDetailsIncluded(IOException wrapped)
throws Throwable {
assertInException(wrapped, "desthost");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,16 @@ public void testSocketAddrWithNameToStaticIP() {
verifyServiceAddr(staticHost, "255.255.255.255");
}

@Test
public void testSocketAddrWithChangeIP() {
String staticHost = "host4";
NetUtils.addStaticResolution(staticHost, "255.255.255.255");
verifyServiceAddr(staticHost, "255.255.255.255");

NetUtils.addStaticResolution(staticHost, "255.255.255.254");
verifyServiceAddr(staticHost, "255.255.255.254");
}

// this is a bizarre case, but it's if a test tries to remap an ip address
@Test
public void testSocketAddrWithIPToStaticIP() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,10 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
boolean uriCacheEnable = dfsClient.getConf().isUriCacheEnable();
long uriCacheExpireMs = dfsClient.getConf().getUriCacheExpireMs();
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr,
-1, null, uriCacheEnable, uriCacheExpireMs);
return new DNAddrPair(chosenNode, targetAddr, storageType, block);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public interface HdfsClientConfigKeys {
long SECOND = 1000L;
long MINUTE = 60 * SECOND;
long HOUR = 60 * MINUTE;

String DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
long DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024;
Expand Down Expand Up @@ -414,6 +415,11 @@ interface Read {

String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size";

String URI_CACHE_KEY = PREFIX + "uri.cache.enable";
boolean URI_CACHE_DEFAULT = false;
String URI_CACHE_EXPIRE_MS_KEY = PREFIX + "uri.cache.expire.ms";
long URI_CACHE_EXPIRE_MS_DEFAULT = 12 * HOUR;

interface ShortCircuit {
String PREFIX = Read.PREFIX + "shortcircuit.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public class DfsClientConf {
private final int blockWriteLocateFollowingMaxDelayMs;
private final long defaultBlockSize;
private final long prefetchSize;
private final boolean uriCacheEnable;
private final long uriCacheExpireMs;
private final short defaultReplication;
private final String taskId;
private final FsPermission uMask;
Expand Down Expand Up @@ -211,24 +213,7 @@ public DfsClientConf(Configuration conf) {
Write.MAX_PACKETS_IN_FLIGHT_KEY,
Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);

final boolean byteArrayManagerEnabled = conf.getBoolean(
Write.ByteArrayManager.ENABLED_KEY,
Write.ByteArrayManager.ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
writeByteArrayManagerConf = null;
} else {
final int countThreshold = conf.getInt(
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
Write.ByteArrayManager.COUNT_LIMIT_KEY,
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
writeByteArrayManagerConf = new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}
writeByteArrayManagerConf = loadWriteByteArrayManagerConf(conf);

defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
DFS_BLOCK_SIZE_DEFAULT);
Expand All @@ -240,6 +225,14 @@ public DfsClientConf(Configuration conf) {
Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
10 * defaultBlockSize);

uriCacheEnable = conf.getBoolean(Read.URI_CACHE_KEY,
Read.URI_CACHE_DEFAULT);
uriCacheExpireMs = conf.getLong(Read.URI_CACHE_EXPIRE_MS_KEY,
Read.URI_CACHE_EXPIRE_MS_DEFAULT);
Preconditions.checkArgument(uriCacheExpireMs >= 0, "The value of " +
Read.URI_CACHE_EXPIRE_MS_KEY + " can't be less then 0.");

numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
numBlockWriteRetry = conf.getInt(
Expand Down Expand Up @@ -308,6 +301,27 @@ public DfsClientConf(Configuration conf) {
"can't be more then 5.");
}

private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
Configuration conf) {
final boolean byteArrayManagerEnabled = conf.getBoolean(
Write.ByteArrayManager.ENABLED_KEY,
Write.ByteArrayManager.ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
return null;
}
final int countThreshold = conf.getInt(
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
Write.ByteArrayManager.COUNT_LIMIT_KEY,
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
return new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}

@SuppressWarnings("unchecked")
private List<Class<? extends ReplicaAccessorBuilder>>
loadReplicaAccessorBuilderClasses(Configuration conf) {
Expand Down Expand Up @@ -555,6 +569,20 @@ public long getPrefetchSize() {
return prefetchSize;
}

/**
* @return the uriCacheEnable
*/
public boolean isUriCacheEnable() {
return uriCacheEnable;
}

/**
* @return the uriCacheExpireMs
*/
public long getUriCacheExpireMs() {
return uriCacheExpireMs;
}

/**
* @return the defaultReplication
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4185,6 +4185,24 @@
</description>
</property>

<property>
<name>dfs.client.read.uri.cache.enable</name>
<value>false</value>
<description>
If true, dfs client will use cache when creating URI based on host:port
to reduce the frequency of URI object creation.
Default is false.
</description>
</property>

<property>
<name>dfs.client.read.uri.cache.expire.ms</name>
<value>43200000</value>
<description>
The expire time of uri cache. Default is 43200000 ms, which is 12 hours.
</description>
</property>

<property>
<name>dfs.client.read.short.circuit.replica.stale.threshold.ms</name>
<value>1800000</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void initializeMemberVariables() {
HdfsClientConfigKeys.Failover.class,
HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
HdfsClientConfigKeys.BlockWrite.class,
HdfsClientConfigKeys.Read.class,
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };

// Set error modes
Expand Down