diff --git a/BUILD.bazel b/BUILD.bazel index 36f02e292fa1..27ab40ef74d1 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -114,6 +114,7 @@ cc_library( ], exclude = [ "src/ray/core_worker/*_test.cc", + "src/ray/core_worker/mock_worker.cc", ], ), hdrs = glob([ @@ -127,7 +128,15 @@ cc_library( ], ) -# This test is run by src/ray/test/run_core_worker_tests.sh +cc_binary( + name = "mock_worker", + srcs = ["src/ray/core_worker/mock_worker.cc"], + copts = COPTS, + deps = [ + ":core_worker_lib", + ], +) + cc_binary( name = "core_worker_test", srcs = ["src/ray/core_worker/core_worker_test.cc"], diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 8317bf181207..b11fabfe46f8 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -12,12 +12,12 @@ namespace ray { enum class WorkerType { WORKER, DRIVER }; /// Language of Ray tasks and workers. -enum class Language { PYTHON, JAVA }; +enum class WorkerLanguage { PYTHON, JAVA }; /// Information about a remote function. struct RayFunction { /// Language of the remote function. - const Language language; + const WorkerLanguage language; /// Function descriptor of the remote function. const std::vector function_descriptor; }; diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index fedcfc6625d9..660330e5cee3 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -1,5 +1,5 @@ -#include "context.h" +#include "ray/core_worker/context.h" namespace ray { @@ -23,7 +23,6 @@ struct WorkerThreadContext { void SetCurrentTask(const raylet::TaskSpecification &spec) { SetCurrentTask(spec.TaskId()); } - private: /// The task ID for current task. TaskID current_task_id; diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 6e0cf3f9f2cf..932d02891b6a 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -1,7 +1,7 @@ #ifndef RAY_CORE_WORKER_CONTEXT_H #define RAY_CORE_WORKER_CONTEXT_H -#include "common.h" +#include "ray/core_worker/common.h" #include "ray/raylet/task_spec.h" namespace ray { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 82f2d885ec58..033409196d9b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1,9 +1,10 @@ -#include "core_worker.h" -#include "context.h" +#include "ray/core_worker/core_worker.h" +#include "ray/core_worker/context.h" namespace ray { -CoreWorker::CoreWorker(const enum WorkerType worker_type, const enum Language language, +CoreWorker::CoreWorker(const enum WorkerType worker_type, + const enum WorkerLanguage language, const std::string &store_socket, const std::string &raylet_socket, DriverID driver_id) : worker_type_(worker_type), @@ -11,20 +12,28 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type, const enum Language la worker_context_(worker_type, driver_id), store_socket_(store_socket), raylet_socket_(raylet_socket), + is_initialized_(false), task_interface_(*this), object_interface_(*this), - task_execution_interface_(*this) {} + task_execution_interface_(*this) { + switch (language_) { + case ray::WorkerLanguage::JAVA: + task_language_ = ::Language::JAVA; + break; + case ray::WorkerLanguage::PYTHON: + task_language_ = ::Language::PYTHON; + break; + default: + RAY_LOG(FATAL) << "Unsupported worker language: " << static_cast(language_); + break; + } +} Status CoreWorker::Connect() { // connect to plasma. RAY_ARROW_RETURN_NOT_OK(store_client_.Connect(store_socket_)); // connect to raylet. - ::Language lang = ::Language::PYTHON; - if (language_ == ray::Language::JAVA) { - lang = ::Language::JAVA; - } - // TODO: currently RayletClient would crash in its constructor if it cannot // connect to Raylet after a number of retries, this needs to be changed // so that the worker (java/python .etc) can retrieve and handle the error @@ -32,7 +41,8 @@ Status CoreWorker::Connect() { raylet_client_ = std::unique_ptr( new RayletClient(raylet_socket_, worker_context_.GetWorkerID(), (worker_type_ == ray::WorkerType::WORKER), - worker_context_.GetCurrentDriverID(), lang)); + worker_context_.GetCurrentDriverID(), task_language_)); + is_initialized_ = true; return Status::OK(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 951b55451f09..c038b76ce53f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1,13 +1,13 @@ #ifndef RAY_CORE_WORKER_CORE_WORKER_H #define RAY_CORE_WORKER_CORE_WORKER_H -#include "common.h" -#include "context.h" -#include "object_interface.h" #include "ray/common/buffer.h" +#include "ray/core_worker/common.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/object_interface.h" +#include "ray/core_worker/task_execution.h" +#include "ray/core_worker/task_interface.h" #include "ray/raylet/raylet_client.h" -#include "task_execution.h" -#include "task_interface.h" namespace ray { @@ -20,7 +20,7 @@ class CoreWorker { /// /// \param[in] worker_type Type of this worker. /// \param[in] langauge Language of this worker. - CoreWorker(const WorkerType worker_type, const Language language, + CoreWorker(const WorkerType worker_type, const WorkerLanguage language, const std::string &store_socket, const std::string &raylet_socket, DriverID driver_id = DriverID::Nil()); @@ -31,7 +31,7 @@ class CoreWorker { enum WorkerType WorkerType() const { return worker_type_; } /// Language of this worker. - enum Language Language() const { return language_; } + enum WorkerLanguage Language() const { return language_; } /// Return the `CoreWorkerTaskInterface` that contains the methods related to task /// submisson. @@ -50,7 +50,10 @@ class CoreWorker { const enum WorkerType worker_type_; /// Language of this worker. - const enum Language language_; + const enum WorkerLanguage language_; + + /// Language of this worker as specified in flatbuf (used by task spec). + ::Language task_language_; /// Worker context per thread. WorkerContext worker_context_; @@ -64,9 +67,15 @@ class CoreWorker { /// Plasma store client. plasma::PlasmaClient store_client_; + /// Mutex to protect store_client_. + std::mutex store_client_mutex_; + /// Raylet client. std::unique_ptr raylet_client_; + /// Whether this worker has been initialized. + bool is_initialized_; + /// The `CoreWorkerTaskInterface` instance. CoreWorkerTaskInterface task_interface_; diff --git a/src/ray/core_worker/core_worker_test.cc b/src/ray/core_worker/core_worker_test.cc index e440aae24d67..fedfb9c2356b 100644 --- a/src/ray/core_worker/core_worker_test.cc +++ b/src/ray/core_worker/core_worker_test.cc @@ -2,9 +2,9 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "context.h" -#include "core_worker.h" #include "ray/common/buffer.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" #include "ray/raylet/raylet_client.h" #include @@ -18,6 +18,7 @@ namespace ray { std::string store_executable; std::string raylet_executable; +std::string mock_worker_executable; ray::ObjectID RandomObjectID() { return ObjectID::FromRandom(); } @@ -32,6 +33,9 @@ static void flushall_redis(void) { class CoreWorkerTest : public ::testing::Test { public: CoreWorkerTest(int num_nodes) { + // flush redis first. + flushall_redis(); + RAY_CHECK(num_nodes >= 0); if (num_nodes > 0) { raylet_socket_names_.resize(num_nodes); @@ -43,10 +47,12 @@ class CoreWorkerTest : public ::testing::Test { store_socket = StartStore(); } - // start raylet on each node + // start raylet on each node. Assign each node with different resources so that + // a task can be scheduled to the desired node. for (int i = 0; i < num_nodes; i++) { - raylet_socket_names_[i] = StartRaylet(raylet_store_socket_names_[i], "127.0.0.1", - "127.0.0.1", "\"CPU,4.0\""); + raylet_socket_names_[i] = + StartRaylet(raylet_store_socket_names_[i], "127.0.0.1", "127.0.0.1", + "\"CPU,4.0,resource" + std::to_string(i) + ",10\""); } } @@ -66,7 +72,7 @@ class CoreWorkerTest : public ::testing::Test { std::string plasma_command = store_executable + " -m 10000000 -s " + store_socket_name + " 1> /dev/null 2> /dev/null & echo $! > " + store_pid; - RAY_LOG(INFO) << plasma_command; + RAY_LOG(DEBUG) << plasma_command; RAY_CHECK(system(plasma_command.c_str()) == 0); usleep(200 * 1000); return store_socket_name; @@ -75,7 +81,7 @@ class CoreWorkerTest : public ::testing::Test { void StopStore(std::string store_socket_name) { std::string store_pid = store_socket_name + ".pid"; std::string kill_9 = "kill -9 `cat " + store_pid + "`"; - RAY_LOG(INFO) << kill_9; + RAY_LOG(DEBUG) << kill_9; ASSERT_TRUE(system(kill_9.c_str()) == 0); ASSERT_TRUE(system(("rm -rf " + store_socket_name).c_str()) == 0); ASSERT_TRUE(system(("rm -rf " + store_socket_name + ".pid").c_str()) == 0); @@ -91,13 +97,14 @@ class CoreWorkerTest : public ::testing::Test { .append(" --node_ip_address=" + node_ip_address) .append(" --redis_address=" + redis_address) .append(" --redis_port=6379") - .append(" --num_initial_workers=0") + .append(" --num_initial_workers=1") .append(" --maximum_startup_concurrency=10") .append(" --static_resource_list=" + resource) - .append(" --python_worker_command=NoneCmd") + .append(" --python_worker_command=\"" + mock_worker_executable + " " + + store_socket_name + " " + raylet_socket_name + "\"") .append(" & echo $! > " + raylet_socket_name + ".pid"); - RAY_LOG(INFO) << "Ray Start command: " << ray_start_cmd; + RAY_LOG(DEBUG) << "Ray Start command: " << ray_start_cmd; RAY_CHECK(system(ray_start_cmd.c_str()) == 0); usleep(200 * 1000); return raylet_socket_name; @@ -106,16 +113,134 @@ class CoreWorkerTest : public ::testing::Test { void StopRaylet(std::string raylet_socket_name) { std::string raylet_pid = raylet_socket_name + ".pid"; std::string kill_9 = "kill -9 `cat " + raylet_pid + "`"; - RAY_LOG(INFO) << kill_9; + RAY_LOG(DEBUG) << kill_9; ASSERT_TRUE(system(kill_9.c_str()) == 0); ASSERT_TRUE(system(("rm -rf " + raylet_socket_name).c_str()) == 0); ASSERT_TRUE(system(("rm -rf " + raylet_socket_name + ".pid").c_str()) == 0); } - void SetUp() { flushall_redis(); } + void SetUp() {} void TearDown() {} + void TestNormalTask(const std::unordered_map &resources) { + CoreWorker driver(WorkerType::DRIVER, WorkerLanguage::PYTHON, + raylet_store_socket_names_[0], raylet_socket_names_[0], + DriverID::FromRandom()); + + RAY_CHECK_OK(driver.Connect()); + + // Test pass by value. + { + uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; + + auto buffer1 = std::make_shared(array1, sizeof(array1)); + + RayFunction func{ray::WorkerLanguage::PYTHON, {}}; + std::vector args; + args.emplace_back(TaskArg::PassByValue(buffer1)); + + TaskOptions options; + + std::vector return_ids; + RAY_CHECK_OK(driver.Tasks().SubmitTask(func, args, options, &return_ids)); + + ASSERT_EQ(return_ids.size(), 1); + + std::vector> results; + RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); + + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0]->Size(), buffer1->Size()); + ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0); + } + + // Test pass by reference. + { + uint8_t array1[] = {10, 11, 12, 13, 14, 15}; + auto buffer1 = std::make_shared(array1, sizeof(array1)); + + ObjectID object_id; + RAY_CHECK_OK(driver.Objects().Put(*buffer1, &object_id)); + + std::vector args; + args.emplace_back(TaskArg::PassByReference(object_id)); + + RayFunction func{ray::WorkerLanguage::PYTHON, {}}; + TaskOptions options; + + std::vector return_ids; + RAY_CHECK_OK(driver.Tasks().SubmitTask(func, args, options, &return_ids)); + + ASSERT_EQ(return_ids.size(), 1); + + std::vector> results; + RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); + + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0]->Size(), buffer1->Size()); + ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0); + } + } + + void TestActorTask(const std::unordered_map &resources) { + CoreWorker driver(WorkerType::DRIVER, WorkerLanguage::PYTHON, + raylet_store_socket_names_[0], raylet_socket_names_[0], + DriverID::FromRandom()); + RAY_CHECK_OK(driver.Connect()); + + std::unique_ptr actor_handle; + + // Test creating actor. + { + uint8_t array[] = {1, 2, 3}; + auto buffer = std::make_shared(array, sizeof(array)); + + RayFunction func{ray::WorkerLanguage::PYTHON, {}}; + std::vector args; + args.emplace_back(TaskArg::PassByValue(buffer)); + + ActorCreationOptions actor_options{0, resources}; + + // Create an actor. + RAY_CHECK_OK(driver.Tasks().CreateActor(func, args, actor_options, &actor_handle)); + } + + // Test submitting a task for that actor. + { + uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; + uint8_t array2[] = {10, 11, 12, 13, 14, 15}; + + auto buffer1 = std::make_shared(array1, sizeof(array1)); + auto buffer2 = std::make_shared(array2, sizeof(array2)); + + ObjectID object_id; + RAY_CHECK_OK(driver.Objects().Put(*buffer1, &object_id)); + + // Create arguments with PassByRef and PassByValue. + std::vector args; + args.emplace_back(TaskArg::PassByReference(object_id)); + args.emplace_back(TaskArg::PassByValue(buffer2)); + + TaskOptions options{1, resources}; + std::vector return_ids; + RayFunction func{ray::WorkerLanguage::PYTHON, {}}; + RAY_CHECK_OK(driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, + &return_ids)); + RAY_CHECK(return_ids.size() == 1); + + std::vector> results; + RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); + + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0]->Size(), buffer1->Size() + buffer2->Size()); + ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0); + ASSERT_EQ( + memcmp(results[0]->Data() + buffer1->Size(), buffer2->Data(), buffer2->Size()), + 0); + } + } + protected: std::vector raylet_socket_names_; std::vector raylet_store_socket_names_; @@ -131,6 +256,11 @@ class SingleNodeTest : public CoreWorkerTest { SingleNodeTest() : CoreWorkerTest(1) {} }; +class TwoNodeTest : public CoreWorkerTest { + public: + TwoNodeTest() : CoreWorkerTest(2) {} +}; + TEST_F(ZeroNodeTest, TestTaskArg) { // Test by-reference argument. ObjectID id = ObjectID::FromRandom(); @@ -148,10 +278,10 @@ TEST_F(ZeroNodeTest, TestTaskArg) { } TEST_F(ZeroNodeTest, TestAttributeGetters) { - CoreWorker core_worker(WorkerType::DRIVER, Language::PYTHON, "", "", + CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, "", "", DriverID::FromRandom()); ASSERT_EQ(core_worker.WorkerType(), WorkerType::DRIVER); - ASSERT_EQ(core_worker.Language(), Language::PYTHON); + ASSERT_EQ(core_worker.Language(), WorkerLanguage::PYTHON); } TEST_F(ZeroNodeTest, TestWorkerContext) { @@ -180,7 +310,7 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { } TEST_F(SingleNodeTest, TestObjectInterface) { - CoreWorker core_worker(WorkerType::DRIVER, Language::PYTHON, + CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], DriverID::FromRandom()); RAY_CHECK_OK(core_worker.Connect()); @@ -193,16 +323,16 @@ TEST_F(SingleNodeTest, TestObjectInterface) { buffers.emplace_back(array2, sizeof(array2)); std::vector ids(buffers.size()); - for (int i = 0; i < ids.size(); i++) { - core_worker.Objects().Put(buffers[i], &ids[i]); + for (size_t i = 0; i < ids.size(); i++) { + RAY_CHECK_OK(core_worker.Objects().Put(buffers[i], &ids[i])); } // Test Get(). std::vector> results; - core_worker.Objects().Get(ids, 0, &results); + RAY_CHECK_OK(core_worker.Objects().Get(ids, -1, &results)); ASSERT_EQ(results.size(), 2); - for (int i = 0; i < ids.size(); i++) { + for (size_t i = 0; i < ids.size(); i++) { ASSERT_EQ(results[i]->Size(), buffers[i].Size()); ASSERT_EQ(memcmp(results[i]->Data(), buffers[i].Data(), buffers[i].Size()), 0); } @@ -213,34 +343,126 @@ TEST_F(SingleNodeTest, TestObjectInterface) { all_ids.push_back(non_existent_id); std::vector wait_results; - core_worker.Objects().Wait(all_ids, 2, -1, &wait_results); + RAY_CHECK_OK(core_worker.Objects().Wait(all_ids, 2, -1, &wait_results)); ASSERT_EQ(wait_results.size(), 3); ASSERT_EQ(wait_results, std::vector({true, true, false})); - core_worker.Objects().Wait(all_ids, 3, 100, &wait_results); + RAY_CHECK_OK(core_worker.Objects().Wait(all_ids, 3, 100, &wait_results)); ASSERT_EQ(wait_results.size(), 3); ASSERT_EQ(wait_results, std::vector({true, true, false})); // Test Delete(). // clear the reference held by PlasmaBuffer. results.clear(); - core_worker.Objects().Delete(ids, true, false); + RAY_CHECK_OK(core_worker.Objects().Delete(ids, true, false)); // Note that Delete() calls RayletClient::FreeObjects and would not // wait for objects being deleted, so wait a while for plasma store // to process the command. usleep(200 * 1000); - core_worker.Objects().Get(ids, 0, &results); + RAY_CHECK_OK(core_worker.Objects().Get(ids, 0, &results)); + ASSERT_EQ(results.size(), 2); + ASSERT_TRUE(!results[0]); + ASSERT_TRUE(!results[1]); +} + +TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { + CoreWorker worker1(WorkerType::DRIVER, WorkerLanguage::PYTHON, + raylet_store_socket_names_[0], raylet_socket_names_[0], + DriverID::FromRandom()); + RAY_CHECK_OK(worker1.Connect()); + + CoreWorker worker2(WorkerType::DRIVER, WorkerLanguage::PYTHON, + raylet_store_socket_names_[1], raylet_socket_names_[1], + DriverID::FromRandom()); + RAY_CHECK_OK(worker2.Connect()); + + uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; + uint8_t array2[] = {10, 11, 12, 13, 14, 15}; + + std::vector buffers; + buffers.emplace_back(array1, sizeof(array1)); + buffers.emplace_back(array2, sizeof(array2)); + + std::vector ids(buffers.size()); + for (size_t i = 0; i < ids.size(); i++) { + RAY_CHECK_OK(worker1.Objects().Put(buffers[i], &ids[i])); + } + + // Test Get() from remote node. + std::vector> results; + RAY_CHECK_OK(worker2.Objects().Get(ids, -1, &results)); + + ASSERT_EQ(results.size(), 2); + for (size_t i = 0; i < ids.size(); i++) { + ASSERT_EQ(results[i]->Size(), buffers[i].Size()); + ASSERT_EQ(memcmp(results[i]->Data(), buffers[i].Data(), buffers[i].Size()), 0); + } + + // Test Wait() from remote node. + ObjectID non_existent_id = ObjectID::FromRandom(); + std::vector all_ids(ids); + all_ids.push_back(non_existent_id); + + std::vector wait_results; + RAY_CHECK_OK(worker2.Objects().Wait(all_ids, 2, -1, &wait_results)); + ASSERT_EQ(wait_results.size(), 3); + ASSERT_EQ(wait_results, std::vector({true, true, false})); + + RAY_CHECK_OK(worker2.Objects().Wait(all_ids, 3, 100, &wait_results)); + ASSERT_EQ(wait_results.size(), 3); + ASSERT_EQ(wait_results, std::vector({true, true, false})); + + // Test Delete() from all machines. + // clear the reference held by PlasmaBuffer. + results.clear(); + RAY_CHECK_OK(worker2.Objects().Delete(ids, false, false)); + + // Note that Delete() calls RayletClient::FreeObjects and would not + // wait for objects being deleted, so wait a while for plasma store + // to process the command. + usleep(1000 * 1000); + // Verify objects are deleted from both machines. + RAY_CHECK_OK(worker2.Objects().Get(ids, 0, &results)); + ASSERT_EQ(results.size(), 2); + ASSERT_TRUE(!results[0]); + ASSERT_TRUE(!results[1]); + + RAY_CHECK_OK(worker1.Objects().Get(ids, 0, &results)); ASSERT_EQ(results.size(), 2); ASSERT_TRUE(!results[0]); ASSERT_TRUE(!results[1]); } +TEST_F(SingleNodeTest, TestNormalTaskLocal) { + std::unordered_map resources; + TestNormalTask(resources); +} + +TEST_F(TwoNodeTest, TestNormalTaskCrossNodes) { + std::unordered_map resources; + resources.emplace("resource1", 1); + TestNormalTask(resources); +} + +TEST_F(SingleNodeTest, TestActorTaskLocal) { + std::unordered_map resources; + TestActorTask(resources); +} + +TEST_F(TwoNodeTest, TestActorTaskCrossNodes) { + std::unordered_map resources; + resources.emplace("resource1", 1); + TestActorTask(resources); +} + } // namespace ray int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); + RAY_CHECK(argc == 4); ray::store_executable = std::string(argv[1]); ray::raylet_executable = std::string(argv[2]); + ray::mock_worker_executable = std::string(argv[3]); return RUN_ALL_TESTS(); } diff --git a/src/ray/core_worker/mock_worker.cc b/src/ray/core_worker/mock_worker.cc new file mode 100644 index 000000000000..205fcfce961d --- /dev/null +++ b/src/ray/core_worker/mock_worker.cc @@ -0,0 +1,66 @@ +#include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" +#include "ray/core_worker/task_execution.h" + +namespace ray { + +/// A mock C++ worker used by core_worker_test.cc to verify the task submission/execution +/// interfaces in both single node and cross-nodes scenarios. As the raylet client can +/// only +/// be called by a real worker process, core_worker_test.cc has to use this program binary +/// to start the actual worker process, in the test, the task submission interfaces are +/// called +/// in core_worker_test, and task execution interfaces are called in this file, see that +/// test +/// for more details on how this class is used. +class MockWorker { + public: + MockWorker(const std::string &store_socket, const std::string &raylet_socket) + : worker_(WorkerType::WORKER, WorkerLanguage::PYTHON, store_socket, raylet_socket, + DriverID::FromRandom()) { + RAY_CHECK_OK(worker_.Connect()); + } + + void Run() { + auto executor_func = [this](const RayFunction &ray_function, + const std::vector> &args, + const TaskID &task_id, int num_returns) { + + // Note that this doesn't include dummy object id. + RAY_CHECK(num_returns >= 0); + + // Merge all the content from input args. + std::vector buffer; + for (const auto &arg : args) { + buffer.insert(buffer.end(), arg->Data(), arg->Data() + arg->Size()); + } + + LocalMemoryBuffer memory_buffer(buffer.data(), buffer.size()); + + // Write the merged content to each of return ids. + for (int i = 0; i < num_returns; i++) { + ObjectID id = ObjectID::ForTaskReturn(task_id, i + 1); + RAY_CHECK_OK(worker_.Objects().Put(memory_buffer, id)); + } + return Status::OK(); + }; + + // Start executing tasks. + worker_.Execution().Run(executor_func); + } + + private: + CoreWorker worker_; +}; + +} // namespace ray + +int main(int argc, char **argv) { + RAY_CHECK(argc == 3); + auto store_socket = std::string(argv[1]); + auto raylet_socket = std::string(argv[2]); + + ray::MockWorker worker(store_socket, raylet_socket); + worker.Run(); + return 0; +} diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 0b94c9d4a747..5ab5d33330d7 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -1,7 +1,7 @@ -#include "object_interface.h" -#include "context.h" -#include "core_worker.h" +#include "ray/core_worker/object_interface.h" #include "ray/common/ray_config.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" namespace ray { @@ -12,14 +12,25 @@ Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) ObjectID put_id = ObjectID::ForPut(core_worker_.worker_context_.GetCurrentTaskID(), core_worker_.worker_context_.GetNextPutIndex()); *object_id = put_id; + return Put(buffer, put_id); +} - auto plasma_id = put_id.ToPlasmaId(); +Status CoreWorkerObjectInterface::Put(const Buffer &buffer, const ObjectID &object_id) { + auto plasma_id = object_id.ToPlasmaId(); std::shared_ptr data; - RAY_ARROW_RETURN_NOT_OK( - core_worker_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); + { + std::unique_lock guard(core_worker_.store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK( + core_worker_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); + } + memcpy(data->mutable_data(), buffer.Data(), buffer.Size()); - RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Seal(plasma_id)); - RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Release(plasma_id)); + + { + std::unique_lock guard(core_worker_.store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Seal(plasma_id)); + RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Release(plasma_id)); + } return Status::OK(); } @@ -31,7 +42,7 @@ Status CoreWorkerObjectInterface::Get(const std::vector &ids, bool was_blocked = false; std::unordered_map unready; - for (int i = 0; i < ids.size(); i++) { + for (size_t i = 0; i < ids.size(); i++) { unready.insert({ids[i], i}); } @@ -73,10 +84,13 @@ Status CoreWorkerObjectInterface::Get(const std::vector &ids, } std::vector object_buffers; - auto status = - core_worker_.store_client_.Get(plasma_ids, get_timeout, &object_buffers); + { + std::unique_lock guard(core_worker_.store_client_mutex_); + auto status = + core_worker_.store_client_.Get(plasma_ids, get_timeout, &object_buffers); + } - for (int i = 0; i < object_buffers.size(); i++) { + for (size_t i = 0; i < object_buffers.size(); i++) { if (object_buffers[i].data != nullptr) { const auto &object_id = unready_ids[i]; (*results)[unready[object_id]] = @@ -112,7 +126,7 @@ Status CoreWorkerObjectInterface::Wait(const std::vector &object_ids, // TODO: change RayletClient::Wait() to return a bit set, so that we don't need // to do this translation. (*results).resize(object_ids.size()); - for (int i = 0; i < object_ids.size(); i++) { + for (size_t i = 0; i < object_ids.size(); i++) { (*results)[i] = ready_ids.count(object_ids[i]) > 0; } diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index 8a9e20c48c6e..431b3f825ac9 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -1,11 +1,11 @@ #ifndef RAY_CORE_WORKER_OBJECT_INTERFACE_H #define RAY_CORE_WORKER_OBJECT_INTERFACE_H -#include "common.h" #include "plasma/client.h" #include "ray/common/buffer.h" #include "ray/common/id.h" #include "ray/common/status.h" +#include "ray/core_worker/common.h" namespace ray { @@ -23,6 +23,13 @@ class CoreWorkerObjectInterface { /// \return Status. Status Put(const Buffer &buffer, ObjectID *object_id); + /// Put an object with specified ID into object store. + /// + /// \param[in] buffer Data buffer of the object. + /// \param[in] object_id Object ID specified by user. + /// \return Status. + Status Put(const Buffer &buffer, const ObjectID &object_id); + /// Get a list of objects from the object store. /// /// \param[in] ids IDs of the objects to get. diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index aea48b4de34a..fc22fce96c97 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -1,7 +1,80 @@ -#include "task_execution.h" +#include "ray/core_worker/task_execution.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" namespace ray { -void CoreWorkerTaskExecutionInterface::Start(const TaskExecutor &executor) {} +Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { + RAY_CHECK(core_worker_.is_initialized_); + + while (true) { + std::unique_ptr task_spec; + auto status = core_worker_.raylet_client_->GetTask(&task_spec); + if (!status.ok()) { + RAY_LOG(ERROR) << "Get task failed with error: " + << ray::Status::IOError(status.message()); + return status; + } + + const auto &spec = *task_spec; + core_worker_.worker_context_.SetCurrentTask(spec); + + WorkerLanguage language = (spec.GetLanguage() == ::Language::JAVA) + ? WorkerLanguage::JAVA + : WorkerLanguage::PYTHON; + RayFunction func{language, spec.FunctionDescriptor()}; + + std::vector> args; + RAY_CHECK_OK(BuildArgsForExecutor(spec, &args)); + + auto num_returns = spec.NumReturns(); + if (spec.IsActorCreationTask() || spec.IsActorTask()) { + RAY_CHECK(num_returns > 0); + // Decrease to account for the dummy object id. + num_returns--; + } + + status = executor(func, args, spec.TaskId(), num_returns); + // TODO: + // 1. Check and handle failure. + // 2. Save or load checkpoint. + } + + // should never reach here. + return Status::OK(); +} + +Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( + const raylet::TaskSpecification &spec, std::vector> *args) { + auto num_args = spec.NumArgs(); + (*args).resize(num_args); + + std::vector object_ids_to_fetch; + std::vector indices; + + for (int i = 0; i < spec.NumArgs(); ++i) { + int count = spec.ArgIdCount(i); + if (count > 0) { + // pass by reference. + RAY_CHECK(count == 1); + object_ids_to_fetch.push_back(spec.ArgId(i, 0)); + indices.push_back(i); + } else { + // pass by value. + (*args)[i] = std::make_shared( + const_cast(spec.ArgVal(i)), spec.ArgValLength(i)); + } + } + + std::vector> results; + auto status = core_worker_.object_interface_.Get(object_ids_to_fetch, -1, &results); + if (status.ok()) { + for (size_t i = 0; i < results.size(); i++) { + (*args)[indices[i]] = results[i]; + } + } + + return status; +} } // namespace ray diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index c4de937ee439..e2fe2148a3ab 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -1,14 +1,18 @@ #ifndef RAY_CORE_WORKER_TASK_EXECUTION_H #define RAY_CORE_WORKER_TASK_EXECUTION_H -#include "common.h" #include "ray/common/buffer.h" #include "ray/common/status.h" +#include "ray/core_worker/common.h" namespace ray { class CoreWorker; +namespace raylet { +class TaskSpecification; +} + /// The interface that contains all `CoreWorker` methods that are related to task /// execution. class CoreWorkerTaskExecutionInterface { @@ -20,13 +24,26 @@ class CoreWorkerTaskExecutionInterface { /// \param ray_function[in] Information about the function to execute. /// \param args[in] Arguments of the task. /// \return Status. - using TaskExecutor = std::function &args)>; + using TaskExecutor = std::function> &args, + const TaskID &task_id, int num_returns)>; /// Start receving and executes tasks in a infinite loop. - void Start(const TaskExecutor &executor); + /// \return Status. + Status Run(const TaskExecutor &executor); private: + /// Build arguments for task executor. This would loop through all the arguments + /// in task spec, and for each of them that's passed by reference (ObjectID), + /// fetch its content from store and; for arguments that are passed by value, + /// just copy their content. + /// + /// \param spec[in] Task specification. + /// \param args[out] The arguments for passing to task executor. + /// + Status BuildArgsForExecutor(const raylet::TaskSpecification &spec, + std::vector> *args); + /// Reference to the parent CoreWorker instance. CoreWorker &core_worker_; }; diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index ab8b8950c298..c19b1e23a7f9 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -1,4 +1,7 @@ -#include "task_interface.h" +#include "ray/raylet/task.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" +#include "ray/core_worker/task_interface.h" namespace ray { @@ -6,13 +9,61 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, const std::vector &args, const TaskOptions &task_options, std::vector *return_ids) { - return Status::OK(); + auto &context = core_worker_.worker_context_; + auto next_task_index = context.GetNextTaskIndex(); + const auto task_id = GenerateTaskId(context.GetCurrentDriverID(), + context.GetCurrentTaskID(), next_task_index); + + auto num_returns = task_options.num_returns; + (*return_ids).resize(num_returns); + for (int i = 0; i < num_returns; i++) { + (*return_ids)[i] = ObjectID::ForTaskReturn(task_id, i + 1); + } + + auto task_arguments = BuildTaskArguments(args); + auto language = ToTaskLanguage(function.language); + + ray::raylet::TaskSpecification spec(context.GetCurrentDriverID(), + context.GetCurrentTaskID(), next_task_index, + task_arguments, num_returns, task_options.resources, + language, function.function_descriptor); + + std::vector execution_dependencies; + return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); } Status CoreWorkerTaskInterface::CreateActor( const RayFunction &function, const std::vector &args, - const ActorCreationOptions &actor_creation_options, ActorHandle *actor_handle) { - return Status::OK(); + const ActorCreationOptions &actor_creation_options, + std::unique_ptr *actor_handle) { + auto &context = core_worker_.worker_context_; + auto next_task_index = context.GetNextTaskIndex(); + const auto task_id = GenerateTaskId(context.GetCurrentDriverID(), + context.GetCurrentTaskID(), next_task_index); + + std::vector return_ids; + return_ids.push_back(ObjectID::ForTaskReturn(task_id, 1)); + ActorID actor_creation_id = ActorID::FromBinary(return_ids[0].Binary()); + + *actor_handle = std::unique_ptr( + new ActorHandle(actor_creation_id, ActorHandleID::Nil())); + (*actor_handle)->IncreaseTaskCounter(); + (*actor_handle)->SetActorCursor(return_ids[0]); + + auto task_arguments = BuildTaskArguments(args); + auto language = ToTaskLanguage(function.language); + + // Note that the caller is supposed to specify required placement resources + // correctly via actor_creation_options.resources. + ray::raylet::TaskSpecification spec( + context.GetCurrentDriverID(), context.GetCurrentTaskID(), next_task_index, + actor_creation_id, ObjectID::Nil(), actor_creation_options.max_reconstructions, + ActorID::Nil(), ActorHandleID::Nil(), 0, {}, task_arguments, 1, + actor_creation_options.resources, actor_creation_options.resources, language, + function.function_descriptor); + + std::vector execution_dependencies; + return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); } Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, @@ -20,7 +71,75 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, const std::vector &args, const TaskOptions &task_options, std::vector *return_ids) { - return Status::OK(); + auto &context = core_worker_.worker_context_; + auto next_task_index = context.GetNextTaskIndex(); + const auto task_id = GenerateTaskId(context.GetCurrentDriverID(), + context.GetCurrentTaskID(), next_task_index); + + // add one for actor cursor object id. + auto num_returns = task_options.num_returns + 1; + (*return_ids).resize(num_returns); + for (int i = 0; i < num_returns; i++) { + (*return_ids)[i] = ObjectID::ForTaskReturn(task_id, i + 1); + } + + auto actor_creation_dummy_object_id = + ObjectID::FromBinary(actor_handle.ActorID().Binary()); + + auto task_arguments = BuildTaskArguments(args); + auto language = ToTaskLanguage(function.language); + + std::vector new_actor_handles; + ray::raylet::TaskSpecification spec( + context.GetCurrentDriverID(), context.GetCurrentTaskID(), next_task_index, + ActorID::Nil(), actor_creation_dummy_object_id, 0, actor_handle.ActorID(), + actor_handle.ActorHandleID(), actor_handle.IncreaseTaskCounter(), new_actor_handles, + task_arguments, num_returns, task_options.resources, task_options.resources, + language, function.function_descriptor); + + std::vector execution_dependencies; + execution_dependencies.push_back(actor_handle.ActorCursor()); + + auto actor_cursor = (*return_ids).back(); + actor_handle.SetActorCursor(actor_cursor); + actor_handle.ClearNewActorHandles(); + + auto status = core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec); + + // remove cursor from return ids. + (*return_ids).pop_back(); + return status; +} + +std::vector> +CoreWorkerTaskInterface::BuildTaskArguments(const std::vector &args) { + std::vector> task_arguments; + for (const auto &arg : args) { + if (arg.IsPassedByReference()) { + std::vector references{arg.GetReference()}; + task_arguments.push_back( + std::make_shared(references)); + } else { + auto data = arg.GetValue(); + task_arguments.push_back( + std::make_shared(data->Data(), data->Size())); + } + } + return task_arguments; +} + +::Language CoreWorkerTaskInterface::ToTaskLanguage(WorkerLanguage language) { + switch (language) { + case ray::WorkerLanguage::JAVA: + return ::Language::JAVA; + break; + case ray::WorkerLanguage::PYTHON: + return ::Language::PYTHON; + break; + default: + RAY_LOG(FATAL) << "invalid language specified: " << static_cast(language); + break; + } } } // namespace ray diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index e23f049d341d..06bd5409a8dd 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -1,10 +1,12 @@ #ifndef RAY_CORE_WORKER_TASK_INTERFACE_H #define RAY_CORE_WORKER_TASK_INTERFACE_H -#include "common.h" +#include + #include "ray/common/buffer.h" #include "ray/common/id.h" #include "ray/common/status.h" +#include "ray/core_worker/common.h" namespace ray { @@ -12,6 +14,10 @@ class CoreWorker; /// Options of a non-actor-creation task. struct TaskOptions { + TaskOptions() {} + TaskOptions(int num_returns, const std::unordered_map &resources) + : num_returns(num_returns), resources(resources) {} + /// Number of returns of this task. const int num_returns = 1; /// Resources required by this task. @@ -20,6 +26,11 @@ struct TaskOptions { /// Options of an actor creation task. struct ActorCreationOptions { + ActorCreationOptions() {} + ActorCreationOptions(uint64_t max_reconstructions, + const std::unordered_map &resources) + : max_reconstructions(max_reconstructions), resources(resources) {} + /// Maximum number of times that the actor should be reconstructed when it dies /// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed. const uint64_t max_reconstructions = 0; @@ -31,19 +42,46 @@ struct ActorCreationOptions { class ActorHandle { public: ActorHandle(const ActorID &actor_id, const ActorHandleID &actor_handle_id) - : actor_id_(actor_id), actor_handle_id_(actor_handle_id) {} + : actor_id_(actor_id), + actor_handle_id_(actor_handle_id), + actor_cursor_(ObjectID::FromBinary(actor_id.Binary())), + task_counter_(0) {} /// ID of the actor. - const class ActorID &ActorID() const { return actor_id_; } + const ray::ActorID &ActorID() const { return actor_id_; }; /// ID of this actor handle. - const class ActorHandleID &ActorHandleID() const { return actor_handle_id_; } + const ray::ActorHandleID &ActorHandleID() const { return actor_handle_id_; }; + + private: + /// Cursor of this actor. + const ObjectID &ActorCursor() const { return actor_cursor_; }; + + /// Set actor cursor. + void SetActorCursor(const ObjectID &actor_cursor) { actor_cursor_ = actor_cursor; }; + + /// Increase task counter. + int IncreaseTaskCounter() { return task_counter_++; } + + std::list GetNewActorHandle() { + // TODO: implement this. + return std::list(); + } + + void ClearNewActorHandles() { /* TODO: implement this. */ + } private: /// ID of the actor. - const class ActorID actor_id_; + const ray::ActorID actor_id_; /// ID of this actor handle. - const class ActorHandleID actor_handle_id_; + const ray::ActorHandleID actor_handle_id_; + /// ID of this actor cursor. + ObjectID actor_cursor_; + /// Counter for tasks from this handle. + int task_counter_; + + friend class CoreWorkerTaskInterface; }; /// The interface that contains all `CoreWorker` methods that are related to task @@ -71,7 +109,7 @@ class CoreWorkerTaskInterface { /// \return Status. Status CreateActor(const RayFunction &function, const std::vector &args, const ActorCreationOptions &actor_creation_options, - ActorHandle *actor_handle); + std::unique_ptr *actor_handle); /// Submit an actor task. /// @@ -89,6 +127,20 @@ class CoreWorkerTaskInterface { private: /// Reference to the parent CoreWorker instance. CoreWorker &core_worker_; + + private: + /// Build the arguments for a task spec. + /// + /// \param[in] args Arguments of a task. + /// \return Arguments as required by task spec. + std::vector> BuildTaskArguments( + const std::vector &args); + + /// Translate from WorkLanguage to Language type (required by taks spec). + /// + /// \param[in] language Language for a task. + /// \return Translated task language. + ::Language ToTaskLanguage(WorkerLanguage language); }; } // namespace ray diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b710b0873b0c..9ba0267e2a03 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2143,7 +2143,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { // Notify the task dependency manager that this object is local. const auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(object_id); RAY_LOG(DEBUG) << "Object local " << object_id << ", " - << " on " << gcs_client_->client_table().GetLocalClientId() + << " on " << gcs_client_->client_table().GetLocalClientId() << ", " << ready_task_ids.size() << " tasks ready"; // Transition the tasks whose dependencies are now fulfilled to the ready state. if (ready_task_ids.size() > 0) { diff --git a/src/ray/test/run_core_worker_tests.sh b/src/ray/test/run_core_worker_tests.sh index 5f1dd2eda69f..104b19ff19cb 100644 --- a/src/ray/test/run_core_worker_tests.sh +++ b/src/ray/test/run_core_worker_tests.sh @@ -6,7 +6,7 @@ set -e set -x -bazel build "//:core_worker_test" "//:raylet" "//:libray_redis_module.so" "@plasma//:plasma_store_server" +bazel build "//:core_worker_test" "//:mock_worker" "//:raylet" "//:libray_redis_module.so" "@plasma//:plasma_store_server" # Get the directory in which this script is executing. SCRIPT_DIR="`dirname \"$0\"`" @@ -26,6 +26,7 @@ REDIS_MODULE="./bazel-bin/libray_redis_module.so" LOAD_MODULE_ARGS="--loadmodule ${REDIS_MODULE}" STORE_EXEC="./bazel-bin/external/plasma/plasma_store_server" RAYLET_EXEC="./bazel-bin/raylet" +MOCK_WORKER_EXEC="./bazel-bin/mock_worker" # Allow cleanup commands to fail. bazel run //:redis-cli -- -p 6379 shutdown || true @@ -37,7 +38,7 @@ sleep 2s bazel run //:redis-server -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6380 & sleep 2s # Run tests. -./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC +./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC $MOCK_WORKER_EXEC sleep 1s bazel run //:redis-cli -- -p 6379 shutdown bazel run //:redis-cli -- -p 6380 shutdown