Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f8150c0
Make CancelTask RPC Fault Tolerant
Sparks0219 Oct 22, 2025
0a630a7
Addressing comments
Sparks0219 Oct 22, 2025
8ae4e3a
clean up and cpp test failures
Sparks0219 Oct 22, 2025
a733422
Addressing comments
Sparks0219 Oct 23, 2025
8a2e428
Fix broken cpp tests
Sparks0219 Oct 23, 2025
901099d
Fix merge conflicts
Sparks0219 Nov 7, 2025
7d4ab2e
Clean up
Sparks0219 Nov 7, 2025
9070db5
lint
Sparks0219 Nov 7, 2025
dcec398
Addressing comments
Sparks0219 Nov 12, 2025
9df37aa
Fix cpp test failures
Sparks0219 Nov 12, 2025
d846b90
Addressing comments
Sparks0219 Nov 13, 2025
873a17c
Addressing comments
Sparks0219 Nov 13, 2025
430a4a6
Merge remote-tracking branch 'upstream/master' into joshlee/make-canc…
Sparks0219 Nov 13, 2025
a253c81
fix build error
Sparks0219 Nov 14, 2025
9d5cf6f
Addressing comments
Sparks0219 Nov 14, 2025
d0fddda
Merge conflicts
Sparks0219 Nov 18, 2025
c8e0ed6
Addressing comments
Sparks0219 Nov 18, 2025
49250fb
Bad merge conflict fix
Sparks0219 Nov 18, 2025
3dbcc22
Addressing comments
Sparks0219 Nov 18, 2025
73445ab
Fix cpp test
Sparks0219 Nov 18, 2025
f737eef
Addressing comments
Sparks0219 Nov 19, 2025
0429f79
Addressing comments
Sparks0219 Nov 20, 2025
2f9c24e
Addressing comments
Sparks0219 Nov 20, 2025
bed7884
Fix cpp test error
Sparks0219 Nov 21, 2025
2a66834
Addressing comments
Sparks0219 Nov 22, 2025
0fb240e
Merge remote-tracking branch 'upstream/master' into joshlee/make-canc…
Sparks0219 Nov 26, 2025
6bab852
Removing io context posts now that accessor node cache is thread safe
Sparks0219 Nov 26, 2025
c1f1e0f
Merge branch 'master' into joshlee/make-cancel-task-fault-tolerant
edoakes Nov 26, 2025
758ecd6
Merge branch 'master' into joshlee/make-cancel-task-fault-tolerant
jjyao Dec 2, 2025
22a53f5
Deflake serve test
Sparks0219 Dec 3, 2025
e053c2f
AI comment
Sparks0219 Dec 3, 2025
6b2674b
Addressing AI comments
Sparks0219 Dec 3, 2025
07da167
More AI comments
Sparks0219 Dec 3, 2025
41fa586
Addressing comments
Sparks0219 Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions python/ray/serve/tests/test_streaming_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
import ray
from ray import serve
from ray._common.test_utils import SignalActor
from ray.serve._private.test_utils import get_application_url, get_application_urls
from ray.serve._private.test_utils import (
get_application_url,
get_application_urls,
send_signal_on_cancellation,
)
from ray.serve.handle import DeploymentHandle


Expand Down Expand Up @@ -326,12 +330,9 @@ def test_http_disconnect(serve_instance):
class SimpleGenerator:
def __call__(self, request: Request) -> StreamingResponse:
async def wait_for_disconnect():
try:
yield "hi"
await asyncio.sleep(100)
except asyncio.CancelledError:
print("Cancelled!")
signal_actor.send.remote()
yield "hi"
Copy link
Contributor Author

@Sparks0219 Sparks0219 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edoakes It looks like this test got flakier from my changes.
What I observed before my changes was:
1.) Proxy Actor sends a CancelTask RPC to ServeReplica
2.) ServeReplica processes the CancelTask RPC
3.) SignalActor.send.remote() gets sent
4.) CancelChildren doesn't find any pending children tasks to cancel

With my changes 3/4 are flipped, and CancelChildren is cancelling the queued send.remote() task before it fires, so it's timing out. It looks like you ran into the same issue here: https://github.com/ray-project/ray/pull/43320/files#diff-463bbcf17174b07dd1780cae9d6b719b248a0245fa029f8d8f280bf092d4db45R336 and fixed it for the other serve cancellation tests, so I moved this one to also use send_signal_on_cancellation.

Copy link
Contributor Author

@Sparks0219 Sparks0219 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still trying to figure out why it got more flaky, reverted back to the last time this PR passed CI but it still is flaky locally then for me. I'd expect the timing to change a bit due to my cancellation path changes, but I would've thought it would've slowed the cancellation path due to the node status cache access in actor/normal task submitter so 3/4 should've been less flaky 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it deflaked after using the context manager?

async with send_signal_on_cancellation(signal_actor):
pass

return StreamingResponse(wait_for_disconnect())

Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_core_worker_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def inject_cancel_remote_task_rpc_failure(monkeypatch, request):
failure = RPC_FAILURE_MAP[deterministic_failure]
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"CoreWorkerService.grpc_client.CancelRemoteTask=1:{failure}",
f"CoreWorkerService.grpc_client.RequestOwnerToCancelTask=1:{failure}",
)


Expand Down
55 changes: 54 additions & 1 deletion python/ray/tests/test_raylet_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import pytest

import ray
from ray._common.test_utils import SignalActor, wait_for_condition
from ray._private.test_utils import (
RPC_FAILURE_MAP,
RPC_FAILURE_TYPES,
wait_for_condition,
)
from ray.core.generated import autoscaler_pb2
from ray.exceptions import GetTimeoutError, TaskCancelledError
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
Expand Down Expand Up @@ -247,5 +248,57 @@ def verify_process_killed():
wait_for_condition(verify_process_killed, timeout=30)


@pytest.fixture
def inject_cancel_local_task_rpc_failure(monkeypatch, request):
failure = RPC_FAILURE_MAP[request.param]
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"NodeManagerService.grpc_client.CancelLocalTask=1:{failure}",
)


@pytest.mark.parametrize(
"inject_cancel_local_task_rpc_failure", RPC_FAILURE_TYPES, indirect=True
)
@pytest.mark.parametrize("force_kill", [True, False])
def test_cancel_local_task_rpc_retry_and_idempotency(
inject_cancel_local_task_rpc_failure, force_kill, shutdown_only
):
"""Test that CancelLocalTask RPC retries work correctly.

Verify that the RPC is idempotent when network failures occur.
When force_kill=True, verify the worker process is actually killed using psutil.
"""
ray.init(num_cpus=1)
signaler = SignalActor.remote()

@ray.remote(num_cpus=1)
def get_pid():
return os.getpid()

@ray.remote(num_cpus=1)
def blocking_task():
return ray.get(signaler.wait.remote())

worker_pid = ray.get(get_pid.remote())

blocking_ref = blocking_task.remote()

with pytest.raises(GetTimeoutError):
ray.get(blocking_ref, timeout=1)

ray.cancel(blocking_ref, force=force_kill)

with pytest.raises(TaskCancelledError):
ray.get(blocking_ref, timeout=10)

if force_kill:

def verify_process_killed():
return not psutil.pid_exists(worker_pid)

wait_for_condition(verify_process_killed, timeout=30)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
6 changes: 3 additions & 3 deletions src/mock/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class MockCoreWorker : public CoreWorker {
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleCancelRemoteTask,
(rpc::CancelRemoteTaskRequest request,
rpc::CancelRemoteTaskReply *reply,
HandleRequestOwnerToCancelTask,
(rpc::RequestOwnerToCancelTaskRequest request,
rpc::RequestOwnerToCancelTaskReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
Expand Down
5 changes: 5 additions & 0 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ class MockRayletClientInterface : public RayletClientInterface {
(const rpc::ClientCallback<rpc::GlobalGCReply> &callback),
(override));
MOCK_METHOD(int64_t, GetPinsInFlight, (), (const, override));
MOCK_METHOD(void,
CancelLocalTask,
(const rpc::CancelLocalTaskRequest &request,
const rpc::ClientCallback<rpc::CancelLocalTaskReply> &callback),
(override));
};

} // namespace ray
6 changes: 3 additions & 3 deletions src/mock/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class MockCoreWorkerClientInterface : public CoreWorkerClientInterface {
const ClientCallback<CancelTaskReply> &callback),
(override));
MOCK_METHOD(void,
CancelRemoteTask,
(CancelRemoteTaskRequest && request,
const ClientCallback<CancelRemoteTaskReply> &callback),
RequestOwnerToCancelTask,
(RequestOwnerToCancelTaskRequest && request,
const ClientCallback<RequestOwnerToCancelTaskReply> &callback),
(override));
MOCK_METHOD(void,
GetCoreWorkerStats,
Expand Down
12 changes: 7 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2460,12 +2460,13 @@ Status CoreWorker::CancelTask(const ObjectID &object_id,
}

if (obj_addr.SerializeAsString() != rpc_address_.SerializeAsString()) {
// We don't have CancelRemoteTask for actor_task_submitter_
// We don't have RequestOwnerToCancelTask for actor_task_submitter_
// because it requires the same implementation.
RAY_LOG(DEBUG).WithField(object_id)
<< "Request to cancel a task of object to an owner "
<< obj_addr.SerializeAsString();
normal_task_submitter_->CancelRemoteTask(object_id, obj_addr, force_kill, recursive);
normal_task_submitter_->RequestOwnerToCancelTask(
object_id, obj_addr, force_kill, recursive);
return Status::OK();
}

Expand Down Expand Up @@ -3910,9 +3911,10 @@ void CoreWorker::ProcessSubscribeForRefRemoved(
reference_counter_->SubscribeRefRemoved(object_id, contained_in_id, owner_address);
}

void CoreWorker::HandleCancelRemoteTask(rpc::CancelRemoteTaskRequest request,
rpc::CancelRemoteTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) {
void CoreWorker::HandleRequestOwnerToCancelTask(
rpc::RequestOwnerToCancelTaskRequest request,
rpc::RequestOwnerToCancelTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto status = CancelTask(ObjectID::FromBinary(request.remote_object_id()),
request.force_kill(),
request.recursive());
Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1208,9 +1208,9 @@ class CoreWorker {
rpc::SendReplyCallback send_reply_callback);

/// Implements gRPC server handler.
void HandleCancelRemoteTask(rpc::CancelRemoteTaskRequest request,
rpc::CancelRemoteTaskReply *reply,
rpc::SendReplyCallback send_reply_callback);
void HandleRequestOwnerToCancelTask(rpc::RequestOwnerToCancelTaskRequest request,
rpc::RequestOwnerToCancelTaskReply *reply,
rpc::SendReplyCallback send_reply_callback);

/// Implements gRPC server handler.
void HandlePlasmaObjectReady(rpc::PlasmaObjectReadyRequest request,
Expand Down
5 changes: 4 additions & 1 deletion src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(

auto actor_task_submitter = std::make_unique<ActorTaskSubmitter>(
*core_worker_client_pool,
*raylet_client_pool,
gcs_client,
*memory_store,
*task_manager,
*actor_creator,
Expand Down Expand Up @@ -538,6 +540,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
local_raylet_rpc_client,
core_worker_client_pool,
raylet_client_pool,
gcs_client,
std::move(lease_policy),
memory_store,
*task_manager,
Expand All @@ -554,7 +557,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
// OBJECT_STORE.
return rpc::TensorTransport::OBJECT_STORE;
},
boost::asio::steady_timer(io_service_),
io_service_,
*scheduler_placement_time_ms_histogram_);

auto report_locality_data_callback = [this](
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker_rpc_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CoreWorkerServiceHandlerProxy : public rpc::CoreWorkerServiceHandler {
RAY_CORE_WORKER_RPC_PROXY(ReportGeneratorItemReturns)
RAY_CORE_WORKER_RPC_PROXY(KillActor)
RAY_CORE_WORKER_RPC_PROXY(CancelTask)
RAY_CORE_WORKER_RPC_PROXY(CancelRemoteTask)
RAY_CORE_WORKER_RPC_PROXY(RequestOwnerToCancelTask)
RAY_CORE_WORKER_RPC_PROXY(RegisterMutableObjectReader)
RAY_CORE_WORKER_RPC_PROXY(GetCoreWorkerStats)
RAY_CORE_WORKER_RPC_PROXY(LocalGC)
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/grpc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void CoreWorkerGrpcService::InitServerCallFactories(
max_active_rpcs_per_handler_,
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
CancelRemoteTask,
RequestOwnerToCancelTask,
max_active_rpcs_per_handler_,
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/grpc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ class CoreWorkerServiceHandler : public DelayedServiceHandler {
CancelTaskReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleCancelRemoteTask(CancelRemoteTaskRequest request,
CancelRemoteTaskReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleRequestOwnerToCancelTask(RequestOwnerToCancelTaskRequest request,
RequestOwnerToCancelTaskReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleRegisterMutableObjectReader(
RegisterMutableObjectReaderRequest request,
Expand Down
18 changes: 18 additions & 0 deletions src/ray/core_worker/task_submission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ ray_cc_library(
],
)

ray_cc_library(
name = "task_submission_util",
hdrs = ["task_submission_util.h"],
visibility = [":__subpackages__"],
deps = [
"//src/ray/common:asio",
"//src/ray/common:id",
"//src/ray/gcs_rpc_client:gcs_client",
],
)

ray_cc_library(
name = "actor_task_submitter",
srcs = ["actor_task_submitter.cc"],
Expand All @@ -66,12 +77,16 @@ ray_cc_library(
":dependency_resolver",
":out_of_order_actor_submit_queue",
":sequential_actor_submit_queue",
":task_submission_util",
"//src/ray/common:asio",
"//src/ray/common:id",
"//src/ray/common:protobuf_utils",
"//src/ray/core_worker:actor_creator",
"//src/ray/core_worker:reference_counter_interface",
"//src/ray/core_worker_rpc_client:core_worker_client_pool",
"//src/ray/gcs_rpc_client:gcs_client",
"//src/ray/raylet_rpc_client:raylet_client_interface",
"//src/ray/raylet_rpc_client:raylet_client_pool",
"//src/ray/rpc:rpc_callback_types",
"//src/ray/util:time",
"@com_google_absl//absl/base:core_headers",
Expand All @@ -90,14 +105,17 @@ ray_cc_library(
],
deps = [
":dependency_resolver",
":task_submission_util",
"//src/ray/common:id",
"//src/ray/common:lease",
"//src/ray/common:protobuf_utils",
"//src/ray/core_worker:lease_policy",
"//src/ray/core_worker:memory_store",
"//src/ray/core_worker:task_manager_interface",
"//src/ray/core_worker_rpc_client:core_worker_client_pool",
"//src/ray/gcs_rpc_client:gcs_client",
"//src/ray/raylet_rpc_client:raylet_client_interface",
"//src/ray/raylet_rpc_client:raylet_client_pool",
"//src/ray/util:time",
"@com_google_absl//absl/base:core_headers",
],
Expand Down
Loading