Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.journalnode.edits.dir.perm";
public static final String DFS_JOURNAL_EDITS_DIR_PERMISSION_DEFAULT =
"700";
public static final String DFS_JOURNALNODE_HANDLER_COUNT_KEY =
"dfs.journalnode.handler.count";
public static final int DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT = 5;


public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.net.InetSocketAddress;
import java.net.URL;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_RPC_BIND_HOST_KEY;


Expand All @@ -63,9 +65,9 @@
public class JournalNodeRpcServer implements QJournalProtocol,
InterQJournalProtocol {
private static final Logger LOG = JournalNode.LOG;
private static final int HANDLER_COUNT = 5;
private final JournalNode jn;
private Server server;
private final int handlerCount;

JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
this.jn = jn;
Expand All @@ -90,13 +92,25 @@ public class JournalNodeRpcServer implements QJournalProtocol,
new QJournalProtocolServerSideTranslatorPB(this);
BlockingService service = QJournalProtocolService
.newReflectiveBlockingService(translator);
int confHandlerCount = conf.getInt(DFS_JOURNALNODE_HANDLER_COUNT_KEY,
DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
if (confHandlerCount <= 0) {
LOG.warn("Invalid value for: {} = {}, Should be > 0,"
+ " will use default value of: {}.",
DFS_JOURNALNODE_HANDLER_COUNT_KEY, confHandlerCount,
DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
confHandlerCount = DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT;
}
this.handlerCount = confHandlerCount;
LOG.info("The number of JournalNodeRpcServer handlers is {}.",
this.handlerCount);

this.server = new RPC.Builder(confCopy)
.setProtocol(QJournalProtocolPB.class)
.setInstance(service)
.setBindAddress(bindHost)
.setPort(addr.getPort())
.setNumHandlers(HANDLER_COUNT)
.setNumHandlers(this.handlerCount)
.setVerbose(false)
.build();

Expand All @@ -121,6 +135,11 @@ public class JournalNodeRpcServer implements QJournalProtocol,
this.server.setTracer(jn.tracer);
}

@VisibleForTesting
protected int getHandlerCount() {
return this.handlerCount;
}

void start() {
this.server.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6253,6 +6253,15 @@
</description>
</property>

<property>
<name>dfs.journalnode.handler.count</name>
<value>5</value>
<description>
The number of JournalNode RPC server threads that listen to
requests from clients.
</description>
</property>

<property>
<name>dfs.namenode.lease-hard-limit-sec</name>
<value>1200</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ public void setup() throws Exception {
"qjournal://journalnode0:9900;journalnode1:9901/" + journalId);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
"qjournal://journalnode0:9902;journalnode1:9903/" + journalId);
} else if (testName.getMethodName().equals("testConfAbnormalHandlerNumber")) {
conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY, -1);
} else if (testName.getMethodName().equals("testConfNormalHandlerNumber")) {
conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY, 10);
}
jn = new JournalNode();
jn.setConf(conf);
Expand Down Expand Up @@ -672,4 +676,26 @@ private void setupStaticHostResolution(int journalNodeCount,
}
}

@Test
public void testConfNormalHandlerNumber() {
int confHandlerNumber = jn.getConf().getInt(
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
assertTrue(confHandlerNumber > 0);
int handlerCount = jn.getRpcServer().getHandlerCount();
assertEquals(confHandlerNumber, handlerCount);
assertEquals(10, handlerCount);
}

@Test
public void testConfAbnormalHandlerNumber() {
int confHandlerCount = jn.getConf().getInt(
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT);
assertTrue(confHandlerCount <= 0);
int handlerCount = jn.getRpcServer().getHandlerCount();
assertEquals(
DFSConfigKeys.DFS_JOURNALNODE_HANDLER_COUNT_DEFAULT,
handlerCount);
}
}