Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Podman test #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,8 +1441,9 @@ def start_raylet(redis_address,

# Create the command that the Raylet will use to start workers.
start_worker_command = [
sys.executable,
worker_path,
"python",
"-m",
"ray.workers.default_worker",
f"--node-ip-address={node_ip_address}",
"--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
f"--object-store-name={plasma_store_name}",
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
config.num_workers_soft_limit,
config.num_initial_python_workers_for_first_job,
config.maximum_startup_concurrency, config.min_worker_port,
config.max_worker_port, config.worker_ports, gcs_client_,
config.max_worker_port, config.worker_ports, config.temp_dir,
gcs_client_,
config.worker_commands,
/*starting_worker_timeout_callback=*/
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
Expand Down
128 changes: 123 additions & 5 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <algorithm>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <fstream>

#include "ray/common/constants.h"
#include "ray/common/network_util.h"
Expand Down Expand Up @@ -59,6 +60,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, const NodeID node_id
int num_initial_python_workers_for_first_job,
int maximum_startup_concurrency, int min_worker_port,
int max_worker_port, const std::vector<int> &worker_ports,
const std::string temp_dir,
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback,
Expand All @@ -68,6 +70,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, const NodeID node_id
node_address_(node_address),
num_workers_soft_limit_(num_workers_soft_limit),
maximum_startup_concurrency_(maximum_startup_concurrency),
temp_dir_(temp_dir),
gcs_client_(std::move(gcs_client)),
starting_worker_timeout_callback_(starting_worker_timeout_callback),
first_job_registered_python_worker_count_(0),
Expand All @@ -80,7 +83,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, const NodeID node_id
#ifndef _WIN32
// Ignore SIGCHLD signals. If we don't do this, then worker processes will
// become zombies instead of dying gracefully.
signal(SIGCHLD, SIG_IGN);
// signal(SIGCHLD, SIG_IGN);
#endif
for (const auto &entry : worker_commands) {
// Initialize the pool state for this language.
Expand Down Expand Up @@ -142,7 +145,8 @@ void WorkerPool::SetNodeManagerPort(int node_manager_port) {
Process WorkerPool::StartWorkerProcess(
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
std::vector<std::string> dynamic_options,
std::unordered_map<std::string, std::string> override_environment_variables) {
std::unordered_map<std::string, std::string> override_environment_variables,
const ResourceSet &worker_resource) {
rpc::JobConfig *job_config = nullptr;
if (!IsIOWorkerType(worker_type)) {
RAY_CHECK(!job_id.IsNil());
Expand Down Expand Up @@ -281,7 +285,7 @@ Process WorkerPool::StartWorkerProcess(
}
// Start a process and measure the startup time.
auto start = std::chrono::high_resolution_clock::now();
Process proc = StartProcess(worker_command_args, env);
Process proc = StartContainerProcess(worker_command_args, env, worker_resource);
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
stats::ProcessStartupTimeMs.Record(duration.count());
Expand Down Expand Up @@ -327,6 +331,118 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
});
}

Process WorkerPool::StartContainerProcess(const std::vector<std::string> &worker_command_args,
const ProcessEnvironment &env,
const ResourceSet &worker_resource) {
// Launch the process to create the worker.
std::vector<std::string> my_new_vec;
std::error_code ec;
std::vector<const char *> argv;
argv.push_back("sudo");
argv.push_back("podman");
argv.push_back("run");
argv.push_back("--log-level=debug");
argv.push_back("-u");
argv.push_back("admin");
argv.push_back("-d");
argv.push_back("-v");
argv.push_back("/home/admin/logs:/home/admin/logs");
argv.push_back("--cgroup-manager=cgroupfs");
argv.push_back("--security-opt=seccomp=unconfined");
argv.push_back("--network=host");
argv.push_back("--pid=host");
std::stringstream cid_file_path;
cid_file_path << "/tmp/ray/container/";
auto worker_id = WorkerID::FromRandom();
cid_file_path << worker_id;
cid_file_path << ".txt";
std::string cid_file = cid_file_path.str();
std::string cid_arg = "--cidfile=" + cid_file;
argv.push_back(cid_arg.c_str());

if (!worker_resource.IsEmpty()) {
const FractionalResourceQuantity cpu_quantity =
worker_resource.GetResource(kCPU_ResourceLabel);
if (cpu_quantity.ToDouble() > 0) {
my_new_vec.push_back("--cpus=" + std::to_string(cpu_quantity.ToDouble()));
}
const FractionalResourceQuantity memory_quantity =
worker_resource.GetResource(kMemory_ResourceLabel);
if (memory_quantity.ToDouble() > 0) {
my_new_vec.push_back("--memory=" + std::to_string(memory_quantity.ToDouble()) +
"b");
}
}

for (auto item : env) {
my_new_vec.push_back("--env");
my_new_vec.push_back((item.first + "=" + item.second).c_str());
}

for (auto &s : my_new_vec ) {
argv.push_back(s.c_str());
}

argv.push_back("-v");
std::string temp_dir = temp_dir_ + ":" + temp_dir_;
argv.push_back(temp_dir.c_str());
argv.push_back("--entrypoint");
argv.push_back(worker_command_args[0].c_str());
//TODO image name
argv.push_back("ray");
for (std::vector<std::string>::size_type i = 1; i < worker_command_args.size(); i++) {
argv.push_back(worker_command_args[i].c_str());
}
argv.push_back(NULL);
if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting worker process with command:";
for (const auto &arg : argv) {
if (arg != nullptr) {
stream << " " << arg;
}
}
RAY_LOG(DEBUG) << stream.str();
}
// we need to wait for container started or failed
Process child(argv.data(), io_service_, ec, /*decouple=*/false, env);
if (!child.IsValid() || ec) {
// errorcode 24: Too many files. This is caused by ulimit.
if (ec.value() == 24) {
RAY_LOG(FATAL) << "Too many workers, failed to create a file. Try setting "
<< "`ulimit -n <num_files>` then restart Ray.";
} else {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": "
<< ec.message();
}
}
RAY_LOG(DEBUG) << "Started container worker " << child.GetId();
int exitCode;
if (waitpid(child.GetId(), &exitCode, 0) == -1) {
std::error_code error = std::error_code(errno, std::system_category());
RAY_LOG(FATAL) << "Failed to wait for process " << child.GetId() << " with error " << error
<< ": " << error.message();
}
if (exitCode != 0){
RAY_LOG(FATAL) << "Container start failed ";
}
std::ifstream cidFile(cid_file, std::ios_base::in);
if (cidFile.good()) {
std::string line;
std::getline(cidFile, line);
std::string pidfile_path =
"/var/run/containers/storage/overlay-containers/" + line + "/userdata/pidfile";
std::ifstream pidfile(pidfile_path, std::ios_base::in);
RAY_CHECK(pidfile.good());
pid_t pid = -1;
pidfile >> pid;
RAY_CHECK(pid != -1);
return Process::FromPid(pid);
}
return child;
}

Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_args,
const ProcessEnvironment &env) {
if (RAY_LOG_ENABLED(DEBUG)) {
Expand Down Expand Up @@ -807,7 +923,8 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
}
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId(), dynamic_options,
task_spec.OverrideEnvironmentVariables());
task_spec.OverrideEnvironmentVariables(),
task_spec.GetRequiredResources());
if (proc.IsValid()) {
state.dedicated_workers_to_tasks[proc] = task_spec.TaskId();
state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc;
Expand Down Expand Up @@ -839,8 +956,9 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
if (worker == nullptr) {
// There are no more non-actor workers available to execute this task.
// Start a new worker process.
RAY_LOG(DEBUG) << "add task, resource: " << task_spec.GetRequiredResources().ToString();
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId());
task_spec.JobId(), {}, {}, task_spec.GetRequiredResources());
}
}

Expand Down
9 changes: 8 additions & 1 deletion src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
int num_initial_python_workers_for_first_job,
int maximum_startup_concurrency, int min_worker_port, int max_worker_port,
const std::vector<int> &worker_ports,
const std::string temp_dir,
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback,
Expand Down Expand Up @@ -342,7 +343,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
Process StartWorkerProcess(
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
std::vector<std::string> dynamic_options = {},
std::unordered_map<std::string, std::string> override_environment_variables = {});
std::unordered_map<std::string, std::string> override_environment_variables = {},
const ResourceSet &worker_resource = ResourceSet());

/// The implementation of how to start a new worker process with command arguments.
/// The lifetime of the process is tied to that of the returned object,
Expand All @@ -355,6 +357,10 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
virtual Process StartProcess(const std::vector<std::string> &worker_command_args,
const ProcessEnvironment &env);

virtual Process StartContainerProcess(const std::vector<std::string> &worker_command_args,
const ProcessEnvironment &env,
const ResourceSet &worker_resource = ResourceSet());

/// Push an warning message to user if worker pool is getting to big.
virtual void WarnAboutSize();

Expand Down Expand Up @@ -496,6 +502,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::unique_ptr<std::queue<int>> free_ports_;
/// The port Raylet uses for listening to incoming connections.
int node_manager_port_ = 0;
const std::string temp_dir_ = "/tmp/ray";
/// A client connection to the GCS.
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// The callback that will be triggered once it times out to start a worker.
Expand Down