diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index af0b5867e744..cc88ee577d5d 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -256,6 +256,7 @@ def get_thread_count(self): # Set the environment variables used by the raylet and worker monkeypatch.setenv("RAY_worker_num_grpc_internal_threads", "1") monkeypatch.setenv("RAY_num_server_call_thread", "1") + monkeypatch.setenv("RAY_core_worker_num_server_call_thread", "1") # TODO(#55215): The for loop and the 'assert ... in {..,..}' complicates this # test unnecessarily. We should only need to call the assert after diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index ec4f61f2a969..bcabe0ee88a5 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -898,11 +898,18 @@ RAY_CONFIG(int64_t, health_check_timeout_ms, 10000) /// The threshold to consider a node dead. RAY_CONFIG(int64_t, health_check_failure_threshold, 5) -/// The pool size for grpc server call. +/// Thread pool size for sending replies in grpc server (system components: raylet, GCS). RAY_CONFIG(int64_t, num_server_call_thread, std::max((int64_t)1, (int64_t)(std::thread::hardware_concurrency() / 4U))) +/// Thread pool size for sending replies in grpc server (CoreWorkers). +/// https://github.com/ray-project/ray/issues/58351 shows the +/// reply path is light enough that 2 threads is sufficient. +RAY_CONFIG(int64_t, + core_worker_num_server_call_thread, + std::thread::hardware_concurrency() >= 8 ? 2 : 1); + /// Use madvise to prevent worker/raylet coredumps from including /// the mapped plasma pages. RAY_CONFIG(bool, worker_core_dump_exclude_plasma_store, true) diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 0c8fd8e45d0c..24b668678b94 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -34,6 +34,7 @@ #include "ray/pubsub/subscriber.h" #include "ray/raylet_ipc_client/raylet_ipc_client.h" #include "ray/raylet_rpc_client/raylet_client.h" +#include "ray/rpc/server_call.h" #include "ray/stats/stats.h" #include "ray/stats/tag_defs.h" #include "ray/util/env.h" @@ -775,6 +776,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) // NOTE(kfstorm): any initialization depending on RayConfig must happen after this // line. InitializeSystemConfig(); + rpc::SetServerCallThreadPoolMode(rpc::ServerCallThreadPoolMode::CORE_WORKER); // Assume stats module will be initialized exactly once in once process. // So it must be called in CoreWorkerProcess constructor and will be reused diff --git a/src/ray/rpc/server_call.cc b/src/ray/rpc/server_call.cc index 7790750d9414..100108f633ba 100644 --- a/src/ray/rpc/server_call.cc +++ b/src/ray/rpc/server_call.cc @@ -14,6 +14,7 @@ #include "ray/rpc/server_call.h" +#include #include #include "ray/common/ray_config.h" @@ -22,9 +23,17 @@ namespace ray { namespace rpc { namespace { +std::atomic &ThreadPoolMode() { + static std::atomic mode( + ServerCallThreadPoolMode::SYSTEM_COMPONENT); + return mode; +} + std::unique_ptr &_GetServerCallExecutor() { static auto thread_pool = std::make_unique( - ::RayConfig::instance().num_server_call_thread()); + ThreadPoolMode().load() == ServerCallThreadPoolMode::CORE_WORKER + ? ::RayConfig::instance().core_worker_num_server_call_thread() + : ::RayConfig::instance().num_server_call_thread()); return thread_pool; } @@ -36,7 +45,13 @@ void DrainServerCallExecutor() { GetServerCallExecutor().join(); } void ResetServerCallExecutor() { _GetServerCallExecutor() = std::make_unique( - ::RayConfig::instance().num_server_call_thread()); + ThreadPoolMode().load() == ServerCallThreadPoolMode::CORE_WORKER + ? ::RayConfig::instance().core_worker_num_server_call_thread() + : ::RayConfig::instance().num_server_call_thread()); +} + +void SetServerCallThreadPoolMode(ServerCallThreadPoolMode mode) { + ThreadPoolMode().store(mode); } } // namespace rpc diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 2da3ce00b064..1ce071f383e7 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -50,6 +50,8 @@ enum class ClusterIdAuthType { /// This pool is shared across gRPC servers. boost::asio::thread_pool &GetServerCallExecutor(); +enum class ServerCallThreadPoolMode { SYSTEM_COMPONENT = 0, CORE_WORKER = 1 }; + /// Drain the executor. void DrainServerCallExecutor(); @@ -59,6 +61,10 @@ void DrainServerCallExecutor(); /// because they are global. void ResetServerCallExecutor(); +/// Set which config this process uses for the global reply thread pool. +/// Call before the first GetServerCallExecutor(). +void SetServerCallThreadPoolMode(ServerCallThreadPoolMode mode); + /// Represents state of a `ServerCall`. enum class ServerCallState { /// The call is created and waiting for an incoming request.