Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ install:
# Raylet tests.
- ./ci/suppress_output bash src/ray/test/run_object_manager_tests.sh
- ./ci/suppress_output bazel test --build_tests_only --test_lang_filters=cc //:all
# Shutdown bazel to release the memory held by bazel.
- bazel shutdown

script:
- export PATH="$HOME/miniconda/bin:$PATH"
Expand Down
6 changes: 4 additions & 2 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,10 @@ def kill_reporter(self, check_alive=True):
check_alive (bool): Raise an exception if the process was already
dead.
"""
self._kill_process_type(
ray_constants.PROCESS_TYPE_REPORTER, check_alive=check_alive)
# reporter is started only in PY3.
if PY3:
self._kill_process_type(
ray_constants.PROCESS_TYPE_REPORTER, check_alive=check_alive)

def kill_dashboard(self, check_alive=True):
"""Kill the dashboard.
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_component_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def test_raylet_failed(ray_start_cluster):
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 4
"num_nodes": 2
}], indirect=True)
def test_plasma_store_failed(ray_start_cluster):
cluster = ray_start_cluster
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/test_multi_node_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def test_worker_plasma_store_failure(ray_start_cluster_head):
cluster.wait_for_nodes()
# Log monitor doesn't die for some reason
worker.kill_log_monitor()
worker.kill_reporter()
worker.kill_plasma_store()
worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait()
assert not worker.any_processes_alive(), worker.live_processes()
57 changes: 39 additions & 18 deletions src/ray/common/client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ ServerConnection<T>::ServerConnection(boost::asio::basic_stream_socket<T> &&sock
: socket_(std::move(socket)),
async_write_max_messages_(1),
async_write_queue_(),
async_write_in_flight_(false) {}
async_write_in_flight_(false),
async_write_broken_pipe_(false) {}

template <class T>
ServerConnection<T>::~ServerConnection() {
Expand Down Expand Up @@ -167,27 +168,47 @@ void ServerConnection<T>::DoAsyncWrites() {
break;
}
}

// Helper function to call all handlers with the input status.
auto call_handlers = [this](const ray::Status &status, int num_messages) {
for (int i = 0; i < num_messages; i++) {
auto write_buffer = std::move(async_write_queue_.front());
write_buffer->handler(status);
async_write_queue_.pop_front();
}
// We finished writing, so mark that we're no longer doing an async write.
async_write_in_flight_ = false;
// If there is more to write, try to write the rest.
if (!async_write_queue_.empty()) {
DoAsyncWrites();
}
};

if (async_write_broken_pipe_) {
// Call the handlers directly. Because writing messages to a connection
// with broken-pipe status will result in the callbacks never being called.
call_handlers(ray::Status::IOError("Broken pipe"), num_messages);
return;
}
auto this_ptr = this->shared_from_this();
boost::asio::async_write(
ServerConnection<T>::socket_, message_buffers,
[this, this_ptr, num_messages](const boost::system::error_code &error,
size_t bytes_transferred) {
ray::Status status = ray::Status::OK();
if (error.value() != boost::system::errc::errc_t::success) {
status = boost_to_ray_status(error);
}
// Call the handlers for the written messages.
for (int i = 0; i < num_messages; i++) {
auto write_buffer = std::move(async_write_queue_.front());
write_buffer->handler(status);
async_write_queue_.pop_front();
}
// We finished writing, so mark that we're no longer doing an async write.
async_write_in_flight_ = false;
// If there is more to write, try to write the rest.
if (!async_write_queue_.empty()) {
DoAsyncWrites();
[this, this_ptr, num_messages, call_handlers](
const boost::system::error_code &error, size_t bytes_transferred) {
ray::Status status = boost_to_ray_status(error);
if (error.value() == boost::system::errc::errc_t::broken_pipe) {
RAY_LOG(ERROR) << "Broken Pipe happened during calling "
<< "ServerConnection<T>::DoAsyncWrites.";
// From now on, calling DoAsyncWrites will directly call the handler
// with this broken-pipe status.
async_write_broken_pipe_ = true;
} else if (!status.ok()) {
RAY_LOG(ERROR) << "Error encountered during calling "
<< "ServerConnection<T>::DoAsyncWrites, message: "
<< status.message()
<< ", error code: " << static_cast<int>(error.value());
}
call_handlers(status, num_messages);
});
}

Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection<T>
/// Whether we are in the middle of an async write.
bool async_write_in_flight_;

/// Whether we've met a broken-pipe error during writing.
bool async_write_broken_pipe_;

/// Count of async messages sent total.
int64_t async_writes_ = 0;

Expand Down
9 changes: 6 additions & 3 deletions src/ray/raylet/scheduling_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,15 @@ void ResourceIds::Release(const ResourceIds &resource_ids) {
if (fractional_pair_it == fractional_ids_.end()) {
fractional_ids_.push_back(fractional_pair_to_return);
} else {
RAY_CHECK(fractional_pair_it->second < 1);
fractional_pair_it->second += fractional_pair_to_return.second;
RAY_CHECK(fractional_pair_it->second <= 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be RAY_CHECK(fractional_pair_it->second < 1 + std::numeric_limits<double>::epsilon());

I believe @romilbhardwaj is making this change in #4555.

However, all of the imprecision is just a temporary solution. There are too many places that we'd have to add epsilon in order to make things correct. @williamma12 is working on a solution that simply makes the fractional resource bookkeeping exact (instead of approximate) by using integers instead of doubles (where the integer can represent 1/1000th of a resource).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem in this case isn't precision. The problem is that the remaining value and returned value can add up to be larger than 1. E.g., returning 0.3 to existing 0.8 will be 1.1 in total.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @raulchen is right. From my observation in test_actor.py::test_resource_assignment, there will be 1.1 which is generated by adding 0.2 to 0.9.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raulchen @guoyuhong that shouldn't be possible. If that happens, then it is a bug.

Each fractional resource corresponds to a specific resource ID. Suppose there are two GPUs with IDs 0 and 1, and suppose there are two tasks that require 0.2 and 0.9 GPUs.

  • The raylet starts with (ID1, 1), (ID2, 1).
  • The first task gets scheduled, so the raylet has (ID1, 0.8), (ID2, 1), and the first worker has (ID1, 0.2).
  • The second task gets scheduled, so the raylet has (ID1, 0.8), (ID2, 0.1), the first worker has (ID1, 0.2) and the second worker has (ID2, 0.9)
  • The first task finishes, so the raylet has (ID1, 1), (ID2, 0.1), and the second worker has (ID2, 0.9)
  • The second task finishes, so the raylet has (ID1, 1), (ID2, 1)

We won't add the 0.2 and the 0.9 together because they correspond to different IDs. It isn't possible for one worker to have (ID1, 0.2) and another worker to have (ID1, 0.9) because the total quantity of a given ID is 1.

Does my notation make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertnishihara Thanks for the information. I looked at the code again. The resource_id is int64_t not a string. I may misunderstand the code. I run the test several time and cannot repro the case that adding 0.2 to 0.9, which is strange.
Sorry to this bad change. Shall I revert the change or this will be fixed in #4555 by @romilbhardwaj?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guoyuhong, we'll fix it in #4555.

// If this makes the ID whole, then return it to the list of whole IDs.
if (fractional_pair_it->second == 1) {
if (fractional_pair_it->second >= 1) {
whole_ids_.push_back(resource_id);
fractional_ids_.erase(fractional_pair_it);
fractional_pair_it->second -= 1;
if (fractional_pair_it->second < 1e-6) {
fractional_ids_.erase(fractional_pair_it);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too sure if this is what we want. @robertnishihara could you also take a look at this part?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading more about this, I think this fix is correct.

}
}
}
Expand Down