Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import javax.net.SocketFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import org.apache.commons.net.util.SubnetUtils;
import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -177,11 +181,33 @@ public static InetSocketAddress createSocketAddr(String target,
* include a port number
* @param configName the name of the configuration from which
* <code>target</code> was loaded. This is used in the
* exception message in the case that parsing fails.
* exception message in the case that parsing fails.
*/
public static InetSocketAddress createSocketAddr(String target,
int defaultPort,
String configName) {
return createSocketAddr(target, defaultPort, configName, false);
}

/**
* Create an InetSocketAddress from the given target string and
* default port. If the string cannot be parsed correctly, the
* <code>configName</code> parameter is used as part of the
* exception message, allowing the user to better diagnose
* the misconfiguration.
*
* @param target a string of either "host" or "host:port"
* @param defaultPort the default port if <code>target</code> does not
* include a port number
* @param configName the name of the configuration from which
* <code>target</code> was loaded. This is used in the
* exception message in the case that parsing fails.
* @param useCacheIfPresent Whether use cache when create URI
*/
public static InetSocketAddress createSocketAddr(String target,
int defaultPort,
String configName,
boolean useCacheIfPresent) {
String helpText = "";
if (configName != null) {
helpText = " (configuration property '" + configName + "')";
Expand All @@ -191,33 +217,59 @@ public static InetSocketAddress createSocketAddr(String target,
helpText);
}
target = target.trim();
boolean hasScheme = target.contains("://");
URI uri = null;
try {
uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
}
boolean hasScheme = target.contains("://");
URI uri = createURI(target, hasScheme, helpText, useCacheIfPresent);

String host = uri.getHost();
int port = uri.getPort();
if (port == -1) {
port = defaultPort;
}
String path = uri.getPath();

if ((host == null) || (port < 0) ||
(!hasScheme && path != null && !path.isEmpty()))
{
(!hasScheme && path != null && !path.isEmpty())) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
}
return createSocketAddrForHost(host, port);
}

private static final long URI_CACHE_SIZE_DEFAULT = 1000;
private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12;
private static final Cache<String, URI> URI_CACHE = CacheBuilder.newBuilder()
.maximumSize(URI_CACHE_SIZE_DEFAULT)
.expireAfterWrite(URI_CACHE_EXPIRE_TIME_DEFAULT, TimeUnit.HOURS)
.build();

private static URI createURI(String target,
boolean hasScheme,
String helpText,
boolean useCacheIfPresent) {
URI uri;
if (useCacheIfPresent) {
uri = URI_CACHE.getIfPresent(target);
if (uri != null) {
return uri;
}
}

try {
uri = hasScheme ? URI.create(target) :
URI.create("dummyscheme://" + target);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
}

if (useCacheIfPresent) {
URI_CACHE.put(target, uri);
}
return uri;
}

/**
* Create a socket address with the given host and port. The hostname
* might be replaced with another host that was set via
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,49 @@ public void testCreateSocketAddress() throws Throwable {
}
}

@Test
public void testCreateSocketAddressWithURICache() throws Throwable {
InetSocketAddress addr = NetUtils.createSocketAddr(
"127.0.0.1:12345", 1000, "myconfig", true);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(12345, addr.getPort());

addr = NetUtils.createSocketAddr(
"127.0.0.1:12345", 1000, "myconfig", true);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(12345, addr.getPort());

// ----------------------------------------------------

addr = NetUtils.createSocketAddr(
"127.0.0.1", 1000, "myconfig", true);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(1000, addr.getPort());

addr = NetUtils.createSocketAddr(
"127.0.0.1", 1000, "myconfig", true);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(1000, addr.getPort());

// ----------------------------------------------------

try {
addr = NetUtils.createSocketAddr(
"127.0.0.1:blahblah", 1000, "myconfig", true);
fail("Should have failed to parse bad port");
} catch (IllegalArgumentException iae) {
assertInException(iae, "myconfig");
}

try {
addr = NetUtils.createSocketAddr(
"127.0.0.1:blahblah", 1000, "myconfig", true);
fail("Should have failed to parse bad port");
} catch (IllegalArgumentException iae) {
assertInException(iae, "myconfig");
}
}

private void assertRemoteDetailsIncluded(IOException wrapped)
throws Throwable {
assertInException(wrapped, "desthost");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,16 @@ public void testSocketAddrWithNameToStaticIP() {
verifyServiceAddr(staticHost, "255.255.255.255");
}

@Test
public void testSocketAddrWithChangeIP() {
String staticHost = "host4";
NetUtils.addStaticResolution(staticHost, "255.255.255.255");
verifyServiceAddr(staticHost, "255.255.255.255");

NetUtils.addStaticResolution(staticHost, "255.255.255.254");
verifyServiceAddr(staticHost, "255.255.255.254");
}

// this is a bizarre case, but it's if a test tries to remap an ip address
@Test
public void testSocketAddrWithIPToStaticIP() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,9 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
boolean uriCacheEnabled = dfsClient.getConf().isUriCacheEnabled();
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr,
-1, null, uriCacheEnabled);
return new DNAddrPair(chosenNode, targetAddr, storageType, block);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ interface Read {

String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size";

String URI_CACHE_KEY = PREFIX + "uri.cache.enabled";
boolean URI_CACHE_DEFAULT = false;

interface ShortCircuit {
String PREFIX = Read.PREFIX + "shortcircuit.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class DfsClientConf {
private final int blockWriteLocateFollowingMaxDelayMs;
private final long defaultBlockSize;
private final long prefetchSize;
private final boolean uriCacheEnabled;
private final short defaultReplication;
private final String taskId;
private final FsPermission uMask;
Expand Down Expand Up @@ -211,24 +212,7 @@ public DfsClientConf(Configuration conf) {
Write.MAX_PACKETS_IN_FLIGHT_KEY,
Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);

final boolean byteArrayManagerEnabled = conf.getBoolean(
Write.ByteArrayManager.ENABLED_KEY,
Write.ByteArrayManager.ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
writeByteArrayManagerConf = null;
} else {
final int countThreshold = conf.getInt(
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
Write.ByteArrayManager.COUNT_LIMIT_KEY,
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
writeByteArrayManagerConf = new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}
writeByteArrayManagerConf = loadWriteByteArrayManagerConf(conf);

defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
DFS_BLOCK_SIZE_DEFAULT);
Expand All @@ -240,6 +224,10 @@ public DfsClientConf(Configuration conf) {
Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
10 * defaultBlockSize);

uriCacheEnabled = conf.getBoolean(Read.URI_CACHE_KEY,
Read.URI_CACHE_DEFAULT);

numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
numBlockWriteRetry = conf.getInt(
Expand Down Expand Up @@ -308,6 +296,27 @@ public DfsClientConf(Configuration conf) {
"can't be more then 5.");
}

private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
Configuration conf) {
final boolean byteArrayManagerEnabled = conf.getBoolean(
Write.ByteArrayManager.ENABLED_KEY,
Write.ByteArrayManager.ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
return null;
}
final int countThreshold = conf.getInt(
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
Write.ByteArrayManager.COUNT_LIMIT_KEY,
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
return new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}

@SuppressWarnings("unchecked")
private List<Class<? extends ReplicaAccessorBuilder>>
loadReplicaAccessorBuilderClasses(Configuration conf) {
Expand Down Expand Up @@ -555,6 +564,13 @@ public long getPrefetchSize() {
return prefetchSize;
}

/**
* @return the uriCacheEnable
*/
public boolean isUriCacheEnabled() {
return uriCacheEnabled;
}

/**
* @return the defaultReplication
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4185,6 +4185,15 @@
</description>
</property>

<property>
<name>dfs.client.read.uri.cache.enabled</name>
<value>false</value>
<description>
If true, dfs client will use cache when creating URI based on host:port
to reduce the frequency of URI object creation.
</description>
</property>

<property>
<name>dfs.client.read.short.circuit.replica.stale.threshold.ms</name>
<value>1800000</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void initializeMemberVariables() {
HdfsClientConfigKeys.Failover.class,
HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
HdfsClientConfigKeys.BlockWrite.class,
HdfsClientConfigKeys.Read.class,
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };

// Set error modes
Expand Down