Skip to content

Conversation

@caopeng428
Copy link
Collaborator

  • implement the interface for reading configuration files with capability of rDSN
  • implement the logging of primitive types and some container types in std
  • include the downloading and building of gtest in the third party script

@caopeng428 caopeng428 requested a review from imzhenyu March 27, 2018 02:12
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may put this inside the root cmakefile of common runtime, so that later on when the path of the thirdparty libraries are changed, we only need to change one place

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can omit this as inside cmakefile projects, the link path is automatically resolved

@caopeng428 caopeng428 force-pushed the dev_cp_config_rdsn_provider branch from fdf9c8f to 74b4051 Compare March 27, 2018 09:08
(including interface and dummy implementation for logging and configuration)
@caopeng428 caopeng428 force-pushed the dev_cp_config_rdsn_provider branch from 71a67f5 to 9ead45f Compare March 27, 2018 09:12
@caopeng428 caopeng428 merged this pull request into dev_sys Mar 27, 2018
micafan pushed a commit that referenced this pull request Nov 21, 2019
* Precede ray.get with ray.wait.

* Trigger checkpoint deletes locally in Trainable

* Clean-up code.

* Minor changes.

* Track best checkpoint so far again

* Pulled checkpoint GC out of Trainable.

* Added comments, error logging.

* Immediate pull after checkpoint taken; rsync source delete on pull

* Minor doc fixes

* Fix checkpoint manager bug

* Fix bugs, tests, formatting

* Fix bugs, feature flag for force sync.

* Fix test.

* Fix minor bugs: clear proc and less verbose sync_on_checkpoint warnings.

* Fix bug: update IP of last_result.

* Fixed message.

* Added a lot of logging.

* Changes to ray trial executor.

* More bug fixes (logging after failure), better logging.

* Fix richards bug and logging

* Add comments.

* try-except

* Fix heapq bug.

* .

* Move handling of no available trials to ray_trial_executor (#1)

* Fix formatting bug, lint.

* Addressed Richard's comments

* Revert tests.

* fix rebase

* Fix trial location reporting.

* Fix test

* Fix lint

* Rebase, use ray.get w/ timeout, lint.

* lint

* fix rebase

* Address richard's comments
ffbin pushed a commit that referenced this pull request Nov 28, 2020
ffbin pushed a commit that referenced this pull request Dec 27, 2020
WangTaoTheTonic pushed a commit that referenced this pull request Dec 29, 2020
…unicator caching (ray-project#12935)

* other collectives all work

* auto-linting

* mannual linting #1

* mannual linting 2

* bugfix

* add send/recv point-to-point calls

* add some initial code for communicator caching

* auto linting

* optimize imports

* minor fix

* fix unpassed tests

* support more dtypes

* rerun some distributed tests for send/recv

* linting
WangTaoTheTonic pushed a commit that referenced this pull request Jul 27, 2022
We encountered SIGSEGV when running Python test `python/ray/tests/test_failure_2.py::test_list_named_actors_timeout`. The stack is:

```
#0  0x00007fffed30f393 in std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&) ()
   from /lib64/libstdc++.so.6
#1  0x00007fffee707649 in ray::RayLog::GetLoggerName() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#2  0x00007fffee70aa90 in ray::SpdLogMessage::Flush() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#3  0x00007fffee70af28 in ray::RayLog::~RayLog() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#4  0x00007fffee2b570d in ray::asio::testing::(anonymous namespace)::DelayManager::Init() [clone .constprop.0] ()
   from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#5  0x00007fffedd0d95a in _GLOBAL__sub_I_asio_chaos.cc () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#6  0x00007ffff7fe282a in call_init.part () from /lib64/ld-linux-x86-64.so.2
#7  0x00007ffff7fe2931 in _dl_init () from /lib64/ld-linux-x86-64.so.2
#8  0x00007ffff7fe674c in dl_open_worker () from /lib64/ld-linux-x86-64.so.2
#9  0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6
#10 0x00007ffff7fe5ffe in _dl_open () from /lib64/ld-linux-x86-64.so.2
#11 0x00007ffff7d5f39c in dlopen_doit () from /lib64/libdl.so.2
#12 0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6
#13 0x00007ffff7b82f13 in _dl_catch_error () from /lib64/libc.so.6
#14 0x00007ffff7d5fb09 in _dlerror_run () from /lib64/libdl.so.2
#15 0x00007ffff7d5f42a in dlopen@@GLIBC_2.2.5 () from /lib64/libdl.so.2
#16 0x00007fffef04d330 in py_dl_open (self=<optimized out>, args=<optimized out>)
    at /tmp/python-build.20220507135524.257789/Python-3.7.11/Modules/_ctypes/callproc.c:1369
```

The root cause is that when loading `_raylet.so`, `static DelayManager _delay_manager` is initialized and `RAY_LOG(ERROR) << "RAY_testing_asio_delay_us is set to " << delay_env;` is executed. However, the static variables declared in `logging.cc` are not initialized yet (in this case, `std::string RayLog::logger_name_ = "ray_log_sink"`).

It's better not to rely on the initialization order of static variables in different compilation units because it's not guaranteed. I propose to change all `RAY_LOG`s to `std::cerr` in `DelayManager::Init()`.

The crash happens in Ant's internal codebase. Not sure why this test case passes in the community version though.

BTW, I've tried different approaches:

1. Using a static local variable in `get_delay_us` and remove the global variable. This doesn't work because `init()` needs to access the variable as well.
2. Defining the global variable as type `std::unique_ptr<DelayManager>` and initialize it in `get_delay_us`. This works but it requires a lock to be thread-safe.
NKcqx pushed a commit that referenced this pull request Mar 17, 2024
You need to define the following slack bot environment variables:

- SLACK_APP_TOKEN starts with "xapp-"
- SLACK_BOT_TOKEN starts with "xoxb-"

Once this gets more complex, the logic should live in the Ray Serve
Deployment and the slack bot should call the deployment via REST. Or we
structure it as a Serve application with two deployments (the main
logic, and the slack bot that calls the main logic).

---------

Signed-off-by: Philipp Moritz <[email protected]>
antfin-oss pushed a commit that referenced this pull request Sep 19, 2025
… condition (ray-project#55367)

## Why are these changes needed?

Workers crash with a fatal `RAY_CHECK` failure when the plasma store
connection is broken during shutdown, causing the following error:
```
RAY_CHECK failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe
```
Stacktrace:
```
core_worker.cc:720 C  Check failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe 
*** StackTrace Information ***
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x141789a) [0x7924dd2c689a] ray::operator<<()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x479) [0x7924dd2c9319] ray::RayLog::~RayLog()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x95cc8a) [0x7924dc80bc8a] ray::core::CoreWorker::CoreWorker()::{lambda()#13}::operator()()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager27MarkTaskReturnObjectsFailedERKNS_17TaskSpecificationENS_3rpc9ErrorTypeEPKNS5_12RayErrorInfoERKN4absl12lts_2023080213flat_hash_setINS_8ObjectIDENSB_13hash_internal4HashISD_EESt8equal_toISD_ESaISD_EEE+0x679) [0x7924dc868f29] ray::core::TaskManager::MarkTaskReturnObjectsFailed()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager15FailPendingTaskERKNS_6TaskIDENS_3rpc9ErrorTypeEPKNS_6StatusEPKNS5_12RayErrorInfoE+0x416) [0x7924dc86f186] ray::core::TaskManager::FailPendingTask()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x9a90e6) [0x7924dc8580e6] ray::core::NormalTaskSubmitter::RequestNewWorkerIfNeeded()::{lambda()#1}::operator()()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_23RequestWorkerLeaseReplyEE15OnReplyReceivedEv+0x68) [0x7924dc94aa48] ray::rpc::ClientCallImpl<>::OnReplyReceived()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7924dc79e285] std::_Function_handler<>::_M_invoke()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd9b4c8) [0x7924dcc4a4c8] EventTracker::RecordExecution()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd4648e) [0x7924dcbf548e] std::_Function_handler<>::_M_invoke()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd46906) [0x7924dcbf5906] boost::asio::detail::completion_handler<>::do_complete()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f417b) [0x7924dd2a317b] boost::asio::detail::scheduler::do_run_one()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f5af9) [0x7924dd2a4af9] boost::asio::detail::scheduler::run()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f6202) [0x7924dd2a5202] boost::asio::io_context::run()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker12RunIOServiceEv+0x91) [0x7924dc793a61] ray::core::CoreWorker::RunIOService()
/home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xcba0b0) [0x7924dcb690b0] thread_proxy
/lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7924dde71ac3]
/lib/x86_64-linux-gnu/libc.so.6(+0x126850) [0x7924ddf03850]
```

Stack trace flow:
1. Task lease request fails ->
`NormalTaskSubmitter::RequestNewWorkerIfNeeded()` callback.
2. Triggers `TaskManager::FailPendingTask()` ->
`MarkTaskReturnObjectsFailed()`.
3. System attempts to store error objects in plasma via
`put_in_local_plasma_callback_`.
4. Plasma connection is broken (raylet/plasma store already shut down).
5. `RAY_CHECK_OK()` in the callback causes fatal crash instead of
graceful handling.

Root Cause:

This is a shutdown ordering race condition:
1. Raylet shuts down first: The raylet stops its IO context
([main_service_.stop()](https://github.com/ray-project/ray/blob/77c5475195e56a26891d88460973198391d20edf/src/ray/object_manager/plasma/store_runner.cc#L146))
which closes plasma store connections.
2. Worker still processes callbacks: Core worker continues processing
pending callbacks on separate threads.
3. Broken connection: When the callback tries to store error objects in
plasma, the connection is already closed.
4. Fatal crash: The `RAY_CHECK_OK()` treats this as an unexpected error
and crashes the process.

Fix:

1. Shutdown-aware plasma operations
- Add `CoreWorker::IsShuttingDown()` method to check shutdown state.
- Skip plasma operations entirely when shutdown is in progress.
- Prevents attempting operations on already-closed connections.

2. Targeted error handling for connection failures
- Replace blanket `RAY_CHECK_OK()` with specific error type checking.
- Handle connection errors (Broken pipe, Connection reset, Bad file
descriptor) as warnings during shutdown scenarios.
- Maintain `RAY_CHECK_OK()` for other error types to catch real issues.

---------

Signed-off-by: Sagar Sumit <[email protected]>
github-actions bot pushed a commit that referenced this pull request Nov 18, 2025
…BRT: "corrupted size vs. prev_size") (ray-project#58660)

## Summary

This PR fixes a heap corruption bug that causes the driver to crash with
SIGABRT. The issue is caused by a use-after-free when the `RayletClient`
object is destroyed while an asynchronous RPC callback is still pending.

## Problem Description

### Scenario

A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter ->
map_batches -> write` running for 4+ hours, where workers use elastic
resources with low job priority causing frequent worker deaths due to
pod preemption, crashes the driver with SIGABRT:
```
corrupted size vs. prev_size
*** SIGABRT received at time=1761916578 on cpu 30 ***
PC: @ 0x7f073569d9fc (unknown) pthread_kill
Aborted (core dumped)
```



### Trigger Conditions

After reproducing with an ASan image, Asan reveals the actual
use-after-free at:
```
 #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628
    #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377
    #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338
    #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61
    #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111
    #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590
    #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293
    #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61
    #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111
    #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290
    #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590
    #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109
    #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30
    #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
2025-11-14 16:15:05,608	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112
    #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110
    #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60
    #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88
    #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111
    #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70
    #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40
    #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492
    #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210
    #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63
2025-11-14 16:15:05,742	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193
    #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120
    #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179
    #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442
    #38 0x7ff28b0a58bf  (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf)

0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38)
freed by thread T68 here:
2025-11-14 16:15:05,876	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172
    #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145
    #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496
    #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74
    #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538
    #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184
    #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705
    #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154
    #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122
    #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211
    #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168
    #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535
    #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421
    #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578
    #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50
    #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183
    #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114
    #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69
    #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85
    #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45
```

This is a **non-deterministic race condition** that occurs under the
following sequence:

1. Worker A's pod is preempted → Worker A dies
2. Objects on Worker A are lost
3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated
4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects
this → `Disconnect` is called
5. The `RayletClient` corresponding to Worker B on the driver is
destroyed
6. RPC callback executes and accesses the already-freed `RayletClient` →
use-after-free triggers crash

Whether the use-after-free occurs depends on the relative timing of
steps 5 and 6. In scenarios with frequent pod preemptions, object
recovery frequently triggers `PinObjectIDs`, making this race condition
more likely to occur.

### Root Cause

In `RayletClient::PinObjectIDs`, the RPC callback lambda directly
captured the raw `this` pointer:

```cpp
auto rpc_callback = [this, callback = std::move(callback)](...) {
    pins_in_flight_--;  // Accessing member via 'this' pointer
    ...
};
```

If the `RayletClient` object is destroyed before the async RPC callback
executes, the callback will access freed memory through the dangling
`this` pointer, leading to heap corruption and SIGABRT with the error
message "corrupted size vs. prev_size".

## Solution

The fix ensures that the `RayletClient` object remains alive during the
asynchronous callback execution by:

1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The
class already inherits from this base class (line 43 in
`raylet_client.h`), which enables safe shared pointer management.

2. **Capturing `shared_from_this()` in the lambda**: Instead of
capturing the raw `this` pointer, the callback now captures a
`shared_ptr` to the object. The `shared_from_this()` is called before
incrementing `pins_in_flight_` to ensure proper lifetime management:

```cpp
// Capture shared_from_this() before incrementing to ensure object lifetime
// is extended for the async callback, preventing use-after-free.
auto self = shared_from_this();
pins_in_flight_++;
auto rpc_callback = [self, callback = std::move(callback)](
                        Status status, rpc::PinObjectIDsReply &&reply) {
  self->pins_in_flight_--;
  callback(status, std::move(reply));
};
```

This ensures that the `RayletClient` object's lifetime is extended until
the callback completes, preventing the use-after-free bug. By capturing
the shared pointer before incrementing the counter, we also ensure that
if `shared_from_this()` were to fail (though it shouldn't in normal
usage), we don't leave the counter in an inconsistent state.

## Code Changes

- **File**: `src/ray/raylet_rpc_client/raylet_client.cc`
- **Method**: `RayletClient::PinObjectIDs`
- **Change**: Replace `this` capture with `shared_from_this()` capture
in the RPC callback lambda

Signed-off-by: dragongu <[email protected]>
Co-authored-by: gulonglong <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants