diff --git a/.travis.yml b/.travis.yml index 60c88509c6a6..d272fa753dc1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -166,6 +166,8 @@ install: # Raylet tests. - ./ci/suppress_output bash src/ray/test/run_object_manager_tests.sh - ./ci/suppress_output bazel test --build_tests_only --test_lang_filters=cc //:all + # Shutdown bazel to release the memory held by bazel. + - bazel shutdown script: - export PATH="$HOME/miniconda/bin:$PATH" diff --git a/python/ray/node.py b/python/ray/node.py index e5deec2324fe..4da4d6989df9 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -573,8 +573,10 @@ def kill_reporter(self, check_alive=True): check_alive (bool): Raise an exception if the process was already dead. """ - self._kill_process_type( - ray_constants.PROCESS_TYPE_REPORTER, check_alive=check_alive) + # reporter is started only in PY3. + if PY3: + self._kill_process_type( + ray_constants.PROCESS_TYPE_REPORTER, check_alive=check_alive) def kill_dashboard(self, check_alive=True): """Kill the dashboard. diff --git a/python/ray/tests/test_component_failures.py b/python/ray/tests/test_component_failures.py index a7a6eeb9c797..e0226cdd18fb 100644 --- a/python/ray/tests/test_component_failures.py +++ b/python/ray/tests/test_component_failures.py @@ -329,7 +329,7 @@ def test_raylet_failed(ray_start_cluster): @pytest.mark.parametrize( "ray_start_cluster", [{ "num_cpus": 8, - "num_nodes": 4 + "num_nodes": 2 }], indirect=True) def test_plasma_store_failed(ray_start_cluster): cluster = ray_start_cluster diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 5bfb21c6842d..e66a3799e25e 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -84,6 +84,7 @@ def test_worker_plasma_store_failure(ray_start_cluster_head): cluster.wait_for_nodes() # Log monitor doesn't die for some reason worker.kill_log_monitor() + worker.kill_reporter() worker.kill_plasma_store() worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait() assert not worker.any_processes_alive(), worker.live_processes() diff --git a/src/ray/common/client_connection.cc b/src/ray/common/client_connection.cc index de9e71f05e0a..4ad961008e91 100644 --- a/src/ray/common/client_connection.cc +++ b/src/ray/common/client_connection.cc @@ -44,7 +44,8 @@ ServerConnection::ServerConnection(boost::asio::basic_stream_socket &&sock : socket_(std::move(socket)), async_write_max_messages_(1), async_write_queue_(), - async_write_in_flight_(false) {} + async_write_in_flight_(false), + async_write_broken_pipe_(false) {} template ServerConnection::~ServerConnection() { @@ -167,27 +168,47 @@ void ServerConnection::DoAsyncWrites() { break; } } + + // Helper function to call all handlers with the input status. + auto call_handlers = [this](const ray::Status &status, int num_messages) { + for (int i = 0; i < num_messages; i++) { + auto write_buffer = std::move(async_write_queue_.front()); + write_buffer->handler(status); + async_write_queue_.pop_front(); + } + // We finished writing, so mark that we're no longer doing an async write. + async_write_in_flight_ = false; + // If there is more to write, try to write the rest. + if (!async_write_queue_.empty()) { + DoAsyncWrites(); + } + }; + + if (async_write_broken_pipe_) { + // Call the handlers directly. Because writing messages to a connection + // with broken-pipe status will result in the callbacks never being called. + call_handlers(ray::Status::IOError("Broken pipe"), num_messages); + return; + } auto this_ptr = this->shared_from_this(); boost::asio::async_write( ServerConnection::socket_, message_buffers, - [this, this_ptr, num_messages](const boost::system::error_code &error, - size_t bytes_transferred) { - ray::Status status = ray::Status::OK(); - if (error.value() != boost::system::errc::errc_t::success) { - status = boost_to_ray_status(error); - } - // Call the handlers for the written messages. - for (int i = 0; i < num_messages; i++) { - auto write_buffer = std::move(async_write_queue_.front()); - write_buffer->handler(status); - async_write_queue_.pop_front(); - } - // We finished writing, so mark that we're no longer doing an async write. - async_write_in_flight_ = false; - // If there is more to write, try to write the rest. - if (!async_write_queue_.empty()) { - DoAsyncWrites(); + [this, this_ptr, num_messages, call_handlers]( + const boost::system::error_code &error, size_t bytes_transferred) { + ray::Status status = boost_to_ray_status(error); + if (error.value() == boost::system::errc::errc_t::broken_pipe) { + RAY_LOG(ERROR) << "Broken Pipe happened during calling " + << "ServerConnection::DoAsyncWrites."; + // From now on, calling DoAsyncWrites will directly call the handler + // with this broken-pipe status. + async_write_broken_pipe_ = true; + } else if (!status.ok()) { + RAY_LOG(ERROR) << "Error encountered during calling " + << "ServerConnection::DoAsyncWrites, message: " + << status.message() + << ", error code: " << static_cast(error.value()); } + call_handlers(status, num_messages); }); } diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index d96b14fd38ce..fe895fafd4d4 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -101,6 +101,9 @@ class ServerConnection : public std::enable_shared_from_this /// Whether we are in the middle of an async write. bool async_write_in_flight_; + /// Whether we've met a broken-pipe error during writing. + bool async_write_broken_pipe_; + /// Count of async messages sent total. int64_t async_writes_ = 0; diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index 03c85dfbe7a5..cd4a80cd0102 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -257,12 +257,15 @@ void ResourceIds::Release(const ResourceIds &resource_ids) { if (fractional_pair_it == fractional_ids_.end()) { fractional_ids_.push_back(fractional_pair_to_return); } else { + RAY_CHECK(fractional_pair_it->second < 1); fractional_pair_it->second += fractional_pair_to_return.second; - RAY_CHECK(fractional_pair_it->second <= 1); // If this makes the ID whole, then return it to the list of whole IDs. - if (fractional_pair_it->second == 1) { + if (fractional_pair_it->second >= 1) { whole_ids_.push_back(resource_id); - fractional_ids_.erase(fractional_pair_it); + fractional_pair_it->second -= 1; + if (fractional_pair_it->second < 1e-6) { + fractional_ids_.erase(fractional_pair_it); + } } } }