diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 31ecbb707c15..aa66cb6c5c33 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -355,7 +355,8 @@ def _get_gpu_info_string(): gpu_dirs = os.listdir(proc_gpus_path) if len(gpu_dirs) > 0: gpu_info_path = f"{proc_gpus_path}/{gpu_dirs[0]}/information" - info_str = open(gpu_info_path).read() + with open(gpu_info_path) as f: + info_str = f.read() return info_str return None diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index c890cfe354ac..199f45258d69 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -1301,3 +1301,9 @@ def check_version_info(cluster_metadata): " Python: " + version_info[1] + "\n" ) raise RuntimeError(error_message) + + +def maybe_enter_context(stack, *args): + for arg in args: + if arg is not None: + stack.enter_context(arg) diff --git a/python/ray/node.py b/python/ray/node.py index dddc6c8f21f0..c999b11c4044 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -1,5 +1,6 @@ import atexit import collections +import contextlib import datetime import errno import json @@ -26,7 +27,12 @@ from ray.internal import storage from ray._private.gcs_utils import GcsClient from ray._private.resource_spec import ResourceSpec -from ray._private.utils import try_to_create_directory, try_to_symlink, open_log +from ray._private.utils import ( + try_to_create_directory, + try_to_symlink, + open_log, + maybe_enter_context, +) import ray._private.usage.usage_lib as ray_usage_lib # Logger for this module. It should be configured at the entry point @@ -832,33 +838,36 @@ def start_or_configure_redis(self): """Starts local Redis or configures external Redis.""" assert self._redis_address is None redis_log_files = [] - if self._ray_params.external_addresses is None: - redis_log_files = [self.get_log_file_handles("redis", unique=True)] - for i in range(self._ray_params.num_redis_shards): - redis_log_files.append( - self.get_log_file_handles(f"redis-shard_{i}", unique=True) - ) + with contextlib.ExitStack() as stack: + if self._ray_params.external_addresses is None: + redis_log_files = [self.get_log_file_handles("redis", unique=True)] + maybe_enter_context(stack, *redis_log_files[-1]) + for i in range(self._ray_params.num_redis_shards): + redis_log_files.append( + self.get_log_file_handles(f"redis-shard_{i}", unique=True) + ) + maybe_enter_context(stack, *redis_log_files[-1]) - ( - self._redis_address, - redis_shards, - process_infos, - ) = ray._private.services.start_redis( - self._node_ip_address, - redis_log_files, - self.get_resource_spec(), - self.get_session_dir_path(), - port=self._ray_params.redis_port, - redis_shard_ports=self._ray_params.redis_shard_ports, - num_redis_shards=self._ray_params.num_redis_shards, - redis_max_clients=self._ray_params.redis_max_clients, - password=self._ray_params.redis_password, - fate_share=self.kernel_fate_share, - external_addresses=self._ray_params.external_addresses, - port_denylist=self._ray_params.reserved_ports, - ) - assert ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes - self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = process_infos + ( + self._redis_address, + redis_shards, + process_infos, + ) = ray._private.services.start_redis( + self._node_ip_address, + redis_log_files, + self.get_resource_spec(), + self.get_session_dir_path(), + port=self._ray_params.redis_port, + redis_shard_ports=self._ray_params.redis_shard_ports, + num_redis_shards=self._ray_params.num_redis_shards, + redis_max_clients=self._ray_params.redis_max_clients, + password=self._ray_params.redis_password, + fate_share=self.kernel_fate_share, + external_addresses=self._ray_params.external_addresses, + port_denylist=self._ray_params.reserved_ports, + ) + assert ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes + self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = process_infos def start_log_monitor(self): """Start the log monitor.""" @@ -916,29 +925,31 @@ def start_gcs_server(self): assert self._gcs_client is None, "GCS client is already connected." # TODO(mwtian): append date time so restarted GCS uses different files. stdout_file, stderr_file = self.get_log_file_handles("gcs_server", unique=True) - process_info = ray._private.services.start_gcs_server( - self.redis_address, - self._logs_dir, - stdout_file=stdout_file, - stderr_file=stderr_file, - redis_password=self._ray_params.redis_password, - config=self._config, - fate_share=self.kernel_fate_share, - gcs_server_port=gcs_server_port, - metrics_agent_port=self._ray_params.metrics_agent_port, - node_ip_address=self._node_ip_address, - ) - assert ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes - self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [ - process_info, - ] - # Connecting via non-localhost address may be blocked by firewall rule, - # e.g. https://github.com/ray-project/ray/issues/15780 - # TODO(mwtian): figure out a way to use 127.0.0.1 for local connection - # when possible. - self._gcs_address = f"{self._node_ip_address}:" f"{gcs_server_port}" - # Initialize gcs client, which also waits for GCS to start running. - self.get_gcs_client() + with contextlib.ExitStack() as stack: + maybe_enter_context(stack, stdout_file, stderr_file) + process_info = ray._private.services.start_gcs_server( + self.redis_address, + self._logs_dir, + stdout_file=stdout_file, + stderr_file=stderr_file, + redis_password=self._ray_params.redis_password, + config=self._config, + fate_share=self.kernel_fate_share, + gcs_server_port=gcs_server_port, + metrics_agent_port=self._ray_params.metrics_agent_port, + node_ip_address=self._node_ip_address, + ) + assert ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes + self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [ + process_info, + ] + # Connecting via non-localhost address may be blocked by firewall rule, + # e.g. https://github.com/ray-project/ray/issues/15780 + # TODO(mwtian): figure out a way to use 127.0.0.1 for local connection + # when possible. + self._gcs_address = f"{self._node_ip_address}:" f"{gcs_server_port}" + # Initialize gcs client, which also waits for GCS to start running. + self.get_gcs_client() def start_raylet( self, @@ -956,47 +967,49 @@ def start_raylet( valgrind profiler. """ stdout_file, stderr_file = self.get_log_file_handles("raylet", unique=True) - process_info = ray._private.services.start_raylet( - self.redis_address, - self.gcs_address, - self._node_ip_address, - self._ray_params.node_manager_port, - self._raylet_socket_name, - self._plasma_store_socket_name, - self._ray_params.worker_path, - self._ray_params.setup_worker_path, - self._ray_params.storage, - self._temp_dir, - self._session_dir, - self._runtime_env_dir, - self._logs_dir, - self.get_resource_spec(), - plasma_directory, - object_store_memory, - min_worker_port=self._ray_params.min_worker_port, - max_worker_port=self._ray_params.max_worker_port, - worker_port_list=self._ray_params.worker_port_list, - object_manager_port=self._ray_params.object_manager_port, - redis_password=self._ray_params.redis_password, - metrics_agent_port=self._ray_params.metrics_agent_port, - metrics_export_port=self._metrics_export_port, - dashboard_agent_listen_port=self._ray_params.dashboard_agent_listen_port, - use_valgrind=use_valgrind, - use_profiler=use_profiler, - stdout_file=stdout_file, - stderr_file=stderr_file, - config=self._config, - huge_pages=self._ray_params.huge_pages, - fate_share=self.kernel_fate_share, - socket_to_use=None, - max_bytes=self.max_bytes, - backup_count=self.backup_count, - start_initial_python_workers_for_first_job=self._ray_params.start_initial_python_workers_for_first_job, # noqa: E501 - ray_debugger_external=self._ray_params.ray_debugger_external, - env_updates=self._ray_params.env_vars, - ) - assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes - self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] + with contextlib.ExitStack() as stack: + maybe_enter_context(stack, stdout_file, stderr_file) + process_info = ray._private.services.start_raylet( + self.redis_address, + self.gcs_address, + self._node_ip_address, + self._ray_params.node_manager_port, + self._raylet_socket_name, + self._plasma_store_socket_name, + self._ray_params.worker_path, + self._ray_params.setup_worker_path, + self._ray_params.storage, + self._temp_dir, + self._session_dir, + self._runtime_env_dir, + self._logs_dir, + self.get_resource_spec(), + plasma_directory, + object_store_memory, + min_worker_port=self._ray_params.min_worker_port, + max_worker_port=self._ray_params.max_worker_port, + worker_port_list=self._ray_params.worker_port_list, + object_manager_port=self._ray_params.object_manager_port, + redis_password=self._ray_params.redis_password, + metrics_agent_port=self._ray_params.metrics_agent_port, + metrics_export_port=self._metrics_export_port, + dashboard_agent_listen_port=self._ray_params.dashboard_agent_listen_port, # noqa: E501 + use_valgrind=use_valgrind, + use_profiler=use_profiler, + stdout_file=stdout_file, + stderr_file=stderr_file, + config=self._config, + huge_pages=self._ray_params.huge_pages, + fate_share=self.kernel_fate_share, + socket_to_use=None, + max_bytes=self.max_bytes, + backup_count=self.backup_count, + start_initial_python_workers_for_first_job=self._ray_params.start_initial_python_workers_for_first_job, # noqa: E501 + ray_debugger_external=self._ray_params.ray_debugger_external, + env_updates=self._ray_params.env_vars, + ) + assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes + self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] def start_worker(self): """Start a worker process.""" @@ -1010,21 +1023,23 @@ def start_monitor(self): cluster launching commands. """ stdout_file, stderr_file = self.get_log_file_handles("monitor", unique=True) - process_info = ray._private.services.start_monitor( - self.redis_address, - self.gcs_address, - self._logs_dir, - stdout_file=stdout_file, - stderr_file=stderr_file, - autoscaling_config=self._ray_params.autoscaling_config, - redis_password=self._ray_params.redis_password, - fate_share=self.kernel_fate_share, - max_bytes=self.max_bytes, - backup_count=self.backup_count, - monitor_ip=self._node_ip_address, - ) - assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes - self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info] + with contextlib.ExitStack() as stack: + maybe_enter_context(stack, stdout_file, stderr_file) + process_info = ray._private.services.start_monitor( + self.redis_address, + self.gcs_address, + self._logs_dir, + stdout_file=stdout_file, + stderr_file=stderr_file, + autoscaling_config=self._ray_params.autoscaling_config, + redis_password=self._ray_params.redis_password, + fate_share=self.kernel_fate_share, + max_bytes=self.max_bytes, + backup_count=self.backup_count, + monitor_ip=self._node_ip_address, + ) + assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes + self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info] def start_ray_client_server(self): """Start the ray client server process.""" @@ -1222,10 +1237,14 @@ def _kill_process_impl( # If the process did not exit, force kill it. if process.poll() is None: process.kill() - # The reason we usually don't call process.wait() here is that + # The reason we usually don't wait indefinitely here is that # there's some chance we'd end up waiting a really long time. - if wait: - process.wait() + # Waiting another second can prevent spurious ResourceWarnings. + timeout_seconds = None if wait else 1 + try: + process.wait(timeout_seconds) + except subprocess.TimeoutExpired: + pass del self.all_processes[process_type]