Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,14 @@ def merge_resources(env_dict, params_dict):
key, params_dict[key], env_dict[key]))
return num_cpus, num_gpus, memory, object_store_memory, result

env_resources = {}
env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE)
if env_string:
env_resources = json.loads(env_string)
logger.info(f"Autosaler overriding resources: {env_resources}.")

if not self._resource_spec:
env_resources = {}
env_string = os.getenv(
ray_constants.RESOURCES_ENVIRONMENT_VARIABLE)
if env_string:
env_resources = json.loads(env_string)
logger.info(
f"Autosaler overriding resources: {env_resources}.")
num_cpus, num_gpus, memory, object_store_memory, resources = \
merge_resources(env_resources, self._ray_params.resources)
self._resource_spec = ResourceSpec(
Expand Down Expand Up @@ -649,16 +650,17 @@ def start_dashboard(self, require_dashboard):
redis_client = self.create_redis_client()
redis_client.hmset("webui", {"url": self._webui_url})

def start_plasma_store(self):
def start_plasma_store(self, plasma_directory, object_store_memory):
"""Start the plasma store."""
stdout_file, stderr_file = self.get_log_file_handles(
"plasma_store", unique=True)
process_info = ray.services.start_plasma_store(
self.get_resource_spec(),
plasma_directory,
object_store_memory,
self._plasma_store_socket_name,
stdout_file=stdout_file,
stderr_file=stderr_file,
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages,
keep_idle=bool(self._config.get("plasma_store_as_thread")),
fate_share=self.kernel_fate_share)
Expand Down Expand Up @@ -688,7 +690,11 @@ def start_gcs_server(self):
process_info,
]

def start_raylet(self, use_valgrind=False, use_profiler=False):
def start_raylet(self,
plasma_directory,
object_store_memory,
use_valgrind=False,
use_profiler=False):
"""Start the raylet.

Args:
Expand All @@ -709,12 +715,14 @@ def start_raylet(self, use_valgrind=False, use_profiler=False):
self._temp_dir,
self._session_dir,
self.get_resource_spec(),
self._ray_params.min_worker_port,
self._ray_params.max_worker_port,
self._ray_params.object_manager_port,
self._ray_params.redis_password,
self._ray_params.metrics_agent_port,
self._metrics_export_port,
plasma_directory,
object_store_memory,
min_worker_port=self._ray_params.min_worker_port,
max_worker_port=self._ray_params.max_worker_port,
object_manager_port=self._ray_params.object_manager_port,
redis_password=self._ray_params.redis_password,
metrics_agent_port=self._ray_params.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
use_valgrind=use_valgrind,
use_profiler=use_profiler,
stdout_file=stdout_file,
Expand All @@ -723,7 +731,6 @@ def start_raylet(self, use_valgrind=False, use_profiler=False):
include_java=self._ray_params.include_java,
java_worker_options=self._ray_params.java_worker_options,
load_code_from_local=self._ray_params.load_code_from_local,
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages,
fate_share=self.kernel_fate_share,
socket_to_use=self.socket,
Expand Down Expand Up @@ -809,8 +816,17 @@ def start_ray_processes(self):
logger.debug(f"Process STDOUT and STDERR is being "
f"redirected to {self._logs_dir}.")

self.start_plasma_store()
self.start_raylet()
# Make sure we don't call `determine_plasma_store_config` multiple
# times to avoid printing multiple warnings.
resource_spec = self.get_resource_spec()
plasma_directory, object_store_memory = \
ray.services.determine_plasma_store_config(
resource_spec.object_store_memory,
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages
)
self.start_plasma_store(plasma_directory, object_store_memory)
self.start_raylet(plasma_directory, object_store_memory)
if "RAY_USE_NEW_DASHBOARD" not in os.environ:
self.start_reporter()

Expand Down
14 changes: 6 additions & 8 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,8 @@ def start_raylet(redis_address,
temp_dir,
session_dir,
resource_spec,
plasma_directory,
object_store_memory,
min_worker_port=None,
max_worker_port=None,
object_manager_port=None,
Expand All @@ -1276,7 +1278,6 @@ def start_raylet(redis_address,
include_java=False,
java_worker_options=None,
load_code_from_local=False,
plasma_directory=None,
huge_pages=False,
fate_share=None,
socket_to_use=None,
Expand Down Expand Up @@ -1455,8 +1456,6 @@ def start_raylet(redis_address,
subprocess.list2cmdline(agent_command)))
if config.get("plasma_store_as_thread"):
# command related to the plasma store
plasma_directory, object_store_memory = determine_plasma_store_config(
resource_spec.object_store_memory, plasma_directory, huge_pages)
command += [
f"--object_store_memory={object_store_memory}",
f"--plasma_directory={plasma_directory}",
Expand Down Expand Up @@ -1655,8 +1654,8 @@ def determine_plasma_store_config(object_store_memory,
"than the total available memory.")
else:
plasma_directory = os.path.abspath(plasma_directory)
logger.warning("WARNING: object_store_memory is not verified when "
"plasma_directory is set.")
logger.info("object_store_memory is not verified when "
"plasma_directory is set.")

if not os.path.isdir(plasma_directory):
raise ValueError(f"The file {plasma_directory} does not "
Expand All @@ -1682,10 +1681,11 @@ def determine_plasma_store_config(object_store_memory,


def start_plasma_store(resource_spec,
plasma_directory,
object_store_memory,
plasma_store_socket_name,
stdout_file=None,
stderr_file=None,
plasma_directory=None,
keep_idle=False,
huge_pages=False,
fate_share=None,
Expand Down Expand Up @@ -1714,8 +1714,6 @@ def start_plasma_store(resource_spec,
raise ValueError("Cannot use valgrind and profiler at the same time.")

assert resource_spec.resolved()
plasma_directory, object_store_memory = determine_plasma_store_config(
resource_spec.object_store_memory, plasma_directory, huge_pages)

command = [
PLASMA_STORE_EXECUTABLE,
Expand Down
8 changes: 4 additions & 4 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
if (num_retries_left != 0) {
auto retries_str =
num_retries_left == -1 ? "infinite" : std::to_string(num_retries_left);
RAY_LOG(ERROR) << retries_str << " retries left for task " << spec.TaskId()
<< ", attempting to resubmit.";
RAY_LOG(INFO) << retries_str << " retries left for task " << spec.TaskId()
<< ", attempting to resubmit.";
retry_task_callback_(spec, /*delay=*/true);
will_retry = true;
} else {
Expand All @@ -315,8 +315,8 @@ bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
(current_time_ms() - last_log_time_ms_) >
kTaskFailureLoggingFrequencyMillis)) {
if (num_failure_logs_++ == kTaskFailureThrottlingThreshold) {
RAY_LOG(ERROR) << "Too many failure logs, throttling to once every "
<< kTaskFailureLoggingFrequencyMillis << " millis.";
RAY_LOG(WARNING) << "Too many failure logs, throttling to once every "
<< kTaskFailureLoggingFrequencyMillis << " millis.";
}
last_log_time_ms_ = current_time_ms();
if (status != nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2183,11 +2183,11 @@ void NodeManager::MarkObjectsAsFailed(
// If we failed to save the error code, log a warning and push an error message
// to the driver.
std::ostringstream stream;
stream << "An plasma error (" << status.ToString() << ") occurred while saving"
stream << "A plasma error (" << status.ToString() << ") occurred while saving"
<< " error code to object " << object_id << ". Anyone who's getting this"
<< " object may hang forever.";
std::string error_message = stream.str();
RAY_LOG(WARNING) << error_message;
RAY_LOG(ERROR) << error_message;
auto error_data_ptr =
gcs::CreateErrorTableData("task", error_message, current_time_ms(), job_id);
RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
Expand Down
12 changes: 9 additions & 3 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,15 @@ Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_
argv.push_back(NULL);
Process child(argv.data(), io_service_, ec, /*decouple=*/false, env);
if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": "
<< ec.message();
// errorcode 24: Too many files. This is caused by ulimit.
if (ec.value() == 24) {
RAY_LOG(FATAL) << "Too many workers, failed to create a file. Try setting "
<< "`ulimit -n <num_files>` then restart Ray.";
} else {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": "
<< ec.message();
}
}
return child;
}
Expand Down