Skip to content

Commit

Permalink
Fix ResourceWarnings
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-conder-sm committed Apr 18, 2022
1 parent e402fc0 commit 7c9de54
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 110 deletions.
3 changes: 2 additions & 1 deletion python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ def _get_gpu_info_string():
gpu_dirs = os.listdir(proc_gpus_path)
if len(gpu_dirs) > 0:
gpu_info_path = f"{proc_gpus_path}/{gpu_dirs[0]}/information"
info_str = open(gpu_info_path).read()
with open(gpu_info_path) as f:
info_str = f.read()
return info_str
return None

Expand Down
6 changes: 6 additions & 0 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,3 +1301,9 @@ def check_version_info(cluster_metadata):
" Python: " + version_info[1] + "\n"
)
raise RuntimeError(error_message)


def maybe_enter_context(stack, *args):
for arg in args:
if arg is not None:
stack.enter_context(arg)
237 changes: 128 additions & 109 deletions python/ray/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import atexit
import collections
import contextlib
import datetime
import errno
import json
Expand All @@ -26,7 +27,12 @@
from ray.internal import storage
from ray._private.gcs_utils import GcsClient
from ray._private.resource_spec import ResourceSpec
from ray._private.utils import try_to_create_directory, try_to_symlink, open_log
from ray._private.utils import (
try_to_create_directory,
try_to_symlink,
open_log,
maybe_enter_context,
)
import ray._private.usage.usage_lib as ray_usage_lib

# Logger for this module. It should be configured at the entry point
Expand Down Expand Up @@ -832,33 +838,36 @@ def start_or_configure_redis(self):
"""Starts local Redis or configures external Redis."""
assert self._redis_address is None
redis_log_files = []
if self._ray_params.external_addresses is None:
redis_log_files = [self.get_log_file_handles("redis", unique=True)]
for i in range(self._ray_params.num_redis_shards):
redis_log_files.append(
self.get_log_file_handles(f"redis-shard_{i}", unique=True)
)
with contextlib.ExitStack() as stack:
if self._ray_params.external_addresses is None:
redis_log_files = [self.get_log_file_handles("redis", unique=True)]
maybe_enter_context(stack, *redis_log_files[-1])
for i in range(self._ray_params.num_redis_shards):
redis_log_files.append(
self.get_log_file_handles(f"redis-shard_{i}", unique=True)
)
maybe_enter_context(stack, *redis_log_files[-1])

(
self._redis_address,
redis_shards,
process_infos,
) = ray._private.services.start_redis(
self._node_ip_address,
redis_log_files,
self.get_resource_spec(),
self.get_session_dir_path(),
port=self._ray_params.redis_port,
redis_shard_ports=self._ray_params.redis_shard_ports,
num_redis_shards=self._ray_params.num_redis_shards,
redis_max_clients=self._ray_params.redis_max_clients,
password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
external_addresses=self._ray_params.external_addresses,
port_denylist=self._ray_params.reserved_ports,
)
assert ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = process_infos
(
self._redis_address,
redis_shards,
process_infos,
) = ray._private.services.start_redis(
self._node_ip_address,
redis_log_files,
self.get_resource_spec(),
self.get_session_dir_path(),
port=self._ray_params.redis_port,
redis_shard_ports=self._ray_params.redis_shard_ports,
num_redis_shards=self._ray_params.num_redis_shards,
redis_max_clients=self._ray_params.redis_max_clients,
password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
external_addresses=self._ray_params.external_addresses,
port_denylist=self._ray_params.reserved_ports,
)
assert ray_constants.PROCESS_TYPE_REDIS_SERVER not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_REDIS_SERVER] = process_infos

def start_log_monitor(self):
"""Start the log monitor."""
Expand Down Expand Up @@ -916,29 +925,31 @@ def start_gcs_server(self):
assert self._gcs_client is None, "GCS client is already connected."
# TODO(mwtian): append date time so restarted GCS uses different files.
stdout_file, stderr_file = self.get_log_file_handles("gcs_server", unique=True)
process_info = ray._private.services.start_gcs_server(
self.redis_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
config=self._config,
fate_share=self.kernel_fate_share,
gcs_server_port=gcs_server_port,
metrics_agent_port=self._ray_params.metrics_agent_port,
node_ip_address=self._node_ip_address,
)
assert ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
process_info,
]
# Connecting via non-localhost address may be blocked by firewall rule,
# e.g. https://github.com/ray-project/ray/issues/15780
# TODO(mwtian): figure out a way to use 127.0.0.1 for local connection
# when possible.
self._gcs_address = f"{self._node_ip_address}:" f"{gcs_server_port}"
# Initialize gcs client, which also waits for GCS to start running.
self.get_gcs_client()
with contextlib.ExitStack() as stack:
maybe_enter_context(stack, stdout_file, stderr_file)
process_info = ray._private.services.start_gcs_server(
self.redis_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
config=self._config,
fate_share=self.kernel_fate_share,
gcs_server_port=gcs_server_port,
metrics_agent_port=self._ray_params.metrics_agent_port,
node_ip_address=self._node_ip_address,
)
assert ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
process_info,
]
# Connecting via non-localhost address may be blocked by firewall rule,
# e.g. https://github.com/ray-project/ray/issues/15780
# TODO(mwtian): figure out a way to use 127.0.0.1 for local connection
# when possible.
self._gcs_address = f"{self._node_ip_address}:" f"{gcs_server_port}"
# Initialize gcs client, which also waits for GCS to start running.
self.get_gcs_client()

def start_raylet(
self,
Expand All @@ -956,47 +967,49 @@ def start_raylet(
valgrind profiler.
"""
stdout_file, stderr_file = self.get_log_file_handles("raylet", unique=True)
process_info = ray._private.services.start_raylet(
self.redis_address,
self.gcs_address,
self._node_ip_address,
self._ray_params.node_manager_port,
self._raylet_socket_name,
self._plasma_store_socket_name,
self._ray_params.worker_path,
self._ray_params.setup_worker_path,
self._ray_params.storage,
self._temp_dir,
self._session_dir,
self._runtime_env_dir,
self._logs_dir,
self.get_resource_spec(),
plasma_directory,
object_store_memory,
min_worker_port=self._ray_params.min_worker_port,
max_worker_port=self._ray_params.max_worker_port,
worker_port_list=self._ray_params.worker_port_list,
object_manager_port=self._ray_params.object_manager_port,
redis_password=self._ray_params.redis_password,
metrics_agent_port=self._ray_params.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
dashboard_agent_listen_port=self._ray_params.dashboard_agent_listen_port,
use_valgrind=use_valgrind,
use_profiler=use_profiler,
stdout_file=stdout_file,
stderr_file=stderr_file,
config=self._config,
huge_pages=self._ray_params.huge_pages,
fate_share=self.kernel_fate_share,
socket_to_use=None,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
start_initial_python_workers_for_first_job=self._ray_params.start_initial_python_workers_for_first_job, # noqa: E501
ray_debugger_external=self._ray_params.ray_debugger_external,
env_updates=self._ray_params.env_vars,
)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
with contextlib.ExitStack() as stack:
maybe_enter_context(stack, stdout_file, stderr_file)
process_info = ray._private.services.start_raylet(
self.redis_address,
self.gcs_address,
self._node_ip_address,
self._ray_params.node_manager_port,
self._raylet_socket_name,
self._plasma_store_socket_name,
self._ray_params.worker_path,
self._ray_params.setup_worker_path,
self._ray_params.storage,
self._temp_dir,
self._session_dir,
self._runtime_env_dir,
self._logs_dir,
self.get_resource_spec(),
plasma_directory,
object_store_memory,
min_worker_port=self._ray_params.min_worker_port,
max_worker_port=self._ray_params.max_worker_port,
worker_port_list=self._ray_params.worker_port_list,
object_manager_port=self._ray_params.object_manager_port,
redis_password=self._ray_params.redis_password,
metrics_agent_port=self._ray_params.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
dashboard_agent_listen_port=self._ray_params.dashboard_agent_listen_port, # noqa: E501
use_valgrind=use_valgrind,
use_profiler=use_profiler,
stdout_file=stdout_file,
stderr_file=stderr_file,
config=self._config,
huge_pages=self._ray_params.huge_pages,
fate_share=self.kernel_fate_share,
socket_to_use=None,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
start_initial_python_workers_for_first_job=self._ray_params.start_initial_python_workers_for_first_job, # noqa: E501
ray_debugger_external=self._ray_params.ray_debugger_external,
env_updates=self._ray_params.env_vars,
)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]

def start_worker(self):
"""Start a worker process."""
Expand All @@ -1010,21 +1023,23 @@ def start_monitor(self):
cluster launching commands.
"""
stdout_file, stderr_file = self.get_log_file_handles("monitor", unique=True)
process_info = ray._private.services.start_monitor(
self.redis_address,
self.gcs_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
autoscaling_config=self._ray_params.autoscaling_config,
redis_password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
monitor_ip=self._node_ip_address,
)
assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info]
with contextlib.ExitStack() as stack:
maybe_enter_context(stack, stdout_file, stderr_file)
process_info = ray._private.services.start_monitor(
self.redis_address,
self.gcs_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
autoscaling_config=self._ray_params.autoscaling_config,
redis_password=self._ray_params.redis_password,
fate_share=self.kernel_fate_share,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
monitor_ip=self._node_ip_address,
)
assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info]

def start_ray_client_server(self):
"""Start the ray client server process."""
Expand Down Expand Up @@ -1222,10 +1237,14 @@ def _kill_process_impl(
# If the process did not exit, force kill it.
if process.poll() is None:
process.kill()
# The reason we usually don't call process.wait() here is that
# The reason we usually don't wait indefinitely here is that
# there's some chance we'd end up waiting a really long time.
if wait:
process.wait()
# Waiting another second can prevent spurious ResourceWarnings.
timeout_seconds = None if wait else 1
try:
process.wait(timeout_seconds)
except subprocess.TimeoutExpired:
pass

del self.all_processes[process_type]

Expand Down

0 comments on commit 7c9de54

Please sign in to comment.