Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY;
Expand Down Expand Up @@ -876,4 +878,10 @@ public void setConf(Configuration conf) {
this.conf = conf;
}

@VisibleForTesting
public boolean isAsync() {
return getRpcServer() != null ? getRpcServer().isAsync() :
getConfig() != null ? getConfig().getBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY,
DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT) : DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hfutatzhanghb Can this method be deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KeeProMise Nice advise, have deleted it.

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,13 @@
} else {
workQueue = new LinkedBlockingQueue<>();
}
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);

Check failure on line 202 in hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java#L202

blanks: end of line
if (router.isAsync()) {
this.executorService = null;
} else {
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my perspective, I think we can add an initExecutorService() method in RouterRpcClient, and then have RouterAsyncRpcClient inherit this method. In RouterRpcClient, we can initialize the executorService normally, but in RouterAsyncRpcClient, this method can be left as an empty implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// in RouterRpcClient
public void initExecutorService(Conf conf) {
     int numThreads = conf.getInt(
        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("RPC Router Client-%d")
        .build();
    BlockingQueue<Runnable> workQueue;
    if (conf.getBoolean(
        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
      workQueue = new ArrayBlockingQueue<>(numThreads);
    } else {
      workQueue = new LinkedBlockingQueue<>();
    }
    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
}

// in RouteAsyncRpcClient
public void initExecutorService(Conf conf) {
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KeeProMise Thanks very much for this nice suggestion. It looks better and i will optimize this PR base on your advise.


this.rpcMonitor = monitor;

Expand Down Expand Up @@ -364,9 +369,11 @@
*/
public String getAsyncCallerPoolJson() {
final Map<String, Integer> info = new LinkedHashMap<>();
info.put("active", executorService.getActiveCount());
info.put("total", executorService.getPoolSize());
info.put("max", executorService.getMaximumPoolSize());
if (executorService != null) {
info.put("active", executorService.getActiveCount());
info.put("total", executorService.getPoolSize());
info.put("max", executorService.getMaximumPoolSize());
}
return JSON.toString(info);
}

Expand Down Expand Up @@ -2027,7 +2034,6 @@
return isUnavailableException(ioe);
}


/**
* The {@link ExecutionStatus} class is a utility class used to track the status of
* execution operations performed by the {@link RouterRpcClient}.
Expand Down