diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index f0ec0b7d61ff..f9fd299c47cb 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -106,6 +106,8 @@ class RayConfig { return object_manager_default_chunk_size_; } + int num_workers_per_process() const { return num_workers_per_process_; } + private: RayConfig() : ray_protocol_version_(0x0000000000000000), @@ -142,7 +144,8 @@ class RayConfig { object_manager_push_timeout_ms_(10000), object_manager_max_sends_(2), object_manager_max_receives_(2), - object_manager_default_chunk_size_(100000000) {} + object_manager_default_chunk_size_(100000000), + num_workers_per_process_(1) {} ~RayConfig() {} @@ -251,6 +254,9 @@ class RayConfig { /// In the object manager, no single thread is permitted to transfer more /// data than what is specified by the chunk size. uint64_t object_manager_default_chunk_size_; + + /// Number of workers per process + int num_workers_per_process_; }; #endif // RAY_CONFIG_H diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index dce0988fc4ef..2a7a837ef0fc 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -35,6 +35,8 @@ int main(int argc, char *argv[]) { RAY_LOG(INFO) << "Starting raylet with static resource configuration: " << node_manager_config.resource_config.ToString(); node_manager_config.num_initial_workers = num_initial_workers; + node_manager_config.num_workers_per_process = + RayConfig::instance().num_workers_per_process(); // Use a default worker that can execute empty tasks with dependencies. std::stringstream worker_command_stream(worker_command); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bace170bdb83..59c28c020861 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -83,7 +83,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, heartbeat_period_ms_(config.heartbeat_period_ms), local_resources_(config.resource_config), local_available_resources_(config.resource_config), - worker_pool_(config.num_initial_workers, + worker_pool_(config.num_initial_workers, config.num_workers_per_process, static_cast(config.resource_config.GetNumCpus()), config.worker_command), local_queues_(SchedulingQueue()), @@ -700,7 +700,7 @@ void NodeManager::AssignTask(Task &task) { if (!spec.IsActorTask()) { // There are no more non-actor workers available to execute this task. // Start a new worker. - worker_pool_.StartWorker(); + worker_pool_.StartWorkerProcess(); } // Queue this task for future assignment. The task will be assigned to a // worker once one becomes available. diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 7610ae142ab1..6403227282a8 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -22,6 +22,7 @@ namespace raylet { struct NodeManagerConfig { ResourceSet resource_config; int num_initial_workers; + int num_workers_per_process; std::vector worker_command; uint64_t heartbeat_period_ms; uint64_t max_lineage_size; diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 451125581ef4..24333b4de4f7 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -37,6 +37,7 @@ class TestObjectManagerBase : public ::testing::Test { node_manager_config.resource_config = ray::raylet::ResourceSet(std::move(static_resource_conf)); node_manager_config.num_initial_workers = 0; + node_manager_config.num_workers_per_process = 1; // Use a default worker that can execute empty tasks with dependencies. node_manager_config.worker_command.push_back("python"); node_manager_config.worker_command.push_back( diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 5577139be7a9..0dd8a383d7b5 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -9,55 +9,53 @@ namespace ray { namespace raylet { -/// A constructor that initializes a worker pool with num_workers workers. -WorkerPool::WorkerPool(int num_workers, int num_cpus, - const std::vector &worker_command) - : num_cpus_(num_cpus), worker_command_(worker_command) { +/// A constructor that initializes a worker pool with +/// (num_worker_processes * num_workers_per_process) workers +WorkerPool::WorkerPool(int num_worker_processes, int num_workers_per_process, + int num_cpus, const std::vector &worker_command) + : num_workers_per_process_(num_workers_per_process), + num_cpus_(num_cpus), + worker_command_(worker_command) { + RAY_CHECK(num_workers_per_process > 0) << "num_workers_per_process must be positive."; // Ignore SIGCHLD signals. If we don't do this, then worker processes will // become zombies instead of dying gracefully. signal(SIGCHLD, SIG_IGN); - for (int i = 0; i < num_workers; i++) { + for (int i = 0; i < num_worker_processes; i++) { // Force-start num_workers workers. - StartWorker(true); + StartWorkerProcess(true); } } -/// A constructor that initializes an empty worker pool with zero workers. -WorkerPool::WorkerPool(const std::vector &worker_command) - : worker_command_(worker_command) {} - WorkerPool::~WorkerPool() { + std::unordered_set pids_to_kill; // Kill all registered workers. NOTE(swang): This assumes that the registered // workers were started by the pool. for (const auto &worker : registered_workers_) { - RAY_CHECK(worker->Pid() > 0); - kill(worker->Pid(), SIGKILL); - waitpid(worker->Pid(), NULL, 0); + pids_to_kill.insert(worker->Pid()); } // Kill all the workers that have been started but not registered. - for (const auto &pid : started_worker_pids_) { + for (const auto &entry : starting_worker_processes_) { + pids_to_kill.insert(entry.second); + } + for (const auto &pid : pids_to_kill) { RAY_CHECK(pid > 0); kill(pid, SIGKILL); waitpid(pid, NULL, 0); } - - pool_.clear(); - actor_pool_.clear(); - registered_workers_.clear(); - started_worker_pids_.clear(); } uint32_t WorkerPool::Size() const { return static_cast(actor_pool_.size() + pool_.size()); } -void WorkerPool::StartWorker(bool force_start) { +void WorkerPool::StartWorkerProcess(bool force_start) { RAY_CHECK(!worker_command_.empty()) << "No worker command provided"; // The first condition makes sure that we are always starting up to // num_cpus_ number of processes in parallel. - if (NumWorkersStarting() > num_cpus_ && !force_start) { + if (static_cast(starting_worker_processes_.size()) >= num_cpus_ && !force_start) { // Workers have been started, but not registered. Force start disabled -- returning. - RAY_LOG(DEBUG) << started_worker_pids_.size() << " workers pending registration"; + RAY_LOG(DEBUG) << starting_worker_processes_.size() + << " worker processes pending registration"; return; } // Either there are no workers pending registration or the worker start is being forced. @@ -67,8 +65,8 @@ void WorkerPool::StartWorker(bool force_start) { // Launch the process to create the worker. pid_t pid = fork(); if (pid != 0) { - RAY_LOG(DEBUG) << "Started worker with pid " << pid; - started_worker_pids_.insert(pid); + RAY_LOG(DEBUG) << "Started worker process with pid " << pid; + starting_worker_processes_.emplace(std::make_pair(pid, num_workers_per_process_)); return; } @@ -93,9 +91,12 @@ void WorkerPool::RegisterWorker(std::shared_ptr worker) { auto pid = worker->Pid(); RAY_LOG(DEBUG) << "Registering worker with pid " << pid; registered_workers_.push_back(std::move(worker)); - auto it = started_worker_pids_.find(pid); - RAY_CHECK(it != started_worker_pids_.end()); - started_worker_pids_.erase(it); + auto it = starting_worker_processes_.find(pid); + RAY_CHECK(it != starting_worker_processes_.end()); + it->second--; + if (it->second == 0) { + starting_worker_processes_.erase(it); + } } std::shared_ptr WorkerPool::GetRegisteredWorker( @@ -155,11 +156,6 @@ bool WorkerPool::DisconnectWorker(std::shared_ptr worker) { return removeWorker(pool_, worker); } -// Protected WorkerPool methods. -void WorkerPool::AddStartedWorker(pid_t pid) { started_worker_pids_.insert(pid); } - -int WorkerPool::NumWorkersStarting() const { return started_worker_pids_.size(); } - } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 8e233011204d..34d5b5d25582 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -21,34 +21,30 @@ class Worker; /// is a container for a unit of work. class WorkerPool { public: - /// Create a pool and asynchronously start the specified number of workers. - /// Once each worker process has registered with an external server, the - /// process should create and register a new Worker, then add itself to the - /// pool. + /// Create a pool and asynchronously start the specified number of worker processes. + /// Once each worker process has registered with an external server, + /// the process should create and register the specified number of workers, + /// and add them to the pool. /// - /// \param num_workers The number of workers to start. + /// \param num_worker_processes The number of worker processes to start. + /// \param num_workers_per_process The number of workers per process. /// \param worker_command The command used to start the worker process. - WorkerPool(int num_workers, int num_cpus, + WorkerPool(int num_worker_processes, int num_workers_per_process, int num_cpus, const std::vector &worker_command); - /// Create a pool with zero workers. - /// - /// \param num_workers The number of workers to start. - /// \param worker_command The command used to start the worker process. - WorkerPool(const std::vector &worker_command); - /// Destructor responsible for freeing a set of workers owned by this class. virtual ~WorkerPool(); /// Asynchronously start a new worker process. Once the worker process has /// registered with an external server, the process should create and - /// register a new Worker, then add itself to the pool. Failure to start - /// the worker process is a fatal error. This function will start up to - /// num_cpus many workers in parallel if it is called multiple times. + /// register num_workers_per_process_ workers, then add them to the pool. + /// Failure to start the worker process is a fatal error. + /// This function will start up to num_cpus many workers in parallel + /// if it is called multiple times. /// /// \param force_start Controls whether to force starting a worker regardless of any /// workers that have already been started but not yet registered. - void StartWorker(bool force_start = false); + void StartWorkerProcess(bool force_start = false); /// Register a new worker. The Worker should be added by the caller to the /// pool after it becomes idle (e.g., requests a work assignment). @@ -90,15 +86,11 @@ class WorkerPool { uint32_t Size() const; protected: - /// Add started worker PID to the internal list of started workers (for testing). - /// - /// \param pid A process identifier for the worker being started. - void AddStartedWorker(pid_t pid); - - /// Return a number of workers currently starting but not registered. - /// - /// \return The number of worker PIDs stored for started workers. - int NumWorkersStarting() const; + /// A map from the pids of starting worker processes + /// to the number of their unregistered workers. + std::unordered_map starting_worker_processes_; + /// The number of workers per process. + int num_workers_per_process_; private: /// The number of CPUs this Raylet has available. @@ -113,7 +105,6 @@ class WorkerPool { /// idle and executing. // TODO(swang): Make this a map to make GetRegisteredWorker faster. std::list> registered_workers_; - std::unordered_set started_worker_pids_; }; } // namespace raylet diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index d2f9f4f10002..5c05913e07a5 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -8,26 +8,30 @@ namespace ray { namespace raylet { +int NUM_WORKERS_PER_PROCESS = 3; + class WorkerPoolMock : public WorkerPool { public: - WorkerPoolMock(const std::vector &worker_command) - : WorkerPool(worker_command) {} + WorkerPoolMock() : WorkerPool(0, NUM_WORKERS_PER_PROCESS, 0, {}) {} - void StartWorker(pid_t pid, bool force_start = false) { - if (NumWorkersStarting() > 0 && !force_start) { + void StartWorkerProcess(pid_t pid, bool force_start = false) { + if (starting_worker_processes_.size() > 0 && !force_start) { // Workers have been started, but not registered. Force start disabled -- returning. - RAY_LOG(DEBUG) << NumWorkersStarting() << " workers pending registration"; + RAY_LOG(DEBUG) << starting_worker_processes_.size() + << " worker processes pending registration"; return; } // Either no workers are pending registration or the worker start is being forced. - RAY_LOG(DEBUG) << "starting worker, worker pool size " << Size(); - AddStartedWorker(pid); + RAY_LOG(DEBUG) << "starting new worker process, worker pool size " << Size(); + starting_worker_processes_.emplace(std::make_pair(pid, num_workers_per_process_)); } + + int NumWorkerProcessesStarting() const { return starting_worker_processes_.size(); } }; class WorkerPoolTest : public ::testing::Test { public: - WorkerPoolTest() : worker_pool_({}), io_service_() {} + WorkerPoolTest() : worker_pool_(), io_service_() {} std::shared_ptr CreateWorker(pid_t pid) { std::function client_handler = @@ -40,7 +44,6 @@ class WorkerPoolTest : public ::testing::Test { boost::asio::local::stream_protocol::socket socket(io_service_); auto client = LocalClientConnection::Create(client_handler, message_handler, std::move(socket)); - worker_pool_.StartWorker(pid); return std::shared_ptr(new Worker(pid, client)); } @@ -54,15 +57,29 @@ class WorkerPoolTest : public ::testing::Test { }; TEST_F(WorkerPoolTest, HandleWorkerRegistration) { - auto worker = CreateWorker(1234); - // Check that we cannot lookup the worker before it's registered. - ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr); - worker_pool_.RegisterWorker(worker); - // Check that we can lookup the worker after it's registered. - ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), worker); - worker_pool_.DisconnectWorker(worker); - // Check that we cannot lookup the worker after it's disconnected. - ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr); + pid_t pid = 1234; + worker_pool_.StartWorkerProcess(pid); + std::vector> workers; + for (int i = 0; i < NUM_WORKERS_PER_PROCESS; i++) { + workers.push_back(CreateWorker(pid)); + } + for (const auto &worker : workers) { + // Check that there's still a starting worker process + // before all workers have been registered + ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), 1); + // Check that we cannot lookup the worker before it's registered. + ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr); + worker_pool_.RegisterWorker(worker); + // Check that we can lookup the worker after it's registered. + ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), worker); + } + // Check that there's no starting worker process + ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), 0); + for (const auto &worker : workers) { + worker_pool_.DisconnectWorker(worker); + // Check that we cannot lookup the worker after it's disconnected. + ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr); + } } TEST_F(WorkerPoolTest, HandleWorkerPushPop) {