diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index c5a5b111b2b3d..004fa1c62ce63 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -39,12 +39,16 @@ import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import javax.net.SocketFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + import org.apache.commons.net.util.SubnetUtils; import org.apache.commons.net.util.SubnetUtils.SubnetInfo; import org.apache.hadoop.classification.InterfaceAudience; @@ -177,11 +181,33 @@ public static InetSocketAddress createSocketAddr(String target, * include a port number * @param configName the name of the configuration from which * target was loaded. This is used in the - * exception message in the case that parsing fails. + * exception message in the case that parsing fails. */ public static InetSocketAddress createSocketAddr(String target, int defaultPort, String configName) { + return createSocketAddr(target, defaultPort, configName, false); + } + + /** + * Create an InetSocketAddress from the given target string and + * default port. If the string cannot be parsed correctly, the + * configName parameter is used as part of the + * exception message, allowing the user to better diagnose + * the misconfiguration. + * + * @param target a string of either "host" or "host:port" + * @param defaultPort the default port if target does not + * include a port number + * @param configName the name of the configuration from which + * target was loaded. This is used in the + * exception message in the case that parsing fails. + * @param useCacheIfPresent Whether use cache when create URI + */ + public static InetSocketAddress createSocketAddr(String target, + int defaultPort, + String configName, + boolean useCacheIfPresent) { String helpText = ""; if (configName != null) { helpText = " (configuration property '" + configName + "')"; @@ -191,15 +217,8 @@ public static InetSocketAddress createSocketAddr(String target, helpText); } target = target.trim(); - boolean hasScheme = target.contains("://"); - URI uri = null; - try { - uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - "Does not contain a valid host:port authority: " + target + helpText - ); - } + boolean hasScheme = target.contains("://"); + URI uri = createURI(target, hasScheme, helpText, useCacheIfPresent); String host = uri.getHost(); int port = uri.getPort(); @@ -207,10 +226,9 @@ public static InetSocketAddress createSocketAddr(String target, port = defaultPort; } String path = uri.getPath(); - + if ((host == null) || (port < 0) || - (!hasScheme && path != null && !path.isEmpty())) - { + (!hasScheme && path != null && !path.isEmpty())) { throw new IllegalArgumentException( "Does not contain a valid host:port authority: " + target + helpText ); @@ -218,6 +236,40 @@ public static InetSocketAddress createSocketAddr(String target, return createSocketAddrForHost(host, port); } + private static final long URI_CACHE_SIZE_DEFAULT = 1000; + private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12; + private static final Cache URI_CACHE = CacheBuilder.newBuilder() + .maximumSize(URI_CACHE_SIZE_DEFAULT) + .expireAfterWrite(URI_CACHE_EXPIRE_TIME_DEFAULT, TimeUnit.HOURS) + .build(); + + private static URI createURI(String target, + boolean hasScheme, + String helpText, + boolean useCacheIfPresent) { + URI uri; + if (useCacheIfPresent) { + uri = URI_CACHE.getIfPresent(target); + if (uri != null) { + return uri; + } + } + + try { + uri = hasScheme ? URI.create(target) : + URI.create("dummyscheme://" + target); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Does not contain a valid host:port authority: " + target + helpText + ); + } + + if (useCacheIfPresent) { + URI_CACHE.put(target, uri); + } + return uri; + } + /** * Create a socket address with the given host and port. The hostname * might be replaced with another host that was set via diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java index 76284932c43de..cfffd85186b89 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java @@ -352,7 +352,7 @@ public void testCreateSocketAddress() throws Throwable { assertEquals(1000, addr.getPort()); try { - addr = NetUtils.createSocketAddr( + NetUtils.createSocketAddr( "127.0.0.1:blahblah", 1000, "myconfig"); fail("Should have failed to parse bad port"); } catch (IllegalArgumentException iae) { @@ -360,6 +360,49 @@ public void testCreateSocketAddress() throws Throwable { } } + @Test + public void testCreateSocketAddressWithURICache() throws Throwable { + InetSocketAddress addr = NetUtils.createSocketAddr( + "127.0.0.1:12345", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(12345, addr.getPort()); + + addr = NetUtils.createSocketAddr( + "127.0.0.1:12345", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(12345, addr.getPort()); + + // ---------------------------------------------------- + + addr = NetUtils.createSocketAddr( + "127.0.0.1", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(1000, addr.getPort()); + + addr = NetUtils.createSocketAddr( + "127.0.0.1", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(1000, addr.getPort()); + + // ---------------------------------------------------- + + try { + NetUtils.createSocketAddr( + "127.0.0.1:blahblah", 1000, "myconfig", true); + fail("Should have failed to parse bad port"); + } catch (IllegalArgumentException iae) { + assertInException(iae, "myconfig"); + } + + try { + NetUtils.createSocketAddr( + "127.0.0.1:blahblah", 1000, "myconfig", true); + fail("Should have failed to parse bad port"); + } catch (IllegalArgumentException iae) { + assertInException(iae, "myconfig"); + } + } + private void assertRemoteDetailsIncluded(IOException wrapped) throws Throwable { assertInException(wrapped, "desthost"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java index 016c589ae3a24..2e1ca6680f586 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java @@ -370,6 +370,16 @@ public void testSocketAddrWithNameToStaticIP() { verifyServiceAddr(staticHost, "255.255.255.255"); } + @Test + public void testSocketAddrWithChangeIP() { + String staticHost = "host4"; + NetUtils.addStaticResolution(staticHost, "255.255.255.255"); + verifyServiceAddr(staticHost, "255.255.255.255"); + + NetUtils.addStaticResolution(staticHost, "255.255.255.254"); + verifyServiceAddr(staticHost, "255.255.255.254"); + } + // this is a bizarre case, but it's if a test tries to remap an ip address @Test public void testSocketAddrWithIPToStaticIP() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 0676cf91a7d9e..402c3823c1187 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1086,7 +1086,9 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, final String dnAddr = chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); DFSClient.LOG.debug("Connecting to datanode {}", dnAddr); - InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); + boolean uriCacheEnabled = dfsClient.getConf().isUriCacheEnabled(); + InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr, + -1, null, uriCacheEnabled); return new DNAddrPair(chosenNode, targetAddr, storageType, block); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index e8b540286c280..3d0de4a15c23a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -414,6 +414,9 @@ interface Read { String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size"; + String URI_CACHE_KEY = PREFIX + "uri.cache.enabled"; + boolean URI_CACHE_DEFAULT = false; + interface ShortCircuit { String PREFIX = Read.PREFIX + "shortcircuit."; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index e41b608b5b5ef..07fe80c2ec0a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -129,6 +129,7 @@ public class DfsClientConf { private final int blockWriteLocateFollowingMaxDelayMs; private final long defaultBlockSize; private final long prefetchSize; + private final boolean uriCacheEnabled; private final short defaultReplication; private final String taskId; private final FsPermission uMask; @@ -211,24 +212,7 @@ public DfsClientConf(Configuration conf) { Write.MAX_PACKETS_IN_FLIGHT_KEY, Write.MAX_PACKETS_IN_FLIGHT_DEFAULT); - final boolean byteArrayManagerEnabled = conf.getBoolean( - Write.ByteArrayManager.ENABLED_KEY, - Write.ByteArrayManager.ENABLED_DEFAULT); - if (!byteArrayManagerEnabled) { - writeByteArrayManagerConf = null; - } else { - final int countThreshold = conf.getInt( - Write.ByteArrayManager.COUNT_THRESHOLD_KEY, - Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); - final int countLimit = conf.getInt( - Write.ByteArrayManager.COUNT_LIMIT_KEY, - Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); - final long countResetTimePeriodMs = conf.getLong( - Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, - Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); - writeByteArrayManagerConf = new ByteArrayManager.Conf( - countThreshold, countLimit, countResetTimePeriodMs); - } + writeByteArrayManagerConf = loadWriteByteArrayManagerConf(conf); defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); @@ -240,6 +224,10 @@ public DfsClientConf(Configuration conf) { Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY, 10 * defaultBlockSize); + + uriCacheEnabled = conf.getBoolean(Read.URI_CACHE_KEY, + Read.URI_CACHE_DEFAULT); + numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); numBlockWriteRetry = conf.getInt( @@ -308,6 +296,27 @@ public DfsClientConf(Configuration conf) { "can't be more then 5."); } + private ByteArrayManager.Conf loadWriteByteArrayManagerConf( + Configuration conf) { + final boolean byteArrayManagerEnabled = conf.getBoolean( + Write.ByteArrayManager.ENABLED_KEY, + Write.ByteArrayManager.ENABLED_DEFAULT); + if (!byteArrayManagerEnabled) { + return null; + } + final int countThreshold = conf.getInt( + Write.ByteArrayManager.COUNT_THRESHOLD_KEY, + Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); + final int countLimit = conf.getInt( + Write.ByteArrayManager.COUNT_LIMIT_KEY, + Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); + final long countResetTimePeriodMs = conf.getLong( + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); + return new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs); + } + @SuppressWarnings("unchecked") private List> loadReplicaAccessorBuilderClasses(Configuration conf) { @@ -555,6 +564,13 @@ public long getPrefetchSize() { return prefetchSize; } + /** + * @return the uriCacheEnable + */ + public boolean isUriCacheEnabled() { + return uriCacheEnabled; + } + /** * @return the defaultReplication */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index db152e60bd65a..4a0da7a059745 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4185,6 +4185,15 @@ + + dfs.client.read.uri.cache.enabled + false + + If true, dfs client will use cache when creating URI based on host:port + to reduce the frequency of URI object creation. + + + dfs.client.read.short.circuit.replica.stale.threshold.ms 1800000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 63c6233d2336b..9d6e589cc8f91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -44,6 +44,7 @@ public void initializeMemberVariables() { HdfsClientConfigKeys.Failover.class, HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class, HdfsClientConfigKeys.BlockWrite.class, + HdfsClientConfigKeys.Read.class, HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class }; // Set error modes