Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def get_thread_count(self):
# Set the environment variables used by the raylet and worker
monkeypatch.setenv("RAY_worker_num_grpc_internal_threads", "1")
monkeypatch.setenv("RAY_num_server_call_thread", "1")
monkeypatch.setenv("RAY_core_worker_num_server_call_thread", "1")

# TODO(#55215): The for loop and the 'assert ... in {..,..}' complicates this
# test unnecessarily. We should only need to call the assert after
Expand Down
9 changes: 8 additions & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -898,11 +898,18 @@ RAY_CONFIG(int64_t, health_check_timeout_ms, 10000)
/// The threshold to consider a node dead.
RAY_CONFIG(int64_t, health_check_failure_threshold, 5)

/// The pool size for grpc server call.
/// Thread pool size for sending replies in grpc server (system components: raylet, GCS).
RAY_CONFIG(int64_t,
Copy link
Member Author

@Yicheng-Lu-llll Yicheng-Lu-llll Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change does alter the meaning of num_server_call_thread, but I believe that’s exactly what we want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's nix etc. There is no etc. after raylet and GCS, and this specifically doesn't effect workers. So let's not make it confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

num_server_call_thread,
std::max((int64_t)1, (int64_t)(std::thread::hardware_concurrency() / 4U)))

/// Thread pool size for sending replies in grpc server (CoreWorkers).
/// https://github.com/ray-project/ray/issues/58351 shows the
/// reply path is light enough that 2 threads is sufficient.
RAY_CONFIG(int64_t,
core_worker_num_server_call_thread,
std::thread::hardware_concurrency() >= 8 ? 2 : 1);

/// Use madvise to prevent worker/raylet coredumps from including
/// the mapped plasma pages.
RAY_CONFIG(bool, worker_core_dump_exclude_plasma_store, true)
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "ray/pubsub/subscriber.h"
#include "ray/raylet_ipc_client/raylet_ipc_client.h"
#include "ray/raylet_rpc_client/raylet_client.h"
#include "ray/rpc/server_call.h"
#include "ray/stats/stats.h"
#include "ray/stats/tag_defs.h"
#include "ray/util/env.h"
Expand Down Expand Up @@ -775,6 +776,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
// NOTE(kfstorm): any initialization depending on RayConfig must happen after this
// line.
InitializeSystemConfig();
rpc::SetServerCallThreadPoolMode(rpc::ServerCallThreadPoolMode::CORE_WORKER);

// Assume stats module will be initialized exactly once in once process.
// So it must be called in CoreWorkerProcess constructor and will be reused
Expand Down
21 changes: 19 additions & 2 deletions src/ray/rpc/server_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "ray/rpc/server_call.h"

#include <atomic>
#include <memory>

#include "ray/common/ray_config.h"
Expand All @@ -22,9 +23,18 @@ namespace ray {
namespace rpc {
namespace {

std::atomic<ServerCallThreadPoolMode> &ThreadPoolMode() {
static std::atomic<ServerCallThreadPoolMode> mode(
ServerCallThreadPoolMode::SYSTEM_COMPONENT);
return mode;
}

std::unique_ptr<boost::asio::thread_pool> &_GetServerCallExecutor() {
static auto thread_pool = std::make_unique<boost::asio::thread_pool>(
::RayConfig::instance().num_server_call_thread());
ThreadPoolMode().load(std::memory_order_acquire) ==
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo the memory ordering is cool but adds unnecessary overhead when reading for a part that's not perf sensitive at all

ServerCallThreadPoolMode::CORE_WORKER
? ::RayConfig::instance().core_worker_num_server_call_thread()
: ::RayConfig::instance().num_server_call_thread());
return thread_pool;
}

Expand All @@ -36,7 +46,14 @@ void DrainServerCallExecutor() { GetServerCallExecutor().join(); }

void ResetServerCallExecutor() {
_GetServerCallExecutor() = std::make_unique<boost::asio::thread_pool>(
::RayConfig::instance().num_server_call_thread());
ThreadPoolMode().load(std::memory_order_acquire) ==
ServerCallThreadPoolMode::CORE_WORKER
? ::RayConfig::instance().core_worker_num_server_call_thread()
: ::RayConfig::instance().num_server_call_thread());
}

void SetServerCallThreadPoolMode(ServerCallThreadPoolMode mode) {
ThreadPoolMode().store(mode, std::memory_order_release);
}

} // namespace rpc
Expand Down
6 changes: 6 additions & 0 deletions src/ray/rpc/server_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ enum class ClusterIdAuthType {
/// This pool is shared across gRPC servers.
boost::asio::thread_pool &GetServerCallExecutor();

enum class ServerCallThreadPoolMode { SYSTEM_COMPONENT = 0, CORE_WORKER = 1 };

/// Drain the executor.
void DrainServerCallExecutor();

Expand All @@ -59,6 +61,10 @@ void DrainServerCallExecutor();
/// because they are global.
void ResetServerCallExecutor();

/// Set which config this process uses for the global reply thread pool.
/// Call before the first GetServerCallExecutor().
void SetServerCallThreadPoolMode(ServerCallThreadPoolMode mode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needing to set this at the right time is a little weird.

I guess you could avoid it by passing in the type of server from ServerCallImpl as an arg or template param but that would mean a new template param or arg all the way down the stack and idk if we want that just for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it "helps" we already have this pattern for setting vars for GRPC elsewhere.... Though that is perhaps a weak justification.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Let me rethink this.

The reason I went with this approach is that I saw a similar pattern in the core worker code lol, so I followed that and added SetServerCallThreadPoolMode right after it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK — I don’t really see a better option here besides (1) “set before using” or (2) “pass the parameter all the way down the stack”.

Given the size of this change and the current setup, we already require InitializeSystemConfig() to be called before GetServerCallExecutor(). We also have service_handler_.WaitUntilInitialized(), so I think it’s quite safe to call SetServerCallThreadPoolMode during core worker init.

Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya that's ok for now, if we use this pattern more can think more about it


/// Represents state of a `ServerCall`.
enum class ServerCallState {
/// The call is created and waiting for an incoming request.
Expand Down