diff --git a/BUILD.bazel b/BUILD.bazel index 27ab40ef74d1..47e795011ab4 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -111,6 +111,8 @@ cc_library( srcs = glob( [ "src/ray/core_worker/*.cc", + "src/ray/core_worker/store_provider/*.cc", + "src/ray/core_worker/transport/*.cc", ], exclude = [ "src/ray/core_worker/*_test.cc", @@ -119,6 +121,8 @@ cc_library( ), hdrs = glob([ "src/ray/core_worker/*.h", + "src/ray/core_worker/store_provider/*.h", + "src/ray/core_worker/transport/*.h", ]), copts = COPTS, deps = [ diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index b11fabfe46f8..3fda406613ef 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -5,6 +5,8 @@ #include "ray/common/buffer.h" #include "ray/common/id.h" +#include "ray/raylet/raylet_client.h" +#include "ray/raylet/task_spec.h" namespace ray { @@ -66,6 +68,35 @@ class TaskArg { const std::shared_ptr data_; }; +/// Task specification, which includes the immutable information about the task +/// which are determined at the submission time. +/// TODO(zhijunfu): this can be removed after everything is moved to protobuf. +class TaskSpec { + public: + TaskSpec(const raylet::TaskSpecification &task_spec, + const std::vector &dependencies) + : task_spec_(task_spec), dependencies_(dependencies) {} + + TaskSpec(const raylet::TaskSpecification &&task_spec, + const std::vector &&dependencies) + : task_spec_(task_spec), dependencies_(dependencies) {} + + const raylet::TaskSpecification &GetTaskSpecification() const { return task_spec_; } + + const std::vector &GetDependencies() const { return dependencies_; } + + private: + /// Raylet task specification. + raylet::TaskSpecification task_spec_; + + /// Dependencies. + std::vector dependencies_; +}; + +enum class StoreProviderType { PLASMA }; + +enum class TaskTransportType { RAYLET }; + } // namespace ray #endif // RAY_CORE_WORKER_COMMON_H diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 033409196d9b..bcc1bdd963db 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -9,41 +9,39 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type, DriverID driver_id) : worker_type_(worker_type), language_(language), - worker_context_(worker_type, driver_id), store_socket_(store_socket), raylet_socket_(raylet_socket), - is_initialized_(false), + worker_context_(worker_type, driver_id), + raylet_client_(raylet_socket_, worker_context_.GetWorkerID(), + (worker_type_ == ray::WorkerType::WORKER), + worker_context_.GetCurrentDriverID(), ToTaskLanguage(language_)), task_interface_(*this), object_interface_(*this), task_execution_interface_(*this) { - switch (language_) { + // TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot + // connect to Raylet after a number of retries, this needs to be changed + // so that the worker (java/python .etc) can retrieve and handle the error + // instead of crashing. + auto status = store_client_.Connect(store_socket_); + if (!status.ok()) { + RAY_LOG(ERROR) << "Connecting plasma store failed when trying to construct" + << " core worker: " << status.message(); + throw std::runtime_error(status.message()); + } +} + +::Language CoreWorker::ToTaskLanguage(WorkerLanguage language) { + switch (language) { case ray::WorkerLanguage::JAVA: - task_language_ = ::Language::JAVA; + return ::Language::JAVA; break; case ray::WorkerLanguage::PYTHON: - task_language_ = ::Language::PYTHON; + return ::Language::PYTHON; break; default: - RAY_LOG(FATAL) << "Unsupported worker language: " << static_cast(language_); + RAY_LOG(FATAL) << "invalid language specified: " << static_cast(language); break; } } -Status CoreWorker::Connect() { - // connect to plasma. - RAY_ARROW_RETURN_NOT_OK(store_client_.Connect(store_socket_)); - - // connect to raylet. - // TODO: currently RayletClient would crash in its constructor if it cannot - // connect to Raylet after a number of retries, this needs to be changed - // so that the worker (java/python .etc) can retrieve and handle the error - // instead of crashing. - raylet_client_ = std::unique_ptr( - new RayletClient(raylet_socket_, worker_context_.GetWorkerID(), - (worker_type_ == ray::WorkerType::WORKER), - worker_context_.GetCurrentDriverID(), task_language_)); - is_initialized_ = true; - return Status::OK(); -} - } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c038b76ce53f..e03a8700be81 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -20,13 +20,12 @@ class CoreWorker { /// /// \param[in] worker_type Type of this worker. /// \param[in] langauge Language of this worker. + /// + /// NOTE(zhijunfu): the constructor would throw if a failure happens. CoreWorker(const WorkerType worker_type, const WorkerLanguage language, const std::string &store_socket, const std::string &raylet_socket, DriverID driver_id = DriverID::Nil()); - /// Connect to raylet. - Status Connect(); - /// Type of this worker. enum WorkerType WorkerType() const { return worker_type_; } @@ -46,23 +45,26 @@ class CoreWorker { CoreWorkerTaskExecutionInterface &Execution() { return task_execution_interface_; } private: + /// Translate from WorkLanguage to Language type (required by raylet client). + /// + /// \param[in] language Language for a task. + /// \return Translated task language. + ::Language ToTaskLanguage(WorkerLanguage language); + /// Type of this worker. const enum WorkerType worker_type_; /// Language of this worker. const enum WorkerLanguage language_; - /// Language of this worker as specified in flatbuf (used by task spec). - ::Language task_language_; - - /// Worker context per thread. - WorkerContext worker_context_; - /// Plasma store socket name. - std::string store_socket_; + const std::string store_socket_; /// raylet socket name. - std::string raylet_socket_; + const std::string raylet_socket_; + + /// Worker context. + WorkerContext worker_context_; /// Plasma store client. plasma::PlasmaClient store_client_; @@ -71,10 +73,7 @@ class CoreWorker { std::mutex store_client_mutex_; /// Raylet client. - std::unique_ptr raylet_client_; - - /// Whether this worker has been initialized. - bool is_initialized_; + RayletClient raylet_client_; /// The `CoreWorkerTaskInterface` instance. CoreWorkerTaskInterface task_interface_; diff --git a/src/ray/core_worker/core_worker_test.cc b/src/ray/core_worker/core_worker_test.cc index fedfb9c2356b..6e4ecc161fb4 100644 --- a/src/ray/core_worker/core_worker_test.cc +++ b/src/ray/core_worker/core_worker_test.cc @@ -128,8 +128,6 @@ class CoreWorkerTest : public ::testing::Test { raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); - RAY_CHECK_OK(driver.Connect()); - // Test pass by value. { uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; @@ -187,7 +185,6 @@ class CoreWorkerTest : public ::testing::Test { CoreWorker driver(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); - RAY_CHECK_OK(driver.Connect()); std::unique_ptr actor_handle; @@ -277,13 +274,6 @@ TEST_F(ZeroNodeTest, TestTaskArg) { ASSERT_EQ(*data, *buffer); } -TEST_F(ZeroNodeTest, TestAttributeGetters) { - CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, "", "", - DriverID::FromRandom()); - ASSERT_EQ(core_worker.WorkerType(), WorkerType::DRIVER); - ASSERT_EQ(core_worker.Language(), WorkerLanguage::PYTHON); -} - TEST_F(ZeroNodeTest, TestWorkerContext) { auto driver_id = DriverID::FromRandom(); @@ -313,7 +303,6 @@ TEST_F(SingleNodeTest, TestObjectInterface) { CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); - RAY_CHECK_OK(core_worker.Connect()); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; @@ -370,12 +359,10 @@ TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { CoreWorker worker1(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); - RAY_CHECK_OK(worker1.Connect()); CoreWorker worker2(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[1], raylet_socket_names_[1], DriverID::FromRandom()); - RAY_CHECK_OK(worker2.Connect()); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; @@ -456,6 +443,15 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodes) { TestActorTask(resources); } +TEST_F(SingleNodeTest, TestCoreWorkerConstructorFailure) { + try { + CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, "", + raylet_socket_names_[0], DriverID::FromRandom()); + } catch (const std::exception &e) { + std::cout << "Caught exception when constructing core worker: " << e.what(); + } +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/core_worker/mock_worker.cc b/src/ray/core_worker/mock_worker.cc index 95d11bb259a8..a331a0b6ae12 100644 --- a/src/ray/core_worker/mock_worker.cc +++ b/src/ray/core_worker/mock_worker.cc @@ -17,9 +17,7 @@ class MockWorker { public: MockWorker(const std::string &store_socket, const std::string &raylet_socket) : worker_(WorkerType::WORKER, WorkerLanguage::PYTHON, store_socket, raylet_socket, - DriverID::FromRandom()) { - RAY_CHECK_OK(worker_.Connect()); - } + DriverID::FromRandom()) {} void Run() { auto executor_func = [this](const RayFunction &ray_function, diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 5ab5d33330d7..81777117cd14 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -2,11 +2,18 @@ #include "ray/common/ray_config.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" +#include "ray/core_worker/store_provider/plasma_store_provider.h" namespace ray { CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker) - : core_worker_(core_worker) {} + : core_worker_(core_worker) { + store_providers_.emplace( + static_cast(StoreProviderType::PLASMA), + std::unique_ptr(new CoreWorkerPlasmaStoreProvider( + core_worker_.store_client_, core_worker_.store_client_mutex_, + core_worker_.raylet_client_))); +} Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) { ObjectID put_id = ObjectID::ForPut(core_worker_.worker_context_.GetCurrentTaskID(), @@ -16,127 +23,31 @@ Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) } Status CoreWorkerObjectInterface::Put(const Buffer &buffer, const ObjectID &object_id) { - auto plasma_id = object_id.ToPlasmaId(); - std::shared_ptr data; - { - std::unique_lock guard(core_worker_.store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK( - core_worker_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); - } - - memcpy(data->mutable_data(), buffer.Data(), buffer.Size()); - - { - std::unique_lock guard(core_worker_.store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Seal(plasma_id)); - RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Release(plasma_id)); - } - return Status::OK(); + auto type = static_cast(StoreProviderType::PLASMA); + return store_providers_[type]->Put(buffer, object_id); } Status CoreWorkerObjectInterface::Get(const std::vector &ids, int64_t timeout_ms, std::vector> *results) { - (*results).resize(ids.size(), nullptr); - - bool was_blocked = false; - - std::unordered_map unready; - for (size_t i = 0; i < ids.size(); i++) { - unready.insert({ids[i], i}); - } - - int num_attempts = 0; - bool should_break = false; - int64_t remaining_timeout = timeout_ms; - // Repeat until we get all objects. - while (!unready.empty() && !should_break) { - std::vector unready_ids; - for (const auto &entry : unready) { - unready_ids.push_back(entry.first); - } - - // For the initial fetch, we only fetch the objects, do not reconstruct them. - bool fetch_only = num_attempts == 0; - if (!fetch_only) { - // If fetch_only is false, this worker will be blocked. - was_blocked = true; - } - - // TODO: can call `fetchOrReconstruct` in batches as an optimization. - RAY_CHECK_OK(core_worker_.raylet_client_->FetchOrReconstruct( - unready_ids, fetch_only, core_worker_.worker_context_.GetCurrentTaskID())); - - // Get the objects from the object store, and parse the result. - int64_t get_timeout; - if (remaining_timeout >= 0) { - get_timeout = - std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds()); - remaining_timeout -= get_timeout; - should_break = remaining_timeout <= 0; - } else { - get_timeout = RayConfig::instance().get_timeout_milliseconds(); - } - - std::vector plasma_ids; - for (const auto &id : unready_ids) { - plasma_ids.push_back(id.ToPlasmaId()); - } - - std::vector object_buffers; - { - std::unique_lock guard(core_worker_.store_client_mutex_); - auto status = - core_worker_.store_client_.Get(plasma_ids, get_timeout, &object_buffers); - } - - for (size_t i = 0; i < object_buffers.size(); i++) { - if (object_buffers[i].data != nullptr) { - const auto &object_id = unready_ids[i]; - (*results)[unready[object_id]] = - std::make_shared(object_buffers[i].data); - unready.erase(object_id); - } - } - - num_attempts += 1; - // TODO: log a message if attempted too many times. - } - - if (was_blocked) { - RAY_CHECK_OK(core_worker_.raylet_client_->NotifyUnblocked( - core_worker_.worker_context_.GetCurrentTaskID())); - } - - return Status::OK(); + auto type = static_cast(StoreProviderType::PLASMA); + return store_providers_[type]->Get( + ids, timeout_ms, core_worker_.worker_context_.GetCurrentTaskID(), results); } Status CoreWorkerObjectInterface::Wait(const std::vector &object_ids, int num_objects, int64_t timeout_ms, std::vector *results) { - WaitResultPair result_pair; - auto status = core_worker_.raylet_client_->Wait( - object_ids, num_objects, timeout_ms, false, - core_worker_.worker_context_.GetCurrentTaskID(), &result_pair); - std::unordered_set ready_ids; - for (const auto &entry : result_pair.first) { - ready_ids.insert(entry); - } - - // TODO: change RayletClient::Wait() to return a bit set, so that we don't need - // to do this translation. - (*results).resize(object_ids.size()); - for (size_t i = 0; i < object_ids.size(); i++) { - (*results)[i] = ready_ids.count(object_ids[i]) > 0; - } - - return status; + auto type = static_cast(StoreProviderType::PLASMA); + return store_providers_[type]->Wait(object_ids, num_objects, timeout_ms, + core_worker_.worker_context_.GetCurrentTaskID(), + results); } Status CoreWorkerObjectInterface::Delete(const std::vector &object_ids, bool local_only, bool delete_creating_tasks) { - return core_worker_.raylet_client_->FreeObjects(object_ids, local_only, - delete_creating_tasks); + auto type = static_cast(StoreProviderType::PLASMA); + return store_providers_[type]->Delete(object_ids, local_only, delete_creating_tasks); } } // namespace ray diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index 431b3f825ac9..35403675f164 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -6,10 +6,12 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" +#include "ray/core_worker/store_provider/store_provider.h" namespace ray { class CoreWorker; +class CoreWorkerStoreProvider; /// The interface that contains all `CoreWorker` methods that are related to object store. class CoreWorkerObjectInterface { @@ -62,6 +64,9 @@ class CoreWorkerObjectInterface { private: /// Reference to the parent CoreWorker instance. CoreWorker &core_worker_; + + /// All the store providers supported. + std::unordered_map> store_providers_; }; } // namespace ray diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc new file mode 100644 index 000000000000..b5dd91d82881 --- /dev/null +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -0,0 +1,139 @@ +#include "ray/core_worker/store_provider/plasma_store_provider.h" +#include "ray/common/ray_config.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" +#include "ray/core_worker/object_interface.h" + +namespace ray { + +CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( + plasma::PlasmaClient &store_client, std::mutex &store_client_mutex, + RayletClient &raylet_client) + : store_client_(store_client), + store_client_mutex_(store_client_mutex), + raylet_client_(raylet_client) {} + +Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer, + const ObjectID &object_id) { + auto plasma_id = object_id.ToPlasmaId(); + std::shared_ptr data; + { + std::unique_lock guard(store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK( + store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); + } + + memcpy(data->mutable_data(), buffer.Data(), buffer.Size()); + + { + std::unique_lock guard(store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(plasma_id)); + RAY_ARROW_RETURN_NOT_OK(store_client_.Release(plasma_id)); + } + return Status::OK(); +} + +Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, + int64_t timeout_ms, const TaskID &task_id, + std::vector> *results) { + (*results).resize(ids.size(), nullptr); + + bool was_blocked = false; + + std::unordered_map unready; + for (size_t i = 0; i < ids.size(); i++) { + unready.insert({ids[i], i}); + } + + int num_attempts = 0; + bool should_break = false; + int64_t remaining_timeout = timeout_ms; + // Repeat until we get all objects. + while (!unready.empty() && !should_break) { + std::vector unready_ids; + for (const auto &entry : unready) { + unready_ids.push_back(entry.first); + } + + // For the initial fetch, we only fetch the objects, do not reconstruct them. + bool fetch_only = num_attempts == 0; + if (!fetch_only) { + // If fetch_only is false, this worker will be blocked. + was_blocked = true; + } + + // TODO(zhijunfu): can call `fetchOrReconstruct` in batches as an optimization. + RAY_CHECK_OK(raylet_client_.FetchOrReconstruct(unready_ids, fetch_only, task_id)); + + // Get the objects from the object store, and parse the result. + int64_t get_timeout; + if (remaining_timeout >= 0) { + get_timeout = + std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds()); + remaining_timeout -= get_timeout; + should_break = remaining_timeout <= 0; + } else { + get_timeout = RayConfig::instance().get_timeout_milliseconds(); + } + + std::vector plasma_ids; + for (const auto &id : unready_ids) { + plasma_ids.push_back(id.ToPlasmaId()); + } + + std::vector object_buffers; + { + std::unique_lock guard(store_client_mutex_); + auto status = store_client_.Get(plasma_ids, get_timeout, &object_buffers); + } + + for (size_t i = 0; i < object_buffers.size(); i++) { + if (object_buffers[i].data != nullptr) { + const auto &object_id = unready_ids[i]; + (*results)[unready[object_id]] = + std::make_shared(object_buffers[i].data); + unready.erase(object_id); + } + } + + num_attempts += 1; + // TODO(zhijunfu): log a message if attempted too many times. + } + + if (was_blocked) { + RAY_CHECK_OK(raylet_client_.NotifyUnblocked(task_id)); + } + + return Status::OK(); +} + +Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_ids, + int num_objects, int64_t timeout_ms, + const TaskID &task_id, + std::vector *results) { + WaitResultPair result_pair; + auto status = raylet_client_.Wait(object_ids, num_objects, timeout_ms, false, task_id, + &result_pair); + std::unordered_set ready_ids; + for (const auto &entry : result_pair.first) { + ready_ids.insert(entry); + } + + // TODO(zhijunfu): change RayletClient::Wait() to return a bit set, so that we don't + // need + // to do this translation. + (*results).resize(object_ids.size()); + for (size_t i = 0; i < object_ids.size(); i++) { + (*results)[i] = ready_ids.count(object_ids[i]) > 0; + } + + return status; +} + +Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object_ids, + bool local_only, + bool delete_creating_tasks) { + return raylet_client_.FreeObjects(object_ids, local_only, delete_creating_tasks); +} + +} // namespace ray diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h new file mode 100644 index 000000000000..0dfce1eb1e45 --- /dev/null +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -0,0 +1,76 @@ +#ifndef RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H +#define RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H + +#include "plasma/client.h" +#include "ray/common/buffer.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/core_worker/common.h" +#include "ray/core_worker/store_provider/store_provider.h" +#include "ray/raylet/raylet_client.h" + +namespace ray { + +class CoreWorker; + +/// The class provides implementations for accessing plasma store, which includes both +/// local and remote store, remote access is done via raylet. +class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { + public: + CoreWorkerPlasmaStoreProvider(plasma::PlasmaClient &store_client, + std::mutex &store_client_mutex, + RayletClient &raylet_client); + + /// Put an object with specified ID into object store. + /// + /// \param[in] buffer Data buffer of the object. + /// \param[in] object_id Object ID specified by user. + /// \return Status. + Status Put(const Buffer &buffer, const ObjectID &object_id) override; + + /// Get a list of objects from the object store. + /// + /// \param[in] ids IDs of the objects to get. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[in] task_id ID for the current task. + /// \param[out] results Result list of objects data. + /// \return Status. + Status Get(const std::vector &ids, int64_t timeout_ms, const TaskID &task_id, + std::vector> *results) override; + + /// Wait for a list of objects to appear in the object store. + /// + /// \param[in] IDs of the objects to wait for. + /// \param[in] num_returns Number of objects that should appear. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[in] task_id ID for the current task. + /// \param[out] results A bitset that indicates each object has appeared or not. + /// \return Status. + Status Wait(const std::vector &object_ids, int num_objects, + int64_t timeout_ms, const TaskID &task_id, + std::vector *results) override; + + /// Delete a list of objects from the object store. + /// + /// \param[in] object_ids IDs of the objects to delete. + /// \param[in] local_only Whether only delete the objects in local node, or all nodes in + /// the cluster. + /// \param[in] delete_creating_tasks Whether also delete the tasks that + /// created these objects. \return Status. + Status Delete(const std::vector &object_ids, bool local_only, + bool delete_creating_tasks) override; + + private: + /// Plasma store client. + plasma::PlasmaClient &store_client_; + + /// Mutex to protect store_client_. + std::mutex &store_client_mutex_; + + /// Raylet client. + RayletClient &raylet_client_; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h new file mode 100644 index 000000000000..f1521edf1626 --- /dev/null +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -0,0 +1,64 @@ +#ifndef RAY_CORE_WORKER_STORE_PROVIDER_H +#define RAY_CORE_WORKER_STORE_PROVIDER_H + +#include "ray/common/buffer.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/core_worker/common.h" + +namespace ray { + +/// Provider interface for store access. Store provider should inherit from this class and +/// provide implementions for the methods. The actual store provider may use a plasma +/// store or local memory store in worker process, or possibly other types of storage. + +class CoreWorkerStoreProvider { + public: + CoreWorkerStoreProvider() {} + + virtual ~CoreWorkerStoreProvider() {} + + /// Put an object with specified ID into object store. + /// + /// \param[in] buffer Data buffer of the object. + /// \param[in] object_id Object ID specified by user. + /// \return Status. + virtual Status Put(const Buffer &buffer, const ObjectID &object_id) = 0; + + /// Get a list of objects from the object store. + /// + /// \param[in] ids IDs of the objects to get. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[in] task_id ID for the current task. + /// \param[out] results Result list of objects data. + /// \return Status. + virtual Status Get(const std::vector &ids, int64_t timeout_ms, + const TaskID &task_id, + std::vector> *results) = 0; + + /// Wait for a list of objects to appear in the object store. + /// + /// \param[in] IDs of the objects to wait for. + /// \param[in] num_returns Number of objects that should appear. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[in] task_id ID for the current task. + /// \param[out] results A bitset that indicates each object has appeared or not. + /// \return Status. + virtual Status Wait(const std::vector &object_ids, int num_objects, + int64_t timeout_ms, const TaskID &task_id, + std::vector *results) = 0; + + /// Delete a list of objects from the object store. + /// + /// \param[in] object_ids IDs of the objects to delete. + /// \param[in] local_only Whether only delete the objects in local node, or all nodes in + /// the cluster. + /// \param[in] delete_creating_tasks Whether also delete the tasks that + /// created these objects. \return Status. + virtual Status Delete(const std::vector &object_ids, bool local_only, + bool delete_creating_tasks) = 0; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_STORE_PROVIDER_H diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index fc22fce96c97..701ae3124c97 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -1,43 +1,54 @@ #include "ray/core_worker/task_execution.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" +#include "ray/core_worker/transport/raylet_transport.h" namespace ray { -Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { - RAY_CHECK(core_worker_.is_initialized_); +CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( + CoreWorker &core_worker) + : core_worker_(core_worker) { + task_receivers.emplace( + static_cast(TaskTransportType::RAYLET), + std::unique_ptr( + new CoreWorkerRayletTaskReceiver(core_worker_.raylet_client_))); +} +Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { while (true) { - std::unique_ptr task_spec; - auto status = core_worker_.raylet_client_->GetTask(&task_spec); + std::vector tasks; + auto status = + task_receivers[static_cast(TaskTransportType::RAYLET)]->GetTasks(&tasks); if (!status.ok()) { - RAY_LOG(ERROR) << "Get task failed with error: " + RAY_LOG(ERROR) << "Getting task failed with error: " << ray::Status::IOError(status.message()); return status; } - const auto &spec = *task_spec; - core_worker_.worker_context_.SetCurrentTask(spec); + for (const auto &task : tasks) { + const auto &spec = task.GetTaskSpecification(); + core_worker_.worker_context_.SetCurrentTask(spec); - WorkerLanguage language = (spec.GetLanguage() == ::Language::JAVA) - ? WorkerLanguage::JAVA - : WorkerLanguage::PYTHON; - RayFunction func{language, spec.FunctionDescriptor()}; + WorkerLanguage language = (spec.GetLanguage() == ::Language::JAVA) + ? WorkerLanguage::JAVA + : WorkerLanguage::PYTHON; + RayFunction func{language, spec.FunctionDescriptor()}; - std::vector> args; - RAY_CHECK_OK(BuildArgsForExecutor(spec, &args)); + std::vector> args; + RAY_CHECK_OK(BuildArgsForExecutor(spec, &args)); - auto num_returns = spec.NumReturns(); - if (spec.IsActorCreationTask() || spec.IsActorTask()) { - RAY_CHECK(num_returns > 0); - // Decrease to account for the dummy object id. - num_returns--; - } + auto num_returns = spec.NumReturns(); + if (spec.IsActorCreationTask() || spec.IsActorTask()) { + RAY_CHECK(num_returns > 0); + // Decrease to account for the dummy object id. + num_returns--; + } - status = executor(func, args, spec.TaskId(), num_returns); - // TODO: - // 1. Check and handle failure. - // 2. Save or load checkpoint. + status = executor(func, args, spec.TaskId(), num_returns); + // TODO(zhijunfu): + // 1. Check and handle failure. + // 2. Save or load checkpoint. + } } // should never reach here. diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index e2fe2148a3ab..f4b44b9e131d 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -4,6 +4,7 @@ #include "ray/common/buffer.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" +#include "ray/core_worker/transport/transport.h" namespace ray { @@ -17,8 +18,7 @@ class TaskSpecification; /// execution. class CoreWorkerTaskExecutionInterface { public: - CoreWorkerTaskExecutionInterface(CoreWorker &core_worker) : core_worker_(core_worker) {} - + CoreWorkerTaskExecutionInterface(CoreWorker &core_worker); /// The callback provided app-language workers that executes tasks. /// /// \param ray_function[in] Information about the function to execute. @@ -46,6 +46,9 @@ class CoreWorkerTaskExecutionInterface { /// Reference to the parent CoreWorker instance. CoreWorker &core_worker_; + + /// All the task task receivers supported. + std::unordered_map> task_receivers; }; } // namespace ray diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index 00f15237f1c3..6a91bd6b2101 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -1,9 +1,19 @@ #include "ray/core_worker/task_interface.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" +#include "ray/core_worker/task_interface.h" +#include "ray/core_worker/transport/raylet_transport.h" namespace ray { +CoreWorkerTaskInterface::CoreWorkerTaskInterface(CoreWorker &core_worker) + : core_worker_(core_worker) { + task_submitters_.emplace( + static_cast(TaskTransportType::RAYLET), + std::unique_ptr( + new CoreWorkerRayletTaskSubmitter(core_worker_.raylet_client_))); +} + Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, const std::vector &args, const TaskOptions &task_options, @@ -20,7 +30,7 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, } auto task_arguments = BuildTaskArguments(args); - auto language = ToTaskLanguage(function.language); + auto language = core_worker_.ToTaskLanguage(function.language); ray::raylet::TaskSpecification spec(context.GetCurrentDriverID(), context.GetCurrentTaskID(), next_task_index, @@ -28,7 +38,8 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, language, function.function_descriptor); std::vector execution_dependencies; - return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); + TaskSpec task(std::move(spec), execution_dependencies); + return task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); } Status CoreWorkerTaskInterface::CreateActor( @@ -50,7 +61,7 @@ Status CoreWorkerTaskInterface::CreateActor( (*actor_handle)->SetActorCursor(return_ids[0]); auto task_arguments = BuildTaskArguments(args); - auto language = ToTaskLanguage(function.language); + auto language = core_worker_.ToTaskLanguage(function.language); // Note that the caller is supposed to specify required placement resources // correctly via actor_creation_options.resources. @@ -62,7 +73,8 @@ Status CoreWorkerTaskInterface::CreateActor( function.function_descriptor); std::vector execution_dependencies; - return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); + TaskSpec task(std::move(spec), execution_dependencies); + return task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); } Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, @@ -86,7 +98,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, ObjectID::FromBinary(actor_handle.ActorID().Binary()); auto task_arguments = BuildTaskArguments(args); - auto language = ToTaskLanguage(function.language); + auto language = core_worker_.ToTaskLanguage(function.language); std::vector new_actor_handles; ray::raylet::TaskSpecification spec( @@ -103,7 +115,9 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, actor_handle.SetActorCursor(actor_cursor); actor_handle.ClearNewActorHandles(); - auto status = core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); + TaskSpec task(std::move(spec), execution_dependencies); + auto status = + task_submitters_[static_cast(TaskTransportType::RAYLET)]->SubmitTask(task); // remove cursor from return ids. (*return_ids).pop_back(); @@ -127,18 +141,4 @@ CoreWorkerTaskInterface::BuildTaskArguments(const std::vector &args) { return task_arguments; } -::Language CoreWorkerTaskInterface::ToTaskLanguage(WorkerLanguage language) { - switch (language) { - case ray::WorkerLanguage::JAVA: - return ::Language::JAVA; - break; - case ray::WorkerLanguage::PYTHON: - return ::Language::PYTHON; - break; - default: - RAY_LOG(FATAL) << "invalid language specified: " << static_cast(language); - break; - } -} - } // namespace ray diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 2ec3b1329cbc..e59934f9b51d 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -7,6 +7,7 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" +#include "ray/core_worker/transport/transport.h" #include "ray/raylet/task.h" namespace ray { @@ -65,11 +66,11 @@ class ActorHandle { int IncreaseTaskCounter() { return task_counter_++; } std::list GetNewActorHandle() { - // TODO: implement this. + // TODO(zhijunfu): implement this. return std::list(); } - void ClearNewActorHandles() { /* TODO: implement this. */ + void ClearNewActorHandles() { /* TODO(zhijunfu): implement this. */ } private: @@ -89,7 +90,7 @@ class ActorHandle { /// submission. class CoreWorkerTaskInterface { public: - CoreWorkerTaskInterface(CoreWorker &core_worker) : core_worker_(core_worker) {} + CoreWorkerTaskInterface(CoreWorker &core_worker); /// Submit a normal task. /// @@ -137,11 +138,8 @@ class CoreWorkerTaskInterface { std::vector> BuildTaskArguments( const std::vector &args); - /// Translate from WorkLanguage to Language type (required by taks spec). - /// - /// \param[in] language Language for a task. - /// \return Translated task language. - ::Language ToTaskLanguage(WorkerLanguage language); + /// All the task submitters supported. + std::unordered_map> task_submitters_; }; } // namespace ray diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc new file mode 100644 index 000000000000..14906acfe0bf --- /dev/null +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -0,0 +1,32 @@ + +#include "ray/core_worker/transport/raylet_transport.h" + +namespace ray { + +CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter(RayletClient &raylet_client) + : raylet_client_(raylet_client) {} + +Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) { + return raylet_client_.SubmitTask(task.GetDependencies(), task.GetTaskSpecification()); +} + +CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(RayletClient &raylet_client) + : raylet_client_(raylet_client) {} + +Status CoreWorkerRayletTaskReceiver::GetTasks(std::vector *tasks) { + std::unique_ptr task_spec; + auto status = raylet_client_.GetTask(&task_spec); + if (!status.ok()) { + RAY_LOG(ERROR) << "Get task from raylet failed with error: " + << ray::Status::IOError(status.message()); + return status; + } + + std::vector dependencies; + RAY_CHECK((*tasks).empty()); + (*tasks).emplace_back(*task_spec, dependencies); + + return Status::OK(); +} + +} // namespace ray diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h new file mode 100644 index 000000000000..03bf82f29886 --- /dev/null +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -0,0 +1,44 @@ +#ifndef RAY_CORE_WORKER_RAYLET_TRANSPORT_H +#define RAY_CORE_WORKER_RAYLET_TRANSPORT_H + +#include + +#include "ray/core_worker/transport/transport.h" +#include "ray/raylet/raylet_client.h" + +namespace ray { + +/// In raylet task submitter and receiver, a task is submitted to raylet, and possibly +/// gets forwarded to another raylet on which node the task should be executed, and +/// then a worker on that node gets this task and starts executing it. + +class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter { + public: + CoreWorkerRayletTaskSubmitter(RayletClient &raylet_client); + + /// Submit a task for execution to raylet. + /// + /// \param[in] task The task spec to submit. + /// \return Status. + virtual Status SubmitTask(const TaskSpec &task) override; + + private: + /// Raylet client. + RayletClient &raylet_client_; +}; + +class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver { + public: + CoreWorkerRayletTaskReceiver(RayletClient &raylet_client); + + // Get tasks for execution from raylet. + virtual Status GetTasks(std::vector *tasks) override; + + private: + /// Raylet client. + RayletClient &raylet_client_; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_RAYLET_TRANSPORT_H diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h new file mode 100644 index 000000000000..44be74b989c7 --- /dev/null +++ b/src/ray/core_worker/transport/transport.h @@ -0,0 +1,41 @@ +#ifndef RAY_CORE_WORKER_TRANSPORT_H +#define RAY_CORE_WORKER_TRANSPORT_H + +#include + +#include "ray/common/buffer.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/core_worker/common.h" +#include "ray/raylet/task_spec.h" + +namespace ray { + +/// Interfaces for task submitter and receiver. They are separate classes but should be +/// used in pairs - one type of task submitter should be used together with task +/// with the same type, so these classes are put together in this same file. +/// +/// Task submitter/receiver should inherit from these classes and provide implementions +/// for the methods. The actual task submitter/receiver can submit/get tasks via raylet, +/// or directly to/from another worker. + +/// This class is responsible to submit tasks. +class CoreWorkerTaskSubmitter { + public: + /// Submit a task for execution. + /// + /// \param[in] task The task spec to submit. + /// \return Status. + virtual Status SubmitTask(const TaskSpec &task) = 0; +}; + +/// This class receives tasks for execution. +class CoreWorkerTaskReceiver { + public: + // Get tasks for execution. + virtual Status GetTasks(std::vector *tasks) = 0; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_TRANSPORT_H diff --git a/src/ray/test/run_core_worker_tests.sh b/src/ray/test/run_core_worker_tests.sh index 104b19ff19cb..7668b92ac272 100644 --- a/src/ray/test/run_core_worker_tests.sh +++ b/src/ray/test/run_core_worker_tests.sh @@ -43,6 +43,3 @@ sleep 1s bazel run //:redis-cli -- -p 6379 shutdown bazel run //:redis-cli -- -p 6380 shutdown sleep 1s - -# Include raylet integration test once it's ready. -# ./bazel-bin/object_manager_integration_test $STORE_EXEC