Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ResourceWarnings #22419

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ def _get_gpu_info_string():
gpu_dirs = os.listdir(proc_gpus_path)
if len(gpu_dirs) > 0:
gpu_info_path = f"{proc_gpus_path}/{gpu_dirs[0]}/information"
info_str = open(gpu_info_path).read()
with open(gpu_info_path) as f:
info_str = f.read()
return info_str
return None

Expand Down
6 changes: 6 additions & 0 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,3 +1301,9 @@ def check_version_info(cluster_metadata):
" Python: " + version_info[1] + "\n"
)
raise RuntimeError(error_message)


def maybe_enter_context(stack, *args):
for arg in args:
if arg is not None:
stack.enter_context(arg)
237 changes: 128 additions & 109 deletions python/ray/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import atexit
import collections
import contextlib
import datetime
import errno
import json
Expand All @@ -26,7 +27,12 @@
from ray.internal import storage
from ray._private.gcs_utils import GcsClient
from ray._private.resource_spec import ResourceSpec
from ray._private.utils import try_to_create_directory, try_to_symlink, open_log
from ray._private.utils import (
try_to_create_directory,
try_to_symlink,
open_log,
maybe_enter_context,
)
import ray._private.usage.usage_lib as ray_usage_lib

# Logger for this module. It should be configured at the entry point
Expand Down Expand Up @@ -832,33 +838,36 @@ def start_or_configure_redis(self):
"""Starts local Redis or configures external Redis."""
assert self._redis_address is None
redis_log_files = []
if self._ray_params.external_addresses is None:
redis_log_files = [self.get_log_file_handles("redis", unique=True)]
for i in range(self._ray_params.num_redis_shards):
redis_log_files.append(
self.get_log_file_handles(f"redis-shard_{i}", unique=True)
)
with contextlib.ExitStack() as stack:
if self._ray_params.external_addresses is None:
redis_log_files = [self.get_log_file_handles("redis", unique=True)]
maybe_enter_context(stack, *redis_log_files[-1])
for i in range(self._ray_params.num_redis_shards):
redis_log_files.append(
self.get_log_file_handles(f"redis-shard_{i}", unique=True)
)
maybe_enter_context(stack, *redis_log_files[-1])

(
self._redis_address,
redis_shards,
process_infos,
) = ray._private.services.start_redis(
self._node_ip_address,
redis_log_files,
self.get_resource_spec(),
self.get_session_dir_path(),
port=self._ray_params.redis_port,
redis_shard_ports=self._ray_params.redis_shard_ports,
num_redis_shards=self._ray_params.num_redis_shards,
redis_max_clients=self._ray_params.redis_max_clients,
password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
external_addresses=self._ray_params.external_addresses,
port_denylist=self._ray_params.reserved_ports,
)
assert ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = process_infos
(
self._redis_address,
redis_shards,
process_infos,
) = ray._private.services.start_redis(
self._node_ip_address,
redis_log_files,
self.get_resource_spec(),
self.get_session_dir_path(),
port=self._ray_params.redis_port,
redis_shard_ports=self._ray_params.redis_shard_ports,
num_redis_shards=self._ray_params.num_redis_shards,
redis_max_clients=self._ray_params.redis_max_clients,
password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
external_addresses=self._ray_params.external_addresses,
port_denylist=self._ray_params.reserved_ports,
)
assert ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = process_infos

def start_log_monitor(self):
"""Start the log monitor."""
Expand Down Expand Up @@ -916,29 +925,31 @@ def start_gcs_server(self):
assert self._gcs_client is None, "GCS client is already connected."
# TODO(mwtian): append date time so restarted GCS uses different files.
stdout_file, stderr_file = self.get_log_file_handles("gcs_server", unique=True)
process_info = ray._private.services.start_gcs_server(
self.redis_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
config=self._config,
fate_share=self.kernel_fate_share,
gcs_server_port=gcs_server_port,
metrics_agent_port=self._ray_params.metrics_agent_port,
node_ip_address=self._node_ip_address,
)
assert ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
process_info,
]
# Connecting via non-localhost address may be blocked by firewall rule,
# e.g. https://github.com/ray-project/ray/issues/15780
# TODO(mwtian): figure out a way to use 127.0.0.1 for local connection
# when possible.
self._gcs_address = f"{self._node_ip_address}:" f"{gcs_server_port}"
# Initialize gcs client, which also waits for GCS to start running.
self.get_gcs_client()
with contextlib.ExitStack() as stack:
maybe_enter_context(stack, stdout_file, stderr_file)
process_info = ray._private.services.start_gcs_server(
self.redis_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
config=self._config,
fate_share=self.kernel_fate_share,
gcs_server_port=gcs_server_port,
metrics_agent_port=self._ray_params.metrics_agent_port,
node_ip_address=self._node_ip_address,
)
assert ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
process_info,
]
# Connecting via non-localhost address may be blocked by firewall rule,
# e.g. https://github.com/ray-project/ray/issues/15780
# TODO(mwtian): figure out a way to use 127.0.0.1 for local connection
# when possible.
self._gcs_address = f"{self._node_ip_address}:" f"{gcs_server_port}"
# Initialize gcs client, which also waits for GCS to start running.
self.get_gcs_client()

def start_raylet(
self,
Expand All @@ -956,47 +967,49 @@ def start_raylet(
valgrind profiler.
"""
stdout_file, stderr_file = self.get_log_file_handles("raylet", unique=True)
process_info = ray._private.services.start_raylet(
self.redis_address,
self.gcs_address,
self._node_ip_address,
self._ray_params.node_manager_port,
self._raylet_socket_name,
self._plasma_store_socket_name,
self._ray_params.worker_path,
self._ray_params.setup_worker_path,
self._ray_params.storage,
self._temp_dir,
self._session_dir,
self._runtime_env_dir,
self._logs_dir,
self.get_resource_spec(),
plasma_directory,
object_store_memory,
min_worker_port=self._ray_params.min_worker_port,
max_worker_port=self._ray_params.max_worker_port,
worker_port_list=self._ray_params.worker_port_list,
object_manager_port=self._ray_params.object_manager_port,
redis_password=self._ray_params.redis_password,
metrics_agent_port=self._ray_params.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
dashboard_agent_listen_port=self._ray_params.dashboard_agent_listen_port,
use_valgrind=use_valgrind,
use_profiler=use_profiler,
stdout_file=stdout_file,
stderr_file=stderr_file,
config=self._config,
huge_pages=self._ray_params.huge_pages,
fate_share=self.kernel_fate_share,
socket_to_use=None,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
start_initial_python_workers_for_first_job=self._ray_params.start_initial_python_workers_for_first_job, # noqa: E501
ray_debugger_external=self._ray_params.ray_debugger_external,
env_updates=self._ray_params.env_vars,
)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
with contextlib.ExitStack() as stack:
maybe_enter_context(stack, stdout_file, stderr_file)
process_info = ray._private.services.start_raylet(
self.redis_address,
self.gcs_address,
self._node_ip_address,
self._ray_params.node_manager_port,
self._raylet_socket_name,
self._plasma_store_socket_name,
self._ray_params.worker_path,
self._ray_params.setup_worker_path,
self._ray_params.storage,
self._temp_dir,
self._session_dir,
self._runtime_env_dir,
self._logs_dir,
self.get_resource_spec(),
plasma_directory,
object_store_memory,
min_worker_port=self._ray_params.min_worker_port,
max_worker_port=self._ray_params.max_worker_port,
worker_port_list=self._ray_params.worker_port_list,
object_manager_port=self._ray_params.object_manager_port,
redis_password=self._ray_params.redis_password,
metrics_agent_port=self._ray_params.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
dashboard_agent_listen_port=self._ray_params.dashboard_agent_listen_port, # noqa: E501
use_valgrind=use_valgrind,
use_profiler=use_profiler,
stdout_file=stdout_file,
stderr_file=stderr_file,
config=self._config,
huge_pages=self._ray_params.huge_pages,
fate_share=self.kernel_fate_share,
socket_to_use=None,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
start_initial_python_workers_for_first_job=self._ray_params.start_initial_python_workers_for_first_job, # noqa: E501
ray_debugger_external=self._ray_params.ray_debugger_external,
env_updates=self._ray_params.env_vars,
)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]

def start_worker(self):
"""Start a worker process."""
Expand All @@ -1010,21 +1023,23 @@ def start_monitor(self):
cluster launching commands.
"""
stdout_file, stderr_file = self.get_log_file_handles("monitor", unique=True)
process_info = ray._private.services.start_monitor(
self.redis_address,
self.gcs_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
autoscaling_config=self._ray_params.autoscaling_config,
redis_password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
monitor_ip=self._node_ip_address,
)
assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info]
with contextlib.ExitStack() as stack:
maybe_enter_context(stack, stdout_file, stderr_file)
process_info = ray._private.services.start_monitor(
self.redis_address,
self.gcs_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
autoscaling_config=self._ray_params.autoscaling_config,
redis_password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
monitor_ip=self._node_ip_address,
)
assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info]

def start_ray_client_server(self):
"""Start the ray client server process."""
Expand Down Expand Up @@ -1222,10 +1237,14 @@ def _kill_process_impl(
# If the process did not exit, force kill it.
if process.poll() is None:
process.kill()
# The reason we usually don't call process.wait() here is that
# The reason we usually don't wait indefinitely here is that
# there's some chance we'd end up waiting a really long time.
if wait:
process.wait()
# Waiting another second can prevent spurious ResourceWarnings.
timeout_seconds = None if wait else 1
try:
process.wait(timeout_seconds)
except subprocess.TimeoutExpired:
pass

del self.all_processes[process_type]

Expand Down