-
Notifications
You must be signed in to change notification settings - Fork 0
[tune](deps): Bump tensorflow-probability from 0.11.1 to 0.12.1 in /python/requirements #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Dependabot tried to update this pull request, but something went wrong. We're looking into it, but in the meantime you can retry the update by commenting |
4 similar comments
|
Dependabot tried to update this pull request, but something went wrong. We're looking into it, but in the meantime you can retry the update by commenting |
|
Dependabot tried to update this pull request, but something went wrong. We're looking into it, but in the meantime you can retry the update by commenting |
|
Dependabot tried to update this pull request, but something went wrong. We're looking into it, but in the meantime you can retry the update by commenting |
|
Dependabot tried to update this pull request, but something went wrong. We're looking into it, but in the meantime you can retry the update by commenting |
Bumps [tensorflow-probability](https://github.com/tensorflow/probability) from 0.11.1 to 0.12.1. - [Release notes](https://github.com/tensorflow/probability/releases) - [Commits](tensorflow/probability@v0.11.1...v0.12.1) Signed-off-by: dependabot[bot] <[email protected]>
bfb10cb to
d832cfd
Compare
|
Superseded by #12. |
…om concurrent chunk receive - #2 (ray-project#19216)
… and `MultiAgentEnvs` (ray-project#21063)
…roject#22317) Improve observability for general objects and lineage reconstruction by adding a "Status" field to `ray memory`. The value of the field can be: ``` // The task is waiting for its dependencies to be created. WAITING_FOR_DEPENDENCIES = 1; // All dependencies have been created and the task is scheduled to execute. SCHEDULED = 2; // The task finished successfully. FINISHED = 3; ``` In addition, tasks that failed or that needed to be re-executed due to lineage reconstruction will have a field listing the attempt number. Example output: ``` IP Address | PID | Type | Call Site | Status | Size | Reference Type | Object Ref 192.168.4.22 | 279475 | Driver | (task call) ... | Attempt #2: FINISHED | 10000254.0 B | LOCAL_REFERENCE | c2668a65bda616c1ffffffffffffffffffffffff0100000001000000 ```
…ray-project#23821) This PR refactors `LazyBlockList` in service of out-of-band serialization (see [mono-PR](ray-project#22616)) and is a precursor to an execution plan refactor (PR #2) and adding the actual out-of-band serialization APIs (PR #3). The following is included in this refactor: 1. `ReadTask`s are now a first-class concept, replacing calls; 2. read stage progress tracking is consolidated into `LazyBlockList._get_blocks_with_metadta()` and more of the read task complexity, e.g. the read remote function, was pushed into `LazyBlockList` to make `ray.data.read_datasource()` simpler; 3. we are a bit smarter with how we progressively launch tasks and fetch and cache metadata, including fetching the metadata for read tasks in `.iter_blocks_with_metadata()` instead of relying on the pre-read task metadata (which will be less accurate), and we also fix some small bugs in the lazy ramp-up around progressive metadata fetching. (1) is the most important item for supporting out-of-band serialization and fundamentally changes the `LazyBlockList` data model. This is required since we need to be able to reference the underlying read tasks when rewriting read stages during optimization and when serializing the lineage of the Dataset. See the [mono-PR](ray-project#22616) for more context. Other changes: 1. Changed stats actor to a global named actor singleton in order to obviate the need for serializing the actor handle with the Dataset stats; without this, we were encountering serialization failures.
We encountered SIGSEGV when running Python test `python/ray/tests/test_failure_2.py::test_list_named_actors_timeout`. The stack is: ``` #0 0x00007fffed30f393 in std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&) () from /lib64/libstdc++.so.6 #1 0x00007fffee707649 in ray::RayLog::GetLoggerName() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #2 0x00007fffee70aa90 in ray::SpdLogMessage::Flush() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #3 0x00007fffee70af28 in ray::RayLog::~RayLog() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #4 0x00007fffee2b570d in ray::asio::testing::(anonymous namespace)::DelayManager::Init() [clone .constprop.0] () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #5 0x00007fffedd0d95a in _GLOBAL__sub_I_asio_chaos.cc () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #6 0x00007ffff7fe282a in call_init.part () from /lib64/ld-linux-x86-64.so.2 #7 0x00007ffff7fe2931 in _dl_init () from /lib64/ld-linux-x86-64.so.2 #8 0x00007ffff7fe674c in dl_open_worker () from /lib64/ld-linux-x86-64.so.2 #9 0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6 #10 0x00007ffff7fe5ffe in _dl_open () from /lib64/ld-linux-x86-64.so.2 #11 0x00007ffff7d5f39c in dlopen_doit () from /lib64/libdl.so.2 #12 0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6 #13 0x00007ffff7b82f13 in _dl_catch_error () from /lib64/libc.so.6 #14 0x00007ffff7d5fb09 in _dlerror_run () from /lib64/libdl.so.2 #15 0x00007ffff7d5f42a in dlopen@@GLIBC_2.2.5 () from /lib64/libdl.so.2 #16 0x00007fffef04d330 in py_dl_open (self=<optimized out>, args=<optimized out>) at /tmp/python-build.20220507135524.257789/Python-3.7.11/Modules/_ctypes/callproc.c:1369 ``` The root cause is that when loading `_raylet.so`, `static DelayManager _delay_manager` is initialized and `RAY_LOG(ERROR) << "RAY_testing_asio_delay_us is set to " << delay_env;` is executed. However, the static variables declared in `logging.cc` are not initialized yet (in this case, `std::string RayLog::logger_name_ = "ray_log_sink"`). It's better not to rely on the initialization order of static variables in different compilation units because it's not guaranteed. I propose to change all `RAY_LOG`s to `std::cerr` in `DelayManager::Init()`. The crash happens in Ant's internal codebase. Not sure why this test case passes in the community version though. BTW, I've tried different approaches: 1. Using a static local variable in `get_delay_us` and remove the global variable. This doesn't work because `init()` needs to access the variable as well. 2. Defining the global variable as type `std::unique_ptr<DelayManager>` and initialize it in `get_delay_us`. This works but it requires a lock to be thread-safe.
Signed-off-by: Edward Oakes <[email protected]>
Why are these changes needed? Right now the theory is as follow. pubsub io service is created and run inside the GcsServer. That means if pubsub io service is accessed after GCSServer GC'ed, it will segfault. Right now, upon teardown, when we call rpc::DrainAndResetExecutor, this will recreate the Executor thread pool. Upon teardown, If DrainAndResetExecutor -> GcsServer's internal pubsub posts new SendReply to the newly created threadpool -> GcsServer.reset -> pubsub io service GC'ed -> SendReply invoked from the newly created thread pool, it will segfault. NOTE: the segfault is from pubsub service if you see the failure #2 0x7f92034d9129 in ray::rpc::ServerCallImpl<ray::rpc::InternalPubSubGcsServiceHandler, ray::rpc::GcsSubscriberPollRequest, ray::rpc::GcsSubscriberPollReply>::HandleRequestImpl()::'lambda'(ray::Status, std::__1::function<void ()>, std::__1::function<void ()>)::operator()(ray::Status, std::__1::function<void ()>, std::__1::function<void ()>) const::'lambda'()::operator()() const /proc/self/cwd/bazel-out/k8-opt/bin/_virtual_includes/grpc_common_lib/ray/rpc/server_call.h:212:48 As a fix, I only drain the thread pool. And then reset it after all operations are fully cleaned up (only from tests). I think there's no need to reset for regular proc termination like raylet, gcs, core workers. Related issue number Closes ray-project#34344 Signed-off-by: SangBin Cho <[email protected]>
missing comma related to ray-project#48564 Had to try again because @can-anyscale suspects a rare ReadTheDocs failure and there's no easy way to kick off a rebuild from the browser. Signed-off-by: angelinalg <[email protected]>
…BRT: "corrupted size vs. prev_size") (ray-project#58660) ## Summary This PR fixes a heap corruption bug that causes the driver to crash with SIGABRT. The issue is caused by a use-after-free when the `RayletClient` object is destroyed while an asynchronous RPC callback is still pending. ## Problem Description ### Scenario A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter -> map_batches -> write` running for 4+ hours, where workers use elastic resources with low job priority causing frequent worker deaths due to pod preemption, crashes the driver with SIGABRT: ``` corrupted size vs. prev_size *** SIGABRT received at time=1761916578 on cpu 30 *** PC: @ 0x7f073569d9fc (unknown) pthread_kill Aborted (core dumped) ``` ### Trigger Conditions After reproducing with an ASan image, Asan reveals the actual use-after-free at: ``` #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628 #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377 #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338 #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61 #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111 #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590 #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293 #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61 #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111 #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290 #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590 #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109 #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30 #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 2025-11-14 16:15:05,608 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112 #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110 #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60 #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88 #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111 #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70 #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40 #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492 #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210 #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63 2025-11-14 16:15:05,742 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193 #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120 #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179 #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442 #38 0x7ff28b0a58bf (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf) 0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38) freed by thread T68 here: 2025-11-14 16:15:05,876 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172 #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145 #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496 #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74 #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538 #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184 #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705 #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154 #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122 #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211 #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168 #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535 #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421 #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578 #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50 #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183 #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114 #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69 #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85 #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45 ``` This is a **non-deterministic race condition** that occurs under the following sequence: 1. Worker A's pod is preempted → Worker A dies 2. Objects on Worker A are lost 3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated 4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects this → `Disconnect` is called 5. The `RayletClient` corresponding to Worker B on the driver is destroyed 6. RPC callback executes and accesses the already-freed `RayletClient` → use-after-free triggers crash Whether the use-after-free occurs depends on the relative timing of steps 5 and 6. In scenarios with frequent pod preemptions, object recovery frequently triggers `PinObjectIDs`, making this race condition more likely to occur. ### Root Cause In `RayletClient::PinObjectIDs`, the RPC callback lambda directly captured the raw `this` pointer: ```cpp auto rpc_callback = [this, callback = std::move(callback)](...) { pins_in_flight_--; // Accessing member via 'this' pointer ... }; ``` If the `RayletClient` object is destroyed before the async RPC callback executes, the callback will access freed memory through the dangling `this` pointer, leading to heap corruption and SIGABRT with the error message "corrupted size vs. prev_size". ## Solution The fix ensures that the `RayletClient` object remains alive during the asynchronous callback execution by: 1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The class already inherits from this base class (line 43 in `raylet_client.h`), which enables safe shared pointer management. 2. **Capturing `shared_from_this()` in the lambda**: Instead of capturing the raw `this` pointer, the callback now captures a `shared_ptr` to the object. The `shared_from_this()` is called before incrementing `pins_in_flight_` to ensure proper lifetime management: ```cpp // Capture shared_from_this() before incrementing to ensure object lifetime // is extended for the async callback, preventing use-after-free. auto self = shared_from_this(); pins_in_flight_++; auto rpc_callback = [self, callback = std::move(callback)]( Status status, rpc::PinObjectIDsReply &&reply) { self->pins_in_flight_--; callback(status, std::move(reply)); }; ``` This ensures that the `RayletClient` object's lifetime is extended until the callback completes, preventing the use-after-free bug. By capturing the shared pointer before incrementing the counter, we also ensure that if `shared_from_this()` were to fail (though it shouldn't in normal usage), we don't leave the counter in an inconsistent state. ## Code Changes - **File**: `src/ray/raylet_rpc_client/raylet_client.cc` - **Method**: `RayletClient::PinObjectIDs` - **Change**: Replace `this` capture with `shared_from_this()` capture in the RPC callback lambda Signed-off-by: dragongu <[email protected]> Co-authored-by: gulonglong <[email protected]>
…BRT: "corrupted size vs. prev_size") (ray-project#58660) ## Summary This PR fixes a heap corruption bug that causes the driver to crash with SIGABRT. The issue is caused by a use-after-free when the `RayletClient` object is destroyed while an asynchronous RPC callback is still pending. ## Problem Description ### Scenario A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter -> map_batches -> write` running for 4+ hours, where workers use elastic resources with low job priority causing frequent worker deaths due to pod preemption, crashes the driver with SIGABRT: ``` corrupted size vs. prev_size *** SIGABRT received at time=1761916578 on cpu 30 *** PC: @ 0x7f073569d9fc (unknown) pthread_kill Aborted (core dumped) ``` ### Trigger Conditions After reproducing with an ASan image, Asan reveals the actual use-after-free at: ``` #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628 #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377 #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338 #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61 #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111 #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590 #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293 #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61 #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111 #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290 #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590 #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109 #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30 #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 2025-11-14 16:15:05,608 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112 #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110 #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60 #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88 #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111 #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70 #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40 #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492 #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210 #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63 2025-11-14 16:15:05,742 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193 #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120 #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179 #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442 #38 0x7ff28b0a58bf (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf) 0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38) freed by thread T68 here: 2025-11-14 16:15:05,876 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172 #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145 #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496 #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74 #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538 #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184 #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705 #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154 #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122 #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211 #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168 #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535 #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421 #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578 #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50 #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183 #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114 #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69 #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85 #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45 ``` This is a **non-deterministic race condition** that occurs under the following sequence: 1. Worker A's pod is preempted → Worker A dies 2. Objects on Worker A are lost 3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated 4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects this → `Disconnect` is called 5. The `RayletClient` corresponding to Worker B on the driver is destroyed 6. RPC callback executes and accesses the already-freed `RayletClient` → use-after-free triggers crash Whether the use-after-free occurs depends on the relative timing of steps 5 and 6. In scenarios with frequent pod preemptions, object recovery frequently triggers `PinObjectIDs`, making this race condition more likely to occur. ### Root Cause In `RayletClient::PinObjectIDs`, the RPC callback lambda directly captured the raw `this` pointer: ```cpp auto rpc_callback = [this, callback = std::move(callback)](...) { pins_in_flight_--; // Accessing member via 'this' pointer ... }; ``` If the `RayletClient` object is destroyed before the async RPC callback executes, the callback will access freed memory through the dangling `this` pointer, leading to heap corruption and SIGABRT with the error message "corrupted size vs. prev_size". ## Solution The fix ensures that the `RayletClient` object remains alive during the asynchronous callback execution by: 1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The class already inherits from this base class (line 43 in `raylet_client.h`), which enables safe shared pointer management. 2. **Capturing `shared_from_this()` in the lambda**: Instead of capturing the raw `this` pointer, the callback now captures a `shared_ptr` to the object. The `shared_from_this()` is called before incrementing `pins_in_flight_` to ensure proper lifetime management: ```cpp // Capture shared_from_this() before incrementing to ensure object lifetime // is extended for the async callback, preventing use-after-free. auto self = shared_from_this(); pins_in_flight_++; auto rpc_callback = [self, callback = std::move(callback)]( Status status, rpc::PinObjectIDsReply &&reply) { self->pins_in_flight_--; callback(status, std::move(reply)); }; ``` This ensures that the `RayletClient` object's lifetime is extended until the callback completes, preventing the use-after-free bug. By capturing the shared pointer before incrementing the counter, we also ensure that if `shared_from_this()` were to fail (though it shouldn't in normal usage), we don't leave the counter in an inconsistent state. ## Code Changes - **File**: `src/ray/raylet_rpc_client/raylet_client.cc` - **Method**: `RayletClient::PinObjectIDs` - **Change**: Replace `this` capture with `shared_from_this()` capture in the RPC callback lambda Signed-off-by: dragongu <[email protected]> Co-authored-by: gulonglong <[email protected]>
Bumps tensorflow-probability from 0.11.1 to 0.12.1.
Release notes
Sourced from tensorflow-probability's releases.
... (truncated)
Commits
43a9d6cMerge pull request #1205 from jburnim/r0.12c1818a8Set the version for the TFP 0.12.1 release.0c282efAvoid cache errors when user-written Bijectors don't passparameters.dcd59edMerge pull request #1199 from jburnim/r0.1290fa3f0Set the version for the TFP 0.12.0 release.ead8501internal change1cc5c39Allow 0 concentration in gamma samplers.5f0dbecAdd float64 support to PHMC.d3bf5a0Rewrite experimental sample_chain in terms of run_kernel.e7d64b7Add a default bijector to the Deterministic distributions.Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting
@dependabot rebase.Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR:
@dependabot rebasewill rebase this PR@dependabot recreatewill recreate this PR, overwriting any edits that have been made to it@dependabot mergewill merge this PR after your CI passes on it@dependabot squash and mergewill squash and merge this PR after your CI passes on it@dependabot cancel mergewill cancel a previously requested merge and block automerging@dependabot reopenwill reopen this PR if it is closed@dependabot closewill close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually@dependabot ignore this major versionwill close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)@dependabot ignore this minor versionwill close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)@dependabot ignore this dependencywill close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)