diff --git a/.gitignore b/.gitignore index f395084687c3..eccca741b118 100644 --- a/.gitignore +++ b/.gitignore @@ -150,5 +150,8 @@ build *.iml java/**/target -java/run -java/test/lib +java/**/bin +java/**/lib +java/**/.settings +java/**/.classpath +java/**/.project diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index 22d7877ba7d6..915c036dea74 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -9,11 +9,8 @@ #include "common.h" #include "common_extension.h" +#include "common_task.h" #include "common_protocol.h" -#include "ray/raylet/task.h" -#include "ray/raylet/task_spec.h" -#include "ray/raylet/task_execution_spec.h" -#include "task.h" #include @@ -51,8 +48,6 @@ void init_pickle_module(void) { RAY_CHECK(pickle_protocol != NULL); } -TaskBuilder *g_task_builder = NULL; - /* Define the PyObjectID class. */ int PyStringToUniqueID(PyObject *object, ObjectID *object_id) { @@ -76,10 +71,6 @@ int PyObjectToUniqueID(PyObject *object, ObjectID *objectid) { } } -bool use_raylet(PyTask *task) { - return task->spec == nullptr; -} - static int PyObjectID_init(PyObjectID *self, PyObject *args, PyObject *kwds) { const char *data; int size; @@ -121,8 +112,7 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) { } PyTask *result = PyObject_New(PyTask, &PyTaskType); result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType); - result->size = size; - result->spec = TaskSpec_copy((TaskSpec *) data, size); + result->taskInterface = new NonRayletTask(data, size); /* The created task does not include any execution dependencies. */ result->execution_dependencies = new std::vector(); /* TODO(pcm): Use flatbuffers validation here. */ @@ -145,15 +135,7 @@ PyObject *PyTask_to_string(PyObject *self, PyObject *args) { return NULL; } PyTask *task = (PyTask *) arg; - if (!use_raylet(task)) { - return PyBytes_FromStringAndSize((char *) task->spec, task->size); - } else { - flatbuffers::FlatBufferBuilder fbb; - auto task_spec_string = task->task_spec->ToFlatbuffer(fbb); - fbb.Finish(task_spec_string); - return PyBytes_FromStringAndSize((char *) fbb.GetBufferPointer(), - fbb.GetSize()); - } + return task->taskInterface->to_py_string(); } static PyObject *PyObjectID_id(PyObject *self) { @@ -387,78 +369,29 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { required_resources["CPU"] = 1.0; } - Py_ssize_t num_args = PyList_Size(arguments); + std::function callMethodObjArgs = + std::bind(PyObject_CallMethodObjArgs, pickle_module, pickle_dumps, + std::placeholders::_1, pickle_protocol, NULL); - bool use_raylet = false; - if (use_raylet_object != nullptr && PyObject_IsTrue(use_raylet_object) == 1) { - use_raylet = true; - } - self->spec = nullptr; - self->task_spec = nullptr; + std::function isPyObjectIDType = + std::bind(PyObject_IsInstance, std::placeholders::_1, + reinterpret_cast(&PyObjectIDType)); // Create the task spec. - if (!use_raylet) { + if (use_raylet_object == nullptr || PyObject_IsTrue(use_raylet_object) != 1) { // The non-raylet code path. - - // Construct the task specification. - TaskSpec_start_construct( - g_task_builder, driver_id, parent_task_id, parent_counter, - actor_creation_id, actor_creation_dummy_object_id, actor_id, - actor_handle_id, actor_counter, is_actor_checkpoint_method, function_id, - num_returns); - // Add the task arguments. - for (Py_ssize_t i = 0; i < num_args; ++i) { - PyObject *arg = PyList_GetItem(arguments, i); - if (PyObject_IsInstance(arg, - reinterpret_cast(&PyObjectIDType))) { - TaskSpec_args_add_ref(g_task_builder, - &(reinterpret_cast(arg))->object_id, - 1); - } else { - PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps, - arg, pickle_protocol, NULL); - TaskSpec_args_add_val( - g_task_builder, reinterpret_cast(PyBytes_AsString(data)), - PyBytes_Size(data)); - Py_DECREF(data); - } - } - // Set the resource requirements for the task. - for (auto const &resource_pair : required_resources) { - TaskSpec_set_required_resource(g_task_builder, resource_pair.first, - resource_pair.second); - } - - // Compute the task ID and the return object IDs. - self->spec = TaskSpec_finish_construct(g_task_builder, &self->size); - + self->taskInterface = new NonRayletTask( + driver_id, parent_task_id, parent_counter, actor_creation_id, + actor_creation_dummy_object_id, actor_id, actor_handle_id, + actor_counter, is_actor_checkpoint_method, function_id, num_returns, + arguments, required_resources, callMethodObjArgs, isPyObjectIDType); } else { // The raylet code path. - - // Parse the arguments from the list. - std::vector> args; - for (Py_ssize_t i = 0; i < num_args; ++i) { - PyObject *arg = PyList_GetItem(arguments, i); - if (PyObject_IsInstance(arg, - reinterpret_cast(&PyObjectIDType))) { - std::vector references = { - reinterpret_cast(arg)->object_id}; - args.push_back( - std::make_shared(references)); - } else { - PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps, - arg, pickle_protocol, NULL); - args.push_back(std::make_shared( - reinterpret_cast(PyBytes_AsString(data)), - PyBytes_Size(data))); - Py_DECREF(data); - } - } - - self->task_spec = new ray::raylet::TaskSpecification( + self->taskInterface = new RayletTask( driver_id, parent_task_id, parent_counter, actor_creation_id, actor_creation_dummy_object_id, actor_id, actor_handle_id, - actor_counter, function_id, args, num_returns, required_resources); + actor_counter, is_actor_checkpoint_method, function_id, num_returns, + arguments, required_resources, callMethodObjArgs, isPyObjectIDType); } /* Set the task's execution dependencies. */ @@ -481,129 +414,58 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { } static void PyTask_dealloc(PyTask *self) { - if (!use_raylet(self)) { - TaskSpec_free(self->spec); - } else { - delete self->task_spec; - } + delete self->taskInterface; delete self->execution_dependencies; Py_TYPE(self)->tp_free(reinterpret_cast(self)); } static PyObject *PyTask_function_id(PyTask *self) { - FunctionID function_id; - if (!use_raylet(self)) { - function_id = TaskSpec_function(self->spec); - } else { - function_id = self->task_spec->FunctionId(); - } - return PyObjectID_make(function_id); + return PyObjectID_make(self->taskInterface->function_id()); } static PyObject *PyTask_actor_id(PyTask *self) { - ActorID actor_id; - if (!use_raylet(self)) { - actor_id = TaskSpec_actor_id(self->spec); - } else { - actor_id = self->task_spec->ActorId(); - } - return PyObjectID_make(actor_id); + return PyObjectID_make(self->taskInterface->actor_id()); } static PyObject *PyTask_actor_counter(PyTask *self) { - int64_t actor_counter; - if (!use_raylet(self)) { - actor_counter = TaskSpec_actor_counter(self->spec); - } else { - actor_counter = self->task_spec->ActorCounter(); - } - return PyLong_FromLongLong(actor_counter); + return PyLong_FromLongLong(self->taskInterface->actor_counter()); } static PyObject *PyTask_driver_id(PyTask *self) { - UniqueID driver_id; - if (!use_raylet(self)) { - driver_id = TaskSpec_driver_id(self->spec); - } else { - driver_id = self->task_spec->DriverId(); - } - return PyObjectID_make(driver_id); + return PyObjectID_make(self->taskInterface->driver_id()); } static PyObject *PyTask_task_id(PyTask *self) { - TaskID task_id; - if (!use_raylet(self)) { - task_id = TaskSpec_task_id(self->spec); - } else { - task_id = self->task_spec->TaskId(); - } - return PyObjectID_make(task_id); + return PyObjectID_make(self->taskInterface->task_id()); } static PyObject *PyTask_parent_task_id(PyTask *self) { - TaskID task_id; - if (!use_raylet(self)) { - task_id = TaskSpec_parent_task_id(self->spec); - } else { - task_id = self->task_spec->ParentTaskId(); - } - return PyObjectID_make(task_id); + return PyObjectID_make(self->taskInterface->parent_task_id()); } static PyObject *PyTask_parent_counter(PyTask *self) { - int64_t parent_counter; - if (!use_raylet(self)) { - parent_counter = TaskSpec_parent_counter(self->spec); - } else { - parent_counter = self->task_spec->ParentCounter(); - } - return PyLong_FromLongLong(parent_counter); + return PyLong_FromLongLong(self->taskInterface->parent_counter()); } static PyObject *PyTask_arguments(PyTask *self) { - TaskSpec *task = self->spec; - ray::raylet::TaskSpecification *task_spec = self->task_spec; - - int64_t num_args; - if (!use_raylet(self)) { - num_args = TaskSpec_num_args(task); - } else { - num_args = self->task_spec->NumArgs(); - } + int64_t num_args = self->taskInterface->num_args(); PyObject *arg_list = PyList_New((Py_ssize_t) num_args); for (int i = 0; i < num_args; ++i) { - int count; - if (!use_raylet(self)) { - count = TaskSpec_arg_id_count(task, i); - } else { - count = task_spec->ArgIdCount(i); - } + int count = self->taskInterface->arg_id_count(i); if (count > 0) { assert(count == 1); - ObjectID object_id; - if (!use_raylet(self)) { - object_id = TaskSpec_arg_id(task, i, 0); - } else { - object_id = task_spec->ArgId(i, 0); - } + ObjectID object_id = self->taskInterface->arg_id(i, 0); PyList_SetItem(arg_list, i, PyObjectID_make(object_id)); } else { RAY_CHECK(pickle_module != NULL); RAY_CHECK(pickle_loads != NULL); - const uint8_t *arg_val; - int64_t arg_length; - if (!use_raylet(self)) { - arg_val = TaskSpec_arg_val(task, i); - arg_length = TaskSpec_arg_length(task, i); - } else { - arg_val = task_spec->ArgVal(i); - arg_length = task_spec->ArgValLength(i); - } + const uint8_t *arg_val = self->taskInterface->arg_val(i); + int64_t arg_length = self->taskInterface->arg_length(i); PyObject *str = PyBytes_FromStringAndSize(reinterpret_cast(arg_val), @@ -618,41 +480,19 @@ static PyObject *PyTask_arguments(PyTask *self) { } static PyObject *PyTask_actor_creation_id(PyTask *self) { - ActorID actor_creation_id; - if (!use_raylet(self)) { - actor_creation_id = TaskSpec_actor_creation_id(self->spec); - } else { - actor_creation_id = self->task_spec->ActorCreationId(); - } - return PyObjectID_make(actor_creation_id); + return PyObjectID_make(self->taskInterface->actor_creation_id()); } static PyObject *PyTask_actor_creation_dummy_object_id(PyTask *self) { - ObjectID actor_creation_dummy_object_id; - if (!use_raylet(self)) { - if (TaskSpec_is_actor_task(self->spec)) { - actor_creation_dummy_object_id = - TaskSpec_actor_creation_dummy_object_id(self->spec); - } else { - actor_creation_dummy_object_id = ObjectID::nil(); - } - } else { - actor_creation_dummy_object_id = - self->task_spec->ActorCreationDummyObjectId(); - } + ObjectID actor_creation_dummy_object_id = + self->taskInterface->actor_creation_dummy_object_id(); return PyObjectID_make(actor_creation_dummy_object_id); } static PyObject *PyTask_required_resources(PyTask *self) { PyObject *required_resources = PyDict_New(); - std::unordered_map resource_map; - if (!use_raylet(self)) { - resource_map = TaskSpec_get_required_resources(self->spec); - } else { - resource_map = self->task_spec->GetRequiredResources().GetResourceMap(); - } - + auto resource_map = self->taskInterface->get_required_resources(); for (auto const &resource_pair : resource_map) { std::string resource_name = resource_pair.first; #if PY_MAJOR_VERSION >= 3 @@ -671,24 +511,11 @@ static PyObject *PyTask_required_resources(PyTask *self) { } static PyObject *PyTask_returns(PyTask *self) { - TaskSpec *task = self->spec; - ray::raylet::TaskSpecification *task_spec = self->task_spec; - - int64_t num_returns; - if (!use_raylet(self)) { - num_returns = TaskSpec_num_returns(task); - } else { - num_returns = task_spec->NumReturns(); - } + int64_t num_returns = self->taskInterface->num_returns(); PyObject *return_id_list = PyList_New((Py_ssize_t) num_returns); for (int i = 0; i < num_returns; ++i) { - ObjectID object_id; - if (!use_raylet(self)) { - object_id = TaskSpec_return(task, i); - } else { - object_id = task_spec->ReturnId(i); - } + ObjectID object_id = self->taskInterface->return_id(i); PyList_SetItem(return_id_list, i, PyObjectID_make(object_id)); } return return_id_list; @@ -704,17 +531,9 @@ static PyObject *PyTask_execution_dependencies_string(PyTask *self) { } static PyObject *PyTask_to_serialized_flatbuf(PyTask *self) { - RAY_CHECK(use_raylet(self)); - - const std::vector execution_dependencies( - *self->execution_dependencies); - auto const execution_spec = ray::raylet::TaskExecutionSpecification( - std::move(execution_dependencies)); - auto const task = ray::raylet::Task(execution_spec, *self->task_spec); - flatbuffers::FlatBufferBuilder fbb; - auto task_flatbuffer = task.ToFlatbuffer(fbb); - fbb.Finish(task_flatbuffer); + self->taskInterface->to_serialized_flatbuf(fbb, + *self->execution_dependencies); return PyBytes_FromStringAndSize( reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); @@ -803,8 +622,7 @@ PyTypeObject PyTaskType = { PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) { PyTask *result = PyObject_New(PyTask, &PyTaskType); result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType); - result->spec = task_spec; - result->size = task_size; + result->taskInterface = new NonRayletTask(task_spec, task_size); /* The created task does not include any execution dependencies. */ result->execution_dependencies = new std::vector(); return (PyObject *) result; diff --git a/src/common/lib/python/common_extension.h b/src/common/lib/python/common_extension.h index b24e45a1f88d..41e9ca66a20d 100644 --- a/src/common/lib/python/common_extension.h +++ b/src/common/lib/python/common_extension.h @@ -1,9 +1,8 @@ #ifndef COMMON_EXTENSION_H #define COMMON_EXTENSION_H -#include - #include +#include #include "marshal.h" #include "structmember.h" @@ -15,29 +14,9 @@ class TaskBuilder; extern PyObject *CommonError; -// clang-format off -typedef struct { - PyObject_HEAD - ray::ObjectID object_id; -} PyObjectID; - -typedef struct { - PyObject_HEAD - int64_t size; - // The task spec to use in the non-raylet case. - TaskSpec *spec; - // The task spec to use in the raylet case. - ray::raylet::TaskSpecification *task_spec; - std::vector *execution_dependencies; -} PyTask; -// clang-format on - extern PyTypeObject PyObjectIDType; extern PyTypeObject PyTaskType; - -bool use_raylet(PyTask *task); - /* Python module for pickling. */ extern PyObject *pickle_module; extern PyObject *pickle_dumps; @@ -47,8 +26,6 @@ int init_numpy_module(void); void init_pickle_module(void); -extern TaskBuilder *g_task_builder; - int PyStringToUniqueID(PyObject *object, ray::ObjectID *object_id); int PyObjectToUniqueID(PyObject *object, ray::ObjectID *object_id); diff --git a/src/common/lib/python/common_task.cc b/src/common/lib/python/common_task.cc new file mode 100644 index 000000000000..094a4a6fe09d --- /dev/null +++ b/src/common/lib/python/common_task.cc @@ -0,0 +1,307 @@ +#include "common_task.h" +#include "common_protocol.h" +#include "format/local_scheduler_generated.h" + +#include "ray/raylet/task.h" +#include "ray/raylet/task_spec.h" +#include "ray/raylet/task_execution_spec.h" +#include "task.h" + +TaskBuilder *g_task_builder = NULL; + +NonRayletTask::NonRayletTask( + UniqueID &driver_id, + TaskID &parent_task_id, + int parent_counter, + ActorID &actor_creation_id, + ObjectID &actor_creation_dummy_object_id, + UniqueID &actor_id, + UniqueID &actor_handle_id, + int actor_counter, + bool is_actor_checkpoint_method, + FunctionID &function_id, + int num_returns, + PyObject *arguments, + std::unordered_map &required_resources, + std::function callMethodObjArgs, + std::function isPyObjectIDType) { + // Construct the task specification. + TaskSpec_start_construct( + g_task_builder, driver_id, parent_task_id, parent_counter, + actor_creation_id, actor_creation_dummy_object_id, actor_id, + actor_handle_id, actor_counter, is_actor_checkpoint_method, function_id, + num_returns); + + Py_ssize_t num_args = PyList_Size(arguments); + + // Add the task arguments. + for (Py_ssize_t i = 0; i < num_args; ++i) { + PyObject *arg = PyList_GetItem(arguments, i); + if (isPyObjectIDType(arg)) { + TaskSpec_args_add_ref( + g_task_builder, &(reinterpret_cast(arg))->object_id, 1); + } else { + PyObject *data = callMethodObjArgs(arg); + TaskSpec_args_add_val(g_task_builder, + reinterpret_cast(PyBytes_AsString(data)), + PyBytes_Size(data)); + Py_DECREF(data); + } + } + // Set the resource requirements for the task. + for (auto const &resource_pair : required_resources) { + TaskSpec_set_required_resource(g_task_builder, resource_pair.first, + resource_pair.second); + } + + // Compute the task ID and the return object IDs. + spec = TaskSpec_finish_construct(g_task_builder, &size); +} + +NonRayletTask::NonRayletTask(const char *data, int data_size) + : size(data_size) { + spec = TaskSpec_copy((TaskSpec *) data, size); +} + +NonRayletTask::NonRayletTask(TaskSpec *task_spec, int64_t task_size) + : size(task_size), spec(task_spec) {} + +NonRayletTask::~NonRayletTask() { + TaskSpec_free(spec); +} + +PyObject *NonRayletTask::to_py_string() { + return PyBytes_FromStringAndSize((char *) spec, size); +} + +FunctionID NonRayletTask::function_id() { + return TaskSpec_function(spec); +} + +ActorID NonRayletTask::actor_id() { + return TaskSpec_actor_id(spec); +} + +int64_t NonRayletTask::actor_counter() { + return TaskSpec_actor_counter(spec); +} + +UniqueID NonRayletTask::driver_id() { + return TaskSpec_driver_id(spec); +} + +TaskID NonRayletTask::task_id() { + return TaskSpec_task_id(spec); +} + +TaskID NonRayletTask::parent_task_id() { + return TaskSpec_parent_task_id(spec); +} + +int64_t NonRayletTask::parent_counter() { + return TaskSpec_parent_counter(spec); +} + +int64_t NonRayletTask::num_args() { + return TaskSpec_num_args(spec); +} + +int NonRayletTask::arg_id_count(int index) { + return TaskSpec_arg_id_count(spec, index); +} + +ObjectID NonRayletTask::arg_id(int64_t arg_index, int64_t id_index) { + return TaskSpec_arg_id(spec, arg_index, id_index); +} + +const uint8_t *NonRayletTask::arg_val(int64_t arg_index) { + return TaskSpec_arg_val(spec, arg_index); +} + +int64_t NonRayletTask::arg_length(int64_t arg_index) { + return TaskSpec_arg_length(spec, arg_index); +} + +ActorID NonRayletTask::actor_creation_id() { + return TaskSpec_actor_creation_id(spec); +} + +ObjectID NonRayletTask::actor_creation_dummy_object_id() { + if (TaskSpec_is_actor_task(spec)) { + return TaskSpec_actor_creation_dummy_object_id(spec); + } else { + return ObjectID::nil(); + } +} + +std::unordered_map +NonRayletTask::get_required_resources() { + return TaskSpec_get_required_resources(spec); +} + +int64_t NonRayletTask::num_returns() { + return TaskSpec_num_returns(spec); +} + +ObjectID NonRayletTask::return_id(int64_t return_index) { + return TaskSpec_return(spec, return_index); +} + +void NonRayletTask::to_serialized_flatbuf( + flatbuffers::FlatBufferBuilder &fbb, + std::vector execution_dependencies) { + throw std::runtime_error("Method not implemented"); +} + +void NonRayletTask::to_submit_message( + flatbuffers::FlatBufferBuilder &fbb, + const std::vector &dependencies) { + TaskExecutionSpec execution_spec(dependencies, spec, size); + auto execution_dependencies = + to_flatbuf(fbb, execution_spec.ExecutionDependencies()); + auto task_spec = + fbb.CreateString(reinterpret_cast(execution_spec.Spec()), + execution_spec.SpecSize()); + auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest( + fbb, execution_dependencies, task_spec); + fbb.Finish(message); +} + +RayletTask::RayletTask( + UniqueID &driver_id, + TaskID &parent_task_id, + int parent_counter, + ActorID &actor_creation_id, + ObjectID &actor_creation_dummy_object_id, + UniqueID &actor_id, + UniqueID &actor_handle_id, + int actor_counter, + bool is_actor_checkpoint_method, + FunctionID &function_id, + int num_returns, + PyObject *arguments, + std::unordered_map &required_resources, + std::function callMethodObjArgs, + std::function isPyObjectIDType) { + Py_ssize_t num_args = PyList_Size(arguments); + + // Parse the arguments from the list. + std::vector> args; + for (Py_ssize_t i = 0; i < num_args; ++i) { + PyObject *arg = PyList_GetItem(arguments, i); + if (isPyObjectIDType(arg)) { + std::vector references = { + reinterpret_cast(arg)->object_id}; + args.push_back( + std::make_shared(references)); + } else { + PyObject *data = callMethodObjArgs(arg); + args.push_back(std::make_shared( + reinterpret_cast(PyBytes_AsString(data)), + PyBytes_Size(data))); + Py_DECREF(data); + } + } + + task_spec.reset(new ray::raylet::TaskSpecification( + driver_id, parent_task_id, parent_counter, actor_creation_id, + actor_creation_dummy_object_id, actor_id, actor_handle_id, actor_counter, + function_id, args, num_returns, required_resources)); +} + +PyObject *RayletTask::to_py_string() { + flatbuffers::FlatBufferBuilder fbb; + auto task_spec_string = task_spec->ToFlatbuffer(fbb); + fbb.Finish(task_spec_string); + return PyBytes_FromStringAndSize((char *) fbb.GetBufferPointer(), + fbb.GetSize()); +} + +FunctionID RayletTask::function_id() { + return task_spec->FunctionId(); +} + +ActorID RayletTask::actor_id() { + return task_spec->ActorId(); +} + +int64_t RayletTask::actor_counter() { + return task_spec->ActorCounter(); +} + +UniqueID RayletTask::driver_id() { + return task_spec->DriverId(); +} + +TaskID RayletTask::task_id() { + return task_spec->TaskId(); +} + +TaskID RayletTask::parent_task_id() { + return task_spec->ParentTaskId(); +} + +int64_t RayletTask::parent_counter() { + return task_spec->ParentCounter(); +} + +int64_t RayletTask::num_args() { + return task_spec->NumArgs(); +} + +int RayletTask::arg_id_count(int index) { + return task_spec->ArgIdCount(index); +} + +ObjectID RayletTask::arg_id(int64_t arg_index, int64_t id_index) { + return task_spec->ArgId(arg_index, id_index); +} + +const uint8_t *RayletTask::arg_val(int64_t arg_index) { + return task_spec->ArgVal(arg_index); +} + +int64_t RayletTask::arg_length(int64_t arg_index) { + return task_spec->ArgValLength(arg_index); +} + +ActorID RayletTask::actor_creation_id() { + return task_spec->ActorCreationId(); +} + +ObjectID RayletTask::actor_creation_dummy_object_id() { + return task_spec->ActorCreationDummyObjectId(); +} + +std::unordered_map RayletTask::get_required_resources() { + return task_spec->GetRequiredResources().GetResourceMap(); +} + +int64_t RayletTask::num_returns() { + return task_spec->NumReturns(); +} + +ObjectID RayletTask::return_id(int64_t return_index) { + return task_spec->ReturnId(return_index); +} + +void RayletTask::to_serialized_flatbuf( + flatbuffers::FlatBufferBuilder &fbb, + std::vector execution_dependencies) { + ray::raylet::TaskExecutionSpecification execution_spec( + std::move(execution_dependencies)); + + ray::raylet::Task task(execution_spec, *task_spec); + + auto task_flatbuffer = task.ToFlatbuffer(fbb); + fbb.Finish(task_flatbuffer); +} + +void RayletTask::to_submit_message( + flatbuffers::FlatBufferBuilder &fbb, + const std::vector &execution_dependencies) { + auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies); + auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest( + fbb, execution_dependencies_message, task_spec->ToFlatbuffer(fbb)); + fbb.Finish(message); +} \ No newline at end of file diff --git a/src/common/lib/python/common_task.h b/src/common/lib/python/common_task.h new file mode 100644 index 000000000000..a6806638ce6f --- /dev/null +++ b/src/common/lib/python/common_task.h @@ -0,0 +1,153 @@ +#ifndef COMMON_TASK_H +#define COMMON_TASK_H + +#include + +#include + +#include "common.h" +#include "task.h" +#include "ray/raylet/task_spec.h" +#include + +typedef char TaskSpec; +class TaskBuilder; + +extern TaskBuilder *g_task_builder; + +class TaskInterface { + public: + virtual ~TaskInterface() {} + virtual PyObject *to_py_string() = 0; + virtual FunctionID function_id() = 0; + virtual ActorID actor_id() = 0; + virtual int64_t actor_counter() = 0; + virtual UniqueID driver_id() = 0; + virtual TaskID task_id() = 0; + virtual TaskID parent_task_id() = 0; + virtual int64_t parent_counter() = 0; + virtual int64_t num_args() = 0; + virtual int arg_id_count(int index) = 0; + virtual ObjectID arg_id(int64_t arg_index, int64_t id_index) = 0; + virtual const uint8_t *arg_val(int64_t arg_index) = 0; + virtual int64_t arg_length(int64_t arg_index) = 0; + virtual ActorID actor_creation_id() = 0; + virtual ObjectID actor_creation_dummy_object_id() = 0; + virtual std::unordered_map get_required_resources() = 0; + virtual int64_t num_returns() = 0; + virtual ObjectID return_id(int64_t return_index) = 0; + virtual void to_serialized_flatbuf( + flatbuffers::FlatBufferBuilder &fbb, + std::vector execution_dependencies) = 0; + virtual void to_submit_message( + flatbuffers::FlatBufferBuilder &fbb, + const std::vector &execution_dependencies) = 0; +}; + +class NonRayletTask : public TaskInterface { + public: + NonRayletTask(UniqueID &driver_id, + TaskID &parent_task_id, + int parent_counter, + ActorID &actor_creation_id, + ObjectID &actor_creation_dummy_object_id, + UniqueID &actor_id, + UniqueID &actor_handle_id, + int actor_counter, + bool is_actor_checkpoint_method, + FunctionID &function_id, + int num_returns, + PyObject *arguments, + std::unordered_map &required_resources, + std::function callMethodObjArgs, + std::function isPyObjectIDType); + NonRayletTask(const char *data, int data_size); + NonRayletTask(TaskSpec *task_spec, int64_t task_size); + ~NonRayletTask(); + PyObject *to_py_string(); + FunctionID function_id(); + ActorID actor_id(); + int64_t actor_counter(); + UniqueID driver_id(); + TaskID task_id(); + TaskID parent_task_id(); + int64_t parent_counter(); + int64_t num_args(); + int arg_id_count(int index); + ObjectID arg_id(int64_t arg_index, int64_t id_index); + const uint8_t *arg_val(int64_t arg_index); + int64_t arg_length(int64_t arg_index); + ActorID actor_creation_id(); + ObjectID actor_creation_dummy_object_id(); + std::unordered_map get_required_resources(); + int64_t num_returns(); + ObjectID return_id(int64_t return_index); + void to_serialized_flatbuf(flatbuffers::FlatBufferBuilder &fbb, + std::vector execution_dependencies); + void to_submit_message(flatbuffers::FlatBufferBuilder &fbb, + const std::vector &execution_dependencies); + + private: + int64_t size; + // The task spec to use in the non-raylet case. + TaskSpec *spec; +}; + +class RayletTask : public TaskInterface { + public: + RayletTask(UniqueID &driver_id, + TaskID &parent_task_id, + int parent_counter, + ActorID &actor_creation_id, + ObjectID &actor_creation_dummy_object_id, + UniqueID &actor_id, + UniqueID &actor_handle_id, + int actor_counter, + bool is_actor_checkpoint_method, + FunctionID &function_id, + int num_returns, + PyObject *arguments, + std::unordered_map &required_resources, + std::function callMethodObjArgs, + std::function isPyObjectIDType); + PyObject *to_py_string(); + FunctionID function_id(); + ActorID actor_id(); + int64_t actor_counter(); + UniqueID driver_id(); + TaskID task_id(); + TaskID parent_task_id(); + int64_t parent_counter(); + int64_t num_args(); + int arg_id_count(int index); + ObjectID arg_id(int64_t arg_index, int64_t id_index); + const uint8_t *arg_val(int64_t arg_index); + int64_t arg_length(int64_t arg_index); + ActorID actor_creation_id(); + ObjectID actor_creation_dummy_object_id(); + std::unordered_map get_required_resources(); + int64_t num_returns(); + ObjectID return_id(int64_t return_index); + void to_serialized_flatbuf(flatbuffers::FlatBufferBuilder &fbb, + std::vector execution_dependencies); + void to_submit_message(flatbuffers::FlatBufferBuilder &fbb, + const std::vector &execution_dependencies); + + private: + // The task spec to use in the raylet case. + std::unique_ptr task_spec; +}; + +// clang-format off +typedef struct { + PyObject_HEAD + ray::ObjectID object_id; +} PyObjectID; + +typedef struct { + PyObject_HEAD + TaskInterface* taskInterface; + std::vector *execution_dependencies; +} PyTask; +// clang-format on +#endif /* COMMON_TASK_H */ diff --git a/src/local_scheduler/lib/python/local_scheduler_extension.cc b/src/local_scheduler/lib/python/local_scheduler_extension.cc index ce68c6bfe250..aab6bf412a18 100644 --- a/src/local_scheduler/lib/python/local_scheduler_extension.cc +++ b/src/local_scheduler/lib/python/local_scheduler_extension.cc @@ -1,5 +1,6 @@ #include +#include "common_task.h" #include "common_extension.h" #include "config_extension.h" #include "local_scheduler_client.h" @@ -56,14 +57,10 @@ static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) { ->local_scheduler_connection; PyTask *task = reinterpret_cast(py_task); - if (!use_raylet(task)) { - TaskExecutionSpec execution_spec = TaskExecutionSpec( - *task->execution_dependencies, task->spec, task->size); - local_scheduler_submit(connection, execution_spec); - } else { - local_scheduler_submit_raylet(connection, *task->execution_dependencies, - *task->task_spec); - } + flatbuffers::FlatBufferBuilder fbb; + task->taskInterface->to_submit_message(fbb, *task->execution_dependencies); + + local_scheduler_submit(connection, fbb); Py_RETURN_NONE; } diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 68642d813ad1..f0b54a887f39 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -81,15 +81,8 @@ void local_scheduler_submit(LocalSchedulerConnection *conn, fbb.GetSize(), fbb.GetBufferPointer()); } -void local_scheduler_submit_raylet( - LocalSchedulerConnection *conn, - const std::vector &execution_dependencies, - ray::raylet::TaskSpecification task_spec) { - flatbuffers::FlatBufferBuilder fbb; - auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies); - auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest( - fbb, execution_dependencies_message, task_spec.ToFlatbuffer(fbb)); - fbb.Finish(message); +void local_scheduler_submit(LocalSchedulerConnection *conn, + flatbuffers::FlatBufferBuilder &fbb) { write_message(conn->conn, static_cast(MessageType::SubmitTask), fbb.GetSize(), fbb.GetBufferPointer()); } diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index 1a8cbd240f29..033537f200c8 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -60,13 +60,10 @@ void local_scheduler_submit(LocalSchedulerConnection *conn, /// Submit a task using the raylet code path. /// /// \param The connection information. -/// \param The execution dependencies. -/// \param The task specification. +/// \param The buffer will be submitted. /// \return Void. -void local_scheduler_submit_raylet( - LocalSchedulerConnection *conn, - const std::vector &execution_dependencies, - ray::raylet::TaskSpecification task_spec); +void local_scheduler_submit(LocalSchedulerConnection *conn, + flatbuffers::FlatBufferBuilder &fbb); /** * Notify the local scheduler that this client is disconnecting gracefully. This