From 99dc93ffb77b5b83bddfcbfecbcf5b9b6ddf7994 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Wed, 12 Jun 2019 11:26:35 +0800 Subject: [PATCH 01/11] add task provider and store provider interface, with a default implementation --- BUILD.bazel | 4 + src/ray/core_worker/common.h | 46 ++++++ src/ray/core_worker/core_worker.cc | 41 +++--- src/ray/core_worker/core_worker.h | 27 ++-- src/ray/core_worker/object_interface.cc | 126 +++------------- src/ray/core_worker/object_interface.h | 6 + .../store_provider/plasma_store_provider.cc | 135 ++++++++++++++++++ .../store_provider/plasma_store_provider.h | 67 +++++++++ .../store_provider/store_provider.h | 60 ++++++++ src/ray/core_worker/task_execution.cc | 56 +++++--- src/ray/core_worker/task_execution.h | 7 +- src/ray/core_worker/task_interface.cc | 38 +++-- src/ray/core_worker/task_interface.h | 10 +- .../task_provider/raylet_task_provider.cc | 35 +++++ .../task_provider/raylet_task_provider.h | 42 ++++++ .../core_worker/task_provider/task_provider.h | 36 +++++ src/ray/test/run_core_worker_tests.sh | 3 - 17 files changed, 542 insertions(+), 197 deletions(-) create mode 100644 src/ray/core_worker/store_provider/plasma_store_provider.cc create mode 100644 src/ray/core_worker/store_provider/plasma_store_provider.h create mode 100644 src/ray/core_worker/store_provider/store_provider.h create mode 100644 src/ray/core_worker/task_provider/raylet_task_provider.cc create mode 100644 src/ray/core_worker/task_provider/raylet_task_provider.h create mode 100644 src/ray/core_worker/task_provider/task_provider.h diff --git a/BUILD.bazel b/BUILD.bazel index 27ab40ef74d1..3e919f8e3280 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -111,6 +111,8 @@ cc_library( srcs = glob( [ "src/ray/core_worker/*.cc", + "src/ray/core_worker/store_provider/*.cc", + "src/ray/core_worker/task_provider/*.cc", ], exclude = [ "src/ray/core_worker/*_test.cc", @@ -119,6 +121,8 @@ cc_library( ), hdrs = glob([ "src/ray/core_worker/*.h", + "src/ray/core_worker/store_provider/*.h", + "src/ray/core_worker/task_provider/*.h", ]), copts = COPTS, deps = [ diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index b11fabfe46f8..0934e2982eba 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -5,6 +5,8 @@ #include "ray/common/buffer.h" #include "ray/common/id.h" +#include "ray/raylet/task_spec.h" +#include "ray/raylet/raylet_client.h" namespace ray { @@ -66,6 +68,50 @@ class TaskArg { const std::shared_ptr data_; }; +/// Task specification, which includes the immutable information about the task +/// which are determined at the submission time. +class TaskSpec { + public: + TaskSpec(const raylet::TaskSpecification &task_spec, + const std::vector &dependencies) + : task_spec_(task_spec), + dependencies_(dependencies) {} + + TaskSpec(const raylet::TaskSpecification &&task_spec, + const std::vector &&dependencies) + : task_spec_(task_spec), + dependencies_(dependencies) {} + + const raylet::TaskSpecification &GetTaskSpecification() const { + return task_spec_; + } + + const std::vector &GetDependencies() const { + return dependencies_; + } + private: + /// Raylet task specification. + raylet::TaskSpecification task_spec_; + + /// Dependencies. + std::vector dependencies_; +}; + +enum class StoreProviderType { PLASMA }; + +enum class TaskProviderType { RAYLET }; + +struct RayClient { + /// Plasma store client. + plasma::PlasmaClient store_client_; + + /// Mutex to protect store_client_. + std::mutex store_client_mutex_; + + /// Raylet client. + std::unique_ptr raylet_client_; +}; + } // namespace ray #endif // RAY_CORE_WORKER_COMMON_H diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 033409196d9b..ec8ca5f50718 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -9,41 +9,42 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type, DriverID driver_id) : worker_type_(worker_type), language_(language), - worker_context_(worker_type, driver_id), store_socket_(store_socket), raylet_socket_(raylet_socket), - is_initialized_(false), + worker_context_(worker_type, driver_id), task_interface_(*this), object_interface_(*this), - task_execution_interface_(*this) { - switch (language_) { - case ray::WorkerLanguage::JAVA: - task_language_ = ::Language::JAVA; - break; - case ray::WorkerLanguage::PYTHON: - task_language_ = ::Language::PYTHON; - break; - default: - RAY_LOG(FATAL) << "Unsupported worker language: " << static_cast(language_); - break; - } -} + task_execution_interface_(*this) {} Status CoreWorker::Connect() { // connect to plasma. - RAY_ARROW_RETURN_NOT_OK(store_client_.Connect(store_socket_)); + RAY_ARROW_RETURN_NOT_OK(ray_client_.store_client_.Connect(store_socket_)); // connect to raylet. // TODO: currently RayletClient would crash in its constructor if it cannot // connect to Raylet after a number of retries, this needs to be changed // so that the worker (java/python .etc) can retrieve and handle the error // instead of crashing. - raylet_client_ = std::unique_ptr( + ray_client_.raylet_client_ = std::unique_ptr( new RayletClient(raylet_socket_, worker_context_.GetWorkerID(), - (worker_type_ == ray::WorkerType::WORKER), - worker_context_.GetCurrentDriverID(), task_language_)); - is_initialized_ = true; + (worker_type_ == ray::WorkerType::WORKER), worker_context_.GetCurrentDriverID(), + ToTaskLanguage(language_))); + return Status::OK(); } +::Language CoreWorker::ToTaskLanguage(WorkerLanguage language) { + switch (language) { + case ray::WorkerLanguage::JAVA: + return ::Language::JAVA; + break; + case ray::WorkerLanguage::PYTHON: + return ::Language::PYTHON; + break; + default: + RAY_LOG(FATAL) << "invalid language specified: " << static_cast(language); + break; + } +} + } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c038b76ce53f..c7d4b12bd7c5 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -46,32 +46,29 @@ class CoreWorker { CoreWorkerTaskExecutionInterface &Execution() { return task_execution_interface_; } private: + /// Translate from WorkLanguage to Language type (required by raylet client). + /// + /// \param[in] language Language for a task. + /// \return Translated task language. + ::Language ToTaskLanguage(WorkerLanguage language); + /// Type of this worker. const enum WorkerType worker_type_; /// Language of this worker. const enum WorkerLanguage language_; - /// Language of this worker as specified in flatbuf (used by task spec). - ::Language task_language_; - - /// Worker context per thread. - WorkerContext worker_context_; - /// Plasma store socket name. - std::string store_socket_; + const std::string store_socket_; /// raylet socket name. - std::string raylet_socket_; - - /// Plasma store client. - plasma::PlasmaClient store_client_; + const std::string raylet_socket_; - /// Mutex to protect store_client_. - std::mutex store_client_mutex_; + /// Worker context. + WorkerContext worker_context_; - /// Raylet client. - std::unique_ptr raylet_client_; + /// Ray client (this includes store client, raylet client and potentially gcs client later). + RayClient ray_client_; /// Whether this worker has been initialized. bool is_initialized_; diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 5ab5d33330d7..43d3e1c80748 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -2,11 +2,17 @@ #include "ray/common/ray_config.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" +#include "ray/core_worker/store_provider/plasma_store_provider.h" namespace ray { CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker) - : core_worker_(core_worker) {} + : core_worker_(core_worker) { + store_providers_.emplace( + static_cast(StoreProviderType::PLASMA), + std::unique_ptr(new CoreWorkerPlasmaStoreProvider( + core_worker_.ray_client_))); +} Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) { ObjectID put_id = ObjectID::ForPut(core_worker_.worker_context_.GetCurrentTaskID(), @@ -16,127 +22,31 @@ Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) } Status CoreWorkerObjectInterface::Put(const Buffer &buffer, const ObjectID &object_id) { - auto plasma_id = object_id.ToPlasmaId(); - std::shared_ptr data; - { - std::unique_lock guard(core_worker_.store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK( - core_worker_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); - } - - memcpy(data->mutable_data(), buffer.Data(), buffer.Size()); - - { - std::unique_lock guard(core_worker_.store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Seal(plasma_id)); - RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Release(plasma_id)); - } - return Status::OK(); + auto type = static_cast(StoreProviderType::PLASMA); + return store_providers_[type]->Put(buffer, object_id); } Status CoreWorkerObjectInterface::Get(const std::vector &ids, int64_t timeout_ms, std::vector> *results) { - (*results).resize(ids.size(), nullptr); - - bool was_blocked = false; - - std::unordered_map unready; - for (size_t i = 0; i < ids.size(); i++) { - unready.insert({ids[i], i}); - } - - int num_attempts = 0; - bool should_break = false; - int64_t remaining_timeout = timeout_ms; - // Repeat until we get all objects. - while (!unready.empty() && !should_break) { - std::vector unready_ids; - for (const auto &entry : unready) { - unready_ids.push_back(entry.first); - } - - // For the initial fetch, we only fetch the objects, do not reconstruct them. - bool fetch_only = num_attempts == 0; - if (!fetch_only) { - // If fetch_only is false, this worker will be blocked. - was_blocked = true; - } - - // TODO: can call `fetchOrReconstruct` in batches as an optimization. - RAY_CHECK_OK(core_worker_.raylet_client_->FetchOrReconstruct( - unready_ids, fetch_only, core_worker_.worker_context_.GetCurrentTaskID())); - - // Get the objects from the object store, and parse the result. - int64_t get_timeout; - if (remaining_timeout >= 0) { - get_timeout = - std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds()); - remaining_timeout -= get_timeout; - should_break = remaining_timeout <= 0; - } else { - get_timeout = RayConfig::instance().get_timeout_milliseconds(); - } - - std::vector plasma_ids; - for (const auto &id : unready_ids) { - plasma_ids.push_back(id.ToPlasmaId()); - } - - std::vector object_buffers; - { - std::unique_lock guard(core_worker_.store_client_mutex_); - auto status = - core_worker_.store_client_.Get(plasma_ids, get_timeout, &object_buffers); - } - - for (size_t i = 0; i < object_buffers.size(); i++) { - if (object_buffers[i].data != nullptr) { - const auto &object_id = unready_ids[i]; - (*results)[unready[object_id]] = - std::make_shared(object_buffers[i].data); - unready.erase(object_id); - } - } - - num_attempts += 1; - // TODO: log a message if attempted too many times. - } - - if (was_blocked) { - RAY_CHECK_OK(core_worker_.raylet_client_->NotifyUnblocked( - core_worker_.worker_context_.GetCurrentTaskID())); - } - - return Status::OK(); + auto type = static_cast(StoreProviderType::PLASMA); + return store_providers_[type]->Get( + ids, timeout_ms, core_worker_.worker_context_.GetCurrentTaskID(), results); } Status CoreWorkerObjectInterface::Wait(const std::vector &object_ids, int num_objects, int64_t timeout_ms, std::vector *results) { - WaitResultPair result_pair; - auto status = core_worker_.raylet_client_->Wait( - object_ids, num_objects, timeout_ms, false, - core_worker_.worker_context_.GetCurrentTaskID(), &result_pair); - std::unordered_set ready_ids; - for (const auto &entry : result_pair.first) { - ready_ids.insert(entry); - } - - // TODO: change RayletClient::Wait() to return a bit set, so that we don't need - // to do this translation. - (*results).resize(object_ids.size()); - for (size_t i = 0; i < object_ids.size(); i++) { - (*results)[i] = ready_ids.count(object_ids[i]) > 0; - } - - return status; + auto type = static_cast(StoreProviderType::PLASMA); + return store_providers_[type]->Wait(object_ids, num_objects, timeout_ms, + core_worker_.worker_context_.GetCurrentTaskID(), + results); } Status CoreWorkerObjectInterface::Delete(const std::vector &object_ids, bool local_only, bool delete_creating_tasks) { - return core_worker_.raylet_client_->FreeObjects(object_ids, local_only, - delete_creating_tasks); + auto type = static_cast(StoreProviderType::PLASMA); + return store_providers_[type]->Delete(object_ids, local_only, delete_creating_tasks); } } // namespace ray diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index 431b3f825ac9..3bd36b8239eb 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -6,10 +6,12 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" +#include "ray/core_worker/store_provider/store_provider.h" namespace ray { class CoreWorker; +class CoreWorkerStoreProvider; /// The interface that contains all `CoreWorker` methods that are related to object store. class CoreWorkerObjectInterface { @@ -62,6 +64,10 @@ class CoreWorkerObjectInterface { private: /// Reference to the parent CoreWorker instance. CoreWorker &core_worker_; + + /// All the store providers supported. + std::unordered_map> + store_providers_; }; } // namespace ray diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc new file mode 100644 index 000000000000..5134515c0fb5 --- /dev/null +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -0,0 +1,135 @@ +#include "ray/core_worker/store_provider/plasma_store_provider.h" +#include "ray/common/ray_config.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" +#include "ray/core_worker/object_interface.h" + +namespace ray { + +CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( + RayClient &ray_client) + : ray_client_(ray_client) {} + +Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer, + const ObjectID &object_id) { + auto plasma_id = object_id.ToPlasmaId(); + std::shared_ptr data; + { + std::unique_lock guard(ray_client_.store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK( + ray_client_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); + } + + memcpy(data->mutable_data(), buffer.Data(), buffer.Size()); + + { + std::unique_lock guard(ray_client_.store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK(ray_client_.store_client_.Seal(plasma_id)); + RAY_ARROW_RETURN_NOT_OK(ray_client_.store_client_.Release(plasma_id)); + } + return Status::OK(); +} + +Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, + int64_t timeout_ms, const TaskID &task_id, + std::vector> *results) { + (*results).resize(ids.size(), nullptr); + + bool was_blocked = false; + + std::unordered_map unready; + for (size_t i = 0; i < ids.size(); i++) { + unready.insert({ids[i], i}); + } + + int num_attempts = 0; + bool should_break = false; + int64_t remaining_timeout = timeout_ms; + // Repeat until we get all objects. + while (!unready.empty() && !should_break) { + std::vector unready_ids; + for (const auto &entry : unready) { + unready_ids.push_back(entry.first); + } + + // For the initial fetch, we only fetch the objects, do not reconstruct them. + bool fetch_only = num_attempts == 0; + if (!fetch_only) { + // If fetch_only is false, this worker will be blocked. + was_blocked = true; + } + + // TODO: can call `fetchOrReconstruct` in batches as an optimization. + RAY_CHECK_OK(ray_client_.raylet_client_->FetchOrReconstruct(unready_ids, fetch_only, task_id)); + + // Get the objects from the object store, and parse the result. + int64_t get_timeout; + if (remaining_timeout >= 0) { + get_timeout = + std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds()); + remaining_timeout -= get_timeout; + should_break = remaining_timeout <= 0; + } else { + get_timeout = RayConfig::instance().get_timeout_milliseconds(); + } + + std::vector plasma_ids; + for (const auto &id : unready_ids) { + plasma_ids.push_back(id.ToPlasmaId()); + } + + std::vector object_buffers; + { + std::unique_lock guard(ray_client_.store_client_mutex_); + auto status = ray_client_.store_client_.Get(plasma_ids, get_timeout, &object_buffers); + } + + for (size_t i = 0; i < object_buffers.size(); i++) { + if (object_buffers[i].data != nullptr) { + const auto &object_id = unready_ids[i]; + (*results)[unready[object_id]] = + std::make_shared(object_buffers[i].data); + unready.erase(object_id); + } + } + + num_attempts += 1; + // TODO: log a message if attempted too many times. + } + + if (was_blocked) { + RAY_CHECK_OK(ray_client_.raylet_client_->NotifyUnblocked(task_id)); + } + + return Status::OK(); +} + +Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_ids, + int num_objects, int64_t timeout_ms, + const TaskID &task_id, + std::vector *results) { + WaitResultPair result_pair; + auto status = ray_client_.raylet_client_->Wait(object_ids, num_objects, timeout_ms, false, task_id, + &result_pair); + std::unordered_set ready_ids; + for (const auto &entry : result_pair.first) { + ready_ids.insert(entry); + } + + // TODO: change RayletClient::Wait() to return a bit set, so that we don't need + // to do this translation. + (*results).resize(object_ids.size()); + for (size_t i = 0; i < object_ids.size(); i++) { + (*results)[i] = ready_ids.count(object_ids[i]) > 0; + } + + return status; +} + +Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object_ids, + bool local_only, + bool delete_creating_tasks) { + return ray_client_.raylet_client_->FreeObjects(object_ids, local_only, delete_creating_tasks); +} + +} // namespace ray \ No newline at end of file diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h new file mode 100644 index 000000000000..01a1c03b8629 --- /dev/null +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -0,0 +1,67 @@ +#ifndef RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H +#define RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H + +#include "plasma/client.h" +#include "ray/common/buffer.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/core_worker/common.h" +#include "ray/core_worker/store_provider/store_provider.h" +#include "ray/raylet/raylet_client.h" + +namespace ray { + +class CoreWorker; + +/// The interface that contains all `CoreWorker` methods that are related to object store. +class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { + public: + CoreWorkerPlasmaStoreProvider(RayClient &ray_client); + + /// Put an object with specified ID into object store. + /// + /// \param[in] buffer Data buffer of the object. + /// \param[in] object_id Object ID specified by user. + /// \return Status. + Status Put(const Buffer &buffer, const ObjectID &object_id) override; + + /// Get a list of objects from the object store. + /// + /// \param[in] ids IDs of the objects to get. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[in] task_id ID for the current task. + /// \param[out] results Result list of objects data. + /// \return Status. + Status Get(const std::vector &ids, int64_t timeout_ms, const TaskID &task_id, + std::vector> *results) override; + + /// Wait for a list of objects to appear in the object store. + /// + /// \param[in] IDs of the objects to wait for. + /// \param[in] num_returns Number of objects that should appear. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[in] task_id ID for the current task. + /// \param[out] results A bitset that indicates each object has appeared or not. + /// \return Status. + Status Wait(const std::vector &object_ids, int num_objects, + int64_t timeout_ms, const TaskID &task_id, + std::vector *results) override; + + /// Delete a list of objects from the object store. + /// + /// \param[in] object_ids IDs of the objects to delete. + /// \param[in] local_only Whether only delete the objects in local node, or all nodes in + /// the cluster. + /// \param[in] delete_creating_tasks Whether also delete the tasks that + /// created these objects. \return Status. + Status Delete(const std::vector &object_ids, bool local_only, + bool delete_creating_tasks) override; + + private: + /// Ray client. + RayClient &ray_client_; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H \ No newline at end of file diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h new file mode 100644 index 000000000000..379733862194 --- /dev/null +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -0,0 +1,60 @@ +#ifndef RAY_CORE_WORKER_STORE_PROVIDER_H +#define RAY_CORE_WORKER_STORE_PROVIDER_H + +#include "ray/common/buffer.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/core_worker/common.h" + +namespace ray { + +class CoreWorkerStoreProvider { + public: + CoreWorkerStoreProvider() {} + + virtual ~CoreWorkerStoreProvider() {} + + /// Put an object with specified ID into object store. + /// + /// \param[in] buffer Data buffer of the object. + /// \param[in] object_id Object ID specified by user. + /// \return Status. + virtual Status Put(const Buffer &buffer, const ObjectID &object_id) = 0; + + /// Get a list of objects from the object store. + /// + /// \param[in] ids IDs of the objects to get. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[in] task_id ID for the current task. + /// \param[out] results Result list of objects data. + /// \return Status. + virtual Status Get(const std::vector &ids, int64_t timeout_ms, + const TaskID &task_id, + std::vector> *results) = 0; + + /// Wait for a list of objects to appear in the object store. + /// + /// \param[in] IDs of the objects to wait for. + /// \param[in] num_returns Number of objects that should appear. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[in] task_id ID for the current task. + /// \param[out] results A bitset that indicates each object has appeared or not. + /// \return Status. + virtual Status Wait(const std::vector &object_ids, int num_objects, + int64_t timeout_ms, const TaskID &task_id, + std::vector *results) = 0; + + /// Delete a list of objects from the object store. + /// + /// \param[in] object_ids IDs of the objects to delete. + /// \param[in] local_only Whether only delete the objects in local node, or all nodes in + /// the cluster. + /// \param[in] delete_creating_tasks Whether also delete the tasks that + /// created these objects. \return Status. + virtual Status Delete(const std::vector &object_ids, bool local_only, + bool delete_creating_tasks) = 0; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_STORE_PROVIDER_H \ No newline at end of file diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index fc22fce96c97..9c8dfcd26763 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -1,43 +1,53 @@ #include "ray/core_worker/task_execution.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" +#include "ray/core_worker/task_provider/raylet_task_provider.h" namespace ray { -Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { - RAY_CHECK(core_worker_.is_initialized_); +CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface(CoreWorker &core_worker) + : core_worker_(core_worker) { + task_execution_providers_.emplace( + static_cast(TaskProviderType::RAYLET), + std::unique_ptr(new CoreWorkerRayletTaskExecutionProvider( + core_worker_.ray_client_))); +} +Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { while (true) { - std::unique_ptr task_spec; - auto status = core_worker_.raylet_client_->GetTask(&task_spec); + std::vector tasks; + auto status = task_execution_providers_[static_cast(TaskProviderType::RAYLET)]->GetTasks(&tasks); if (!status.ok()) { RAY_LOG(ERROR) << "Get task failed with error: " - << ray::Status::IOError(status.message()); + << ray::Status::IOError(status.message()); return status; } - const auto &spec = *task_spec; - core_worker_.worker_context_.SetCurrentTask(spec); + for (const auto &task : tasks) { - WorkerLanguage language = (spec.GetLanguage() == ::Language::JAVA) - ? WorkerLanguage::JAVA - : WorkerLanguage::PYTHON; - RayFunction func{language, spec.FunctionDescriptor()}; + const auto &spec = task.GetTaskSpecification(); + core_worker_.worker_context_.SetCurrentTask(spec); - std::vector> args; - RAY_CHECK_OK(BuildArgsForExecutor(spec, &args)); + WorkerLanguage language = (spec.GetLanguage() == ::Language::JAVA) + ? WorkerLanguage::JAVA + : WorkerLanguage::PYTHON; + RayFunction func{language, spec.FunctionDescriptor()}; - auto num_returns = spec.NumReturns(); - if (spec.IsActorCreationTask() || spec.IsActorTask()) { - RAY_CHECK(num_returns > 0); - // Decrease to account for the dummy object id. - num_returns--; - } + std::vector> args; + RAY_CHECK_OK(BuildArgsForExecutor(spec, &args)); + + auto num_returns = spec.NumReturns(); + if (spec.IsActorCreationTask() || spec.IsActorTask()) { + RAY_CHECK(num_returns > 0); + // Decrease to account for the dummy object id. + num_returns--; + } - status = executor(func, args, spec.TaskId(), num_returns); - // TODO: - // 1. Check and handle failure. - // 2. Save or load checkpoint. + status = executor(func, args, spec.TaskId(), num_returns); + // TODO: + // 1. Check and handle failure. + // 2. Save or load checkpoint. + } } // should never reach here. diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index e2fe2148a3ab..dff2c973b5e1 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -4,6 +4,7 @@ #include "ray/common/buffer.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" +#include "ray/core_worker/task_provider/task_provider.h" namespace ray { @@ -17,8 +18,7 @@ class TaskSpecification; /// execution. class CoreWorkerTaskExecutionInterface { public: - CoreWorkerTaskExecutionInterface(CoreWorker &core_worker) : core_worker_(core_worker) {} - + CoreWorkerTaskExecutionInterface(CoreWorker &core_worker); /// The callback provided app-language workers that executes tasks. /// /// \param ray_function[in] Information about the function to execute. @@ -46,6 +46,9 @@ class CoreWorkerTaskExecutionInterface { /// Reference to the parent CoreWorker instance. CoreWorker &core_worker_; + + /// All the task submission providers supported. + std::unordered_map> task_execution_providers_; }; } // namespace ray diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index c19b1e23a7f9..cd5d62c2b8aa 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -2,9 +2,18 @@ #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" #include "ray/core_worker/task_interface.h" +#include "ray/core_worker/task_provider/raylet_task_provider.h" namespace ray { +CoreWorkerTaskInterface::CoreWorkerTaskInterface(CoreWorker &core_worker) + : core_worker_(core_worker) { + task_submission_providers_.emplace( + static_cast(TaskProviderType::RAYLET), + std::unique_ptr(new CoreWorkerRayletTaskSubmissionProvider( + core_worker_.ray_client_))); +} + Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, const std::vector &args, const TaskOptions &task_options, @@ -21,7 +30,7 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, } auto task_arguments = BuildTaskArguments(args); - auto language = ToTaskLanguage(function.language); + auto language = core_worker_.ToTaskLanguage(function.language); ray::raylet::TaskSpecification spec(context.GetCurrentDriverID(), context.GetCurrentTaskID(), next_task_index, @@ -29,7 +38,8 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, language, function.function_descriptor); std::vector execution_dependencies; - return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); + TaskSpec task(std::move(spec), execution_dependencies); + return task_submission_providers_[static_cast(TaskProviderType::RAYLET)]->SubmitTask(task); } Status CoreWorkerTaskInterface::CreateActor( @@ -51,7 +61,7 @@ Status CoreWorkerTaskInterface::CreateActor( (*actor_handle)->SetActorCursor(return_ids[0]); auto task_arguments = BuildTaskArguments(args); - auto language = ToTaskLanguage(function.language); + auto language = core_worker_.ToTaskLanguage(function.language); // Note that the caller is supposed to specify required placement resources // correctly via actor_creation_options.resources. @@ -63,7 +73,8 @@ Status CoreWorkerTaskInterface::CreateActor( function.function_descriptor); std::vector execution_dependencies; - return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); + TaskSpec task(std::move(spec), execution_dependencies); + return task_submission_providers_[static_cast(TaskProviderType::RAYLET)]->SubmitTask(task); } Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, @@ -87,7 +98,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, ObjectID::FromBinary(actor_handle.ActorID().Binary()); auto task_arguments = BuildTaskArguments(args); - auto language = ToTaskLanguage(function.language); + auto language = core_worker_.ToTaskLanguage(function.language); std::vector new_actor_handles; ray::raylet::TaskSpecification spec( @@ -104,7 +115,8 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, actor_handle.SetActorCursor(actor_cursor); actor_handle.ClearNewActorHandles(); - auto status = core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); + TaskSpec task(std::move(spec), execution_dependencies); + auto status = task_submission_providers_[static_cast(TaskProviderType::RAYLET)]->SubmitTask(task); // remove cursor from return ids. (*return_ids).pop_back(); @@ -128,18 +140,4 @@ CoreWorkerTaskInterface::BuildTaskArguments(const std::vector &args) { return task_arguments; } -::Language CoreWorkerTaskInterface::ToTaskLanguage(WorkerLanguage language) { - switch (language) { - case ray::WorkerLanguage::JAVA: - return ::Language::JAVA; - break; - case ray::WorkerLanguage::PYTHON: - return ::Language::PYTHON; - break; - default: - RAY_LOG(FATAL) << "invalid language specified: " << static_cast(language); - break; - } -} - } // namespace ray diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 06bd5409a8dd..2177c270a53a 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -7,6 +7,7 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" +#include "ray/core_worker/task_provider/task_provider.h" namespace ray { @@ -88,7 +89,7 @@ class ActorHandle { /// submission. class CoreWorkerTaskInterface { public: - CoreWorkerTaskInterface(CoreWorker &core_worker) : core_worker_(core_worker) {} + CoreWorkerTaskInterface(CoreWorker &core_worker); /// Submit a normal task. /// @@ -136,11 +137,8 @@ class CoreWorkerTaskInterface { std::vector> BuildTaskArguments( const std::vector &args); - /// Translate from WorkLanguage to Language type (required by taks spec). - /// - /// \param[in] language Language for a task. - /// \return Translated task language. - ::Language ToTaskLanguage(WorkerLanguage language); + /// All the task submission providers supported. + std::unordered_map> task_submission_providers_; }; } // namespace ray diff --git a/src/ray/core_worker/task_provider/raylet_task_provider.cc b/src/ray/core_worker/task_provider/raylet_task_provider.cc new file mode 100644 index 000000000000..dc79d37121d2 --- /dev/null +++ b/src/ray/core_worker/task_provider/raylet_task_provider.cc @@ -0,0 +1,35 @@ + +#include "ray/core_worker/task_provider/raylet_task_provider.h" + +namespace ray { + +CoreWorkerRayletTaskSubmissionProvider::CoreWorkerRayletTaskSubmissionProvider( + RayClient &ray_client) + : ray_client_(ray_client) {} + +Status CoreWorkerRayletTaskSubmissionProvider::SubmitTask(const TaskSpec &task) { + + return ray_client_.raylet_client_->SubmitTask(task.GetDependencies(), task.GetTaskSpecification()); +} + +CoreWorkerRayletTaskExecutionProvider::CoreWorkerRayletTaskExecutionProvider( + RayClient &ray_client) + : ray_client_(ray_client) {} + +Status CoreWorkerRayletTaskExecutionProvider::GetTasks(std::vector *tasks) { + std::unique_ptr task_spec; + auto status = ray_client_.raylet_client_->GetTask(&task_spec); + if (!status.ok()) { + RAY_LOG(ERROR) << "Get task from raylet failed with error: " + << ray::Status::IOError(status.message()); + return status; + } + + std::vector dependencies; + (*tasks).clear(); + (*tasks).emplace_back(*task_spec, dependencies); + + return Status::OK(); +} + +} // namespace ray diff --git a/src/ray/core_worker/task_provider/raylet_task_provider.h b/src/ray/core_worker/task_provider/raylet_task_provider.h new file mode 100644 index 000000000000..d0cc7877a24d --- /dev/null +++ b/src/ray/core_worker/task_provider/raylet_task_provider.h @@ -0,0 +1,42 @@ +#ifndef RAY_CORE_WORKER_RAYLET_TASK_PROVIDER_H +#define RAY_CORE_WORKER_RAYLET_TASK_PROVIDER_H + +#include + +#include "ray/core_worker/task_provider/task_provider.h" +#include "ray/raylet/raylet_client.h" + +namespace ray { + +/// In raylet task submission and execution provider, a task is submitted to raylet, +/// and possibly gets forwarded to another raylet on which node the task should be +/// executed, and then a worker on that node gets this task and starts executing it. + +class CoreWorkerRayletTaskSubmissionProvider : public CoreWorkerTaskSubmissionProvider { + public: + CoreWorkerRayletTaskSubmissionProvider(RayClient &ray_client); + + /// Submit a task for execution to raylet. + /// + /// \param[in] task The task spec to submit. + /// \return Status. + virtual Status SubmitTask(const TaskSpec &task) override; + private: + /// ray client. + RayClient &ray_client_; +}; + +class CoreWorkerRayletTaskExecutionProvider : public CoreWorkerTaskExecutionProvider { + public: + CoreWorkerRayletTaskExecutionProvider(RayClient &ray_client); + + // Get tasks for execution from raylet. + virtual Status GetTasks(std::vector *tasks) override; + private: + /// ray client. + RayClient &ray_client_; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_RAYLET_TASK_PROVIDER_H \ No newline at end of file diff --git a/src/ray/core_worker/task_provider/task_provider.h b/src/ray/core_worker/task_provider/task_provider.h new file mode 100644 index 000000000000..db5091338867 --- /dev/null +++ b/src/ray/core_worker/task_provider/task_provider.h @@ -0,0 +1,36 @@ +#ifndef RAY_CORE_WORKER_TASK_PROVIDER_H +#define RAY_CORE_WORKER_TASK_PROVIDER_H + +#include + +#include "ray/common/buffer.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/core_worker/common.h" +#include "ray/raylet/task_spec.h" + +namespace ray { + + +class CoreWorkerTaskSubmissionProvider { + public: + CoreWorkerTaskSubmissionProvider() {} + + /// Submit a task for execution. + /// + /// \param[in] task The task spec to submit. + /// \return Status. + virtual Status SubmitTask(const TaskSpec &task) = 0; +}; + +class CoreWorkerTaskExecutionProvider { + public: + CoreWorkerTaskExecutionProvider() {} + + // Get tasks for execution. + virtual Status GetTasks(std::vector *tasks) = 0; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_TASK_PROVIDER_H \ No newline at end of file diff --git a/src/ray/test/run_core_worker_tests.sh b/src/ray/test/run_core_worker_tests.sh index 104b19ff19cb..7668b92ac272 100644 --- a/src/ray/test/run_core_worker_tests.sh +++ b/src/ray/test/run_core_worker_tests.sh @@ -43,6 +43,3 @@ sleep 1s bazel run //:redis-cli -- -p 6379 shutdown bazel run //:redis-cli -- -p 6380 shutdown sleep 1s - -# Include raylet integration test once it's ready. -# ./bazel-bin/object_manager_integration_test $STORE_EXEC From 4343a87ffc77b526d5a256fb2cfc9464857e03ec Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Wed, 12 Jun 2019 11:30:01 +0800 Subject: [PATCH 02/11] format --- src/ray/core_worker/common.h | 17 ++++++----------- src/ray/core_worker/core_worker.cc | 4 ++-- src/ray/core_worker/core_worker.h | 3 ++- src/ray/core_worker/object_interface.cc | 4 ++-- src/ray/core_worker/object_interface.h | 3 +-- .../store_provider/plasma_store_provider.cc | 16 +++++++++------- src/ray/core_worker/task_execution.cc | 14 ++++++++------ src/ray/core_worker/task_execution.h | 3 ++- src/ray/core_worker/task_interface.cc | 14 +++++++++----- src/ray/core_worker/task_interface.h | 3 ++- .../task_provider/raylet_task_provider.cc | 6 +++--- .../task_provider/raylet_task_provider.h | 4 +++- .../core_worker/task_provider/task_provider.h | 1 - 13 files changed, 49 insertions(+), 43 deletions(-) diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 0934e2982eba..c361171086b1 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -5,8 +5,8 @@ #include "ray/common/buffer.h" #include "ray/common/id.h" -#include "ray/raylet/task_spec.h" #include "ray/raylet/raylet_client.h" +#include "ray/raylet/task_spec.h" namespace ray { @@ -74,21 +74,16 @@ class TaskSpec { public: TaskSpec(const raylet::TaskSpecification &task_spec, const std::vector &dependencies) - : task_spec_(task_spec), - dependencies_(dependencies) {} + : task_spec_(task_spec), dependencies_(dependencies) {} TaskSpec(const raylet::TaskSpecification &&task_spec, const std::vector &&dependencies) - : task_spec_(task_spec), - dependencies_(dependencies) {} + : task_spec_(task_spec), dependencies_(dependencies) {} - const raylet::TaskSpecification &GetTaskSpecification() const { - return task_spec_; - } + const raylet::TaskSpecification &GetTaskSpecification() const { return task_spec_; } + + const std::vector &GetDependencies() const { return dependencies_; } - const std::vector &GetDependencies() const { - return dependencies_; - } private: /// Raylet task specification. raylet::TaskSpecification task_spec_; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ec8ca5f50718..7e10d0f65dec 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -27,8 +27,8 @@ Status CoreWorker::Connect() { // instead of crashing. ray_client_.raylet_client_ = std::unique_ptr( new RayletClient(raylet_socket_, worker_context_.GetWorkerID(), - (worker_type_ == ray::WorkerType::WORKER), worker_context_.GetCurrentDriverID(), - ToTaskLanguage(language_))); + (worker_type_ == ray::WorkerType::WORKER), + worker_context_.GetCurrentDriverID(), ToTaskLanguage(language_))); return Status::OK(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c7d4b12bd7c5..499e38d59670 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -67,7 +67,8 @@ class CoreWorker { /// Worker context. WorkerContext worker_context_; - /// Ray client (this includes store client, raylet client and potentially gcs client later). + /// Ray client (this includes store client, raylet client and potentially gcs client + /// later). RayClient ray_client_; /// Whether this worker has been initialized. diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 43d3e1c80748..866cb70dfbe1 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -10,8 +10,8 @@ CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker) : core_worker_(core_worker) { store_providers_.emplace( static_cast(StoreProviderType::PLASMA), - std::unique_ptr(new CoreWorkerPlasmaStoreProvider( - core_worker_.ray_client_))); + std::unique_ptr( + new CoreWorkerPlasmaStoreProvider(core_worker_.ray_client_))); } Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) { diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index 3bd36b8239eb..35403675f164 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -66,8 +66,7 @@ class CoreWorkerObjectInterface { CoreWorker &core_worker_; /// All the store providers supported. - std::unordered_map> - store_providers_; + std::unordered_map> store_providers_; }; } // namespace ray diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 5134515c0fb5..1a6ca03de8f8 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -6,8 +6,7 @@ namespace ray { -CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( - RayClient &ray_client) +CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(RayClient &ray_client) : ray_client_(ray_client) {} Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer, @@ -60,7 +59,8 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, } // TODO: can call `fetchOrReconstruct` in batches as an optimization. - RAY_CHECK_OK(ray_client_.raylet_client_->FetchOrReconstruct(unready_ids, fetch_only, task_id)); + RAY_CHECK_OK( + ray_client_.raylet_client_->FetchOrReconstruct(unready_ids, fetch_only, task_id)); // Get the objects from the object store, and parse the result. int64_t get_timeout; @@ -81,7 +81,8 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, std::vector object_buffers; { std::unique_lock guard(ray_client_.store_client_mutex_); - auto status = ray_client_.store_client_.Get(plasma_ids, get_timeout, &object_buffers); + auto status = + ray_client_.store_client_.Get(plasma_ids, get_timeout, &object_buffers); } for (size_t i = 0; i < object_buffers.size(); i++) { @@ -109,8 +110,8 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_i const TaskID &task_id, std::vector *results) { WaitResultPair result_pair; - auto status = ray_client_.raylet_client_->Wait(object_ids, num_objects, timeout_ms, false, task_id, - &result_pair); + auto status = ray_client_.raylet_client_->Wait(object_ids, num_objects, timeout_ms, + false, task_id, &result_pair); std::unordered_set ready_ids; for (const auto &entry : result_pair.first) { ready_ids.insert(entry); @@ -129,7 +130,8 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_i Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object_ids, bool local_only, bool delete_creating_tasks) { - return ray_client_.raylet_client_->FreeObjects(object_ids, local_only, delete_creating_tasks); + return ray_client_.raylet_client_->FreeObjects(object_ids, local_only, + delete_creating_tasks); } } // namespace ray \ No newline at end of file diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index 9c8dfcd26763..78dc3d770ad4 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -5,26 +5,28 @@ namespace ray { -CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface(CoreWorker &core_worker) +CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( + CoreWorker &core_worker) : core_worker_(core_worker) { task_execution_providers_.emplace( static_cast(TaskProviderType::RAYLET), - std::unique_ptr(new CoreWorkerRayletTaskExecutionProvider( - core_worker_.ray_client_))); + std::unique_ptr( + new CoreWorkerRayletTaskExecutionProvider(core_worker_.ray_client_))); } Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { while (true) { std::vector tasks; - auto status = task_execution_providers_[static_cast(TaskProviderType::RAYLET)]->GetTasks(&tasks); + auto status = + task_execution_providers_[static_cast(TaskProviderType::RAYLET)]->GetTasks( + &tasks); if (!status.ok()) { RAY_LOG(ERROR) << "Get task failed with error: " - << ray::Status::IOError(status.message()); + << ray::Status::IOError(status.message()); return status; } for (const auto &task : tasks) { - const auto &spec = task.GetTaskSpecification(); core_worker_.worker_context_.SetCurrentTask(spec); diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index dff2c973b5e1..45681c443575 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -48,7 +48,8 @@ class CoreWorkerTaskExecutionInterface { CoreWorker &core_worker_; /// All the task submission providers supported. - std::unordered_map> task_execution_providers_; + std::unordered_map> + task_execution_providers_; }; } // namespace ray diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index cd5d62c2b8aa..74bb1abd6c54 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -10,8 +10,8 @@ CoreWorkerTaskInterface::CoreWorkerTaskInterface(CoreWorker &core_worker) : core_worker_(core_worker) { task_submission_providers_.emplace( static_cast(TaskProviderType::RAYLET), - std::unique_ptr(new CoreWorkerRayletTaskSubmissionProvider( - core_worker_.ray_client_))); + std::unique_ptr( + new CoreWorkerRayletTaskSubmissionProvider(core_worker_.ray_client_))); } Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, @@ -39,7 +39,8 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, std::vector execution_dependencies; TaskSpec task(std::move(spec), execution_dependencies); - return task_submission_providers_[static_cast(TaskProviderType::RAYLET)]->SubmitTask(task); + return task_submission_providers_[static_cast(TaskProviderType::RAYLET)] + ->SubmitTask(task); } Status CoreWorkerTaskInterface::CreateActor( @@ -74,7 +75,8 @@ Status CoreWorkerTaskInterface::CreateActor( std::vector execution_dependencies; TaskSpec task(std::move(spec), execution_dependencies); - return task_submission_providers_[static_cast(TaskProviderType::RAYLET)]->SubmitTask(task); + return task_submission_providers_[static_cast(TaskProviderType::RAYLET)] + ->SubmitTask(task); } Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, @@ -116,7 +118,9 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, actor_handle.ClearNewActorHandles(); TaskSpec task(std::move(spec), execution_dependencies); - auto status = task_submission_providers_[static_cast(TaskProviderType::RAYLET)]->SubmitTask(task); + auto status = + task_submission_providers_[static_cast(TaskProviderType::RAYLET)]->SubmitTask( + task); // remove cursor from return ids. (*return_ids).pop_back(); diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 2177c270a53a..e02726b1b476 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -138,7 +138,8 @@ class CoreWorkerTaskInterface { const std::vector &args); /// All the task submission providers supported. - std::unordered_map> task_submission_providers_; + std::unordered_map> + task_submission_providers_; }; } // namespace ray diff --git a/src/ray/core_worker/task_provider/raylet_task_provider.cc b/src/ray/core_worker/task_provider/raylet_task_provider.cc index dc79d37121d2..1f053ae1a641 100644 --- a/src/ray/core_worker/task_provider/raylet_task_provider.cc +++ b/src/ray/core_worker/task_provider/raylet_task_provider.cc @@ -8,8 +8,8 @@ CoreWorkerRayletTaskSubmissionProvider::CoreWorkerRayletTaskSubmissionProvider( : ray_client_(ray_client) {} Status CoreWorkerRayletTaskSubmissionProvider::SubmitTask(const TaskSpec &task) { - - return ray_client_.raylet_client_->SubmitTask(task.GetDependencies(), task.GetTaskSpecification()); + return ray_client_.raylet_client_->SubmitTask(task.GetDependencies(), + task.GetTaskSpecification()); } CoreWorkerRayletTaskExecutionProvider::CoreWorkerRayletTaskExecutionProvider( @@ -21,7 +21,7 @@ Status CoreWorkerRayletTaskExecutionProvider::GetTasks(std::vector *ta auto status = ray_client_.raylet_client_->GetTask(&task_spec); if (!status.ok()) { RAY_LOG(ERROR) << "Get task from raylet failed with error: " - << ray::Status::IOError(status.message()); + << ray::Status::IOError(status.message()); return status; } diff --git a/src/ray/core_worker/task_provider/raylet_task_provider.h b/src/ray/core_worker/task_provider/raylet_task_provider.h index d0cc7877a24d..98aa3de36f6b 100644 --- a/src/ray/core_worker/task_provider/raylet_task_provider.h +++ b/src/ray/core_worker/task_provider/raylet_task_provider.h @@ -21,6 +21,7 @@ class CoreWorkerRayletTaskSubmissionProvider : public CoreWorkerTaskSubmissionPr /// \param[in] task The task spec to submit. /// \return Status. virtual Status SubmitTask(const TaskSpec &task) override; + private: /// ray client. RayClient &ray_client_; @@ -32,9 +33,10 @@ class CoreWorkerRayletTaskExecutionProvider : public CoreWorkerTaskExecutionProv // Get tasks for execution from raylet. virtual Status GetTasks(std::vector *tasks) override; + private: /// ray client. - RayClient &ray_client_; + RayClient &ray_client_; }; } // namespace ray diff --git a/src/ray/core_worker/task_provider/task_provider.h b/src/ray/core_worker/task_provider/task_provider.h index db5091338867..c920b54e020b 100644 --- a/src/ray/core_worker/task_provider/task_provider.h +++ b/src/ray/core_worker/task_provider/task_provider.h @@ -11,7 +11,6 @@ namespace ray { - class CoreWorkerTaskSubmissionProvider { public: CoreWorkerTaskSubmissionProvider() {} From 66d8f9ecbd26d80aa9e75cbeb95a2d322e353e36 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Wed, 12 Jun 2019 11:44:04 +0800 Subject: [PATCH 03/11] comments and new line --- .../store_provider/plasma_store_provider.cc | 2 +- .../store_provider/plasma_store_provider.h | 2 +- src/ray/core_worker/store_provider/store_provider.h | 6 +++++- .../core_worker/task_provider/raylet_task_provider.h | 2 +- src/ray/core_worker/task_provider/task_provider.h | 11 ++++++++++- 5 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 1a6ca03de8f8..f9be2a89dcf0 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -134,4 +134,4 @@ Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object delete_creating_tasks); } -} // namespace ray \ No newline at end of file +} // namespace ray diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 01a1c03b8629..d26275cb02a2 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -64,4 +64,4 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { } // namespace ray -#endif // RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H \ No newline at end of file +#endif // RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index 379733862194..d473eccb11bb 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -8,6 +8,10 @@ namespace ray { +/// Provider interface for store access. Store provider should inherit from this class and +/// provide implementions for the methods. The actual store provider a plasma store or +/// just process local memory store, and possibly other types of storage. + class CoreWorkerStoreProvider { public: CoreWorkerStoreProvider() {} @@ -57,4 +61,4 @@ class CoreWorkerStoreProvider { } // namespace ray -#endif // RAY_CORE_WORKER_STORE_PROVIDER_H \ No newline at end of file +#endif // RAY_CORE_WORKER_STORE_PROVIDER_H diff --git a/src/ray/core_worker/task_provider/raylet_task_provider.h b/src/ray/core_worker/task_provider/raylet_task_provider.h index 98aa3de36f6b..db0cad2abc75 100644 --- a/src/ray/core_worker/task_provider/raylet_task_provider.h +++ b/src/ray/core_worker/task_provider/raylet_task_provider.h @@ -41,4 +41,4 @@ class CoreWorkerRayletTaskExecutionProvider : public CoreWorkerTaskExecutionProv } // namespace ray -#endif // RAY_CORE_WORKER_RAYLET_TASK_PROVIDER_H \ No newline at end of file +#endif // RAY_CORE_WORKER_RAYLET_TASK_PROVIDER_H diff --git a/src/ray/core_worker/task_provider/task_provider.h b/src/ray/core_worker/task_provider/task_provider.h index c920b54e020b..2c7a1bcc252a 100644 --- a/src/ray/core_worker/task_provider/task_provider.h +++ b/src/ray/core_worker/task_provider/task_provider.h @@ -11,6 +11,15 @@ namespace ray { +/// Provider interface for task submission and execution. They are separate classes +/// but should be used in pairs - one type of task submission provider should be used +/// together with task execution provider with the same type, so these classes are +/// put together in this same file. +/// +/// Task submission/execution provider should inherit from these classes and provide +/// implementions for the methods. The actual task provider can submit/get tasks via +/// raylet, or directly to/from another worker. + class CoreWorkerTaskSubmissionProvider { public: CoreWorkerTaskSubmissionProvider() {} @@ -32,4 +41,4 @@ class CoreWorkerTaskExecutionProvider { } // namespace ray -#endif // RAY_CORE_WORKER_TASK_PROVIDER_H \ No newline at end of file +#endif // RAY_CORE_WORKER_TASK_PROVIDER_H From ae85b9806c35fee55d0533a437aaf891ee92ad2a Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Wed, 12 Jun 2019 16:56:33 +0800 Subject: [PATCH 04/11] rename --- BUILD.bazel | 4 ++-- src/ray/core_worker/task_execution.cc | 6 +++--- src/ray/core_worker/task_execution.h | 4 ++-- src/ray/core_worker/task_interface.cc | 6 +++--- src/ray/core_worker/task_interface.h | 4 ++-- .../raylet_transport.cc} | 10 +++++----- .../raylet_transport.h} | 16 ++++++++-------- .../task_provider.h => transport/transport.h} | 16 ++++++++-------- 8 files changed, 33 insertions(+), 33 deletions(-) rename src/ray/core_worker/{task_provider/raylet_task_provider.cc => transport/raylet_transport.cc} (65%) rename src/ray/core_worker/{task_provider/raylet_task_provider.h => transport/raylet_transport.h} (60%) rename src/ray/core_worker/{task_provider/task_provider.h => transport/transport.h} (71%) diff --git a/BUILD.bazel b/BUILD.bazel index 3e919f8e3280..47e795011ab4 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -112,7 +112,7 @@ cc_library( [ "src/ray/core_worker/*.cc", "src/ray/core_worker/store_provider/*.cc", - "src/ray/core_worker/task_provider/*.cc", + "src/ray/core_worker/transport/*.cc", ], exclude = [ "src/ray/core_worker/*_test.cc", @@ -122,7 +122,7 @@ cc_library( hdrs = glob([ "src/ray/core_worker/*.h", "src/ray/core_worker/store_provider/*.h", - "src/ray/core_worker/task_provider/*.h", + "src/ray/core_worker/transport/*.h", ]), copts = COPTS, deps = [ diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index 78dc3d770ad4..503278e9d03b 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -1,7 +1,7 @@ #include "ray/core_worker/task_execution.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" -#include "ray/core_worker/task_provider/raylet_task_provider.h" +#include "ray/core_worker/transport/raylet_transport.h" namespace ray { @@ -10,8 +10,8 @@ CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( : core_worker_(core_worker) { task_execution_providers_.emplace( static_cast(TaskProviderType::RAYLET), - std::unique_ptr( - new CoreWorkerRayletTaskExecutionProvider(core_worker_.ray_client_))); + std::unique_ptr( + new CoreWorkerRayletTaskReceiver(core_worker_.ray_client_))); } Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index 45681c443575..40f45e29d3fb 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -4,7 +4,7 @@ #include "ray/common/buffer.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" -#include "ray/core_worker/task_provider/task_provider.h" +#include "ray/core_worker/transport/transport.h" namespace ray { @@ -48,7 +48,7 @@ class CoreWorkerTaskExecutionInterface { CoreWorker &core_worker_; /// All the task submission providers supported. - std::unordered_map> + std::unordered_map> task_execution_providers_; }; diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index 74bb1abd6c54..92e927ec27bc 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -2,7 +2,7 @@ #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" #include "ray/core_worker/task_interface.h" -#include "ray/core_worker/task_provider/raylet_task_provider.h" +#include "ray/core_worker/transport/raylet_transport.h" namespace ray { @@ -10,8 +10,8 @@ CoreWorkerTaskInterface::CoreWorkerTaskInterface(CoreWorker &core_worker) : core_worker_(core_worker) { task_submission_providers_.emplace( static_cast(TaskProviderType::RAYLET), - std::unique_ptr( - new CoreWorkerRayletTaskSubmissionProvider(core_worker_.ray_client_))); + std::unique_ptr( + new CoreWorkerRayletTaskSubmitter(core_worker_.ray_client_))); } Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index e02726b1b476..0a39060b3126 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -7,7 +7,7 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" -#include "ray/core_worker/task_provider/task_provider.h" +#include "ray/core_worker/transport/transport.h" namespace ray { @@ -138,7 +138,7 @@ class CoreWorkerTaskInterface { const std::vector &args); /// All the task submission providers supported. - std::unordered_map> + std::unordered_map> task_submission_providers_; }; diff --git a/src/ray/core_worker/task_provider/raylet_task_provider.cc b/src/ray/core_worker/transport/raylet_transport.cc similarity index 65% rename from src/ray/core_worker/task_provider/raylet_task_provider.cc rename to src/ray/core_worker/transport/raylet_transport.cc index 1f053ae1a641..ed78a41bf370 100644 --- a/src/ray/core_worker/task_provider/raylet_task_provider.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -1,22 +1,22 @@ -#include "ray/core_worker/task_provider/raylet_task_provider.h" +#include "ray/core_worker/transport/raylet_transport.h" namespace ray { -CoreWorkerRayletTaskSubmissionProvider::CoreWorkerRayletTaskSubmissionProvider( +CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter( RayClient &ray_client) : ray_client_(ray_client) {} -Status CoreWorkerRayletTaskSubmissionProvider::SubmitTask(const TaskSpec &task) { +Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) { return ray_client_.raylet_client_->SubmitTask(task.GetDependencies(), task.GetTaskSpecification()); } -CoreWorkerRayletTaskExecutionProvider::CoreWorkerRayletTaskExecutionProvider( +CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver( RayClient &ray_client) : ray_client_(ray_client) {} -Status CoreWorkerRayletTaskExecutionProvider::GetTasks(std::vector *tasks) { +Status CoreWorkerRayletTaskReceiver::GetTasks(std::vector *tasks) { std::unique_ptr task_spec; auto status = ray_client_.raylet_client_->GetTask(&task_spec); if (!status.ok()) { diff --git a/src/ray/core_worker/task_provider/raylet_task_provider.h b/src/ray/core_worker/transport/raylet_transport.h similarity index 60% rename from src/ray/core_worker/task_provider/raylet_task_provider.h rename to src/ray/core_worker/transport/raylet_transport.h index db0cad2abc75..6851136d7d4a 100644 --- a/src/ray/core_worker/task_provider/raylet_task_provider.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -1,9 +1,9 @@ -#ifndef RAY_CORE_WORKER_RAYLET_TASK_PROVIDER_H -#define RAY_CORE_WORKER_RAYLET_TASK_PROVIDER_H +#ifndef RAY_CORE_WORKER_RAYLET_TRANSPORT_H +#define RAY_CORE_WORKER_RAYLET_TRANSPORT_H #include -#include "ray/core_worker/task_provider/task_provider.h" +#include "ray/core_worker/transport/transport.h" #include "ray/raylet/raylet_client.h" namespace ray { @@ -12,9 +12,9 @@ namespace ray { /// and possibly gets forwarded to another raylet on which node the task should be /// executed, and then a worker on that node gets this task and starts executing it. -class CoreWorkerRayletTaskSubmissionProvider : public CoreWorkerTaskSubmissionProvider { +class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter { public: - CoreWorkerRayletTaskSubmissionProvider(RayClient &ray_client); + CoreWorkerRayletTaskSubmitter(RayClient &ray_client); /// Submit a task for execution to raylet. /// @@ -27,9 +27,9 @@ class CoreWorkerRayletTaskSubmissionProvider : public CoreWorkerTaskSubmissionPr RayClient &ray_client_; }; -class CoreWorkerRayletTaskExecutionProvider : public CoreWorkerTaskExecutionProvider { +class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver { public: - CoreWorkerRayletTaskExecutionProvider(RayClient &ray_client); + CoreWorkerRayletTaskReceiver(RayClient &ray_client); // Get tasks for execution from raylet. virtual Status GetTasks(std::vector *tasks) override; @@ -41,4 +41,4 @@ class CoreWorkerRayletTaskExecutionProvider : public CoreWorkerTaskExecutionProv } // namespace ray -#endif // RAY_CORE_WORKER_RAYLET_TASK_PROVIDER_H +#endif // RAY_CORE_WORKER_RAYLET_TRANSPORT_H diff --git a/src/ray/core_worker/task_provider/task_provider.h b/src/ray/core_worker/transport/transport.h similarity index 71% rename from src/ray/core_worker/task_provider/task_provider.h rename to src/ray/core_worker/transport/transport.h index 2c7a1bcc252a..f26d932c9b19 100644 --- a/src/ray/core_worker/task_provider/task_provider.h +++ b/src/ray/core_worker/transport/transport.h @@ -1,5 +1,5 @@ -#ifndef RAY_CORE_WORKER_TASK_PROVIDER_H -#define RAY_CORE_WORKER_TASK_PROVIDER_H +#ifndef RAY_CORE_WORKER_TRANSPORT_H +#define RAY_CORE_WORKER_TRANSPORT_H #include @@ -11,7 +11,7 @@ namespace ray { -/// Provider interface for task submission and execution. They are separate classes +/// Interface for task submission and execution. They are separate classes /// but should be used in pairs - one type of task submission provider should be used /// together with task execution provider with the same type, so these classes are /// put together in this same file. @@ -20,9 +20,9 @@ namespace ray { /// implementions for the methods. The actual task provider can submit/get tasks via /// raylet, or directly to/from another worker. -class CoreWorkerTaskSubmissionProvider { +class CoreWorkerTaskSubmitter { public: - CoreWorkerTaskSubmissionProvider() {} + CoreWorkerTaskSubmitter() {} /// Submit a task for execution. /// @@ -31,9 +31,9 @@ class CoreWorkerTaskSubmissionProvider { virtual Status SubmitTask(const TaskSpec &task) = 0; }; -class CoreWorkerTaskExecutionProvider { +class CoreWorkerTaskReceiver { public: - CoreWorkerTaskExecutionProvider() {} + CoreWorkerTaskReceiver() {} // Get tasks for execution. virtual Status GetTasks(std::vector *tasks) = 0; @@ -41,4 +41,4 @@ class CoreWorkerTaskExecutionProvider { } // namespace ray -#endif // RAY_CORE_WORKER_TASK_PROVIDER_H +#endif // RAY_CORE_WORKER_TRANSPORT_H From 3786462693e300445d3631856075d99fd192c624 Mon Sep 17 00:00:00 2001 From: Zhijun Fu <37800433+zhijunfu@users.noreply.github.com> Date: Thu, 13 Jun 2019 15:21:04 +0800 Subject: [PATCH 05/11] Update src/ray/core_worker/task_execution.cc Co-Authored-By: Hao Chen --- src/ray/core_worker/task_execution.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index 503278e9d03b..560f27edc9b3 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -21,7 +21,7 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { task_execution_providers_[static_cast(TaskProviderType::RAYLET)]->GetTasks( &tasks); if (!status.ok()) { - RAY_LOG(ERROR) << "Get task failed with error: " + RAY_LOG(ERROR) << "Getting task failed with error: " << ray::Status::IOError(status.message()); return status; } From 047b163cfcae56e9369f323f2ace375690c902bb Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Thu, 13 Jun 2019 15:29:38 +0800 Subject: [PATCH 06/11] address comments --- src/ray/core_worker/common.h | 2 +- .../store_provider/plasma_store_provider.h | 3 ++- src/ray/core_worker/store_provider/store_provider.h | 4 ++-- src/ray/core_worker/task_execution.cc | 6 +++--- src/ray/core_worker/task_execution.h | 4 ++-- src/ray/core_worker/task_interface.cc | 10 +++++----- src/ray/core_worker/task_interface.h | 4 ++-- src/ray/core_worker/transport/raylet_transport.cc | 2 +- src/ray/core_worker/transport/raylet_transport.h | 6 +++--- src/ray/core_worker/transport/transport.h | 13 ++++++------- 10 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index c361171086b1..75b1171b06fb 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -94,7 +94,7 @@ class TaskSpec { enum class StoreProviderType { PLASMA }; -enum class TaskProviderType { RAYLET }; +enum class TaskTransportType { RAYLET }; struct RayClient { /// Plasma store client. diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index d26275cb02a2..cce02bbda201 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -13,7 +13,8 @@ namespace ray { class CoreWorker; -/// The interface that contains all `CoreWorker` methods that are related to object store. +/// The class provides implementations for accessing plasma store, which includes both +/// local and remote store, remote access is done via raylet. class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { public: CoreWorkerPlasmaStoreProvider(RayClient &ray_client); diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index d473eccb11bb..ee5d67d1a463 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -9,8 +9,8 @@ namespace ray { /// Provider interface for store access. Store provider should inherit from this class and -/// provide implementions for the methods. The actual store provider a plasma store or -/// just process local memory store, and possibly other types of storage. +/// provide implementions for the methods. The actual store provider may use a plasma store +/// or local memory store in worker process, or possibly other types of storage. class CoreWorkerStoreProvider { public: diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index 503278e9d03b..e2387cc22727 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -8,8 +8,8 @@ namespace ray { CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( CoreWorker &core_worker) : core_worker_(core_worker) { - task_execution_providers_.emplace( - static_cast(TaskProviderType::RAYLET), + task_receivers.emplace( + static_cast(TaskTransportType::RAYLET), std::unique_ptr( new CoreWorkerRayletTaskReceiver(core_worker_.ray_client_))); } @@ -18,7 +18,7 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { while (true) { std::vector tasks; auto status = - task_execution_providers_[static_cast(TaskProviderType::RAYLET)]->GetTasks( + task_receivers[static_cast(TaskTransportType::RAYLET)]->GetTasks( &tasks); if (!status.ok()) { RAY_LOG(ERROR) << "Get task failed with error: " diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index 40f45e29d3fb..5f5e6322ed6c 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -47,9 +47,9 @@ class CoreWorkerTaskExecutionInterface { /// Reference to the parent CoreWorker instance. CoreWorker &core_worker_; - /// All the task submission providers supported. + /// All the task task receivers supported. std::unordered_map> - task_execution_providers_; + task_receivers; }; } // namespace ray diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index 92e927ec27bc..c56960ad684a 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -8,8 +8,8 @@ namespace ray { CoreWorkerTaskInterface::CoreWorkerTaskInterface(CoreWorker &core_worker) : core_worker_(core_worker) { - task_submission_providers_.emplace( - static_cast(TaskProviderType::RAYLET), + task_submitters_.emplace( + static_cast(TaskTransportType::RAYLET), std::unique_ptr( new CoreWorkerRayletTaskSubmitter(core_worker_.ray_client_))); } @@ -39,7 +39,7 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, std::vector execution_dependencies; TaskSpec task(std::move(spec), execution_dependencies); - return task_submission_providers_[static_cast(TaskProviderType::RAYLET)] + return task_submitters_[static_cast(TaskTransportType::RAYLET)] ->SubmitTask(task); } @@ -75,7 +75,7 @@ Status CoreWorkerTaskInterface::CreateActor( std::vector execution_dependencies; TaskSpec task(std::move(spec), execution_dependencies); - return task_submission_providers_[static_cast(TaskProviderType::RAYLET)] + return task_submitters_[static_cast(TaskTransportType::RAYLET)] ->SubmitTask(task); } @@ -119,7 +119,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, TaskSpec task(std::move(spec), execution_dependencies); auto status = - task_submission_providers_[static_cast(TaskProviderType::RAYLET)]->SubmitTask( + task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask( task); // remove cursor from return ids. diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 0a39060b3126..98caf3f1ed86 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -137,9 +137,9 @@ class CoreWorkerTaskInterface { std::vector> BuildTaskArguments( const std::vector &args); - /// All the task submission providers supported. + /// All the task submitters supported. std::unordered_map> - task_submission_providers_; + task_submitters_; }; } // namespace ray diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index ed78a41bf370..d6c475919127 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -26,7 +26,7 @@ Status CoreWorkerRayletTaskReceiver::GetTasks(std::vector *tasks) { } std::vector dependencies; - (*tasks).clear(); + RAY_CHECK((*tasks).empty()); (*tasks).emplace_back(*task_spec, dependencies); return Status::OK(); diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index 6851136d7d4a..6eac2af17584 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -8,9 +8,9 @@ namespace ray { -/// In raylet task submission and execution provider, a task is submitted to raylet, -/// and possibly gets forwarded to another raylet on which node the task should be -/// executed, and then a worker on that node gets this task and starts executing it. +/// In raylet task submitter and receiver, a task is submitted to raylet, and possibly +/// gets forwarded to another raylet on which node the task should be executed, and +/// then a worker on that node gets this task and starts executing it. class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter { public: diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h index f26d932c9b19..e54f3ece2393 100644 --- a/src/ray/core_worker/transport/transport.h +++ b/src/ray/core_worker/transport/transport.h @@ -11,14 +11,13 @@ namespace ray { -/// Interface for task submission and execution. They are separate classes -/// but should be used in pairs - one type of task submission provider should be used -/// together with task execution provider with the same type, so these classes are -/// put together in this same file. +/// Interface for task submitter and receiver. They are separate classes but should be +/// used in pairs - one type of task submitter should be used together with task +/// with the same type, so these classes are put together in this same file. /// -/// Task submission/execution provider should inherit from these classes and provide -/// implementions for the methods. The actual task provider can submit/get tasks via -/// raylet, or directly to/from another worker. +/// Task submitter/receiver should inherit from these classes and provide implementions +/// for the methods. The actual task submitter/receiver can submit/get tasks via raylet, +/// or directly to/from another worker. class CoreWorkerTaskSubmitter { public: From 400faa4fc8c5307a7e639a81db52f939d851bce2 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Thu, 13 Jun 2019 16:31:20 +0800 Subject: [PATCH 07/11] format --- src/ray/core_worker/store_provider/store_provider.h | 3 ++- src/ray/core_worker/task_execution.cc | 10 ++++------ src/ray/core_worker/task_execution.h | 3 +-- src/ray/core_worker/task_interface.cc | 9 +++------ src/ray/core_worker/task_interface.h | 3 +-- src/ray/core_worker/transport/raylet_transport.cc | 6 ++---- src/ray/core_worker/transport/transport.h | 2 +- 7 files changed, 14 insertions(+), 22 deletions(-) diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index ee5d67d1a463..cda9a92b3aa0 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -9,7 +9,8 @@ namespace ray { /// Provider interface for store access. Store provider should inherit from this class and -/// provide implementions for the methods. The actual store provider may use a plasma store +/// provide implementions for the methods. The actual store provider may use a plasma +/// store /// or local memory store in worker process, or possibly other types of storage. class CoreWorkerStoreProvider { diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index fcfb01e886c5..8a4b455ba1a4 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -8,18 +8,16 @@ namespace ray { CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( CoreWorker &core_worker) : core_worker_(core_worker) { - task_receivers.emplace( - static_cast(TaskTransportType::RAYLET), - std::unique_ptr( - new CoreWorkerRayletTaskReceiver(core_worker_.ray_client_))); + task_receivers.emplace(static_cast(TaskTransportType::RAYLET), + std::unique_ptr( + new CoreWorkerRayletTaskReceiver(core_worker_.ray_client_))); } Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { while (true) { std::vector tasks; auto status = - task_receivers[static_cast(TaskTransportType::RAYLET)]->GetTasks( - &tasks); + task_receivers[static_cast(TaskTransportType::RAYLET)]->GetTasks(&tasks); if (!status.ok()) { RAY_LOG(ERROR) << "Getting task failed with error: " << ray::Status::IOError(status.message()); diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index 5f5e6322ed6c..f4b44b9e131d 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -48,8 +48,7 @@ class CoreWorkerTaskExecutionInterface { CoreWorker &core_worker_; /// All the task task receivers supported. - std::unordered_map> - task_receivers; + std::unordered_map> task_receivers; }; } // namespace ray diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index c56960ad684a..3032a390f85b 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -39,8 +39,7 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, std::vector execution_dependencies; TaskSpec task(std::move(spec), execution_dependencies); - return task_submitters_[static_cast(TaskTransportType::RAYLET)] - ->SubmitTask(task); + return task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); } Status CoreWorkerTaskInterface::CreateActor( @@ -75,8 +74,7 @@ Status CoreWorkerTaskInterface::CreateActor( std::vector execution_dependencies; TaskSpec task(std::move(spec), execution_dependencies); - return task_submitters_[static_cast(TaskTransportType::RAYLET)] - ->SubmitTask(task); + return task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); } Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, @@ -119,8 +117,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, TaskSpec task(std::move(spec), execution_dependencies); auto status = - task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask( - task); + task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); // remove cursor from return ids. (*return_ids).pop_back(); diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 98caf3f1ed86..98850408f230 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -138,8 +138,7 @@ class CoreWorkerTaskInterface { const std::vector &args); /// All the task submitters supported. - std::unordered_map> - task_submitters_; + std::unordered_map> task_submitters_; }; } // namespace ray diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index d6c475919127..0e69d8ba0c48 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -3,8 +3,7 @@ namespace ray { -CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter( - RayClient &ray_client) +CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter(RayClient &ray_client) : ray_client_(ray_client) {} Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) { @@ -12,8 +11,7 @@ Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) { task.GetTaskSpecification()); } -CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver( - RayClient &ray_client) +CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(RayClient &ray_client) : ray_client_(ray_client) {} Status CoreWorkerRayletTaskReceiver::GetTasks(std::vector *tasks) { diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h index e54f3ece2393..da97fd32a9ea 100644 --- a/src/ray/core_worker/transport/transport.h +++ b/src/ray/core_worker/transport/transport.h @@ -12,7 +12,7 @@ namespace ray { /// Interface for task submitter and receiver. They are separate classes but should be -/// used in pairs - one type of task submitter should be used together with task +/// used in pairs - one type of task submitter should be used together with task /// with the same type, so these classes are put together in this same file. /// /// Task submitter/receiver should inherit from these classes and provide implementions From 8b9c869310e470bf8d6e66c42b493385b980e2a9 Mon Sep 17 00:00:00 2001 From: Zhijun Fu <37800433+zhijunfu@users.noreply.github.com> Date: Fri, 14 Jun 2019 11:22:58 +0800 Subject: [PATCH 08/11] Update src/ray/core_worker/transport/transport.h Co-Authored-By: Hao Chen --- src/ray/core_worker/transport/transport.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h index da97fd32a9ea..cbe29fe54c39 100644 --- a/src/ray/core_worker/transport/transport.h +++ b/src/ray/core_worker/transport/transport.h @@ -11,7 +11,7 @@ namespace ray { -/// Interface for task submitter and receiver. They are separate classes but should be +/// Interfaces for task submitter and receiver. They are separate classes but should be /// used in pairs - one type of task submitter should be used together with task /// with the same type, so these classes are put together in this same file. /// From 0585236a63a404255c8cad44102ce3900d9bc781 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Fri, 14 Jun 2019 12:20:33 +0800 Subject: [PATCH 09/11] refactor CoreWorker constructor to throw exception on error, and address comments --- src/ray/core_worker/common.h | 12 +------ src/ray/core_worker/core_worker.cc | 22 ++++++------- src/ray/core_worker/core_worker.h | 17 +++++----- src/ray/core_worker/core_worker_test.cc | 23 ++++++-------- src/ray/core_worker/mock_worker.cc | 4 +-- src/ray/core_worker/object_interface.cc | 3 +- .../store_provider/plasma_store_provider.cc | 31 +++++++++++-------- .../store_provider/plasma_store_provider.h | 13 ++++++-- .../store_provider/store_provider.h | 3 +- src/ray/core_worker/task_execution.cc | 2 +- src/ray/core_worker/task_interface.cc | 2 +- .../core_worker/transport/raylet_transport.cc | 12 +++---- .../core_worker/transport/raylet_transport.h | 12 +++---- src/ray/core_worker/transport/transport.h | 6 ++-- 14 files changed, 78 insertions(+), 84 deletions(-) diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 75b1171b06fb..e5c9ebdccdc7 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -70,6 +70,7 @@ class TaskArg { /// Task specification, which includes the immutable information about the task /// which are determined at the submission time. +/// TODO: this can be removed after everything is moved to protobuf. class TaskSpec { public: TaskSpec(const raylet::TaskSpecification &task_spec, @@ -96,17 +97,6 @@ enum class StoreProviderType { PLASMA }; enum class TaskTransportType { RAYLET }; -struct RayClient { - /// Plasma store client. - plasma::PlasmaClient store_client_; - - /// Mutex to protect store_client_. - std::mutex store_client_mutex_; - - /// Raylet client. - std::unique_ptr raylet_client_; -}; - } // namespace ray #endif // RAY_CORE_WORKER_COMMON_H diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 7e10d0f65dec..d0f2cc2e0f01 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -12,25 +12,23 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type, store_socket_(store_socket), raylet_socket_(raylet_socket), worker_context_(worker_type, driver_id), + raylet_client_(raylet_socket_, worker_context_.GetWorkerID(), + (worker_type_ == ray::WorkerType::WORKER), + worker_context_.GetCurrentDriverID(), ToTaskLanguage(language_)), task_interface_(*this), object_interface_(*this), - task_execution_interface_(*this) {} + task_execution_interface_(*this) { -Status CoreWorker::Connect() { - // connect to plasma. - RAY_ARROW_RETURN_NOT_OK(ray_client_.store_client_.Connect(store_socket_)); - - // connect to raylet. // TODO: currently RayletClient would crash in its constructor if it cannot // connect to Raylet after a number of retries, this needs to be changed // so that the worker (java/python .etc) can retrieve and handle the error // instead of crashing. - ray_client_.raylet_client_ = std::unique_ptr( - new RayletClient(raylet_socket_, worker_context_.GetWorkerID(), - (worker_type_ == ray::WorkerType::WORKER), - worker_context_.GetCurrentDriverID(), ToTaskLanguage(language_))); - - return Status::OK(); + auto status = store_client_.Connect(store_socket_); + if (!status.ok()) { + RAY_LOG(ERROR) << "Connecting plasma store failed when trying to construct" + << " core worker: " << status.message(); + throw std::runtime_error(status.message()); + } } ::Language CoreWorker::ToTaskLanguage(WorkerLanguage language) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 499e38d59670..030dc9c8ad7e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -14,6 +14,8 @@ namespace ray { /// The root class that contains all the core and language-independent functionalities /// of the worker. This class is supposed to be used to implement app-language (Java, /// Python, etc) workers. +/// +/// Note: the constructor of CoreWorker would throw if a failure happens. class CoreWorker { public: /// Construct a CoreWorker instance. @@ -24,9 +26,6 @@ class CoreWorker { const std::string &store_socket, const std::string &raylet_socket, DriverID driver_id = DriverID::Nil()); - /// Connect to raylet. - Status Connect(); - /// Type of this worker. enum WorkerType WorkerType() const { return worker_type_; } @@ -67,12 +66,14 @@ class CoreWorker { /// Worker context. WorkerContext worker_context_; - /// Ray client (this includes store client, raylet client and potentially gcs client - /// later). - RayClient ray_client_; + /// Plasma store client. + plasma::PlasmaClient store_client_; + + /// Mutex to protect store_client_. + std::mutex store_client_mutex_; - /// Whether this worker has been initialized. - bool is_initialized_; + /// Raylet client. + RayletClient raylet_client_; /// The `CoreWorkerTaskInterface` instance. CoreWorkerTaskInterface task_interface_; diff --git a/src/ray/core_worker/core_worker_test.cc b/src/ray/core_worker/core_worker_test.cc index fedfb9c2356b..075608a5649b 100644 --- a/src/ray/core_worker/core_worker_test.cc +++ b/src/ray/core_worker/core_worker_test.cc @@ -128,8 +128,6 @@ class CoreWorkerTest : public ::testing::Test { raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); - RAY_CHECK_OK(driver.Connect()); - // Test pass by value. { uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; @@ -187,7 +185,6 @@ class CoreWorkerTest : public ::testing::Test { CoreWorker driver(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); - RAY_CHECK_OK(driver.Connect()); std::unique_ptr actor_handle; @@ -277,13 +274,6 @@ TEST_F(ZeroNodeTest, TestTaskArg) { ASSERT_EQ(*data, *buffer); } -TEST_F(ZeroNodeTest, TestAttributeGetters) { - CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, "", "", - DriverID::FromRandom()); - ASSERT_EQ(core_worker.WorkerType(), WorkerType::DRIVER); - ASSERT_EQ(core_worker.Language(), WorkerLanguage::PYTHON); -} - TEST_F(ZeroNodeTest, TestWorkerContext) { auto driver_id = DriverID::FromRandom(); @@ -313,7 +303,6 @@ TEST_F(SingleNodeTest, TestObjectInterface) { CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); - RAY_CHECK_OK(core_worker.Connect()); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; @@ -370,12 +359,10 @@ TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { CoreWorker worker1(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); - RAY_CHECK_OK(worker1.Connect()); CoreWorker worker2(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[1], raylet_socket_names_[1], DriverID::FromRandom()); - RAY_CHECK_OK(worker2.Connect()); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; @@ -456,6 +443,16 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodes) { TestActorTask(resources); } +TEST_F(SingleNodeTest, TestCoreWorkerConstructorFailure) { + try { + CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, + "", raylet_socket_names_[0], + DriverID::FromRandom()); + } catch (const std::exception& e) { + std::cout << "Caught exception when constructing core worker: " << e.what(); + } +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/core_worker/mock_worker.cc b/src/ray/core_worker/mock_worker.cc index 205fcfce961d..89459430405e 100644 --- a/src/ray/core_worker/mock_worker.cc +++ b/src/ray/core_worker/mock_worker.cc @@ -17,9 +17,7 @@ class MockWorker { public: MockWorker(const std::string &store_socket, const std::string &raylet_socket) : worker_(WorkerType::WORKER, WorkerLanguage::PYTHON, store_socket, raylet_socket, - DriverID::FromRandom()) { - RAY_CHECK_OK(worker_.Connect()); - } + DriverID::FromRandom()) {} void Run() { auto executor_func = [this](const RayFunction &ray_function, diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 866cb70dfbe1..1b2ce894056e 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -11,7 +11,8 @@ CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker) store_providers_.emplace( static_cast(StoreProviderType::PLASMA), std::unique_ptr( - new CoreWorkerPlasmaStoreProvider(core_worker_.ray_client_))); + new CoreWorkerPlasmaStoreProvider(core_worker_.store_client_, + core_worker_.store_client_mutex_, core_worker_.raylet_client_))); } Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) { diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index f9be2a89dcf0..354bb053019d 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -6,25 +6,30 @@ namespace ray { -CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(RayClient &ray_client) - : ray_client_(ray_client) {} +CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( + plasma::PlasmaClient &store_client, + std::mutex &store_client_mutex, + RayletClient &raylet_client) + : store_client_(store_client), + store_client_mutex_(store_client_mutex), + raylet_client_(raylet_client) {} Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer, const ObjectID &object_id) { auto plasma_id = object_id.ToPlasmaId(); std::shared_ptr data; { - std::unique_lock guard(ray_client_.store_client_mutex_); + std::unique_lock guard(store_client_mutex_); RAY_ARROW_RETURN_NOT_OK( - ray_client_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); + store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); } memcpy(data->mutable_data(), buffer.Data(), buffer.Size()); { - std::unique_lock guard(ray_client_.store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK(ray_client_.store_client_.Seal(plasma_id)); - RAY_ARROW_RETURN_NOT_OK(ray_client_.store_client_.Release(plasma_id)); + std::unique_lock guard(store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(plasma_id)); + RAY_ARROW_RETURN_NOT_OK(store_client_.Release(plasma_id)); } return Status::OK(); } @@ -60,7 +65,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, // TODO: can call `fetchOrReconstruct` in batches as an optimization. RAY_CHECK_OK( - ray_client_.raylet_client_->FetchOrReconstruct(unready_ids, fetch_only, task_id)); + raylet_client_.FetchOrReconstruct(unready_ids, fetch_only, task_id)); // Get the objects from the object store, and parse the result. int64_t get_timeout; @@ -80,9 +85,9 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, std::vector object_buffers; { - std::unique_lock guard(ray_client_.store_client_mutex_); + std::unique_lock guard(store_client_mutex_); auto status = - ray_client_.store_client_.Get(plasma_ids, get_timeout, &object_buffers); + store_client_.Get(plasma_ids, get_timeout, &object_buffers); } for (size_t i = 0; i < object_buffers.size(); i++) { @@ -99,7 +104,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, } if (was_blocked) { - RAY_CHECK_OK(ray_client_.raylet_client_->NotifyUnblocked(task_id)); + RAY_CHECK_OK(raylet_client_.NotifyUnblocked(task_id)); } return Status::OK(); @@ -110,7 +115,7 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_i const TaskID &task_id, std::vector *results) { WaitResultPair result_pair; - auto status = ray_client_.raylet_client_->Wait(object_ids, num_objects, timeout_ms, + auto status = raylet_client_.Wait(object_ids, num_objects, timeout_ms, false, task_id, &result_pair); std::unordered_set ready_ids; for (const auto &entry : result_pair.first) { @@ -130,7 +135,7 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_i Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object_ids, bool local_only, bool delete_creating_tasks) { - return ray_client_.raylet_client_->FreeObjects(object_ids, local_only, + return raylet_client_.FreeObjects(object_ids, local_only, delete_creating_tasks); } diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index cce02bbda201..284ee8e34a5a 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -17,7 +17,8 @@ class CoreWorker; /// local and remote store, remote access is done via raylet. class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { public: - CoreWorkerPlasmaStoreProvider(RayClient &ray_client); + CoreWorkerPlasmaStoreProvider(plasma::PlasmaClient &store_client, + std::mutex &store_client_mutex, RayletClient &raylet_client); /// Put an object with specified ID into object store. /// @@ -59,8 +60,14 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { bool delete_creating_tasks) override; private: - /// Ray client. - RayClient &ray_client_; + /// Plasma store client. + plasma::PlasmaClient &store_client_; + + /// Mutex to protect store_client_. + std::mutex &store_client_mutex_; + + /// Raylet client. + RayletClient &raylet_client_; }; } // namespace ray diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index cda9a92b3aa0..f1521edf1626 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -10,8 +10,7 @@ namespace ray { /// Provider interface for store access. Store provider should inherit from this class and /// provide implementions for the methods. The actual store provider may use a plasma -/// store -/// or local memory store in worker process, or possibly other types of storage. +/// store or local memory store in worker process, or possibly other types of storage. class CoreWorkerStoreProvider { public: diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index 8a4b455ba1a4..0e0513181317 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -10,7 +10,7 @@ CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( : core_worker_(core_worker) { task_receivers.emplace(static_cast(TaskTransportType::RAYLET), std::unique_ptr( - new CoreWorkerRayletTaskReceiver(core_worker_.ray_client_))); + new CoreWorkerRayletTaskReceiver(core_worker_.raylet_client_))); } Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index 3032a390f85b..77a0aa51d49f 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -11,7 +11,7 @@ CoreWorkerTaskInterface::CoreWorkerTaskInterface(CoreWorker &core_worker) task_submitters_.emplace( static_cast(TaskTransportType::RAYLET), std::unique_ptr( - new CoreWorkerRayletTaskSubmitter(core_worker_.ray_client_))); + new CoreWorkerRayletTaskSubmitter(core_worker_.raylet_client_))); } Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index 0e69d8ba0c48..345948e93dde 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -3,20 +3,20 @@ namespace ray { -CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter(RayClient &ray_client) - : ray_client_(ray_client) {} +CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter(RayletClient &raylet_client) + : raylet_client_(raylet_client) {} Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) { - return ray_client_.raylet_client_->SubmitTask(task.GetDependencies(), + return raylet_client_.SubmitTask(task.GetDependencies(), task.GetTaskSpecification()); } -CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(RayClient &ray_client) - : ray_client_(ray_client) {} +CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(RayletClient &raylet_client) + : raylet_client_(raylet_client) {} Status CoreWorkerRayletTaskReceiver::GetTasks(std::vector *tasks) { std::unique_ptr task_spec; - auto status = ray_client_.raylet_client_->GetTask(&task_spec); + auto status = raylet_client_.GetTask(&task_spec); if (!status.ok()) { RAY_LOG(ERROR) << "Get task from raylet failed with error: " << ray::Status::IOError(status.message()); diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index 6eac2af17584..03bf82f29886 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -14,7 +14,7 @@ namespace ray { class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter { public: - CoreWorkerRayletTaskSubmitter(RayClient &ray_client); + CoreWorkerRayletTaskSubmitter(RayletClient &raylet_client); /// Submit a task for execution to raylet. /// @@ -23,20 +23,20 @@ class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter { virtual Status SubmitTask(const TaskSpec &task) override; private: - /// ray client. - RayClient &ray_client_; + /// Raylet client. + RayletClient &raylet_client_; }; class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver { public: - CoreWorkerRayletTaskReceiver(RayClient &ray_client); + CoreWorkerRayletTaskReceiver(RayletClient &raylet_client); // Get tasks for execution from raylet. virtual Status GetTasks(std::vector *tasks) override; private: - /// ray client. - RayClient &ray_client_; + /// Raylet client. + RayletClient &raylet_client_; }; } // namespace ray diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h index da97fd32a9ea..d3af2b8d2d87 100644 --- a/src/ray/core_worker/transport/transport.h +++ b/src/ray/core_worker/transport/transport.h @@ -19,10 +19,9 @@ namespace ray { /// for the methods. The actual task submitter/receiver can submit/get tasks via raylet, /// or directly to/from another worker. +/// This class is responsible to submit tasks. class CoreWorkerTaskSubmitter { public: - CoreWorkerTaskSubmitter() {} - /// Submit a task for execution. /// /// \param[in] task The task spec to submit. @@ -30,10 +29,9 @@ class CoreWorkerTaskSubmitter { virtual Status SubmitTask(const TaskSpec &task) = 0; }; +/// This class receives tasks for execution. class CoreWorkerTaskReceiver { public: - CoreWorkerTaskReceiver() {} - // Get tasks for execution. virtual Status GetTasks(std::vector *tasks) = 0; }; From f8bf96abf8c01b3f4094d22c186d1c80e60b6a21 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Fri, 14 Jun 2019 13:02:23 +0800 Subject: [PATCH 10/11] update TODO and NOTE in comments --- src/ray/core_worker/common.h | 2 +- src/ray/core_worker/core_worker.cc | 2 +- src/ray/core_worker/core_worker.h | 4 ++-- src/ray/core_worker/store_provider/plasma_store_provider.cc | 6 +++--- src/ray/core_worker/task_execution.cc | 2 +- src/ray/core_worker/task_interface.h | 4 ++-- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index e5c9ebdccdc7..3fda406613ef 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -70,7 +70,7 @@ class TaskArg { /// Task specification, which includes the immutable information about the task /// which are determined at the submission time. -/// TODO: this can be removed after everything is moved to protobuf. +/// TODO(zhijunfu): this can be removed after everything is moved to protobuf. class TaskSpec { public: TaskSpec(const raylet::TaskSpecification &task_spec, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d0f2cc2e0f01..4b1a1192df9a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -19,7 +19,7 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type, object_interface_(*this), task_execution_interface_(*this) { - // TODO: currently RayletClient would crash in its constructor if it cannot + // TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot // connect to Raylet after a number of retries, this needs to be changed // so that the worker (java/python .etc) can retrieve and handle the error // instead of crashing. diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 030dc9c8ad7e..e03a8700be81 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -14,14 +14,14 @@ namespace ray { /// The root class that contains all the core and language-independent functionalities /// of the worker. This class is supposed to be used to implement app-language (Java, /// Python, etc) workers. -/// -/// Note: the constructor of CoreWorker would throw if a failure happens. class CoreWorker { public: /// Construct a CoreWorker instance. /// /// \param[in] worker_type Type of this worker. /// \param[in] langauge Language of this worker. + /// + /// NOTE(zhijunfu): the constructor would throw if a failure happens. CoreWorker(const WorkerType worker_type, const WorkerLanguage language, const std::string &store_socket, const std::string &raylet_socket, DriverID driver_id = DriverID::Nil()); diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 354bb053019d..c987ae4aaea2 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -63,7 +63,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, was_blocked = true; } - // TODO: can call `fetchOrReconstruct` in batches as an optimization. + // TODO(zhijunfu): can call `fetchOrReconstruct` in batches as an optimization. RAY_CHECK_OK( raylet_client_.FetchOrReconstruct(unready_ids, fetch_only, task_id)); @@ -100,7 +100,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, } num_attempts += 1; - // TODO: log a message if attempted too many times. + // TODO(zhijunfu): log a message if attempted too many times. } if (was_blocked) { @@ -122,7 +122,7 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_i ready_ids.insert(entry); } - // TODO: change RayletClient::Wait() to return a bit set, so that we don't need + // TODO(zhijunfu): change RayletClient::Wait() to return a bit set, so that we don't need // to do this translation. (*results).resize(object_ids.size()); for (size_t i = 0; i < object_ids.size(); i++) { diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index 0e0513181317..e9c86dd00f97 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -44,7 +44,7 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { } status = executor(func, args, spec.TaskId(), num_returns); - // TODO: + // TODO(zhijunfu): // 1. Check and handle failure. // 2. Save or load checkpoint. } diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 98850408f230..71547d2ac3bf 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -65,11 +65,11 @@ class ActorHandle { int IncreaseTaskCounter() { return task_counter_++; } std::list GetNewActorHandle() { - // TODO: implement this. + // TODO(zhijunfu): implement this. return std::list(); } - void ClearNewActorHandles() { /* TODO: implement this. */ + void ClearNewActorHandles() { /* TODO(zhijunfu): implement this. */ } private: From 7845639dc44f18715a9df6069e7cd54a63d1a783 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Fri, 14 Jun 2019 14:27:51 +0800 Subject: [PATCH 11/11] format --- src/ray/core_worker/core_worker.cc | 5 ++--- src/ray/core_worker/core_worker_test.cc | 7 +++---- src/ray/core_worker/object_interface.cc | 6 +++--- .../store_provider/plasma_store_provider.cc | 19 ++++++++----------- .../store_provider/plasma_store_provider.h | 3 ++- src/ray/core_worker/task_execution.cc | 7 ++++--- .../core_worker/transport/raylet_transport.cc | 3 +-- 7 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 4b1a1192df9a..bcc1bdd963db 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -13,12 +13,11 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type, raylet_socket_(raylet_socket), worker_context_(worker_type, driver_id), raylet_client_(raylet_socket_, worker_context_.GetWorkerID(), - (worker_type_ == ray::WorkerType::WORKER), - worker_context_.GetCurrentDriverID(), ToTaskLanguage(language_)), + (worker_type_ == ray::WorkerType::WORKER), + worker_context_.GetCurrentDriverID(), ToTaskLanguage(language_)), task_interface_(*this), object_interface_(*this), task_execution_interface_(*this) { - // TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot // connect to Raylet after a number of retries, this needs to be changed // so that the worker (java/python .etc) can retrieve and handle the error diff --git a/src/ray/core_worker/core_worker_test.cc b/src/ray/core_worker/core_worker_test.cc index 075608a5649b..6e4ecc161fb4 100644 --- a/src/ray/core_worker/core_worker_test.cc +++ b/src/ray/core_worker/core_worker_test.cc @@ -445,10 +445,9 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodes) { TEST_F(SingleNodeTest, TestCoreWorkerConstructorFailure) { try { - CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, - "", raylet_socket_names_[0], - DriverID::FromRandom()); - } catch (const std::exception& e) { + CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, "", + raylet_socket_names_[0], DriverID::FromRandom()); + } catch (const std::exception &e) { std::cout << "Caught exception when constructing core worker: " << e.what(); } } diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 1b2ce894056e..81777117cd14 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -10,9 +10,9 @@ CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker) : core_worker_(core_worker) { store_providers_.emplace( static_cast(StoreProviderType::PLASMA), - std::unique_ptr( - new CoreWorkerPlasmaStoreProvider(core_worker_.store_client_, - core_worker_.store_client_mutex_, core_worker_.raylet_client_))); + std::unique_ptr(new CoreWorkerPlasmaStoreProvider( + core_worker_.store_client_, core_worker_.store_client_mutex_, + core_worker_.raylet_client_))); } Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) { diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index c987ae4aaea2..b5dd91d82881 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -7,8 +7,7 @@ namespace ray { CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( - plasma::PlasmaClient &store_client, - std::mutex &store_client_mutex, + plasma::PlasmaClient &store_client, std::mutex &store_client_mutex, RayletClient &raylet_client) : store_client_(store_client), store_client_mutex_(store_client_mutex), @@ -64,8 +63,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, } // TODO(zhijunfu): can call `fetchOrReconstruct` in batches as an optimization. - RAY_CHECK_OK( - raylet_client_.FetchOrReconstruct(unready_ids, fetch_only, task_id)); + RAY_CHECK_OK(raylet_client_.FetchOrReconstruct(unready_ids, fetch_only, task_id)); // Get the objects from the object store, and parse the result. int64_t get_timeout; @@ -86,8 +84,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, std::vector object_buffers; { std::unique_lock guard(store_client_mutex_); - auto status = - store_client_.Get(plasma_ids, get_timeout, &object_buffers); + auto status = store_client_.Get(plasma_ids, get_timeout, &object_buffers); } for (size_t i = 0; i < object_buffers.size(); i++) { @@ -115,14 +112,15 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_i const TaskID &task_id, std::vector *results) { WaitResultPair result_pair; - auto status = raylet_client_.Wait(object_ids, num_objects, timeout_ms, - false, task_id, &result_pair); + auto status = raylet_client_.Wait(object_ids, num_objects, timeout_ms, false, task_id, + &result_pair); std::unordered_set ready_ids; for (const auto &entry : result_pair.first) { ready_ids.insert(entry); } - // TODO(zhijunfu): change RayletClient::Wait() to return a bit set, so that we don't need + // TODO(zhijunfu): change RayletClient::Wait() to return a bit set, so that we don't + // need // to do this translation. (*results).resize(object_ids.size()); for (size_t i = 0; i < object_ids.size(); i++) { @@ -135,8 +133,7 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_i Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object_ids, bool local_only, bool delete_creating_tasks) { - return raylet_client_.FreeObjects(object_ids, local_only, - delete_creating_tasks); + return raylet_client_.FreeObjects(object_ids, local_only, delete_creating_tasks); } } // namespace ray diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 284ee8e34a5a..0dfce1eb1e45 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -18,7 +18,8 @@ class CoreWorker; class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { public: CoreWorkerPlasmaStoreProvider(plasma::PlasmaClient &store_client, - std::mutex &store_client_mutex, RayletClient &raylet_client); + std::mutex &store_client_mutex, + RayletClient &raylet_client); /// Put an object with specified ID into object store. /// diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index e9c86dd00f97..701ae3124c97 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -8,9 +8,10 @@ namespace ray { CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( CoreWorker &core_worker) : core_worker_(core_worker) { - task_receivers.emplace(static_cast(TaskTransportType::RAYLET), - std::unique_ptr( - new CoreWorkerRayletTaskReceiver(core_worker_.raylet_client_))); + task_receivers.emplace( + static_cast(TaskTransportType::RAYLET), + std::unique_ptr( + new CoreWorkerRayletTaskReceiver(core_worker_.raylet_client_))); } Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index 345948e93dde..14906acfe0bf 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -7,8 +7,7 @@ CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter(RayletClient &rayle : raylet_client_(raylet_client) {} Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) { - return raylet_client_.SubmitTask(task.GetDependencies(), - task.GetTaskSpecification()); + return raylet_client_.SubmitTask(task.GetDependencies(), task.GetTaskSpecification()); } CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(RayletClient &raylet_client)