Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ protected void serviceInit(Configuration configuration) throws Exception {
this.setRpcServerAddress(rpcServer.getRpcAddress());
}

checkRouterId();

if (conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
RBFConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) {
Expand Down Expand Up @@ -308,6 +310,21 @@ protected void serviceInit(Configuration configuration) throws Exception {
}
}

/**
* Set the router id if not set to prevent RouterHeartbeatService
* update state store with a null router id.
*/
private void checkRouterId() {
if (this.routerId == null) {
InetSocketAddress confRpcAddress = conf.getSocketAddr(
RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY,
RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_DEFAULT,
RBFConfigKeys.DFS_ROUTER_RPC_PORT_DEFAULT);
setRpcServerAddress(confRpcAddress);
}
}

private String getDisabledDependentServices() {
if (this.stateStore == null && this.adminServer == null) {
return StateStoreService.class.getSimpleName() + ","
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,14 @@ public void testRouterMetricsWhenDisabled() throws Exception {

@Test
public void testSwitchRouter() throws IOException {
assertRouterHeartbeater(true, true);
assertRouterHeartbeater(true, false);
assertRouterHeartbeater(false, true);
assertRouterHeartbeater(false, false);
assertRouterHeartbeater(true, true, true);
assertRouterHeartbeater(true, true, false);
assertRouterHeartbeater(true, false, true);
assertRouterHeartbeater(true, false, false);
assertRouterHeartbeater(false, true, true);
assertRouterHeartbeater(false, true, false);
assertRouterHeartbeater(false, false, true);
assertRouterHeartbeater(false, false, false);
}

/**
Expand All @@ -235,15 +239,19 @@ public void testSwitchRouter() throws IOException {
* @param expectedRouterHeartbeat expect the routerHeartbeat enable state.
* @param expectedNNHeartbeat expect the nnHeartbeat enable state.
*/
private void assertRouterHeartbeater(boolean expectedRouterHeartbeat,
private void assertRouterHeartbeater(boolean enableRpcServer, boolean expectedRouterHeartbeat,
boolean expectedNNHeartbeat) throws IOException {
final Router router = new Router();
Configuration baseCfg = new RouterConfigBuilder(conf).rpc().build();
Configuration baseCfg = new RouterConfigBuilder(conf).rpc(enableRpcServer).build();
baseCfg.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
expectedRouterHeartbeat);
baseCfg.setBoolean(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_ENABLE,
expectedNNHeartbeat);
router.init(baseCfg);

// RouterId can not be null , used by RouterHeartbeatService.updateStateStore()
assertNotNull(router.getRouterId());

RouterHeartbeatService routerHeartbeatService =
router.getRouterHeartbeatService();
if (expectedRouterHeartbeat) {
Expand Down