From e49389e92df04fc896c2148491e9d81a89ae77ff Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 29 Jan 2019 15:02:36 -0800 Subject: [PATCH 01/23] Stream logs to driver by default. --- python/ray/experimental/state.py | 28 ---- python/ray/gcs_utils.py | 1 + python/ray/log_monitor.py | 264 ++++++++++++++++++++++--------- python/ray/node.py | 4 +- python/ray/ray_constants.py | 1 + python/ray/services.py | 59 +------ python/ray/test/cluster_utils.py | 4 +- python/ray/utils.py | 11 -- python/ray/worker.py | 51 ++++-- test/runtest.py | 112 +++++++++---- 10 files changed, 321 insertions(+), 214 deletions(-) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 156d0e52d323..699a55030b58 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -401,34 +401,6 @@ def client_table(self): return parse_client_table(self.redis_client) - def log_files(self): - """Fetch and return a dictionary of log file names to outputs. - - Returns: - IP address to log file name to log file contents mappings. - """ - relevant_files = self.redis_client.keys("LOGFILE*") - - ip_filename_file = {} - - for filename in relevant_files: - filename = decode(filename) - filename_components = filename.split(":") - ip_addr = filename_components[1] - - file = self.redis_client.lrange(filename, 0, -1) - file_str = [] - for x in file: - y = decode(x) - file_str.append(y) - - if ip_addr not in ip_filename_file: - ip_filename_file[ip_addr] = {} - - ip_filename_file[ip_addr][filename] = file_str - - return ip_filename_file - def _profile_table(self, batch_id): """Get the profile events for a given batch of profile events. diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index c3ba3d06b237..b46f4170a70d 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -27,6 +27,7 @@ ] FUNCTION_PREFIX = "RemoteFunction:" +LOG_FILE_CHANNEL = "RAY_LOG_CHANNEL" # xray heartbeats XRAY_HEARTBEAT_CHANNEL = str(TablePubsub.HEARTBEAT).encode("ascii") diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index ff2910e88920..b8d1633a9bbb 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -3,10 +3,13 @@ from __future__ import print_function import argparse +import errno import logging import os +import traceback + +import colorama import redis -import time import ray.ray_constants as ray_constants from ray.services import get_ip_address @@ -19,91 +22,195 @@ logger = logging.getLogger(__name__) +class LogFileInfo(object): + def __init__(self, + filename=None, + size_when_last_opened=None, + file_position=None, + file_handle=None): + assert (filename is not None and size_when_last_opened is not None + and file_position is not None) + self.filename = filename + self.size_when_last_opened = size_when_last_opened + self.file_position = file_position + self.file_handle = file_handle + self.worker_pid = None + + class LogMonitor(object): """A monitor process for monitoring Ray log files. + This class mantains a list of open files and a list of closed log files. We + can't simply leave all files open because we'll run out of file + descriptors. + + The "run" method of this class will cycle between doing several things: + 1. First, it will check if any new files have appeared in the log + directory. If so, they will be added to the list of closed files. + 2. Then, if we are unable to open any new files, we will close all of the + files. + 3. Then, we will open as many closed files as we can that may have new + lines (judged by an increase in file size since the last time the file + was opened). + 4. Then we will loop through the open files and see if there are any new + lines in the file. If so, we will publish them to Redis. + Attributes: - node_ip_address: The IP address of the node that the log monitor - process is running on. This will be used to determine which log - files to track. + host (str): The hostname of this machine. Used to improve the log + messages published to Redis. + logs_dir (str): The directory that the log files are in. redis_client: A client used to communicate with the Redis server. - log_files: A dictionary mapping the name of a log file to a list of - strings representing its contents. - log_file_handles: A dictionary mapping the name of a log file to a file - handle for that file. + log_filenames (set): This is the set of filenames of all files in + open_file_infos and closed_file_infos. + open_file_infos (list[LogFileInfo]): Info for all of the open files. + closed_file_infos (list[LogFileInfo]): Info for all of the closed + files. + can_open_more_files (bool): True if we can still open more files and + false otherwise. """ def __init__(self, + logs_dir, redis_ip_address, redis_port, - node_ip_address, redis_password=None): """Initialize the log monitor object.""" - self.node_ip_address = node_ip_address + self.host = os.uname()[1] + self.logs_dir = logs_dir self.redis_client = redis.StrictRedis( host=redis_ip_address, port=redis_port, password=redis_password) - self.log_files = {} - self.log_file_handles = {} - self.files_to_ignore = set() + self.log_filenames = set() + self.open_file_infos = [] + self.closed_file_infos = [] + self.can_open_more_files = True + + def close_all_files(self): + """Close all open files (so that we can open more).""" + while len(self.open_file_infos) > 0: + file_info = self.open_file_infos.pop(0) + file_info.file_handle.close() + file_info.file_handle = None + self.closed_file_infos.append(file_info) + self.can_open_more_files = True def update_log_filenames(self): - """Get the most up-to-date list of log files to monitor from Redis.""" - num_current_log_files = len(self.log_files) - new_log_filenames = self.redis_client.lrange( - "LOG_FILENAMES:{}".format(self.node_ip_address), - num_current_log_files, -1) - for log_filename in new_log_filenames: - logger.info("Beginning to track file {}".format(log_filename)) - assert log_filename not in self.log_files - self.log_files[log_filename] = [] - - def check_log_files_and_push_updates(self): - """Get any changes to the log files and push updates to Redis.""" - for log_filename in self.log_files: - if log_filename in self.log_file_handles: - # Get any updates to the file. - new_lines = [] - while True: - current_position = ( - self.log_file_handles[log_filename].tell()) - next_line = self.log_file_handles[log_filename].readline() - if next_line != "": - new_lines.append(next_line) - else: - self.log_file_handles[log_filename].seek( - current_position) - break + """Update the list of log files to monitor.""" + try: + log_filenames = os.listdir(self.logs_dir) + except (IOError, OSError) as e: + if e.errno == errno.EMFILE: + self.close_all_files() + log_filenames = os.listdir(self.logs_dir) + else: + raise - # If there are any new lines, cache them and also push them to - # Redis. - if len(new_lines) > 0: - self.log_files[log_filename] += new_lines - redis_key = "LOGFILE:{}:{}".format( - self.node_ip_address, ray.utils.decode(log_filename)) - self.redis_client.rpush(redis_key, *new_lines) + for log_filename in log_filenames: + full_path = os.path.join(self.logs_dir, log_filename) + if full_path not in self.log_filenames: + self.log_filenames.add(full_path) + self.closed_file_infos.append( + LogFileInfo( + filename=full_path, + size_when_last_opened=0, + file_position=0, + file_handle=None)) + logger.info("Beginning to track file {}".format(log_filename)) - # Pass if we already failed to open the log file. - elif log_filename in self.files_to_ignore: - pass + def open_closed_files(self): + """Open some closed files if they may have new lines. - # Try to open this file for the first time. - else: + Opening more files may require us to close some of the already open + files. + """ + if not self.can_open_more_files: + # If we can't open any more files. Close all of the files. + self.close_all_files() + + files_with_no_updates = [] + while len(self.closed_file_infos) > 0: + file_info = self.closed_file_infos.pop(0) + assert file_info.file_handle is None + # Get the file size to see if it has gotten bigger since we last + # opened it. + try: + file_size = os.path.getsize(file_info.filename) + except (IOError, OSError) as e: + # Catch "file not found" errors. + if e.errno == errno.ENOENT: + logger.warning("Warning: The file {} was not " + "found.".format(file_info.filename)) + self.log_filenames.remove(file_info.filename) + continue + raise e + + # If some new lines have been added to this file, try to reopen the + # file. + if file_size > file_info.size_when_last_opened: try: - self.log_file_handles[log_filename] = open( - log_filename, "r") - except IOError as e: - if e.errno == os.errno.EMFILE: - logger.warning( - "Warning: Ignoring {} because there are too " - "many open files.".format(log_filename)) - elif e.errno == os.errno.ENOENT: + f = open(file_info.filename, "r") + except (IOError, OSError) as e: + if e.errno == errno.EMFILE: + self.can_open_more_files = False + self.closed_file_infos.insert(0, file_info) + break + elif e.errno == errno.ENOENT: logger.warning("Warning: The file {} was not " - "found.".format(log_filename)) + "found.".format(file_info.filename)) + self.log_filenames.remove(file_info.filename) + continue else: raise e - # Don't try to open this file any more. - self.files_to_ignore.add(log_filename) + f.seek(file_info.file_position) + file_info.filesize_when_last_opened = file_size + file_info.file_handle = f + self.open_file_infos.append(file_info) + else: + files_with_no_updates.append(file_info) + + # Add the files with no changes back to the list of closed files. + self.closed_file_infos += files_with_no_updates + + def check_log_files_and_publish_updates(self): + """Get any changes to the log files and push updates to Redis.""" + for file_info in self.open_file_infos: + assert not file_info.file_handle.closed + + lines_to_publish = [] + max_num_lines_to_read = 100 + for _ in range(max_num_lines_to_read): + next_line = file_info.file_handle.readline() + if next_line == "": + break + if next_line[-1] == "\n": + next_line = next_line[:-1] + lines_to_publish.append(next_line) + + # Publish the lines if this is a worker process. + filename = file_info.filename.split("/")[-1] + is_worker = (filename.startswith("worker") + and (filename.endswith("out") + or filename.endswith("err"))) + output_type = "stdout" if filename.endswith("out") else "stderr" + + if is_worker and file_info.file_position == 0: + if (len(lines_to_publish) > 0 and + lines_to_publish[0].startswith("Ray worker pid: ")): + file_info.worker_pid = int( + lines_to_publish[0].split(" ")[-1]) + lines_to_publish = lines_to_publish[1:] + + # Record the current position in the file. + file_info.file_position = file_info.file_handle.tell() + + if len(lines_to_publish) > 0 and is_worker: + lines_to_publish.insert( + 0, "{}{}{} (pid={}, host={})".format( + colorama.Fore.CYAN, "worker ({})".format(output_type), + colorama.Fore.RESET, file_info.worker_pid, self.host)) + + self.redis_client.publish(ray.gcs_utils.LOG_FILE_CHANNEL, + "\n".join(lines_to_publish)) def run(self): """Run the log monitor. @@ -113,8 +220,8 @@ def run(self): """ while True: self.update_log_filenames() - self.check_log_files_and_push_updates() - time.sleep(1) + self.open_closed_files() + self.check_log_files_and_publish_updates() if __name__ == "__main__": @@ -127,11 +234,6 @@ def run(self): required=True, type=str, help="The address to use for Redis.") - parser.add_argument( - "--node-ip-address", - required=True, - type=str, - help="The IP address of the node this process is on.") parser.add_argument( "--redis-password", required=False, @@ -151,6 +253,12 @@ def run(self): type=str, default=ray_constants.LOGGER_FORMAT, help=ray_constants.LOGGER_FORMAT_HELP) + parser.add_argument( + "--logs-dir", + required=True, + type=str, + help="Specify the path of the temporary directory used by Ray " + "processes.") args = parser.parse_args() ray.utils.setup_logger(args.logging_level, args.logging_format) @@ -158,8 +266,22 @@ def run(self): redis_port = get_port(args.redis_address) log_monitor = LogMonitor( + args.logs_dir, redis_ip_address, redis_port, - args.node_ip_address, redis_password=args.redis_password) - log_monitor.run() + + try: + log_monitor.run() + except Exception as e: + # Something went wrong, so push an error to all drivers. + redis_client = redis.StrictRedis( + host=redis_ip_address, + port=redis_port, + password=args.redis_password) + traceback_str = ray.utils.format_error_message(traceback.format_exc()) + message = ("The log monitor on node {} failed with the following " + "error:\n{}".format(os.uname()[1], traceback_str)) + ray.utils.push_error_to_driver_through_redis( + redis_client, ray_constants.LOG_MONITOR_DIED_ERROR, message) + raise e diff --git a/python/ray/node.py b/python/ray/node.py index fa722368b392..acf217c62604 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -149,7 +149,6 @@ def start_log_monitor(self): stdout_file, stderr_file = new_log_monitor_log_file() process_info = ray.services.start_log_monitor( self.redis_address, - self._node_ip_address, stdout_file=stdout_file, stderr_file=stderr_file, redis_password=self._ray_params.redis_password) @@ -189,8 +188,7 @@ def start_plasma_store(self): object_store_memory=self._ray_params.object_store_memory, plasma_directory=self._ray_params.plasma_directory, huge_pages=self._ray_params.huge_pages, - plasma_store_socket_name=self._plasma_store_socket_name, - redis_password=self._ray_params.redis_password) + plasma_store_socket_name=self._plasma_store_socket_name) assert ( ray_constants.PROCESS_TYPE_PLASMA_STORE not in self.all_processes) self.all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] = [ diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 7e9f5dc042f1..1a85ef64e050 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -53,6 +53,7 @@ def env_integer(key, default): INFEASIBLE_TASK_ERROR = "infeasible_task" REMOVED_NODE_ERROR = "node_removed" MONITOR_DIED_ERROR = "monitor_died" +LOG_MONITOR_DIED_ERROR = "log_monitor_died" # Abort autoscaling if more than this number of errors are encountered. This # is a safety feature to prevent e.g. runaway node launches. diff --git a/python/ray/services.py b/python/ray/services.py index ca1cc8c72d7c..d34f87bb92c0 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -188,34 +188,6 @@ def get_node_ip_address(address="8.8.8.8:53"): return node_ip_address -def record_log_files_in_redis(redis_address, - node_ip_address, - log_files, - password=None): - """Record in Redis that a new log file has been created. - - This is used so that each log monitor can check Redis and figure out which - log files it is reponsible for monitoring. - - Args: - redis_address: The address of the redis server. - node_ip_address: The IP address of the node that the log file exists - on. - log_files: A list of file handles for the log files. If one of the file - handles is None, we ignore it. - password (str): The password of the redis server. - """ - for log_file in log_files: - if log_file is not None: - redis_ip_address, redis_port = redis_address.split(":") - redis_client = redis.StrictRedis( - host=redis_ip_address, port=redis_port, password=password) - # The name of the key storing the list of log filenames for this IP - # address. - log_file_list_key = "LOG_FILENAMES:{}".format(node_ip_address) - redis_client.rpush(log_file_list_key, log_file.name) - - def create_redis_client(redis_address, password=None): """Create a Redis client. @@ -834,7 +806,6 @@ def _start_redis_instance(executable, def start_log_monitor(redis_address, - node_ip_address, stdout_file=None, stderr_file=None, redis_password=None): @@ -842,8 +813,6 @@ def start_log_monitor(redis_address, Args: redis_address (str): The address of the Redis instance. - node_ip_address (str): The IP address of the node that this log monitor - is running on. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If @@ -856,8 +825,9 @@ def start_log_monitor(redis_address, log_monitor_filepath = os.path.join( os.path.dirname(os.path.abspath(__file__)), "log_monitor.py") command = [ - sys.executable, "-u", log_monitor_filepath, "--redis-address", - redis_address, "--node-ip-address", node_ip_address + sys.executable, "-u", log_monitor_filepath, + "--redis-address={}".format(redis_address), "--logs-dir={}".format( + get_logs_dir_path()) ] if redis_password: command += ["--redis-password", redis_password] @@ -866,10 +836,6 @@ def start_log_monitor(redis_address, ray_constants.PROCESS_TYPE_LOG_MONITOR, stdout_file=stdout_file, stderr_file=stderr_file) - record_log_files_in_redis( - redis_address, - node_ip_address, [stdout_file, stderr_file], - password=redis_password) return process_info @@ -1127,10 +1093,6 @@ def start_raylet(redis_address, use_perftools_profiler=("RAYLET_PERFTOOLS_PATH" in os.environ), stdout_file=stdout_file, stderr_file=stderr_file) - record_log_files_in_redis( - redis_address, - node_ip_address, [stdout_file, stderr_file], - password=redis_password) return process_info @@ -1322,8 +1284,7 @@ def start_plasma_store(node_ip_address, object_store_memory=None, plasma_directory=None, huge_pages=False, - plasma_store_socket_name=None, - redis_password=None): + plasma_store_socket_name=None): """This method starts an object store process. Args: @@ -1340,7 +1301,6 @@ def start_plasma_store(node_ip_address, be created. huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. - redis_password (str): The password of the redis server. Returns: ProcessInfo for the process that was started. @@ -1368,11 +1328,6 @@ def start_plasma_store(node_ip_address, huge_pages=huge_pages, socket_name=plasma_store_socket_name) - record_log_files_in_redis( - redis_address, - node_ip_address, [stdout_file, stderr_file], - password=redis_password) - return process_info @@ -1414,8 +1369,6 @@ def start_worker(node_ip_address, ray_constants.PROCESS_TYPE_WORKER, stdout_file=stdout_file, stderr_file=stderr_file) - record_log_files_in_redis(redis_address, node_ip_address, - [stdout_file, stderr_file]) return process_info @@ -1456,10 +1409,6 @@ def start_monitor(redis_address, ray_constants.PROCESS_TYPE_MONITOR, stdout_file=stdout_file, stderr_file=stderr_file) - record_log_files_in_redis( - redis_address, - node_ip_address, [stdout_file, stderr_file], - password=redis_password) return process_info diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index 503087abb1c4..a92d9b8f4020 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -120,7 +120,7 @@ def remove_node(self, node): assert not node.any_processes_alive(), ( "There are zombie processes left over after killing.") - def _wait_for_node(self, node, timeout=30): + def _wait_for_node(self, node, timeout=60): """Wait until this node has appeared in the client table. Args: @@ -148,7 +148,7 @@ def _wait_for_node(self, node, timeout=30): time.sleep(0.1) raise Exception("Timed out while waiting for nodes to join.") - def wait_for_nodes(self, timeout=30): + def wait_for_nodes(self, timeout=60): """Waits for correct number of nodes to be registered. This will wait until the number of live nodes in the client table diff --git a/python/ray/utils.py b/python/ray/utils.py index 64de24187327..9ab6bd7f558c 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -282,17 +282,6 @@ def setup_logger(logging_level, logging_format): logger.propagate = False -def try_update_handler(new_stream): - global _default_handler - logger = logging.getLogger("ray") - if _default_handler: - new_handler = logging.StreamHandler(stream=new_stream) - new_handler.setFormatter(_default_handler.formatter) - _default_handler.close() - _default_handler = new_handler - logger.addHandler(_default_handler) - - # This function is copied and modified from # https://github.com/giampaolo/psutil/blob/5bd44f8afcecbfb0db479ce230c790fc2c56569a/psutil/tests/test_linux.py#L132-L138 # noqa: E501 def vmstat(stat): diff --git a/python/ray/worker.py b/python/ray/worker.py index afb3010e2738..79f00ec9039d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -39,7 +39,7 @@ from ray.function_manager import (FunctionActorManager, FunctionDescriptor) import ray.parameter from ray.utils import (check_oversized_pickle, is_cython, random_string, - thread_safe_client, setup_logger, try_update_handler) + thread_safe_client, setup_logger) SCRIPT_MODE = 0 WORKER_MODE = 1 @@ -1219,12 +1219,13 @@ def init(redis_address=None, resources=None, object_store_memory=None, redis_max_memory=None, + log_to_driver=True, node_ip_address=None, object_id_seed=None, num_workers=None, local_mode=False, driver_mode=None, - redirect_worker_output=False, + redirect_worker_output=True, redirect_output=True, ignore_reinit_error=False, num_redis_shards=None, @@ -1281,6 +1282,8 @@ def init(redis_address=None, LRU eviction of entries. This only applies to the sharded redis tables (task, object, and profile tables). By default, this is capped at 10GB but can be set higher. + log_to_driver (bool): If true, then output from all of the worker + processes on all nodes will be directed to the driver. node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the @@ -1490,6 +1493,7 @@ def init(redis_address=None, redis_password=redis_password, object_id_seed=object_id_seed, mode=driver_mode, + log_to_driver=log_to_driver, worker=global_worker, driver_id=driver_id) @@ -1564,6 +1568,21 @@ def custom_excepthook(type, value, tb): UNCAUGHT_ERROR_GRACE_PERIOD = 5 +def print_logs(redis_client): + """Prints log messages from workers on all of the nodes. + + Args: + redis_client: A client to the primary Redis shard. + """ + # TODO(rkn): The redis client is thread safe, but how does this pubsub + # client relate to the pubsub client in the error printing thread? They are + # derived from the same redis client. Can they interact? + pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True) + pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL) + for msg in pubsub_client.listen(): + print(ray.utils.decode(msg["data"]), file=sys.stderr) + + def print_error_messages_raylet(task_error_queue): """Prints message received in the given output queue. @@ -1650,6 +1669,7 @@ def connect(info, redis_password=None, object_id_seed=None, mode=WORKER_MODE, + log_to_driver=False, worker=global_worker, driver_id=None): """Connect this worker to the local scheduler, to Plasma, and to Redis. @@ -1665,6 +1685,8 @@ def connect(info, manner. However, the same ID should not be used for different jobs. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, and LOCAL_MODE. + log_to_driver (bool): If true, then output from all of the worker + processes on all nodes will be directed to the driver. worker: The ray.Worker instance. driver_id: The ID of driver. If it's None, then we will generate one. """ @@ -1777,13 +1799,17 @@ def connect(info, log_stdout_file, log_stderr_file = ( tempfile_services.new_worker_redirected_log_file( worker.worker_id)) - sys.stdout = log_stdout_file - sys.stderr = log_stderr_file - try_update_handler(sys.stderr) - services.record_log_files_in_redis( - info["redis_address"], - info["node_ip_address"], [log_stdout_file, log_stderr_file], - password=redis_password) + # Redirect stdout/stderr at the file descriptor level. If we simply + # set sys.stdout and sys.stderr, then logging from C++ can fail to + # be redirected. + os.dup2(log_stdout_file.fileno(), sys.stdout.fileno()) + os.dup2(log_stderr_file.fileno(), sys.stderr.fileno()) + # This should always be the first message to appear in the worker's + # stdout and stderr log files. The string "Ray worker pid:" is + # parsed in the log monitor process. + print("Ray worker pid: {}".format(os.getpid())) + print("Ray worker pid: {}".format(os.getpid()), file=sys.stderr) + # Register the worker with Redis. worker_dict = { "node_ip_address": worker.node_ip_address, @@ -1888,6 +1914,13 @@ def connect(info, listener.start() printer.daemon = True printer.start() + if log_to_driver: + log_thread = threading.Thread( + target=print_logs, + name="ray_print_logs", + args=(worker.redis_client, )) + log_thread.daemon = True + log_thread.start() # If we are using the raylet code path and we are not in local mode, start # a background thread to periodically flush profiling data to the GCS. diff --git a/test/runtest.py b/test/runtest.py index 1e35a08e5844..2e83ba423df3 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2297,9 +2297,6 @@ def test_global_state_api(shutdown_only): with pytest.raises(Exception): ray.global_state.function_table() - with pytest.raises(Exception): - ray.global_state.log_files() - ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1}) resources = {"CPU": 5, "GPU": 3, "CustomResource": 1} @@ -2388,45 +2385,90 @@ def wait_for_object_table(): assert object_table[result_id] == object_table_entry -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="New GCS API doesn't have a Python API yet.") -def test_log_file_api(shutdown_only): - """Tests that stderr and stdout are redirected appropriately.""" - ray.init(num_cpus=1, redirect_worker_output=True) +# TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we +# should use those, but they seem to conflict with Ray's use of faulthandler. +class CaptureOutputAndError(object): + """Capture stdout and stderr of some span. - message_1 = "unique message" - message_2 = "message unique" + This can be used as follows. - @ray.remote - def f(): - print(message_1, file=sys.stdout) - print(message_2, file=sys.stderr) - # The call to sys.stdout.flush() seems to be necessary when using - # the system Python 2.7 on Ubuntu. + captured = {} + with CaptureOutputAndError(captured): + # Do stuff. + # Access captured["out"] and captured["err"]. + """ + + def __init__(self, captured_output_and_error): + if sys.version_info >= (3, 0): + import io + self.output_buffer = io.StringIO() + self.error_buffer = io.StringIO() + else: + import cStringIO + self.output_buffer = cStringIO.StringIO() + self.error_buffer = cStringIO.StringIO() + self.captured_output_and_error = captured_output_and_error + + def __enter__(self): sys.stdout.flush() sys.stderr.flush() + self.old_stdout = sys.stdout + self.old_stderr = sys.stderr + sys.stdout = self.output_buffer + sys.stderr = self.error_buffer - ray.get(f.remote()) + def __exit__(self, exc_type, exc_value, traceback): + sys.stdout.flush() + sys.stderr.flush() + sys.stdout = self.old_stdout + sys.stderr = self.old_stderr + self.captured_output_and_error["out"] = self.output_buffer.getvalue() + self.captured_output_and_error["err"] = self.error_buffer.getvalue() - # Make sure that the message appears in the log files. - start_time = time.time() - found_message_1 = False - found_message_2 = False - while time.time() - start_time < 10: - log_files = ray.global_state.log_files() - for ip, innerdict in log_files.items(): - for filename, contents in innerdict.items(): - contents_str = "".join(contents) - if message_1 in contents_str: - found_message_1 = True - if message_2 in contents_str: - found_message_2 = True - if found_message_1 and found_message_2: - break - time.sleep(0.1) - assert found_message_1 and found_message_2 +def test_logging_to_driver(shutdown_only): + ray.init(num_cpus=1, log_to_driver=True) + + @ray.remote + def f(): + for i in range(100): + print(i) + print(100 + i, file=sys.stderr) + sys.stdout.flush() + sys.stderr.flush() + + captured = {} + with CaptureOutputAndError(captured): + ray.get(f.remote()) + time.sleep(1) + + output_lines = captured["out"] + assert len(output_lines) == 0 + error_lines = captured["err"] + for i in range(200): + assert str(i) in error_lines + + +def test_not_logging_to_driver(shutdown_only): + ray.init(num_cpus=1, log_to_driver=False) + + @ray.remote + def f(): + for i in range(100): + print(i) + print(100 + i, file=sys.stderr) + sys.stdout.flush() + sys.stderr.flush() + + captured = {} + with CaptureOutputAndError(captured): + ray.get(f.remote()) + time.sleep(1) + + output_lines = captured["out"] + assert len(output_lines) == 0 + error_lines = captured["err"] + assert len(error_lines) == 0 @pytest.mark.skipif( From dfb25c028671f39decfffe98a83fd5c403843118 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 11:28:52 -0800 Subject: [PATCH 02/23] Fix from rebase --- python/ray/services.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/python/ray/services.py b/python/ray/services.py index d34f87bb92c0..928105d3abaf 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -556,12 +556,6 @@ def start_redis(node_ip_address, processes.append(p) redis_address = address(node_ip_address, port) - # Record the log files in Redis. - record_log_files_in_redis( - redis_address, - node_ip_address, [redis_stdout_file, redis_stderr_file], - password=password) - # Register the number of Redis shards in the primary shard, so that clients # know how many redis shards to expect under RedisShards. primary_redis_client = redis.StrictRedis( @@ -622,11 +616,6 @@ def start_redis(node_ip_address, # Store redis shard information in the primary redis shard. primary_redis_client.rpush("RedisShards", shard_address) - record_log_files_in_redis( - redis_address, - node_ip_address, [redis_stdout_file, redis_stderr_file], - password=password) - if use_credis: # Configure the chain state. The way it is intended to work is # the following: From 92ad0aae291496f442735be07df23b7b4f511836 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 11:32:45 -0800 Subject: [PATCH 03/23] Redirect raylet output independently of worker output. --- python/ray/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/node.py b/python/ray/node.py index acf217c62604..e3581ec2b0b3 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -210,7 +210,7 @@ def start_raylet(self, use_valgrind=False, use_profiler=False): or get_raylet_socket_name()) self.prepare_socket_file(self._raylet_socket_name) stdout_file, stderr_file = new_raylet_log_file( - redirect_output=self._ray_params.redirect_worker_output) + redirect_output=self._ray_params.redirect_output) process_info = ray.services.start_raylet( self._redis_address, self._node_ip_address, From 08a8c51f922d1e0ca2e5a326f0281ba16b4343e7 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 11:54:49 -0800 Subject: [PATCH 04/23] Fix. --- python/ray/test/cluster_utils.py | 4 ++-- python/ray/worker.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index a92d9b8f4020..503087abb1c4 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -120,7 +120,7 @@ def remove_node(self, node): assert not node.any_processes_alive(), ( "There are zombie processes left over after killing.") - def _wait_for_node(self, node, timeout=60): + def _wait_for_node(self, node, timeout=30): """Wait until this node has appeared in the client table. Args: @@ -148,7 +148,7 @@ def _wait_for_node(self, node, timeout=60): time.sleep(0.1) raise Exception("Timed out while waiting for nodes to join.") - def wait_for_nodes(self, timeout=60): + def wait_for_nodes(self, timeout=30): """Waits for correct number of nodes to be registered. This will wait until the number of live nodes in the client table diff --git a/python/ray/worker.py b/python/ray/worker.py index 79f00ec9039d..8a36e7596ad9 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1809,6 +1809,8 @@ def connect(info, # parsed in the log monitor process. print("Ray worker pid: {}".format(os.getpid())) print("Ray worker pid: {}".format(os.getpid()), file=sys.stderr) + sys.stdout.flush() + sys.stderr.flush() # Register the worker with Redis. worker_dict = { From be8e306b54fe5adba976b12533ee98978215317a Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 12:09:03 -0800 Subject: [PATCH 05/23] Create redis client with services.create_redis_client. --- python/ray/log_monitor.py | 27 ++++++--------------------- python/ray/monitor.py | 26 +++++++++----------------- 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index b8d1633a9bbb..4a8a84bfebd0 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -9,11 +9,8 @@ import traceback import colorama -import redis import ray.ray_constants as ray_constants -from ray.services import get_ip_address -from ray.services import get_port import ray.utils # Logger for this module. It should be configured at the entry point @@ -69,16 +66,12 @@ class LogMonitor(object): false otherwise. """ - def __init__(self, - logs_dir, - redis_ip_address, - redis_port, - redis_password=None): + def __init__(self, logs_dir, redis_address, redis_password=None): """Initialize the log monitor object.""" self.host = os.uname()[1] self.logs_dir = logs_dir - self.redis_client = redis.StrictRedis( - host=redis_ip_address, port=redis_port, password=redis_password) + self.redis_client = ray.services.create_redis_client( + redis_address, password=redis_password) self.log_filenames = set() self.open_file_infos = [] self.closed_file_infos = [] @@ -262,23 +255,15 @@ def run(self): args = parser.parse_args() ray.utils.setup_logger(args.logging_level, args.logging_format) - redis_ip_address = get_ip_address(args.redis_address) - redis_port = get_port(args.redis_address) - log_monitor = LogMonitor( - args.logs_dir, - redis_ip_address, - redis_port, - redis_password=args.redis_password) + args.logs_dir, args.redis_address, redis_password=args.redis_password) try: log_monitor.run() except Exception as e: # Something went wrong, so push an error to all drivers. - redis_client = redis.StrictRedis( - host=redis_ip_address, - port=redis_port, - password=args.redis_password) + redis_client = ray.services.create_redis_client( + args.redis_address, password=args.redis_password) traceback_str = ray.utils.format_error_message(traceback.format_exc()) message = ("The log monitor on node {} failed with the following " "error:\n{}".format(os.uname()[1], traceback_str)) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 260ef1d87dd7..c0e2ff2ac7f7 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -36,17 +36,15 @@ class Monitor(object): receive notifications about failed components. """ - def __init__(self, - redis_address, - redis_port, - autoscaling_config, - redis_password=None): + def __init__(self, redis_address, autoscaling_config, redis_password=None): # Initialize the Redis clients. self.state = ray.experimental.state.GlobalState() + redis_ip_address = get_ip_address(args.redis_address) + redis_port = get_port(args.redis_address) self.state._initialize_global_state( - redis_address, redis_port, redis_password=redis_password) - self.redis = redis.StrictRedis( - host=redis_address, port=redis_port, db=0, password=redis_password) + redis_ip_address, redis_port, redis_password=redis_password) + self.redis = ray.services.create_redis_client( + redis_address, password=redis_password) # Setup subscriptions to the primary Redis server and the Redis shards. self.primary_subscribe_client = self.redis.pubsub( ignore_subscribe_messages=True) @@ -366,17 +364,13 @@ def run(self): args = parser.parse_args() setup_logger(args.logging_level, args.logging_format) - redis_ip_address = get_ip_address(args.redis_address) - redis_port = get_port(args.redis_address) - if args.autoscaling_config: autoscaling_config = os.path.expanduser(args.autoscaling_config) else: autoscaling_config = None monitor = Monitor( - redis_ip_address, - redis_port, + args.redis_address, autoscaling_config, redis_password=args.redis_password) @@ -384,10 +378,8 @@ def run(self): monitor.run() except Exception as e: # Something went wrong, so push an error to all drivers. - redis_client = redis.StrictRedis( - host=redis_ip_address, - port=redis_port, - password=args.redis_password) + redis_client = ray.services.create_redis_client( + args.redis_address, password=args.redis_password) traceback_str = ray.utils.format_error_message(traceback.format_exc()) message = "The monitor failed with the following error:\n{}".format( traceback_str) From 841d0a6e156040e56e051be4fe388e7fe65d0bc6 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 12:12:03 -0800 Subject: [PATCH 06/23] Suppress Redis connection error at exit. --- python/ray/worker.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 8a36e7596ad9..1a73368e4f1e 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1579,8 +1579,13 @@ def print_logs(redis_client): # derived from the same redis client. Can they interact? pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True) pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL) - for msg in pubsub_client.listen(): - print(ray.utils.decode(msg["data"]), file=sys.stderr) + try: + for msg in pubsub_client.listen(): + print(ray.utils.decode(msg["data"]), file=sys.stderr) + except redis.ConnectionError: + # When Redis terminates the listen call will throw a ConnectionError, + # which we catch here. + pass def print_error_messages_raylet(task_error_queue): From 8b99bf0b61eeaa8b639738034a102bf85364e043 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 12:33:56 -0800 Subject: [PATCH 07/23] Remove thread_safe_client from redis. --- python/ray/worker.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 1a73368e4f1e..f3bab9f73ee6 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1574,9 +1574,6 @@ def print_logs(redis_client): Args: redis_client: A client to the primary Redis shard. """ - # TODO(rkn): The redis client is thread safe, but how does this pubsub - # client relate to the pubsub client in the error printing thread? They are - # derived from the same redis client. Can they interact? pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True) pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL) try: @@ -1744,11 +1741,8 @@ def connect(info, # Create a Redis client. redis_ip_address, redis_port = info["redis_address"].split(":") - worker.redis_client = thread_safe_client( - redis.StrictRedis( - host=redis_ip_address, - port=int(redis_port), - password=redis_password)) + worker.redis_client = redis.StrictRedis( + host=redis_ip_address, port=int(redis_port), password=redis_password) # For driver's check that the version information matches the version # information that the Ray cluster was started with. From 4ccc8a2267f6002648475a7d580bed4f5acf267b Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 15:38:00 -0800 Subject: [PATCH 08/23] Shutdown driver threads in ray.shutdown(). --- python/ray/import_thread.py | 33 ++++++--- python/ray/profiling.py | 24 ++++-- python/ray/worker.py | 144 +++++++++++++++++++++++------------- 3 files changed, 135 insertions(+), 66 deletions(-) diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index 8aa1149db705..fbbc1a73c797 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -17,29 +17,35 @@ class ImportThread(object): """A thread used to import exports from the driver or other workers. - Note: - The driver also has an import thread, which is used only to - import custom class definitions from calls to register_custom_serializer - that happen under the hood on workers. + Note: The driver also has an import thread, which is used only to import + custom class definitions from calls to register_custom_serializer that + happen under the hood on workers. Attributes: worker: the worker object in this process. mode: worker mode redis_client: the redis client used to query exports. + threads_stopped (threading.Event): A threading event used to signal to + the thread that it should exit. """ - def __init__(self, worker, mode): + def __init__(self, worker, mode, threads_stopped): self.worker = worker self.mode = mode self.redis_client = worker.redis_client + self.threads_stopped = threads_stopped def start(self): """Start the import thread.""" - t = threading.Thread(target=self._run, name="ray_import_thread") + self.t = threading.Thread(target=self._run, name="ray_import_thread") # Making the thread a daemon causes it to exit # when the main thread exits. - t.daemon = True - t.start() + self.t.daemon = True + self.t.start() + + def join_import_thread(self): + """Wait for the thread to exit.""" + self.t.join() def _run(self): import_pubsub_client = self.redis_client.pubsub() @@ -56,8 +62,17 @@ def _run(self): for key in export_keys: num_imported += 1 self._process_key(key) + try: - for msg in import_pubsub_client.listen(): + while True: + # Exit if we received a signal that we should stop. + if self.threads_stopped.is_set(): + return + + msg = import_pubsub_client.get_message() + if msg is None: + continue + with self.worker.lock: if msg["type"] == "subscribe": continue diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 6f4818379144..8871b8710c20 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -69,21 +69,28 @@ class Profiler(object): worker: the worker to profile. events: the buffer of events. lock: the lock to protect access of events. + threads_stopped (threading.Event): A threading event used to signal to + the thread that it should exit. """ - def __init__(self, worker): + def __init__(self, worker, threads_stopped): self.worker = worker self.events = [] self.lock = threading.Lock() + self.threads_stopped = threads_stopped def start_flush_thread(self): - t = threading.Thread( + self.t = threading.Thread( target=self._periodically_flush_profile_events, name="ray_push_profiling_information") # Making the thread a daemon causes it to exit when the main thread # exits. - t.daemon = True - t.start() + self.t.daemon = True + self.t.start() + + def join_flush_thread(self): + """Wait for the flush thread to exit.""" + self.t.join() def _periodically_flush_profile_events(self): """Drivers run this as a thread to flush profile data in the @@ -94,7 +101,14 @@ def _periodically_flush_profile_events(self): # if either of those things changes, then we could run into issues. try: while True: - time.sleep(1) + # Sleep for 1 second. This will be interrupted if + # self.threads_stopped is set. + self.threads_stopped.wait(timeout=1) + + # Exit if we received a signal that we should stop. + if self.threads_stopped.is_set(): + return + self.flush_profile_data() except AttributeError: # TODO(suquark): It is a bad idea to ignore "AttributeError". diff --git a/python/ray/worker.py b/python/ray/worker.py index f3bab9f73ee6..3fb35b70c77c 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -158,6 +158,9 @@ def __init__(self): # It is a DriverID. self.task_driver_id = DriverID.nil() self._task_context = threading.local() + # This event is checked regularly by all of the threads so that they + # know when to exit. + self.threads_stopped = threading.Event() @property def task_context(self): @@ -1532,6 +1535,15 @@ def shutdown(worker=global_worker): if hasattr(worker, "plasma_client"): worker.plasma_client.disconnect() + # Shutdown all of the threads that we've started. + worker.threads_stopped.set() + worker.import_thread.join_import_thread() + worker.profiler.join_flush_thread() + worker.listener_thread.join() + worker.printer_thread.join() + if worker.logger_thread is not None: + worker.logger_thread.join() + # Shut down the Ray processes. global _global_node if _global_node is not None: @@ -1568,35 +1580,52 @@ def custom_excepthook(type, value, tb): UNCAUGHT_ERROR_GRACE_PERIOD = 5 -def print_logs(redis_client): +def print_logs(redis_client, threads_stopped): """Prints log messages from workers on all of the nodes. Args: redis_client: A client to the primary Redis shard. + threads_stopped (threading.Event): A threading event used to signal to + the thread that it should exit. """ pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True) pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL) - try: - for msg in pubsub_client.listen(): - print(ray.utils.decode(msg["data"]), file=sys.stderr) - except redis.ConnectionError: - # When Redis terminates the listen call will throw a ConnectionError, - # which we catch here. - pass + while True: + # Exit if we received a signal that we should stop. + if threads_stopped.is_set(): + return + + msg = pubsub_client.get_message() + if msg is None: + continue + print(ray.utils.decode(msg["data"]), file=sys.stderr) -def print_error_messages_raylet(task_error_queue): +def print_error_messages_raylet(task_error_queue, threads_stopped): """Prints message received in the given output queue. This checks periodically if any un-raised errors occured in the background. + + Args: + task_error_queue (queue.Queue): A queue used to receive errors from the + thread that listens to Redis. + threads_stopped (threading.Event): A threading event used to signal to + the thread that it should exit. """ while True: - error, t = task_error_queue.get() + # Exit if we received a signal that we should stop. + if threads_stopped.is_set(): + return + + try: + error, t = task_error_queue.get(block=False) + except queue.Empty: + continue # Delay errors a little bit of time to attempt to suppress redundant # messages originating from the worker. while t + UNCAUGHT_ERROR_GRACE_PERIOD > time.time(): - time.sleep(1) + threads_stopped.wait(timeout=1) if t < last_task_error_raise_time + UNCAUGHT_ERROR_GRACE_PERIOD: logger.debug("Suppressing error from worker: {}".format(error)) else: @@ -1604,11 +1633,18 @@ def print_error_messages_raylet(task_error_queue): "Possible unhandled error from worker: {}".format(error)) -def listen_error_messages_raylet(worker, task_error_queue): +def listen_error_messages_raylet(worker, task_error_queue, threads_stopped): """Listen to error messages in the background on the driver. This runs in a separate thread on the driver and pushes (error, time) tuples to the output queue. + + Args: + worker: The worker class that this thread belongs to. + task_error_queue (queue.Queue): A queue used to communicate with the + thread that prints the errors found by this thread. + threads_stopped (threading.Event): A threading event used to signal to + the thread that it should exit. """ worker.error_message_pubsub_client = worker.redis_client.pubsub( ignore_subscribe_messages=True) @@ -1629,33 +1665,33 @@ def listen_error_messages_raylet(worker, task_error_queue): for error_message in error_messages: logger.error(error_message) - try: - for msg in worker.error_message_pubsub_client.listen(): - - gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( - msg["data"], 0) - assert gcs_entry.EntriesLength() == 1 - error_data = ray.gcs_utils.ErrorTableData.GetRootAsErrorTableData( - gcs_entry.Entries(0), 0) - job_id = error_data.JobId() - if job_id not in [ - worker.task_driver_id.binary(), - DriverID.nil().binary() - ]: - continue - - error_message = ray.utils.decode(error_data.ErrorMessage()) - if (ray.utils.decode( - error_data.Type()) == ray_constants.TASK_PUSH_ERROR): - # Delay it a bit to see if we can suppress it - task_error_queue.put((error_message, time.time())) - else: - logger.error(error_message) + while True: + # Exit if we received a signal that we should stop. + if threads_stopped.is_set(): + return - except redis.ConnectionError: - # When Redis terminates the listen call will throw a ConnectionError, - # which we catch here. - pass + msg = worker.error_message_pubsub_client.get_message() + if msg is None: + continue + gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( + msg["data"], 0) + assert gcs_entry.EntriesLength() == 1 + error_data = ray.gcs_utils.ErrorTableData.GetRootAsErrorTableData( + gcs_entry.Entries(0), 0) + job_id = error_data.JobId() + if job_id not in [ + worker.task_driver_id.binary(), + DriverID.nil().binary() + ]: + continue + + error_message = ray.utils.decode(error_data.ErrorMessage()) + if (ray.utils.decode( + error_data.Type()) == ray_constants.TASK_PUSH_ERROR): + # Delay it a bit to see if we can suppress it + task_error_queue.put((error_message, time.time())) + else: + logger.error(error_message) def is_initialized(): @@ -1701,7 +1737,7 @@ def connect(info, if not faulthandler.is_enabled(): faulthandler.enable(all_threads=False) - worker.profiler = profiling.Profiler(worker) + worker.profiler = profiling.Profiler(worker, worker.threads_stopped) # Initialize some fields. if mode is WORKER_MODE: @@ -1893,7 +1929,9 @@ def connect(info, ) # Start the import thread - import_thread.ImportThread(worker, mode).start() + worker.import_thread = import_thread.ImportThread(worker, mode, + worker.threads_stopped) + worker.import_thread.start() # If this is a driver running in SCRIPT_MODE, start a thread to print error # messages asynchronously in the background. Ideally the scheduler would @@ -1903,25 +1941,27 @@ def connect(info, # scheduler for new error messages. if mode == SCRIPT_MODE: q = queue.Queue() - listener = threading.Thread( + worker.listener_thread = threading.Thread( target=listen_error_messages_raylet, name="ray_listen_error_messages", - args=(worker, q)) - printer = threading.Thread( + args=(worker, q, worker.threads_stopped)) + worker.printer_thread = threading.Thread( target=print_error_messages_raylet, name="ray_print_error_messages", - args=(q, )) - listener.daemon = True - listener.start() - printer.daemon = True - printer.start() + args=(q, worker.threads_stopped)) + worker.listener_thread.daemon = True + worker.listener_thread.start() + worker.printer_thread.daemon = True + worker.printer_thread.start() if log_to_driver: - log_thread = threading.Thread( + worker.logger_thread = threading.Thread( target=print_logs, name="ray_print_logs", - args=(worker.redis_client, )) - log_thread.daemon = True - log_thread.start() + args=(worker.redis_client, worker.threads_stopped)) + worker.logger_thread.daemon = True + worker.logger_thread.start() + else: + worker.logger_thread = None # If we are using the raylet code path and we are not in local mode, start # a background thread to periodically flush profiling data to the GCS. From 5a1a64dfa219d4f746302f07de2ed7e8c8dcdaa4 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 17:16:12 -0800 Subject: [PATCH 09/23] Add warning for too many log messages. --- python/ray/import_thread.py | 1 + python/ray/worker.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index fbbc1a73c797..ced604fc5c92 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -71,6 +71,7 @@ def _run(self): msg = import_pubsub_client.get_message() if msg is None: + self.threads_stopped.wait(timeout=0.01) continue with self.worker.lock: diff --git a/python/ray/worker.py b/python/ray/worker.py index 3fb35b70c77c..13dc013bae23 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1590,6 +1590,11 @@ def print_logs(redis_client, threads_stopped): """ pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True) pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL) + # Keep track of the number of consecutive log messages that have been + # received with no break in between. If this number grows continually, then + # the worker is probably not able to process the log messages as rapidly as + # they are coming in. + num_consecutive_messages_received = 0 while True: # Exit if we received a signal that we should stop. if threads_stopped.is_set(): @@ -1597,9 +1602,19 @@ def print_logs(redis_client, threads_stopped): msg = pubsub_client.get_message() if msg is None: + num_consecutive_messages_received = 0 + threads_stopped.wait(timeout=0.01) continue + num_consecutive_messages_received += 1 print(ray.utils.decode(msg["data"]), file=sys.stderr) + if (num_consecutive_messages_received % 100 == 0 + and num_consecutive_messages_received > 0): + logger.warning( + "The driver may not be able to keep up with the stdout/stderr " + "of the workers. To avoid forwarding logs to the driver, use " + "'ray.init(log_to_driver=False)'.") + def print_error_messages_raylet(task_error_queue, threads_stopped): """Prints message received in the given output queue. @@ -1621,6 +1636,7 @@ def print_error_messages_raylet(task_error_queue, threads_stopped): try: error, t = task_error_queue.get(block=False) except queue.Empty: + threads_stopped.wait(timeout=0.01) continue # Delay errors a little bit of time to attempt to suppress redundant # messages originating from the worker. @@ -1672,6 +1688,7 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped): msg = worker.error_message_pubsub_client.get_message() if msg is None: + threads_stopped.wait(timeout=0.01) continue gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( msg["data"], 0) From a2e8c875758430bc8a2aa66f48ad13e7dd93b3d8 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 18:14:47 -0800 Subject: [PATCH 10/23] Only stop threads if worker is connected. --- python/ray/worker.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 13dc013bae23..2019a7dd316a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1530,19 +1530,6 @@ def shutdown(worker=global_worker): will need to reload the module. """ disconnect(worker) - if hasattr(worker, "raylet_client"): - del worker.raylet_client - if hasattr(worker, "plasma_client"): - worker.plasma_client.disconnect() - - # Shutdown all of the threads that we've started. - worker.threads_stopped.set() - worker.import_thread.join_import_thread() - worker.profiler.join_flush_thread() - worker.listener_thread.join() - worker.printer_thread.join() - if worker.logger_thread is not None: - worker.logger_thread.join() # Shut down the Ray processes. global _global_node @@ -2019,11 +2006,26 @@ def disconnect(worker=global_worker): # remote functions or actors are defined and then connect is called again, # the remote functions will be exported. This is mostly relevant for the # tests. + if worker.connected: + # Shutdown all of the threads that we've started. + worker.threads_stopped.set() + worker.import_thread.join_import_thread() + worker.profiler.join_flush_thread() + worker.listener_thread.join() + worker.printer_thread.join() + if worker.logger_thread is not None: + worker.logger_thread.join() + worker.connected = False worker.cached_functions_to_run = [] worker.function_actor_manager.reset_cache() worker.serialization_context_map.clear() + if hasattr(worker, "raylet_client"): + del worker.raylet_client + if hasattr(worker, "plasma_client"): + worker.plasma_client.disconnect() + @contextmanager def _changeproctitle(title, next_title): From 8535645efe0f3ad119aa847e91a9b3ba452ee531 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 19:14:32 -0800 Subject: [PATCH 11/23] Only stop threads if they exist. --- python/ray/worker.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 2019a7dd316a..e237cc560b33 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1964,8 +1964,6 @@ def connect(info, args=(worker.redis_client, worker.threads_stopped)) worker.logger_thread.daemon = True worker.logger_thread.start() - else: - worker.logger_thread = None # If we are using the raylet code path and we are not in local mode, start # a background thread to periodically flush profiling data to the GCS. @@ -2011,10 +2009,13 @@ def disconnect(worker=global_worker): worker.threads_stopped.set() worker.import_thread.join_import_thread() worker.profiler.join_flush_thread() - worker.listener_thread.join() - worker.printer_thread.join() - if worker.logger_thread is not None: + if hasattr(worker, "listener_thread"): + worker.listener_thread.join() + if hasattr(worker, "printer_thread"): + worker.printer_thread.join() + if hasattr(worker, "logger_thread"): worker.logger_thread.join() + worker.threads_stopped.clear() worker.connected = False worker.cached_functions_to_run = [] From 76ca0e73c78a02fefc508672232e8e4b2fa9dc33 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 19:20:31 -0800 Subject: [PATCH 12/23] Remove unnecessary try/excepts. --- python/ray/import_thread.py | 45 ++++++++++++++++--------------------- python/ray/profiling.py | 28 +++++++++-------------- 2 files changed, 29 insertions(+), 44 deletions(-) diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index ced604fc5c92..32b309f57e67 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -5,8 +5,6 @@ import threading import traceback -import redis - import ray from ray import ray_constants from ray import cloudpickle as pickle @@ -63,31 +61,26 @@ def _run(self): num_imported += 1 self._process_key(key) - try: - while True: - # Exit if we received a signal that we should stop. - if self.threads_stopped.is_set(): - return - - msg = import_pubsub_client.get_message() - if msg is None: - self.threads_stopped.wait(timeout=0.01) - continue + while True: + # Exit if we received a signal that we should stop. + if self.threads_stopped.is_set(): + return - with self.worker.lock: - if msg["type"] == "subscribe": - continue - assert msg["data"] == b"rpush" - num_imports = self.redis_client.llen("Exports") - assert num_imports >= num_imported - for i in range(num_imported, num_imports): - num_imported += 1 - key = self.redis_client.lindex("Exports", i) - self._process_key(key) - except redis.ConnectionError: - # When Redis terminates the listen call will throw a - # ConnectionError, which we catch here. - pass + msg = import_pubsub_client.get_message() + if msg is None: + self.threads_stopped.wait(timeout=0.01) + continue + + with self.worker.lock: + if msg["type"] == "subscribe": + continue + assert msg["data"] == b"rpush" + num_imports = self.redis_client.llen("Exports") + assert num_imports >= num_imported + for i in range(num_imported, num_imports): + num_imported += 1 + key = self.redis_client.lindex("Exports", i) + self._process_key(key) def _process_key(self, key): """Process the given export key from redis.""" diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 8871b8710c20..45ff78aebb64 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -99,24 +99,16 @@ def _periodically_flush_profile_events(self): # the local scheduler client. This should be ok because it doesn't read # from the local scheduler client and we have the GIL here. However, # if either of those things changes, then we could run into issues. - try: - while True: - # Sleep for 1 second. This will be interrupted if - # self.threads_stopped is set. - self.threads_stopped.wait(timeout=1) - - # Exit if we received a signal that we should stop. - if self.threads_stopped.is_set(): - return - - self.flush_profile_data() - except AttributeError: - # TODO(suquark): It is a bad idea to ignore "AttributeError". - # It has caused some very unexpected behaviors when implementing - # new features (related to AttributeError). - - # This is to suppress errors that occur at shutdown. - pass + while True: + # Sleep for 1 second. This will be interrupted if + # self.threads_stopped is set. + self.threads_stopped.wait(timeout=1) + + # Exit if we received a signal that we should stop. + if self.threads_stopped.is_set(): + return + + self.flush_profile_data() def flush_profile_data(self): """Push the logged profiling data to the global control store. From f8278d2ccb5a80fcc83121be6499c2e6c13cf490 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 22:43:05 -0800 Subject: [PATCH 13/23] Fix --- python/ray/profiling.py | 10 +--------- python/ray/worker.py | 10 +++++++--- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 45ff78aebb64..c5bd4bdee1e5 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -111,15 +111,7 @@ def _periodically_flush_profile_events(self): self.flush_profile_data() def flush_profile_data(self): - """Push the logged profiling data to the global control store. - - By default, profiling information for a given task won't appear in the - timeline until after the task has completed. For very long-running - tasks, we may want profiling information to appear more quickly. - In such cases, this function can be called. Note that as an - alternative, we could start a thread in the background on workers that - calls this automatically. - """ + """Push the logged profiling data to the global control store.""" with self.lock: events = self.events self.events = [] diff --git a/python/ray/worker.py b/python/ray/worker.py index e237cc560b33..532b71d1836c 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -2005,10 +2005,14 @@ def disconnect(worker=global_worker): # the remote functions will be exported. This is mostly relevant for the # tests. if worker.connected: - # Shutdown all of the threads that we've started. + # Shutdown all of the threads that we've started. TODO(rkn): This + # should be handled cleanly in the worker object's destructor and not + # in this disconnect method. worker.threads_stopped.set() - worker.import_thread.join_import_thread() - worker.profiler.join_flush_thread() + if hasattr(worker, "import_thread"): + worker.import_thread.join_import_thread() + if hasattr(worker, "profiler") and hasattr(worker.profiler, "t"): + worker.profiler.join_flush_thread() if hasattr(worker, "listener_thread"): worker.listener_thread.join() if hasattr(worker, "printer_thread"): From 35f5b81e5820c074a39433550bc4961c4dfb232b Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 4 Feb 2019 23:44:53 -0800 Subject: [PATCH 14/23] Only add new logging handler once. --- python/ray/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/utils.py b/python/ray/utils.py index 9ab6bd7f558c..a7da864e3c9d 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -276,9 +276,10 @@ def setup_logger(logging_level, logging_format): logging_level = logging.getLevelName(logging_level.upper()) logger.setLevel(logging_level) global _default_handler - _default_handler = logging.StreamHandler() + if _default_handler is None: + _default_handler = logging.StreamHandler() + logger.addHandler(_default_handler) _default_handler.setFormatter(logging.Formatter(logging_format)) - logger.addHandler(_default_handler) logger.propagate = False From dc02bbe300168380a5f01b603fa11b618d5ad165 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 5 Feb 2019 00:55:37 -0800 Subject: [PATCH 15/23] Increase timeout. --- python/ray/test/cluster_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index 503087abb1c4..674901804304 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -120,7 +120,7 @@ def remove_node(self, node): assert not node.any_processes_alive(), ( "There are zombie processes left over after killing.") - def _wait_for_node(self, node, timeout=30): + def _wait_for_node(self, node, timeout=120): """Wait until this node has appeared in the client table. Args: @@ -148,7 +148,7 @@ def _wait_for_node(self, node, timeout=30): time.sleep(0.1) raise Exception("Timed out while waiting for nodes to join.") - def wait_for_nodes(self, timeout=30): + def wait_for_nodes(self, timeout=120): """Waits for correct number of nodes to be registered. This will wait until the number of live nodes in the client table From d1ed2becaab46baa956cdb8c064fee614f05d4f1 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 5 Feb 2019 10:24:09 -0800 Subject: [PATCH 16/23] Fix tempfile test. --- test/tempfile_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/tempfile_test.py b/test/tempfile_test.py index 17d710403dc6..465738c5d499 100644 --- a/test/tempfile_test.py +++ b/test/tempfile_test.py @@ -73,8 +73,9 @@ def test_raylet_tempfiles(): "log_monitor.out", "log_monitor.err", "plasma_store.out", "plasma_store.err", "webui.out", "webui.err", "monitor.out", "monitor.err", "raylet_monitor.out", "raylet_monitor.err", - "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err" - } # without raylet logs + "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err", + "raylet.out", "raylet.err" + } # with raylet logs socket_files = set(os.listdir(tempfile_services.get_sockets_dir_path())) assert socket_files == {"plasma_store", "raylet"} ray.shutdown() From 26021ae710e32f223c3d02e95a089f26e7016fcb Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 5 Feb 2019 16:13:44 -0800 Subject: [PATCH 17/23] Fix logging in cluster_utils. --- python/ray/parameter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 5b4a0cb488c5..e1eb82216de9 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -93,7 +93,7 @@ def __init__(self, num_workers=None, local_mode=False, driver_mode=None, - redirect_worker_output=False, + redirect_worker_output=True, redirect_output=True, num_redis_shards=None, redis_max_clients=None, From 57518d24a1dfb75dff1e3191aaa6e7270d10cb57 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 5 Feb 2019 16:13:51 -0800 Subject: [PATCH 18/23] Revert "Increase timeout." This reverts commit b3846b89040bcd8e583b2e18cb513cb040e71d95. --- python/ray/test/cluster_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index 674901804304..503087abb1c4 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -120,7 +120,7 @@ def remove_node(self, node): assert not node.any_processes_alive(), ( "There are zombie processes left over after killing.") - def _wait_for_node(self, node, timeout=120): + def _wait_for_node(self, node, timeout=30): """Wait until this node has appeared in the client table. Args: @@ -148,7 +148,7 @@ def _wait_for_node(self, node, timeout=120): time.sleep(0.1) raise Exception("Timed out while waiting for nodes to join.") - def wait_for_nodes(self, timeout=120): + def wait_for_nodes(self, timeout=30): """Waits for correct number of nodes to be registered. This will wait until the number of live nodes in the client table From dfbbfb6cd9def921725220fe757ed6034ee72b21 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 5 Feb 2019 17:28:40 -0800 Subject: [PATCH 19/23] Retry longer when connecting to plasma store from node manager and object manager. --- src/ray/object_manager/object_store_notification_manager.cc | 2 +- src/ray/raylet/node_manager.cc | 5 +++++ src/ray/raylet/raylet.cc | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index efa0c211d51d..aa19787f3c37 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -20,7 +20,7 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager( num_adds_processed_(0), num_removes_processed_(0), socket_(io_service) { - RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str())); + RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str(), "", 0, 300)); RAY_ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_)); boost::system::error_code ec; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d05f28dd0e80..2712757a24b1 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -91,6 +91,11 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, })); RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); +<<<<<<< HEAD +======= + + RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str(), "", 0, 300)); +>>>>>>> Retry longer when connecting to plasma store from node manager and object manager. } ray::Status NodeManager::RegisterGcs() { diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index ea4bd3fec6a2..a600aa166a4d 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -58,7 +58,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ node_manager_config.node_manager_port)), node_manager_socket_(main_service) { RAY_ARROW_CHECK_OK( - store_client_.Connect(node_manager_config.store_socket_name.c_str())); + store_client_.Connect(node_manager_config.store_socket_name.c_str(), "", 0, 300)); // Start listening for clients. DoAccept(); DoAcceptObjectManager(); From 1f31342ba86b54d5ee9c389a4d6abb051321b5f0 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 5 Feb 2019 21:56:05 -0800 Subject: [PATCH 20/23] Close pubsub channels to avoid leaking file descriptors. --- python/ray/experimental/state.py | 4 ++ python/ray/import_thread.py | 52 +++++++------- python/ray/monitor.py | 5 ++ python/ray/worker.py | 118 +++++++++++++++++-------------- 4 files changed, 100 insertions(+), 79 deletions(-) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 699a55030b58..2d1e4bee0ea5 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -841,6 +841,10 @@ def available_resources(self): for resource_id, num_available in available_resources.items(): total_available_resources[resource_id] += num_available + # Close the pubsub clients to avoid leaking file descriptors. + for subscribe_client in subscribe_clients: + subscribe_client.close() + return dict(total_available_resources) def _error_messages(self, job_id): diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index 32b309f57e67..ba561919ed4a 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -54,34 +54,38 @@ def _run(self): # Keep track of the number of imports that we've imported. num_imported = 0 - # Get the exports that occurred before the call to subscribe. - with self.worker.lock: - export_keys = self.redis_client.lrange("Exports", 0, -1) - for key in export_keys: - num_imported += 1 - self._process_key(key) - - while True: - # Exit if we received a signal that we should stop. - if self.threads_stopped.is_set(): - return - - msg = import_pubsub_client.get_message() - if msg is None: - self.threads_stopped.wait(timeout=0.01) - continue - + try: + # Get the exports that occurred before the call to subscribe. with self.worker.lock: - if msg["type"] == "subscribe": - continue - assert msg["data"] == b"rpush" - num_imports = self.redis_client.llen("Exports") - assert num_imports >= num_imported - for i in range(num_imported, num_imports): + export_keys = self.redis_client.lrange("Exports", 0, -1) + for key in export_keys: num_imported += 1 - key = self.redis_client.lindex("Exports", i) self._process_key(key) + while True: + # Exit if we received a signal that we should stop. + if self.threads_stopped.is_set(): + return + + msg = import_pubsub_client.get_message() + if msg is None: + self.threads_stopped.wait(timeout=0.01) + continue + + with self.worker.lock: + if msg["type"] == "subscribe": + continue + assert msg["data"] == b"rpush" + num_imports = self.redis_client.llen("Exports") + assert num_imports >= num_imported + for i in range(num_imported, num_imports): + num_imported += 1 + key = self.redis_client.lindex("Exports", i) + self._process_key(key) + finally: + # Close the pubsub client to avoid leaking file descriptors. + import_pubsub_client.close() + def _process_key(self, key): """Process the given export key from redis.""" # Handle the driver case first. diff --git a/python/ray/monitor.py b/python/ray/monitor.py index c0e2ff2ac7f7..da2c52e3d5e4 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -86,6 +86,11 @@ def __init__(self, redis_address, autoscaling_config, redis_password=None): str(e))) self.issue_gcs_flushes = False + def __del__(self): + """Destruct the monitor object.""" + # We close the pubsub client to avoid leaking file descriptors. + self.primary_subscribe_client.close() + def subscribe(self, channel): """Subscribe to the given channel on the primary Redis shard. diff --git a/python/ray/worker.py b/python/ray/worker.py index 532b71d1836c..9589e78d4676 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1577,30 +1577,34 @@ def print_logs(redis_client, threads_stopped): """ pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True) pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL) - # Keep track of the number of consecutive log messages that have been - # received with no break in between. If this number grows continually, then - # the worker is probably not able to process the log messages as rapidly as - # they are coming in. - num_consecutive_messages_received = 0 - while True: - # Exit if we received a signal that we should stop. - if threads_stopped.is_set(): - return - - msg = pubsub_client.get_message() - if msg is None: - num_consecutive_messages_received = 0 - threads_stopped.wait(timeout=0.01) - continue - num_consecutive_messages_received += 1 - print(ray.utils.decode(msg["data"]), file=sys.stderr) + try: + # Keep track of the number of consecutive log messages that have been + # received with no break in between. If this number grows continually, + # then the worker is probably not able to process the log messages as + # rapidly as they are coming in. + num_consecutive_messages_received = 0 + while True: + # Exit if we received a signal that we should stop. + if threads_stopped.is_set(): + return - if (num_consecutive_messages_received % 100 == 0 - and num_consecutive_messages_received > 0): - logger.warning( - "The driver may not be able to keep up with the stdout/stderr " - "of the workers. To avoid forwarding logs to the driver, use " - "'ray.init(log_to_driver=False)'.") + msg = pubsub_client.get_message() + if msg is None: + num_consecutive_messages_received = 0 + threads_stopped.wait(timeout=0.01) + continue + num_consecutive_messages_received += 1 + print(ray.utils.decode(msg["data"]), file=sys.stderr) + + if (num_consecutive_messages_received % 100 == 0 + and num_consecutive_messages_received > 0): + logger.warning( + "The driver may not be able to keep up with the " + "stdout/stderr of the workers. To avoid forwarding logs " + "to the driver, use 'ray.init(log_to_driver=False)'.") + finally: + # Close the pubsub client to avoid leaking file descriptors. + pubsub_client.close() def print_error_messages_raylet(task_error_queue, threads_stopped): @@ -1662,40 +1666,44 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped): worker.error_message_pubsub_client.subscribe(error_pubsub_channel) # worker.error_message_pubsub_client.psubscribe("*") - # Get the exports that occurred before the call to subscribe. - with worker.lock: - error_messages = global_state.error_messages(worker.task_driver_id) - for error_message in error_messages: - logger.error(error_message) - - while True: - # Exit if we received a signal that we should stop. - if threads_stopped.is_set(): - return + try: + # Get the exports that occurred before the call to subscribe. + with worker.lock: + error_messages = global_state.error_messages(worker.task_driver_id) + for error_message in error_messages: + logger.error(error_message) - msg = worker.error_message_pubsub_client.get_message() - if msg is None: - threads_stopped.wait(timeout=0.01) - continue - gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( - msg["data"], 0) - assert gcs_entry.EntriesLength() == 1 - error_data = ray.gcs_utils.ErrorTableData.GetRootAsErrorTableData( - gcs_entry.Entries(0), 0) - job_id = error_data.JobId() - if job_id not in [ - worker.task_driver_id.binary(), - DriverID.nil().binary() - ]: - continue + while True: + # Exit if we received a signal that we should stop. + if threads_stopped.is_set(): + return - error_message = ray.utils.decode(error_data.ErrorMessage()) - if (ray.utils.decode( - error_data.Type()) == ray_constants.TASK_PUSH_ERROR): - # Delay it a bit to see if we can suppress it - task_error_queue.put((error_message, time.time())) - else: - logger.error(error_message) + msg = worker.error_message_pubsub_client.get_message() + if msg is None: + threads_stopped.wait(timeout=0.01) + continue + gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( + msg["data"], 0) + assert gcs_entry.EntriesLength() == 1 + error_data = ray.gcs_utils.ErrorTableData.GetRootAsErrorTableData( + gcs_entry.Entries(0), 0) + job_id = error_data.JobId() + if job_id not in [ + worker.task_driver_id.binary(), + DriverID.nil().binary() + ]: + continue + + error_message = ray.utils.decode(error_data.ErrorMessage()) + if (ray.utils.decode( + error_data.Type()) == ray_constants.TASK_PUSH_ERROR): + # Delay it a bit to see if we can suppress it + task_error_queue.put((error_message, time.time())) + else: + logger.error(error_message) + finally: + # Close the pubsub client to avoid leaking file descriptors. + worker.error_message_pubsub_client.close() def is_initialized(): From 036165fdbbff2da3fde8fbc69c85fc8ab0acc3f0 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 6 Feb 2019 14:56:38 -0800 Subject: [PATCH 21/23] Limit log monitor open files to 200. --- python/ray/log_monitor.py | 20 +++++++------------- python/ray/ray_constants.py | 2 ++ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index 4a8a84bfebd0..33e0b320f934 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -88,14 +88,7 @@ def close_all_files(self): def update_log_filenames(self): """Update the list of log files to monitor.""" - try: - log_filenames = os.listdir(self.logs_dir) - except (IOError, OSError) as e: - if e.errno == errno.EMFILE: - self.close_all_files() - log_filenames = os.listdir(self.logs_dir) - else: - raise + log_filenames = os.listdir(self.logs_dir) for log_filename in log_filenames: full_path = os.path.join(self.logs_dir, log_filename) @@ -121,6 +114,11 @@ def open_closed_files(self): files_with_no_updates = [] while len(self.closed_file_infos) > 0: + if (len(self.open_file_infos) >= + ray_constants.LOG_MONITOR_MAX_OPEN_FILES): + self.can_open_more_files = False + break + file_info = self.closed_file_infos.pop(0) assert file_info.file_handle is None # Get the file size to see if it has gotten bigger since we last @@ -142,11 +140,7 @@ def open_closed_files(self): try: f = open(file_info.filename, "r") except (IOError, OSError) as e: - if e.errno == errno.EMFILE: - self.can_open_more_files = False - self.closed_file_infos.insert(0, file_info) - break - elif e.errno == errno.ENOENT: + if e.errno == errno.ENOENT: logger.warning("Warning: The file {} was not " "found.".format(file_info.filename)) self.log_filenames.remove(file_info.filename) diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 1a85ef64e050..98a43fb0a2d7 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -101,3 +101,5 @@ def env_integer(key, default): PROCESS_TYPE_PLASMA_STORE = "plasma_store" PROCESS_TYPE_REDIS_SERVER = "redis_server" PROCESS_TYPE_WEB_UI = "web_ui" + +LOG_MONITOR_MAX_OPEN_FILES = 200 From 6fa04748461f806bca0492af82e1981a2936adaa Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 7 Feb 2019 13:02:10 -0800 Subject: [PATCH 22/23] Increase plasma connect retries. --- python/ray/worker.py | 2 +- src/ray/object_manager/object_buffer_pool.cc | 2 +- src/ray/raylet/node_manager.cc | 5 ----- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 9589e78d4676..b5a88c84a007 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1874,7 +1874,7 @@ def connect(info, # Create an object store client. worker.plasma_client = thread_safe_client( - plasma.connect(info["store_socket_name"])) + plasma.connect(info["store_socket_name"], None, 0, 300)) raylet_socket = info["raylet_socket_name"] diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 11ae442393f1..fe0471797c0d 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -9,7 +9,7 @@ ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name, uint64_t chunk_size) : default_chunk_size_(chunk_size) { store_socket_name_ = store_socket_name; - RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str())); + RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", 0, 300)); } ObjectBufferPool::~ObjectBufferPool() { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2712757a24b1..d05f28dd0e80 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -91,11 +91,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, })); RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); -<<<<<<< HEAD -======= - - RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str(), "", 0, 300)); ->>>>>>> Retry longer when connecting to plasma store from node manager and object manager. } ray::Status NodeManager::RegisterGcs() { From f5a348b9f740db7f70adeef68651eaab94a82737 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 7 Feb 2019 13:46:55 -0800 Subject: [PATCH 23/23] Add comment. --- python/ray/worker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/worker.py b/python/ray/worker.py index b5a88c84a007..fb3ad9025c38 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1789,6 +1789,9 @@ def connect(info, # Create a Redis client. redis_ip_address, redis_port = info["redis_address"].split(":") + # The Redis client can safely be shared between threads. However, that is + # not true of Redis pubsub clients. See the documentation at + # https://github.com/andymccurdy/redis-py#thread-safety. worker.redis_client = redis.StrictRedis( host=redis_ip_address, port=int(redis_port), password=redis_password)