Skip to content
Merged
24 changes: 24 additions & 0 deletions python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,30 @@ def get_raylet_pid(self):
wait_for_pid_to_exit(worker_pid)


def test_plasma_store_operation_after_raylet_dies(ray_start_cluster):
"""
Test that the operation on the plasma store after the raylet dies will not fail the
task with an application level error (RayTaskError) but a system level error
(RayletDiedError).
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()

ray.init(address=cluster.address)

@ray.remote
def get_after_raylet_dies():
raylet_pid = int(os.environ["RAY_RAYLET_PID"])
os.kill(raylet_pid, SIGKILL)
ray.put([0] * 100000)

try:
ray.get(get_after_raylet_dies.remote(), timeout=10)
except Exception as e:
assert isinstance(e, ray.exceptions.RayletDiedError)


@pytest.mark.parametrize(
"ray_start_cluster_head",
[
Expand Down
4 changes: 2 additions & 2 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
///
/// \param buffer The buffer.
/// \return Status.
Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer);
virtual Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer);

/// Write a buffer to this connection asynchronously.
///
Expand All @@ -113,7 +113,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
///
/// \param buffer The buffer.
/// \return Status.
Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer);
virtual Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer);

/// Read a buffer from this connection asynchronously.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
bool warmup,
std::function<std::string()> get_current_call_site)
: raylet_client_(raylet_client),
store_client_(std::make_shared<plasma::PlasmaClient>()),
// We can turn on exit_on_connection_failure on for the core worker plasma
// client to early exist core worker after the raylet's death because on the
// raylet side, we never proactively close the plasma store connection even
// during shutdown. So any error from the raylet side should be a sign of raylet
// death.
store_client_(
std::make_shared<plasma::PlasmaClient>(/*exit_on_connection_failure*/ true)),
reference_counter_(reference_counter),
check_signals_(std::move(check_signals)) {
if (get_current_call_site != nullptr) {
Expand Down
14 changes: 12 additions & 2 deletions src/ray/object_manager/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ struct ObjectInUseEntry {
class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
public:
Impl();
explicit Impl(bool exit_on_connection_failure);
~Impl();

// PlasmaClient method implementations
Expand Down Expand Up @@ -235,11 +236,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
std::unordered_set<ObjectID> deletion_cache_;
/// A mutex which protects this class.
std::recursive_mutex client_mutex_;
/// Whether the current process should exit when read or write to the connection fails.
/// Currently it is only turned on when the plasma client is in a core worker.
bool exit_on_connection_failure_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of modifying low-level PlasmaClient, why can't we reuse existing check_signals_ mechanism in CoreWorkerPlasmaStoreProvider, which is the abstraction layer between the Core Worker and the plasma store?

The get and wait paths in CoreWorkerPlasmaStoreProvider proactively check for interrupts via check_signals_. We can do the same for other Plasma operations.

CoreWorker::ExitIfParentRayletDies() already checks periodically if the raylet process is alive. Then let the CoreWorker take a call what to do based on error propagated from CoreWorkerPlasmaStoreProvider.

};

PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); }

PlasmaClient::Impl::Impl() : store_capacity_(0) {}
PlasmaClient::Impl::Impl() : store_capacity_(0), exit_on_connection_failure_(false) {}

PlasmaClient::Impl::Impl(bool exit_on_connection_failure)
: store_capacity_(0), exit_on_connection_failure_(exit_on_connection_failure) {}

PlasmaClient::Impl::~Impl() {}

Expand Down Expand Up @@ -868,7 +875,7 @@ Status PlasmaClient::Impl::Connect(const std::string &store_socket_name,
/// The local stream socket that connects to store.
ray::local_stream_socket socket(main_service_);
RAY_RETURN_NOT_OK(ray::ConnectSocketRetry(socket, store_socket_name));
store_conn_.reset(new StoreConn(std::move(socket)));
store_conn_.reset(new StoreConn(std::move(socket), exit_on_connection_failure_));
// Send a ConnectRequest to the store to get its memory capacity.
RAY_RETURN_NOT_OK(SendConnectRequest(store_conn_));
std::vector<uint8_t> buffer;
Expand Down Expand Up @@ -912,6 +919,9 @@ std::string PlasmaClient::Impl::DebugString() {

PlasmaClient::PlasmaClient() : impl_(std::make_shared<PlasmaClient::Impl>()) {}

PlasmaClient::PlasmaClient(bool exit_on_connection_failure)
: impl_(std::make_shared<PlasmaClient::Impl>(exit_on_connection_failure)) {}

Status PlasmaClient::Connect(const std::string &store_socket_name,
const std::string &manager_socket_name,
int num_retries) {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ class PlasmaClient : public PlasmaClientInterface {
public:
PlasmaClient();

explicit PlasmaClient(bool exit_on_connection_failure);

Status Connect(const std::string &store_socket_name,
const std::string &manager_socket_name = "",
int num_retries = -1) override;
Expand Down
28 changes: 27 additions & 1 deletion src/ray/object_manager/plasma/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ Status Client::SendFd(MEMFD_TYPE fd) {
}

StoreConn::StoreConn(ray::local_stream_socket &&socket)
: ray::ServerConnection(std::move(socket)) {}
: ray::ServerConnection(std::move(socket)), exit_on_connection_failure_(false) {}

StoreConn::StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure)
: ray::ServerConnection(std::move(socket)),
exit_on_connection_failure_(exit_on_connection_failure) {}

Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) {
#ifdef _WIN32
Expand All @@ -192,4 +196,26 @@ Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) {
return Status::OK();
}

ray::Status StoreConn::WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer) {
auto status = ray::ServerConnection::WriteBuffer(buffer);
ShutdownWorkerIfErrorStatus(status);
return status;
}

ray::Status StoreConn::ReadBuffer(
const std::vector<boost::asio::mutable_buffer> &buffer) {
auto status = ray::ServerConnection::ReadBuffer(buffer);
ShutdownWorkerIfErrorStatus(status);
return status;
}

void StoreConn::ShutdownWorkerIfErrorStatus(const ray::Status &status) {
if (!status.ok() && exit_on_connection_failure_) {
RAY_LOG(WARNING) << "The connection to the plasma store is failed. Terminate the "
<< "process. Status: " << status;
ray::QuickExit();
RAY_LOG(FATAL) << "Accessing unreachable code. This line should never be reached "
<< "after quick process exit due to plasma store connection failure.";
}
}
} // namespace plasma
20 changes: 20 additions & 0 deletions src/ray/object_manager/plasma/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,30 @@ class StoreConn : public ray::ServerConnection {
public:
explicit StoreConn(ray::local_stream_socket &&socket);

explicit StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure);

/// Receive a file descriptor for the store.
///
/// \return A file descriptor.
ray::Status RecvFd(MEMFD_TYPE_NON_UNIQUE *fd);

ray::Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer) override;

ray::Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer) override;

private:
// Whether the current process should exit when WriteBuffer or ReadBuffer fails.
// Currently it is only turned on when the plasma client is in a core worker.
// TODO(myan): For better error handling, we should: (1) In the mid-term, evaluate if
// we should turn it on for the plasma client in other processes. (2) In the
// long-term, consolidate the shutdown path between core worker and raylet to make the
// shutdown procedure cleaner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my discussion with @edoakes, I think we need a short-to-medium-term followup to handle this outside of the PlasmaClient inside the CoreWorker calling code and propagating the right exception to the user. Can you update the comment and create a jira issue with the label oncall-followup for this please?

bool exit_on_connection_failure_ = false;

// Shutdown the current process if the passed in status is not OK and the client is
// configured to exit on failure.
// @param status: The status to check.
void ShutdownWorkerIfErrorStatus(const ray::Status &status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove Worker from this to make this not look like we're special-casing the PlasmaClient for CoreWorker? Maybe ShutdownProcessIfConnectionError

};

std::ostream &operator<<(std::ostream &os, const std::shared_ptr<StoreConn> &store_conn);
Expand Down