Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testRandomHealthAndDisconnects() throws Exception {
// Mockito errors if the HM calls the proxy in the middle of
// setting up the mock.
cluster.start();

long st = Time.now();
while (Time.now() - st < runFor) {
cluster.getTestContext().checkException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_EDITS_PLUGIN_PREFIX = "dfs.namenode.edits.journal-plugin";
public static final String DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY = "dfs.namenode.edits.dir.required";
public static final String DFS_NAMENODE_EDITS_DIR_DEFAULT = "file:///tmp/hadoop/dfs/name";

public static final String
DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED =
"dfs.namenode.edits.qjournals.resolution-enabled";
public static final boolean
DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED_DEFAULT = false;

public static final String
DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_RESOLVER_IMPL =
"dfs.namenode.edits.qjournals.resolver.impl";

public static final String DFS_METRICS_SESSION_ID_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public static Set<String> getJournalNodeAddresses(
" to append it with namenodeId");
URI uri = new URI(journalsUri);
List<InetSocketAddress> socketAddresses = Util.
getAddressesList(uri);
getAddressesList(uri, conf);
for (InetSocketAddress is : socketAddresses) {
journalNodeList.add(is.getHostName());
}
Expand All @@ -501,7 +501,7 @@ public static Set<String> getJournalNodeAddresses(
} else {
URI uri = new URI(journalsUri);
List<InetSocketAddress> socketAddresses = Util.
getAddressesList(uri);
getAddressesList(uri, conf);
for (InetSocketAddress is : socketAddresses) {
journalNodeList.add(is.getHostName());
}
Expand All @@ -512,7 +512,7 @@ public static Set<String> getJournalNodeAddresses(
return journalNodeList;
} else {
URI uri = new URI(journalsUri);
List<InetSocketAddress> socketAddresses = Util.getAddressesList(uri);
List<InetSocketAddress> socketAddresses = Util.getAddressesList(uri, conf);
for (InetSocketAddress is : socketAddresses) {
journalNodeList.add(is.getHostName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ static List<AsyncLogger> createLoggers(Configuration conf,
String nameServiceId)
throws IOException {
List<AsyncLogger> ret = Lists.newArrayList();
List<InetSocketAddress> addrs = Util.getAddressesList(uri);
List<InetSocketAddress> addrs = Util.getAddressesList(uri, conf);
if (addrs.size() % 2 == 0) {
LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
"of Journal Nodes specified. This is not recommended!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
IOException {
URI uri = new URI(uriStr);
return Util.getLoggerAddresses(uri,
Sets.newHashSet(jn.getBoundIpcAddress()));
Sets.newHashSet(jn.getBoundIpcAddress()), conf);
}

private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.DomainNameResolver;
import org.apache.hadoop.net.DomainNameResolverFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
Expand Down Expand Up @@ -361,7 +363,7 @@ private static MD5Hash parseMD5Header(HttpURLConnection connection) {
return (header != null) ? new MD5Hash(header) : null;
}

public static List<InetSocketAddress> getAddressesList(URI uri)
public static List<InetSocketAddress> getAddressesList(URI uri, Configuration conf)
throws IOException{
String authority = uri.getAuthority();
Preconditions.checkArgument(authority != null && !authority.isEmpty(),
Expand All @@ -372,21 +374,49 @@ public static List<InetSocketAddress> getAddressesList(URI uri)
parts[i] = parts[i].trim();
}

boolean resolveNeeded = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED,
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED_DEFAULT);
DomainNameResolver dnr = DomainNameResolverFactory.newInstance(
conf,
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_RESOLVER_IMPL);

List<InetSocketAddress> addrs = Lists.newArrayList();
for (String addr : parts) {
InetSocketAddress isa = NetUtils.createSocketAddr(
addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
if (isa.isUnresolved()) {
throw new UnknownHostException(addr);
if (resolveNeeded) {
LOG.info("Resolving journal address: " + addr);
InetSocketAddress isa = NetUtils.createSocketAddr(
addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
// Get multiple hostnames from domain name if needed,
// for example multiple hosts behind a DNS entry.
int port = isa.getPort();
// QJM should just use FQDN
String[] hostnames = dnr
.getAllResolvedHostnameByDomainName(isa.getHostName(), true);
if (hostnames.length == 0) {
throw new UnknownHostException(addr);
}
for (String h : hostnames) {
addrs.add(NetUtils.createSocketAddr(
h + ":" + port,
DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT)
);
}
} else {
InetSocketAddress isa = NetUtils.createSocketAddr(
addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
if (isa.isUnresolved()) {
throw new UnknownHostException(addr);
}
addrs.add(isa);
}
addrs.add(isa);
}
return addrs;
}

public static List<InetSocketAddress> getLoggerAddresses(URI uri,
Set<InetSocketAddress> addrsToExclude) throws IOException {
List<InetSocketAddress> addrsList = getAddressesList(uri);
Set<InetSocketAddress> addrsToExclude, Configuration conf) throws IOException {
List<InetSocketAddress> addrsList = getAddressesList(uri, conf);
addrsList.removeAll(addrsToExclude);
return addrsList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,25 @@
<value>org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager</value>
</property>

<property>
<name>dfs.namenode.edits.qjournals.resolution-enabled</name>
<value>false</value>
<description>
Determines if the given qjournals address is a domain name which needs to
be resolved.
This is used by namenode to resolve qjournals.
</description>
</property>

<property>
<name>dfs.namenode.edits.qjournals.resolver.impl</name>
<value></value>
<description>
Qjournals resolver implementation used by namenode.
Effective with dfs.namenode.edits.qjournals.resolution-enabled on.
</description>
</property>

<property>
<name>dfs.permissions.enabled</name>
<value>true</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ The order in which you set these configurations is unimportant, but the values y
<value>qjournal://node1.example.com:8485;node2.example.com:8485;node3.example.com:8485/mycluster</value>
</property>

You can also configure journal nodes by setting up dns round-robin record to avoid hardcoded names:

<property>
<name>dfs.namenode.edits.qjournals.resolution-enabled</name>
<value>true</value>
</property>

This will require you to configure multiple IPs behind one dns record on the host level, for example round robin DNS.

* **dfs.client.failover.proxy.provider.[nameservice ID]** - the Java class that HDFS clients use to contact the Active NameNode

Configure the name of the Java class which will be used by the DFS Client to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.net.MockDomainNameResolver;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1123,6 +1127,32 @@ public void testSelectViaRpcAfterJNRestart() throws Exception {
Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
}
}

@Test
public void testGetJournalAddressListWithResolution() throws Exception {
Configuration configuration = new Configuration();
configuration.setBoolean(
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED, true);
configuration.set(
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_RESOLVER_IMPL,
MockDomainNameResolver.class.getName());

URI uriWithDomain = URI.create("qjournal://"
+ MockDomainNameResolver.DOMAIN + ":1234" + "/testns");
List<InetSocketAddress> result = Util.getAddressesList(uriWithDomain, configuration);
assertEquals(2, result.size());
assertEquals(new InetSocketAddress(MockDomainNameResolver.FQDN_1, 1234), result.get(0));
assertEquals(new InetSocketAddress(MockDomainNameResolver.FQDN_2, 1234), result.get(1));

uriWithDomain = URI.create("qjournal://"
+ MockDomainNameResolver.UNKNOW_DOMAIN + ":1234" + "/testns");
try{
Util.getAddressesList(uriWithDomain, configuration);
fail("Should throw unknown host exception.");
} catch (UnknownHostException e) {
// expected
}
}

private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException {
Expand Down