Skip to content

Conversation

@Yicheng-Lu-llll
Copy link
Member

@Yicheng-Lu-llll Yicheng-Lu-llll commented Nov 19, 2025

Description

Previously, RAY_num_server_call_thread controlled the gRPC reply thread pool size for all processes (including CoreWorkers), and its default value was tied to the number of CPUs, which could oversubscribe threads in CoreWorkers on large instances. In this PR, we introduce RAY_core_worker_num_server_call_thread to separately control CoreWorkers, defaulting to min(2, max(1, num cpu/4)), and scope RAY_num_server_call_thread to system components (raylet, GCS, etc.) only.

This keeps per-worker reply pools tiny so we can run many workers on the same node without oversubscribing threads; the choice of “2” is based on the microbenchmarks in #58351.

Related issues

Closes #58351

Test

#!/usr/bin/env python3
import os
import subprocess
import sys


def get_thread_count(config_value):
    subprocess.run(["ray", "stop", "-f"], capture_output=True)
    
    env = os.environ.copy()
    if config_value is not None:
        env["RAY_core_worker_num_server_call_thread"] = str(config_value)
    
    test_code = """
import ray
import psutil
import os
import time

@ray.remote
def count_threads():
    return len(psutil.Process(os.getpid()).threads())

ray.init()

# Warm up once to make sure thread pools are instantiated.
ray.get(count_threads.remote())
time.sleep(1)

print(ray.get(count_threads.remote()))
"""
    
    result = subprocess.run(
        [sys.executable, "-c", test_code],
        env=env,
        capture_output=True,
        text=True,
        check=True
    )
    return int(result.stdout.strip())


if __name__ == "__main__":
    default_threads = get_thread_count(None)
    with_config_10 = get_thread_count(10)
    subprocess.run(["ray", "stop", "-f"], capture_output=True)
    
    print(f"Default (RAY_core_worker_num_server_call_thread=2): {default_threads} threads")
    print(f"With RAY_core_worker_num_server_call_thread=10: {with_config_10} threads")
#Default (RAY_core_worker_num_server_call_thread=2): 52 threads
#With RAY_core_worker_num_server_call_thread=10: 60 threads

By default, this setting creates two threads. After changing it to ten, we typically observe eight additional threads.
(Because of #55215, the exact count may differ, but in most cases the delta is three.)

@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the set_num_server_call_thread_for_core_worker_to_two branch from 0463c5a to 3d080da Compare November 19, 2025 04:36
@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the set_num_server_call_thread_for_core_worker_to_two branch from 3d080da to 5b3bec0 Compare November 19, 2025 06:08

/// The pool size for grpc server call.
/// The pool size for grpc server call for system components (raylet, GCS, etc.).
RAY_CONFIG(int64_t,
Copy link
Member Author

@Yicheng-Lu-llll Yicheng-Lu-llll Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change does alter the meaning of num_server_call_thread, but I believe that’s exactly what we want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's nix etc. There is no etc. after raylet and GCS, and this specifically doesn't effect workers. So let's not make it confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

@Yicheng-Lu-llll Yicheng-Lu-llll marked this pull request as ready for review November 19, 2025 22:48
@Yicheng-Lu-llll Yicheng-Lu-llll requested a review from a team as a code owner November 19, 2025 22:48
@Yicheng-Lu-llll
Copy link
Member Author

@edoakes Let me know if the current implementation looks good to you. Thank you!

@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the set_num_server_call_thread_for_core_worker_to_two branch from 4f277aa to 127c325 Compare November 19, 2025 23:16
@edoakes
Copy link
Collaborator

edoakes commented Nov 19, 2025

@ZacAttack PTAL

@ray-gardener ray-gardener bot added the core Issues that should be addressed in Ray Core label Nov 20, 2025
/// reply path is light enough that 2 threads is sufficient.
RAY_CONFIG(int64_t,
core_worker_num_server_call_thread,
std::min((int64_t)2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the min(2, max(1, env/4))..... This is a little verbose for a line that either resolves to 2 or 1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can simplify it to:

RAY_CONFIG(int64_t, core_worker_num_server_call_thread,
           std::thread::hardware_concurrency() >= 8 ? 2 : 1);

I'll add a tiny comment to avoid the threshold looking "magic". Let me know if this works.

Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One note here is that during the benchmarking, you measured 100kb. This is the max for a single object, but a task can have many returns, and the max size inline size for all obj's is actually 10mb, so a single PushTaskReply or PushTaskRequest can be up to 10mb.

// Max number bytes of inlined objects in a task rpc request/response.
RAY_CONFIG(int64_t, task_rpc_inlined_bytes_limit, 10 * 1024 * 1024)

None of our microbenchmarks really stress this either afaik and it's a little impractical. Tasks also have very high overhead on just launching so streaming generators are generally gonna be better for stressing this.

And also while we're here and reducing sender threads, there's one more interesting thing here - when receiving requests, we actually do the copy into the proto request object on the main io context thread, not the polling thread, so there's a lot more work that the polling thread(s) can do to lessen the burden on the io_context / remove bottlenecks. We should consider the # of req receiver threads and sender in conjunction with each other.
#55904

RAY_CONFIG(int64_t, health_check_failure_threshold, 5)

/// The pool size for grpc server call.
/// The pool size for grpc server call for system components (raylet, GCS, etc.).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this comment to mention that this is specifically for sending replies


/// Set which config this process uses for the global reply thread pool.
/// Call before the first GetServerCallExecutor().
void SetServerCallThreadPoolMode(ServerCallThreadPoolMode mode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needing to set this at the right time is a little weird.

I guess you could avoid it by passing in the type of server from ServerCallImpl as an arg or template param but that would mean a new template param or arg all the way down the stack and idk if we want that just for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it "helps" we already have this pattern for setting vars for GRPC elsewhere.... Though that is perhaps a weak justification.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Let me rethink this.

The reason I went with this approach is that I saw a similar pattern in the core worker code lol, so I followed that and added SetServerCallThreadPoolMode right after it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK — I don’t really see a better option here besides (1) “set before using” or (2) “pass the parameter all the way down the stack”.

Given the size of this change and the current setup, we already require InitializeSystemConfig() to be called before GetServerCallExecutor(). We also have service_handler_.WaitUntilInitialized(), so I think it’s quite safe to call SetServerCallThreadPoolMode during core worker init.

Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya that's ok for now, if we use this pattern more can think more about it

@Yicheng-Lu-llll
Copy link
Member Author

Yicheng-Lu-llll commented Nov 20, 2025

@dayshah Thank you for the detailed comment, this is super helpful!

Let me check that I fully understand the behavior:

  • By default, Ray treats the return value as a single object, so it must be < max_direct_call_object_size (100KB by default).

  • In contrast, if we use num_returns or streaming generators, multiple return objects can be inlined into the same RPC, and their sizes are accumulated up to task_rpc_inlined_bytes_limit (10MB):

    @ray.remote(num_returns=2)
    def f():
        return a, b
    
    x_ref, y_ref = f.remote()

For streaming generators, my current understanding is:
each yield produces one object that still needs to be < max_direct_call_object_size, and all yielded objects for the same task share the same PushTaskRequest/PushTaskReply, so the total inlined bytes are accumulated and capped by the 10MB task_rpc_inlined_bytes_limit. From the reply-thread perspective, we’re effectively sending one object’s payload at a time (each ≤ 100KB), but over the lifetime of the task the cumulative inline data on that RPC can grow up to ~10MB.

So using streaming essentially gives us high QPS, while each individual send still only handles a ≤100KB chunk at a time.

I can re-measure the cost of ServerCall::Finish(...) using a multi-return case like:

@ray.remote(num_returns=100)
def f():
    item = b"a" * (100 * 1024)
    return tuple(item for _ in range(100))

Please let me know if this works!

@dayshah
Copy link
Contributor

dayshah commented Nov 20, 2025

Please let me know if this works!

Yup num_returns=100 with 100kb objects would be worst case. Also want to mention that this probably isn't that practical, most users will only have num_returns=1...

The streaming generator thing actually doesn't matter here the more I thought of it, because each yield is actually a separate ReportGeneratorItemReturnsRequest so I guess it doesn't really apply here at all since request writing and request receiving aren't dependent on this threadpool.

@Yicheng-Lu-llll
Copy link
Member Author

@dayshah @edoakes I re-ran the Finish() sync-slice timing with the worst-case inline payload (100 returns × 100 KiB = 10 MiB total) :

268.272 µs
277.566 µs
280.853 µs
284.206 µs
284.644 µs

Using the method here, even a contrived 10k QPS burst with this worst-case return just needs ~2.x threads to avoid tail latency (roughly time × 10,000). Since async actors default to DEFAULT_MAX_CONCURRENCY_ASYNC = 1000 (which would need ≪1 thread), I’m inclined to stick with 2 threads.

Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice investigation!


/// Set which config this process uses for the global reply thread pool.
/// Call before the first GetServerCallExecutor().
void SetServerCallThreadPoolMode(ServerCallThreadPoolMode mode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya that's ok for now, if we use this pattern more can think more about it

std::unique_ptr<boost::asio::thread_pool> &_GetServerCallExecutor() {
static auto thread_pool = std::make_unique<boost::asio::thread_pool>(
::RayConfig::instance().num_server_call_thread());
ThreadPoolMode().load(std::memory_order_acquire) ==
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo the memory ordering is cool but adds unnecessary overhead when reading for a part that's not perf sensitive at all

@Yicheng-Lu-llll Yicheng-Lu-llll added the go add ONLY when ready to merge, run all tests label Nov 24, 2025
@edoakes edoakes merged commit 50703af into ray-project:master Nov 24, 2025
7 checks passed
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
…ect#58771)

Previously, `RAY_num_server_call_thread` controlled the gRPC reply
thread pool size for all processes (including CoreWorkers), and its
default value was tied to the number of CPUs, which could oversubscribe
threads in CoreWorkers on large instances. In this PR, we introduce
`RAY_core_worker_num_server_call_thread` to separately control
CoreWorkers, defaulting to `min(2, max(1, num cpu/4))`, and scope
`RAY_num_server_call_thread` to system components (raylet, GCS, etc.)
only.

This keeps per-worker reply pools tiny so we can run many workers on the
same node without oversubscribing threads; the choice of “2” is based on
the microbenchmarks in ray-project#58351.

## Related issues
Closes ray-project#58351

## Test

```python
#!/usr/bin/env python3
import os
import subprocess
import sys

def get_thread_count(config_value):
    subprocess.run(["ray", "stop", "-f"], capture_output=True)

    env = os.environ.copy()
    if config_value is not None:
        env["RAY_core_worker_num_server_call_thread"] = str(config_value)

    test_code = """
import ray
import psutil
import os
import time

@ray.remote
def count_threads():
    return len(psutil.Process(os.getpid()).threads())

ray.init()

# Warm up once to make sure thread pools are instantiated.
ray.get(count_threads.remote())
time.sleep(1)

print(ray.get(count_threads.remote()))
"""

    result = subprocess.run(
        [sys.executable, "-c", test_code],
        env=env,
        capture_output=True,
        text=True,
        check=True
    )
    return int(result.stdout.strip())

if __name__ == "__main__":
    default_threads = get_thread_count(None)
    with_config_10 = get_thread_count(10)
    subprocess.run(["ray", "stop", "-f"], capture_output=True)

    print(f"Default (RAY_core_worker_num_server_call_thread=2): {default_threads} threads")
    print(f"With RAY_core_worker_num_server_call_thread=10: {with_config_10} threads")

```
```shell
#Default (RAY_core_worker_num_server_call_thread=2): 52 threads
#With RAY_core_worker_num_server_call_thread=10: 60 threads
```
By default, this setting creates two threads. After changing it to ten,
we typically observe eight additional threads.
(Because of ray-project#55215, the exact count may differ, but in most cases the
delta is three.)

---------

Signed-off-by: yicheng <[email protected]>
Co-authored-by: yicheng <[email protected]>
Signed-off-by: YK <[email protected]>
sampan-s-nayak pushed a commit to sampan-s-nayak/ray that referenced this pull request Dec 1, 2025
Reverts:
- 0752886 [core] enable open telemetry by default (ray-project#56432)
- 50703af [core] Limit core worker gRPC reply threads to 2 by default (ray-project#58771)

Testing if these changes caused the aggregator-to-GCS performance regression.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Signed-off-by: sampan <[email protected]>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…ect#58771)

Previously, `RAY_num_server_call_thread` controlled the gRPC reply
thread pool size for all processes (including CoreWorkers), and its
default value was tied to the number of CPUs, which could oversubscribe
threads in CoreWorkers on large instances. In this PR, we introduce
`RAY_core_worker_num_server_call_thread` to separately control
CoreWorkers, defaulting to `min(2, max(1, num cpu/4))`, and scope
`RAY_num_server_call_thread` to system components (raylet, GCS, etc.)
only.

This keeps per-worker reply pools tiny so we can run many workers on the
same node without oversubscribing threads; the choice of “2” is based on
the microbenchmarks in ray-project#58351.

## Related issues
Closes ray-project#58351


## Test

```python
#!/usr/bin/env python3
import os
import subprocess
import sys


def get_thread_count(config_value):
    subprocess.run(["ray", "stop", "-f"], capture_output=True)
    
    env = os.environ.copy()
    if config_value is not None:
        env["RAY_core_worker_num_server_call_thread"] = str(config_value)
    
    test_code = """
import ray
import psutil
import os
import time

@ray.remote
def count_threads():
    return len(psutil.Process(os.getpid()).threads())

ray.init()

# Warm up once to make sure thread pools are instantiated.
ray.get(count_threads.remote())
time.sleep(1)

print(ray.get(count_threads.remote()))
"""
    
    result = subprocess.run(
        [sys.executable, "-c", test_code],
        env=env,
        capture_output=True,
        text=True,
        check=True
    )
    return int(result.stdout.strip())


if __name__ == "__main__":
    default_threads = get_thread_count(None)
    with_config_10 = get_thread_count(10)
    subprocess.run(["ray", "stop", "-f"], capture_output=True)
    
    print(f"Default (RAY_core_worker_num_server_call_thread=2): {default_threads} threads")
    print(f"With RAY_core_worker_num_server_call_thread=10: {with_config_10} threads")

```
```shell
#Default (RAY_core_worker_num_server_call_thread=2): 52 threads
#With RAY_core_worker_num_server_call_thread=10: 60 threads
```
By default, this setting creates two threads. After changing it to ten,
we typically observe eight additional threads.
(Because of ray-project#55215, the exact count may differ, but in most cases the
delta is three.)

---------

Signed-off-by: yicheng <[email protected]>
Co-authored-by: yicheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[core] Same actor/task has different number of threads in different environments

4 participants