Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class RayConfig {
return object_manager_default_chunk_size_;
}

int num_workers_per_process() const { return num_workers_per_process_; }

private:
RayConfig()
: ray_protocol_version_(0x0000000000000000),
Expand Down Expand Up @@ -142,7 +144,8 @@ class RayConfig {
object_manager_push_timeout_ms_(10000),
object_manager_max_sends_(2),
object_manager_max_receives_(2),
object_manager_default_chunk_size_(100000000) {}
object_manager_default_chunk_size_(100000000),
num_workers_per_process_(1) {}

~RayConfig() {}

Expand Down Expand Up @@ -251,6 +254,9 @@ class RayConfig {
/// In the object manager, no single thread is permitted to transfer more
/// data than what is specified by the chunk size.
uint64_t object_manager_default_chunk_size_;

/// Number of workers per process
int num_workers_per_process_;
};

#endif // RAY_CONFIG_H
2 changes: 2 additions & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ int main(int argc, char *argv[]) {
RAY_LOG(INFO) << "Starting raylet with static resource configuration: "
<< node_manager_config.resource_config.ToString();
node_manager_config.num_initial_workers = num_initial_workers;
node_manager_config.num_workers_per_process =
RayConfig::instance().num_workers_per_process();
// Use a default worker that can execute empty tasks with dependencies.

std::stringstream worker_command_stream(worker_command);
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
heartbeat_period_ms_(config.heartbeat_period_ms),
local_resources_(config.resource_config),
local_available_resources_(config.resource_config),
worker_pool_(config.num_initial_workers,
worker_pool_(config.num_initial_workers, config.num_workers_per_process,
static_cast<int>(config.resource_config.GetNumCpus()),
config.worker_command),
local_queues_(SchedulingQueue()),
Expand Down Expand Up @@ -700,7 +700,7 @@ void NodeManager::AssignTask(Task &task) {
if (!spec.IsActorTask()) {
// There are no more non-actor workers available to execute this task.
// Start a new worker.
worker_pool_.StartWorker();
worker_pool_.StartWorkerProcess();
}
// Queue this task for future assignment. The task will be assigned to a
// worker once one becomes available.
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace raylet {
struct NodeManagerConfig {
ResourceSet resource_config;
int num_initial_workers;
int num_workers_per_process;
std::vector<std::string> worker_command;
uint64_t heartbeat_period_ms;
uint64_t max_lineage_size;
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/object_manager_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TestObjectManagerBase : public ::testing::Test {
node_manager_config.resource_config =
ray::raylet::ResourceSet(std::move(static_resource_conf));
node_manager_config.num_initial_workers = 0;
node_manager_config.num_workers_per_process = 1;
// Use a default worker that can execute empty tasks with dependencies.
node_manager_config.worker_command.push_back("python");
node_manager_config.worker_command.push_back(
Expand Down
60 changes: 28 additions & 32 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,53 @@ namespace ray {

namespace raylet {

/// A constructor that initializes a worker pool with num_workers workers.
WorkerPool::WorkerPool(int num_workers, int num_cpus,
const std::vector<std::string> &worker_command)
: num_cpus_(num_cpus), worker_command_(worker_command) {
/// A constructor that initializes a worker pool with
/// (num_worker_processes * num_workers_per_process) workers
WorkerPool::WorkerPool(int num_worker_processes, int num_workers_per_process,
int num_cpus, const std::vector<std::string> &worker_command)
: num_workers_per_process_(num_workers_per_process),
num_cpus_(num_cpus),
worker_command_(worker_command) {
RAY_CHECK(num_workers_per_process > 0) << "num_workers_per_process must be positive.";
// Ignore SIGCHLD signals. If we don't do this, then worker processes will
// become zombies instead of dying gracefully.
signal(SIGCHLD, SIG_IGN);
for (int i = 0; i < num_workers; i++) {
for (int i = 0; i < num_worker_processes; i++) {
// Force-start num_workers workers.
StartWorker(true);
StartWorkerProcess(true);
}
}

/// A constructor that initializes an empty worker pool with zero workers.
WorkerPool::WorkerPool(const std::vector<std::string> &worker_command)
: worker_command_(worker_command) {}

WorkerPool::~WorkerPool() {
std::unordered_set<pid_t> pids_to_kill;
// Kill all registered workers. NOTE(swang): This assumes that the registered
// workers were started by the pool.
for (const auto &worker : registered_workers_) {
RAY_CHECK(worker->Pid() > 0);
kill(worker->Pid(), SIGKILL);
waitpid(worker->Pid(), NULL, 0);
pids_to_kill.insert(worker->Pid());
}
// Kill all the workers that have been started but not registered.
for (const auto &pid : started_worker_pids_) {
for (const auto &entry : starting_worker_processes_) {
pids_to_kill.insert(entry.second);
}
for (const auto &pid : pids_to_kill) {
RAY_CHECK(pid > 0);
kill(pid, SIGKILL);
waitpid(pid, NULL, 0);
}

pool_.clear();
actor_pool_.clear();
registered_workers_.clear();
started_worker_pids_.clear();
}

uint32_t WorkerPool::Size() const {
return static_cast<uint32_t>(actor_pool_.size() + pool_.size());
}

void WorkerPool::StartWorker(bool force_start) {
void WorkerPool::StartWorkerProcess(bool force_start) {
RAY_CHECK(!worker_command_.empty()) << "No worker command provided";
// The first condition makes sure that we are always starting up to
// num_cpus_ number of processes in parallel.
if (NumWorkersStarting() > num_cpus_ && !force_start) {
if (static_cast<int>(starting_worker_processes_.size()) >= num_cpus_ && !force_start) {
// Workers have been started, but not registered. Force start disabled -- returning.
RAY_LOG(DEBUG) << started_worker_pids_.size() << " workers pending registration";
RAY_LOG(DEBUG) << starting_worker_processes_.size()
<< " worker processes pending registration";
return;
}
// Either there are no workers pending registration or the worker start is being forced.
Expand All @@ -67,8 +65,8 @@ void WorkerPool::StartWorker(bool force_start) {
// Launch the process to create the worker.
pid_t pid = fork();
if (pid != 0) {
RAY_LOG(DEBUG) << "Started worker with pid " << pid;
started_worker_pids_.insert(pid);
RAY_LOG(DEBUG) << "Started worker process with pid " << pid;
starting_worker_processes_.emplace(std::make_pair(pid, num_workers_per_process_));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddStartingWorkerProcess(pid)

return;
}

Expand All @@ -93,9 +91,12 @@ void WorkerPool::RegisterWorker(std::shared_ptr<Worker> worker) {
auto pid = worker->Pid();
RAY_LOG(DEBUG) << "Registering worker with pid " << pid;
registered_workers_.push_back(std::move(worker));
auto it = started_worker_pids_.find(pid);
RAY_CHECK(it != started_worker_pids_.end());
started_worker_pids_.erase(it);
auto it = starting_worker_processes_.find(pid);
RAY_CHECK(it != starting_worker_processes_.end());
it->second--;
if (it->second == 0) {
starting_worker_processes_.erase(it);
}
}

std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(
Expand Down Expand Up @@ -155,11 +156,6 @@ bool WorkerPool::DisconnectWorker(std::shared_ptr<Worker> worker) {
return removeWorker(pool_, worker);
}

// Protected WorkerPool methods.
void WorkerPool::AddStartedWorker(pid_t pid) { started_worker_pids_.insert(pid); }

int WorkerPool::NumWorkersStarting() const { return started_worker_pids_.size(); }

} // namespace raylet

} // namespace ray
43 changes: 17 additions & 26 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,30 @@ class Worker;
/// is a container for a unit of work.
class WorkerPool {
public:
/// Create a pool and asynchronously start the specified number of workers.
/// Once each worker process has registered with an external server, the
/// process should create and register a new Worker, then add itself to the
/// pool.
/// Create a pool and asynchronously start the specified number of worker processes.
/// Once each worker process has registered with an external server,
/// the process should create and register the specified number of workers,
/// and add them to the pool.
///
/// \param num_workers The number of workers to start.
/// \param num_worker_processes The number of worker processes to start.
/// \param num_workers_per_process The number of workers per process.
/// \param worker_command The command used to start the worker process.
WorkerPool(int num_workers, int num_cpus,
WorkerPool(int num_worker_processes, int num_workers_per_process, int num_cpus,
const std::vector<std::string> &worker_command);

/// Create a pool with zero workers.
///
/// \param num_workers The number of workers to start.
/// \param worker_command The command used to start the worker process.
WorkerPool(const std::vector<std::string> &worker_command);

/// Destructor responsible for freeing a set of workers owned by this class.
virtual ~WorkerPool();

/// Asynchronously start a new worker process. Once the worker process has
/// registered with an external server, the process should create and
/// register a new Worker, then add itself to the pool. Failure to start
/// the worker process is a fatal error. This function will start up to
/// num_cpus many workers in parallel if it is called multiple times.
/// register num_workers_per_process_ workers, then add them to the pool.
/// Failure to start the worker process is a fatal error.
/// This function will start up to num_cpus many workers in parallel
/// if it is called multiple times.
///
/// \param force_start Controls whether to force starting a worker regardless of any
/// workers that have already been started but not yet registered.
void StartWorker(bool force_start = false);
void StartWorkerProcess(bool force_start = false);

/// Register a new worker. The Worker should be added by the caller to the
/// pool after it becomes idle (e.g., requests a work assignment).
Expand Down Expand Up @@ -90,15 +86,11 @@ class WorkerPool {
uint32_t Size() const;

protected:
/// Add started worker PID to the internal list of started workers (for testing).
///
/// \param pid A process identifier for the worker being started.
void AddStartedWorker(pid_t pid);

/// Return a number of workers currently starting but not registered.
///
/// \return The number of worker PIDs stored for started workers.
int NumWorkersStarting() const;
/// A map from the pids of starting worker processes
/// to the number of their unregistered workers.
std::unordered_map<pid_t, int> starting_worker_processes_;
/// The number of workers per process.
int num_workers_per_process_;

private:
/// The number of CPUs this Raylet has available.
Expand All @@ -113,7 +105,6 @@ class WorkerPool {
/// idle and executing.
// TODO(swang): Make this a map to make GetRegisteredWorker faster.
std::list<std::shared_ptr<Worker>> registered_workers_;
std::unordered_set<pid_t> started_worker_pids_;
};

} // namespace raylet
Expand Down
53 changes: 35 additions & 18 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,30 @@ namespace ray {

namespace raylet {

int NUM_WORKERS_PER_PROCESS = 3;

class WorkerPoolMock : public WorkerPool {
public:
WorkerPoolMock(const std::vector<std::string> &worker_command)
: WorkerPool(worker_command) {}
WorkerPoolMock() : WorkerPool(0, NUM_WORKERS_PER_PROCESS, 0, {}) {}

void StartWorker(pid_t pid, bool force_start = false) {
if (NumWorkersStarting() > 0 && !force_start) {
void StartWorkerProcess(pid_t pid, bool force_start = false) {
if (starting_worker_processes_.size() > 0 && !force_start) {
// Workers have been started, but not registered. Force start disabled -- returning.
RAY_LOG(DEBUG) << NumWorkersStarting() << " workers pending registration";
RAY_LOG(DEBUG) << starting_worker_processes_.size()
<< " worker processes pending registration";
return;
}
// Either no workers are pending registration or the worker start is being forced.
RAY_LOG(DEBUG) << "starting worker, worker pool size " << Size();
AddStartedWorker(pid);
RAY_LOG(DEBUG) << "starting new worker process, worker pool size " << Size();
starting_worker_processes_.emplace(std::make_pair(pid, num_workers_per_process_));
}

int NumWorkerProcessesStarting() const { return starting_worker_processes_.size(); }
};

class WorkerPoolTest : public ::testing::Test {
public:
WorkerPoolTest() : worker_pool_({}), io_service_() {}
WorkerPoolTest() : worker_pool_(), io_service_() {}

std::shared_ptr<Worker> CreateWorker(pid_t pid) {
std::function<void(LocalClientConnection &)> client_handler =
Expand All @@ -40,7 +44,6 @@ class WorkerPoolTest : public ::testing::Test {
boost::asio::local::stream_protocol::socket socket(io_service_);
auto client =
LocalClientConnection::Create(client_handler, message_handler, std::move(socket));
worker_pool_.StartWorker(pid);
return std::shared_ptr<Worker>(new Worker(pid, client));
}

Expand All @@ -54,15 +57,29 @@ class WorkerPoolTest : public ::testing::Test {
};

TEST_F(WorkerPoolTest, HandleWorkerRegistration) {
auto worker = CreateWorker(1234);
// Check that we cannot lookup the worker before it's registered.
ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr);
worker_pool_.RegisterWorker(worker);
// Check that we can lookup the worker after it's registered.
ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), worker);
worker_pool_.DisconnectWorker(worker);
// Check that we cannot lookup the worker after it's disconnected.
ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr);
pid_t pid = 1234;
worker_pool_.StartWorkerProcess(pid);
std::vector<std::shared_ptr<Worker>> workers;
for (int i = 0; i < NUM_WORKERS_PER_PROCESS; i++) {
workers.push_back(CreateWorker(pid));
}
for (const auto &worker : workers) {
// Check that there's still a starting worker process
// before all workers have been registered
ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), 1);
// Check that we cannot lookup the worker before it's registered.
ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr);
worker_pool_.RegisterWorker(worker);
// Check that we can lookup the worker after it's registered.
ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), worker);
}
// Check that there's no starting worker process
ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), 0);
for (const auto &worker : workers) {
worker_pool_.DisconnectWorker(worker);
// Check that we cannot lookup the worker after it's disconnected.
ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr);
}
}

TEST_F(WorkerPoolTest, HandleWorkerPushPop) {
Expand Down