Skip to content

Conversation

@eric-jj
Copy link
Collaborator

@eric-jj eric-jj commented May 16, 2018

Add lock_guard for pull_requests_, which may be modified in different threads.

Copy link

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@imzhenyu
Copy link
Collaborator

So the current io manager becomes multi-threading?

@eric-jj eric-jj closed this May 19, 2018
@eric-jj eric-jj deleted the fix_multi_thread branch June 4, 2018 10:53
songqing referenced this pull request in songqing/ray Jun 10, 2018
raulchen pushed a commit that referenced this pull request Mar 30, 2019
ray-project#4504)

* Fixes Inconsistent weight assignment operations in DQNPolicyGraph (ray-project#4502)

* Update dqn_policy_graph.py
WangTaoTheTonic pushed a commit that referenced this pull request Jul 27, 2022
We encountered SIGSEGV when running Python test `python/ray/tests/test_failure_2.py::test_list_named_actors_timeout`. The stack is:

```
#0  0x00007fffed30f393 in std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&) ()
   from /lib64/libstdc++.so.6
#1  0x00007fffee707649 in ray::RayLog::GetLoggerName() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#2  0x00007fffee70aa90 in ray::SpdLogMessage::Flush() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#3  0x00007fffee70af28 in ray::RayLog::~RayLog() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#4  0x00007fffee2b570d in ray::asio::testing::(anonymous namespace)::DelayManager::Init() [clone .constprop.0] ()
   from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#5  0x00007fffedd0d95a in _GLOBAL__sub_I_asio_chaos.cc () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so
#6  0x00007ffff7fe282a in call_init.part () from /lib64/ld-linux-x86-64.so.2
#7  0x00007ffff7fe2931 in _dl_init () from /lib64/ld-linux-x86-64.so.2
#8  0x00007ffff7fe674c in dl_open_worker () from /lib64/ld-linux-x86-64.so.2
#9  0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6
#10 0x00007ffff7fe5ffe in _dl_open () from /lib64/ld-linux-x86-64.so.2
#11 0x00007ffff7d5f39c in dlopen_doit () from /lib64/libdl.so.2
#12 0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6
#13 0x00007ffff7b82f13 in _dl_catch_error () from /lib64/libc.so.6
#14 0x00007ffff7d5fb09 in _dlerror_run () from /lib64/libdl.so.2
#15 0x00007ffff7d5f42a in dlopen@@GLIBC_2.2.5 () from /lib64/libdl.so.2
#16 0x00007fffef04d330 in py_dl_open (self=<optimized out>, args=<optimized out>)
    at /tmp/python-build.20220507135524.257789/Python-3.7.11/Modules/_ctypes/callproc.c:1369
```

The root cause is that when loading `_raylet.so`, `static DelayManager _delay_manager` is initialized and `RAY_LOG(ERROR) << "RAY_testing_asio_delay_us is set to " << delay_env;` is executed. However, the static variables declared in `logging.cc` are not initialized yet (in this case, `std::string RayLog::logger_name_ = "ray_log_sink"`).

It's better not to rely on the initialization order of static variables in different compilation units because it's not guaranteed. I propose to change all `RAY_LOG`s to `std::cerr` in `DelayManager::Init()`.

The crash happens in Ant's internal codebase. Not sure why this test case passes in the community version though.

BTW, I've tried different approaches:

1. Using a static local variable in `get_delay_us` and remove the global variable. This doesn't work because `init()` needs to access the variable as well.
2. Defining the global variable as type `std::unique_ptr<DelayManager>` and initialize it in `get_delay_us`. This works but it requires a lock to be thread-safe.
Chong-Li pushed a commit that referenced this pull request Mar 13, 2023
NKcqx pushed a commit that referenced this pull request Mar 17, 2024
SongGuyang pushed a commit that referenced this pull request May 16, 2024
github-actions bot pushed a commit that referenced this pull request Nov 18, 2025
…BRT: "corrupted size vs. prev_size") (ray-project#58660)

## Summary

This PR fixes a heap corruption bug that causes the driver to crash with
SIGABRT. The issue is caused by a use-after-free when the `RayletClient`
object is destroyed while an asynchronous RPC callback is still pending.

## Problem Description

### Scenario

A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter ->
map_batches -> write` running for 4+ hours, where workers use elastic
resources with low job priority causing frequent worker deaths due to
pod preemption, crashes the driver with SIGABRT:
```
corrupted size vs. prev_size
*** SIGABRT received at time=1761916578 on cpu 30 ***
PC: @ 0x7f073569d9fc (unknown) pthread_kill
Aborted (core dumped)
```



### Trigger Conditions

After reproducing with an ASan image, Asan reveals the actual
use-after-free at:
```
 #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628
    #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377
    #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338
    #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61
    #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111
    #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590
    #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293
    #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61
    #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111
    #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290
    #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590
    #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109
    #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30
    #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
2025-11-14 16:15:05,608	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112
    #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110
    #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60
    #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88
    #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111
    #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70
    #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40
    #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492
    #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210
    #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63
2025-11-14 16:15:05,742	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193
    #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120
    #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179
    #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442
    #38 0x7ff28b0a58bf  (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf)

0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38)
freed by thread T68 here:
2025-11-14 16:15:05,876	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172
    #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145
    #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496
    #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74
    #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538
    #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184
    #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705
    #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154
    #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122
    #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211
    #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168
    #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535
    #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421
    #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578
    #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50
    #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183
    #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114
    #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69
    #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85
    #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45
```

This is a **non-deterministic race condition** that occurs under the
following sequence:

1. Worker A's pod is preempted → Worker A dies
2. Objects on Worker A are lost
3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated
4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects
this → `Disconnect` is called
5. The `RayletClient` corresponding to Worker B on the driver is
destroyed
6. RPC callback executes and accesses the already-freed `RayletClient` →
use-after-free triggers crash

Whether the use-after-free occurs depends on the relative timing of
steps 5 and 6. In scenarios with frequent pod preemptions, object
recovery frequently triggers `PinObjectIDs`, making this race condition
more likely to occur.

### Root Cause

In `RayletClient::PinObjectIDs`, the RPC callback lambda directly
captured the raw `this` pointer:

```cpp
auto rpc_callback = [this, callback = std::move(callback)](...) {
    pins_in_flight_--;  // Accessing member via 'this' pointer
    ...
};
```

If the `RayletClient` object is destroyed before the async RPC callback
executes, the callback will access freed memory through the dangling
`this` pointer, leading to heap corruption and SIGABRT with the error
message "corrupted size vs. prev_size".

## Solution

The fix ensures that the `RayletClient` object remains alive during the
asynchronous callback execution by:

1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The
class already inherits from this base class (line 43 in
`raylet_client.h`), which enables safe shared pointer management.

2. **Capturing `shared_from_this()` in the lambda**: Instead of
capturing the raw `this` pointer, the callback now captures a
`shared_ptr` to the object. The `shared_from_this()` is called before
incrementing `pins_in_flight_` to ensure proper lifetime management:

```cpp
// Capture shared_from_this() before incrementing to ensure object lifetime
// is extended for the async callback, preventing use-after-free.
auto self = shared_from_this();
pins_in_flight_++;
auto rpc_callback = [self, callback = std::move(callback)](
                        Status status, rpc::PinObjectIDsReply &&reply) {
  self->pins_in_flight_--;
  callback(status, std::move(reply));
};
```

This ensures that the `RayletClient` object's lifetime is extended until
the callback completes, preventing the use-after-free bug. By capturing
the shared pointer before incrementing the counter, we also ensure that
if `shared_from_this()` were to fail (though it shouldn't in normal
usage), we don't leave the counter in an inconsistent state.

## Code Changes

- **File**: `src/ray/raylet_rpc_client/raylet_client.cc`
- **Method**: `RayletClient::PinObjectIDs`
- **Change**: Replace `this` capture with `shared_from_this()` capture
in the RPC callback lambda

Signed-off-by: dragongu <[email protected]>
Co-authored-by: gulonglong <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants