Skip to content

Commit 0463c5a

Browse files
author
yicheng
committed
set num server call thread for core worker to two
Signed-off-by: yicheng <[email protected]>
1 parent d1cce8c commit 0463c5a

File tree

4 files changed

+36
-3
lines changed

4 files changed

+36
-3
lines changed

src/ray/common/ray_config_def.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -898,11 +898,19 @@ RAY_CONFIG(int64_t, health_check_timeout_ms, 10000)
898898
/// The threshold to consider a node dead.
899899
RAY_CONFIG(int64_t, health_check_failure_threshold, 5)
900900

901-
/// The pool size for grpc server call.
901+
/// The pool size for grpc server call for system components (raylet, GCS, etc.). All
902+
/// processes that run a CoreWorker (workers, drivers, io workers) have a dedicated config.
902903
RAY_CONFIG(int64_t,
903904
num_server_call_thread,
904905
std::max((int64_t)1, (int64_t)(std::thread::hardware_concurrency() / 4U)))
905906

907+
/// The pool size for grpc server call inside CoreWorker processes (workers, drivers, io
908+
/// workers). https://github.com/ray-project/ray/issues/58351 shows the
909+
/// reply path is light enough that 2 threads is sufficient.
910+
RAY_CONFIG(int64_t,
911+
core_worker_num_server_call_thread,
912+
std::min<int64_t>(2, (int64_t)std::thread::hardware_concurrency()))
913+
906914
/// Use madvise to prevent worker/raylet coredumps from including
907915
/// the mapped plasma pages.
908916
RAY_CONFIG(bool, worker_core_dump_exclude_plasma_store, true)

src/ray/core_worker/core_worker_process.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "ray/core_worker/core_worker_process.h"
1616

1717
#include <chrono>
18+
#include <cstdlib>
1819
#include <memory>
1920
#include <string>
2021
#include <thread>
@@ -34,6 +35,7 @@
3435
#include "ray/pubsub/subscriber.h"
3536
#include "ray/raylet_ipc_client/raylet_ipc_client.h"
3637
#include "ray/raylet_rpc_client/raylet_client.h"
38+
#include "ray/rpc/server_call.h"
3739
#include "ray/stats/stats.h"
3840
#include "ray/stats/tag_defs.h"
3941
#include "ray/util/env.h"
@@ -775,6 +777,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
775777
// NOTE(kfstorm): any initialization depending on RayConfig must happen after this
776778
// line.
777779
InitializeSystemConfig();
780+
rpc::SetServerCallThreadPoolMode(rpc::ServerCallThreadPoolMode::CORE_WORKER);
778781

779782
// Assume stats module will be initialized exactly once in once process.
780783
// So it must be called in CoreWorkerProcess constructor and will be reused

src/ray/rpc/server_call.cc

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "ray/rpc/server_call.h"
1616

17+
#include <atomic>
1718
#include <memory>
1819

1920
#include "ray/common/ray_config.h"
@@ -22,9 +23,17 @@ namespace ray {
2223
namespace rpc {
2324
namespace {
2425

26+
std::atomic<ServerCallThreadPoolMode> &ThreadPoolMode() {
27+
static std::atomic<ServerCallThreadPoolMode> mode(ServerCallThreadPoolMode::SYSTEM_COMPONENT);
28+
return mode;
29+
}
30+
2531
std::unique_ptr<boost::asio::thread_pool> &_GetServerCallExecutor() {
2632
static auto thread_pool = std::make_unique<boost::asio::thread_pool>(
27-
::RayConfig::instance().num_server_call_thread());
33+
ThreadPoolMode().load(std::memory_order_acquire) ==
34+
ServerCallThreadPoolMode::CORE_WORKER
35+
? ::RayConfig::instance().core_worker_num_server_call_thread()
36+
: ::RayConfig::instance().num_server_call_thread());
2837
return thread_pool;
2938
}
3039

@@ -36,7 +45,14 @@ void DrainServerCallExecutor() { GetServerCallExecutor().join(); }
3645

3746
void ResetServerCallExecutor() {
3847
_GetServerCallExecutor() = std::make_unique<boost::asio::thread_pool>(
39-
::RayConfig::instance().num_server_call_thread());
48+
ThreadPoolMode().load(std::memory_order_acquire) ==
49+
ServerCallThreadPoolMode::CORE_WORKER
50+
? ::RayConfig::instance().core_worker_num_server_call_thread()
51+
: ::RayConfig::instance().num_server_call_thread());
52+
}
53+
54+
void SetServerCallThreadPoolMode(ServerCallThreadPoolMode mode) {
55+
ThreadPoolMode().store(mode, std::memory_order_release);
4056
}
4157

4258
} // namespace rpc

src/ray/rpc/server_call.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ enum class ClusterIdAuthType {
5050
/// This pool is shared across gRPC servers.
5151
boost::asio::thread_pool &GetServerCallExecutor();
5252

53+
enum class ServerCallThreadPoolMode { SYSTEM_COMPONENT = 0, CORE_WORKER = 1 };
54+
5355
/// Drain the executor.
5456
void DrainServerCallExecutor();
5557

@@ -59,6 +61,10 @@ void DrainServerCallExecutor();
5961
/// because they are global.
6062
void ResetServerCallExecutor();
6163

64+
/// Set the executor mode for the current process. This should be called before any RPC
65+
/// replies are processed so the thread pool is created with the correct size.
66+
void SetServerCallThreadPoolMode(ServerCallThreadPoolMode mode);
67+
6268
/// Represents state of a `ServerCall`.
6369
enum class ServerCallState {
6470
/// The call is created and waiting for an incoming request.

0 commit comments

Comments
 (0)