forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
[data] Cleanup: Use SortKey instead of mixed typing for Aggregates #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
64d0714 to
01aa3a1
Compare
…in CI and translate to new API stack. (ray-project#48687)
…orV2 built-in classes. (ray-project#48698)
Clean up viz_str() methods
…t#48634) Signed-off-by: dentiny <[email protected]>
Signed-off-by: dentiny <[email protected]>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> The TFRecords release tests typically takes around 1680-1750s to complete. Because the timeout is set to 1800s, if there's minor variation in the job runtime, the job can timeout. To avoid flakiness, this PR relaxes the timeout. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: dentiny <[email protected]>
Users (including Ray Data developers!) are often confused about how to choose between `read_parquet` and `read_parquet_bulk`. To avoid confusion, this PR deprecates `read_parquet_bulk`. Signed-off-by: Balaji Veeramani <[email protected]>
…`iter_rows` (ray-project#48704) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> The `prefetch_blocks` and `prefetch_batches` parameters of `iter_rows` have been deprecated for more than 6 months. In accordance with our API policy, this PR removes them. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Balaji Veeramani <[email protected]>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> We recommend `to_tf` over `iter_tf_batches`. To avoid confusion, we shouldn’t have two similar APIs, especially if we always prefer one. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Balaji Veeramani <[email protected]>
…48690) Metadata providers are specific to current implementation of read APIs. To enable us to change the implementation, this PR deprecates the `meta_provider` parameter and decouples metadata providers from the user-facing interface. --------- Signed-off-by: Balaji Veeramani <[email protected]> Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: dentiny <[email protected]>
We recommend `iter_torch_batches` over `to_torch`. To avoid confusion, we shouldn’t have two similar APIs, especially if we always prefer one. --------- Signed-off-by: Balaji Veeramani <[email protected]> Signed-off-by: Balaji Veeramani <[email protected]>
As titled, I think having `MB` explicitly called out is more readable than `1024 * 1024` or `1<<20` Intended use case: ray-project#48262 (comment) Signed-off-by: dentiny <[email protected]>
…s multiple times (ray-project#48509) Currently, we're serializing first row in every block twice when adding it t/h `DelegatingBlockBuilder`, carrying tangible overhead and impact on latency for large enough rows. Provided that `ArrowBlockBuilder` is now able to handle arbitrary Python object we can just deprecate `DelegatingBlockBuilder` altogether. --------- Signed-off-by: Alexey Kudinkin <[email protected]>
We don't actively maintain the `RandomAccessDataset` API, and we haven't documented it for a long time. So, this PR removes the release test. Signed-off-by: Balaji Veeramani <[email protected]>
This fixes a jemalloc profiling deadlock, as hit by clickhouse ClickHouse/ClickHouse#66346 Signed-off-by: Ruiyang Wang <[email protected]>
Some minor improvements for session implementation. Signed-off-by: dentiny <[email protected]>
## Why are these changes needed? - Combine the endpoint router and the longest prefix router. The two differed in that one used `match_route` to get a handle, and one used `get_handle_for_endpoint` to get a handle (for HTTP and gRPC respectively). We can just have a single proxy router class that have both of these methods. - This is nice in that we only have one cached set of handles. - Add a default `get_proxy_handle` function in `default_impl.py`, which can be used to control how a handle is fetched and how it is initialized. - Infer request protocol from request context, which should deterministically indicate whether the request is an http request or a gRPC request. This then removes the need to set request protocol in the handle, which makes sense since request protocol is more metadata than handle configuration. Signed-off-by: Cindy Zhang <[email protected]>
…)" (ray-project#48612)" (ray-project#48686) This reverts commit 4a9c424. --------- Signed-off-by: Balaji Veeramani <[email protected]>
https://github.com/ray-project/ray/pull/48693/files#diff-81d7373cd5567e997c002b244c491e6b3498d206b12c093f4dc4d30e9b5848af added a test that uses tensorflow. We currently need to skip all tensorflow-related tests for python 3.12 since we don't support tensorflow for python 3.12. Also this is a test for the deprecation of tensorflow ;) Test: - CI Signed-off-by: can <[email protected]>
…tensors > 2Gb) (ray-project#48629) Enabling V2 Arrow Tensor extension type by default (allowing tensors > 2Gb) --------- Signed-off-by: Alexey Kudinkin <[email protected]>
…Error (ray-project#48636) In a recent investigation, we found that when we call the `ray._private.internal_api.free()` from a task the same time as a Raylet is gracefully shutting down, the task might fail with application level Broken pipe IOError. This resulted in job failure without any task retries. However, as the Broken pipe happens because the unhealthiness of the local Raylet, the error should be a system level error and should be retried automatically. Updated changes in commit [01f5f11](ray-project@01f5f11): This PR add the logic for the above bahvior: * When IOError is received in the `CoreWorker::Delete`, throw a system error exception so that the task can retry Why not add the exception check in the `free_objects` function? * It is better to add the logic in the `CoreWorker::Delete` because it can cover the case for other languages as well. * The `CoreWorker::Delete` function is intended to be open to all languages to call and is not called in other ray internal code paths. Why not crash the worker when IOError is encountered in the `WriteMessage` function? * `QuickExit()` function will directly exit the process without executing any shutdown logic for the worker. Directly calling the function in the task execution might potentially causing resource leak * At the same time, the write message function is called also on the graceful shutdown scenario and it is possible during the graceful shutdown process that the local Raylet is unreachable. Therefore, in the graceful shutdown scenario, we shouldn't exit early but let the shutdown logic finish. * At the same time, it is not clear in the code regarding the behavior of the graceful vs force shutdown. We might need some effort to make them clear. The todo is added in the PR. Updated changes in commit [2029d36](ray-project@2029d36): > This PR add the logic for the above behavior: > * When IOError is received in the `free_objects()` function, throw a system error exception so that the task can retry Changes in commit ([9d57b29](ray-project@9d57b29)) : > This PR add the logic for the above behavior: > * Today, the internal `free` API deletes the objects from the local Raylet object store by writing a message through a socket > * When the write failed because the local Raylet is terminated, there is already logic to quick exit the task > * However, the current termination check didn't cover the case where the local Raylet process is a Zombie process and IOError happens during write messages. > * This fix update the check criteria and fail the task when the Raylet process is terminated or the write message function returns an IOError~ Signed-off-by: Mengjin Yan <[email protected]>
## Why are these changes needed? While we calling `xxx.map_groups(..., batch_format="...")`, we may invoke sort function and creating empty blocks which still uses pyarrow by default. And, when we invoke another sort call on top of it, we will hit `AttributeError: 'DataFrame' object has no attribute 'num_rows'` since we uses first block type. (However, we may have different blocks). See more details in ray-project#46748 ## Related issue number Close ray-project#46748 ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Xingyu Long <[email protected]> Co-authored-by: Scott Lee <[email protected]>
…oject#48729) Created by release automation bot. Update with commit e393a71 Signed-off-by: Jiajun Yao <[email protected]> Co-authored-by: Jiajun Yao <[email protected]>
…es (ray-project#48478) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? When writing blocks to parquet, there might be blocks with fields that differ ONLY in nullability - by default, this would be rejected since some blocks might have a different schema than the ParquetWriter. However, we could potentially allow it to happen by tweaking the schema. This PR goes through all blocks before writing them to parquet, and merge schemas that differ only in nullability of the fields. It also casts the table to the newly merged schema so that the write could happen. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number Closes ray-project#48102 --------- Signed-off-by: rickyx <[email protected]>
…ay-project#48697) ## Why are these changes needed? This makes SortAggregate more consistent by unifying the API on the SortKey object, similar to how SortTaskSpec is implemented. ## Related issue number This is related to ray-project#42776 and ray-project#42142 Signed-off-by: Richard Liaw <[email protected]>
richardliaw
pushed a commit
that referenced
this pull request
Dec 9, 2025
…BRT: "corrupted size vs. prev_size") (ray-project#58660) ## Summary This PR fixes a heap corruption bug that causes the driver to crash with SIGABRT. The issue is caused by a use-after-free when the `RayletClient` object is destroyed while an asynchronous RPC callback is still pending. ## Problem Description ### Scenario A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter -> map_batches -> write` running for 4+ hours, where workers use elastic resources with low job priority causing frequent worker deaths due to pod preemption, crashes the driver with SIGABRT: ``` corrupted size vs. prev_size *** SIGABRT received at time=1761916578 on cpu 30 *** PC: @ 0x7f073569d9fc (unknown) pthread_kill Aborted (core dumped) ``` ### Trigger Conditions After reproducing with an ASan image, Asan reveals the actual use-after-free at: ``` #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628 #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377 #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338 #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61 #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111 #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590 #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293 #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61 #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111 #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290 #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590 #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109 #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30 #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 2025-11-14 16:15:05,608 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112 #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110 #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 ray-project#22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 ray-project#23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 ray-project#24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60 ray-project#25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88 ray-project#26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 ray-project#27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111 ray-project#28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 ray-project#29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70 ray-project#30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40 ray-project#31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492 ray-project#32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210 ray-project#33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63 2025-11-14 16:15:05,742 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} ray-project#34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193 ray-project#35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120 ray-project#36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179 ray-project#37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442 ray-project#38 0x7ff28b0a58bf (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf) 0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38) freed by thread T68 here: 2025-11-14 16:15:05,876 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172 #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145 #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496 #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74 #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538 #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184 #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705 #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154 #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122 #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211 #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168 #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535 #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421 #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578 #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50 #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183 #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114 #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69 #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 ray-project#22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85 ray-project#23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45 ``` This is a **non-deterministic race condition** that occurs under the following sequence: 1. Worker A's pod is preempted → Worker A dies 2. Objects on Worker A are lost 3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated 4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects this → `Disconnect` is called 5. The `RayletClient` corresponding to Worker B on the driver is destroyed 6. RPC callback executes and accesses the already-freed `RayletClient` → use-after-free triggers crash Whether the use-after-free occurs depends on the relative timing of steps 5 and 6. In scenarios with frequent pod preemptions, object recovery frequently triggers `PinObjectIDs`, making this race condition more likely to occur. ### Root Cause In `RayletClient::PinObjectIDs`, the RPC callback lambda directly captured the raw `this` pointer: ```cpp auto rpc_callback = [this, callback = std::move(callback)](...) { pins_in_flight_--; // Accessing member via 'this' pointer ... }; ``` If the `RayletClient` object is destroyed before the async RPC callback executes, the callback will access freed memory through the dangling `this` pointer, leading to heap corruption and SIGABRT with the error message "corrupted size vs. prev_size". ## Solution The fix ensures that the `RayletClient` object remains alive during the asynchronous callback execution by: 1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The class already inherits from this base class (line 43 in `raylet_client.h`), which enables safe shared pointer management. 2. **Capturing `shared_from_this()` in the lambda**: Instead of capturing the raw `this` pointer, the callback now captures a `shared_ptr` to the object. The `shared_from_this()` is called before incrementing `pins_in_flight_` to ensure proper lifetime management: ```cpp // Capture shared_from_this() before incrementing to ensure object lifetime // is extended for the async callback, preventing use-after-free. auto self = shared_from_this(); pins_in_flight_++; auto rpc_callback = [self, callback = std::move(callback)]( Status status, rpc::PinObjectIDsReply &&reply) { self->pins_in_flight_--; callback(status, std::move(reply)); }; ``` This ensures that the `RayletClient` object's lifetime is extended until the callback completes, preventing the use-after-free bug. By capturing the shared pointer before incrementing the counter, we also ensure that if `shared_from_this()` were to fail (though it shouldn't in normal usage), we don't leave the counter in an inconsistent state. ## Code Changes - **File**: `src/ray/raylet_rpc_client/raylet_client.cc` - **Method**: `RayletClient::PinObjectIDs` - **Change**: Replace `this` capture with `shared_from_this()` capture in the RPC callback lambda Signed-off-by: dragongu <[email protected]> Co-authored-by: gulonglong <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Staging PR to break things up
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.