Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ synchronized Journal getOrCreateJournal(String jid,
return journal;
}

@VisibleForTesting
public JournalNodeSyncer getJournalSyncer(String jid) {
return journalSyncersById.get(jid);
}

@VisibleForTesting
public boolean getJournalSyncerStatus(String jid) {
if (journalSyncersById.get(jid) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* A Journal Sync thread runs through the lifetime of the JN. It periodically
Expand Down Expand Up @@ -153,6 +155,9 @@ private boolean getOtherJournalNodeProxies() {
LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
}
}
// Check if there are any other JournalNodes before starting the sync. Although some proxies
// may be unresolved now, the act of attempting to sync will instigate resolution when the
// servers become available.
if (otherJNProxies.isEmpty()) {
LOG.error("Cannot sync as there is no other JN available for sync.");
return false;
Expand Down Expand Up @@ -310,12 +315,24 @@ private List<InetSocketAddress> getOtherJournalNodeAddrs() {
return null;
}

private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
@VisibleForTesting
protected List<InetSocketAddress> getJournalAddrList(String uriStr) throws
URISyntaxException,
IOException {
URI uri = new URI(uriStr);
return Util.getLoggerAddresses(uri,
Sets.newHashSet(jn.getBoundIpcAddress()));

InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress();
Set<InetSocketAddress> excluded = Sets.newHashSet(boundIpcAddress);
List<InetSocketAddress> addrList = Util.getLoggerAddresses(uri, excluded);

// Exclude the current JournalNode instance (a local address and the same port). If the address
// is bound to a local address on the same port, then remove it to handle scenarios where a
// wildcard address (e.g. "0.0.0.0") is used. We can't simply exclude all local addresses
// since we may be running multiple servers on the same host.
addrList.removeIf(addr -> !addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()
&& boundIpcAddress.getPort() == addr.getPort());

return addrList;
}

private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;

import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,6 +37,7 @@
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
.getLogFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;

import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -96,12 +99,45 @@ public void shutDownMiniCluster() throws IOException {
}
}

/**
* Test that the "self exclusion" works when there are multiple JournalNode instances running on
* the same server, but on different ports.
*/
@Test
public void testJournalNodeExcludesSelfMultilpePorts() throws URISyntaxException, IOException {
String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");

// Test: Get the Journal address list for the default configuration
List<InetSocketAddress> addrList = syncer.getJournalAddrList(uri);

// Verify: One of the addresses should be excluded so that the node isn't syncing with itself
assertEquals(2, addrList.size());
}

/**
* Test that the "self exclusion" works when there a host uses a wildcard address.
*/
@Test
public void testJournalNodeExcludesSelfWildCard() throws URISyntaxException, IOException {
String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");

// Test: Request the same Journal address list, but using the IPv4 "0.0.0.0" which is commonly
// used as a bind host.
String boundHostUri = uri.replaceAll("127.0.0.1", "0.0.0.0");
List<InetSocketAddress> boundHostAddrList = syncer.getJournalAddrList(boundHostUri);

// Verify: One of the address should be excluded so that the node isn't syncing with itself
assertEquals(2, boundHostAddrList.size());
}

@Test(timeout=30000)
public void testJournalNodeSync() throws Exception {

//As by default 3 journal nodes are started;
for(int i=0; i<3; i++) {
Assert.assertEquals(true,
assertEquals(true,
jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
}

Expand Down Expand Up @@ -386,13 +422,13 @@ public void testSyncDuringRollingUpgrade() throws Exception {
HdfsConstants.RollingUpgradeAction.PREPARE);

//query rolling upgrade
Assert.assertEquals(info, dfsActive.rollingUpgrade(
assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));

// Restart the Standby NN with rollingUpgrade option
dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started");
Assert.assertEquals(info, dfsActive.rollingUpgrade(
assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));

// Do some edits and delete some edit logs
Expand Down Expand Up @@ -420,7 +456,7 @@ public void testSyncDuringRollingUpgrade() throws Exception {
// Restart the current standby NN (previously active)
dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started");
Assert.assertEquals(info, dfsActive.rollingUpgrade(
assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));
dfsCluster.waitActive();

Expand Down