Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e49389e
Stream logs to driver by default.
robertnishihara Jan 29, 2019
dfb25c0
Fix from rebase
robertnishihara Feb 4, 2019
92ad0aa
Redirect raylet output independently of worker output.
robertnishihara Feb 4, 2019
08a8c51
Fix.
robertnishihara Feb 4, 2019
be8e306
Create redis client with services.create_redis_client.
robertnishihara Feb 4, 2019
841d0a6
Suppress Redis connection error at exit.
robertnishihara Feb 4, 2019
8b99bf0
Remove thread_safe_client from redis.
robertnishihara Feb 4, 2019
4ccc8a2
Shutdown driver threads in ray.shutdown().
robertnishihara Feb 4, 2019
5a1a64d
Add warning for too many log messages.
robertnishihara Feb 5, 2019
a2e8c87
Only stop threads if worker is connected.
robertnishihara Feb 5, 2019
8535645
Only stop threads if they exist.
robertnishihara Feb 5, 2019
76ca0e7
Remove unnecessary try/excepts.
robertnishihara Feb 5, 2019
f8278d2
Fix
robertnishihara Feb 5, 2019
35f5b81
Only add new logging handler once.
robertnishihara Feb 5, 2019
dc02bbe
Increase timeout.
robertnishihara Feb 5, 2019
d1ed2be
Fix tempfile test.
robertnishihara Feb 5, 2019
26021ae
Fix logging in cluster_utils.
robertnishihara Feb 6, 2019
57518d2
Revert "Increase timeout."
robertnishihara Feb 6, 2019
dfbbfb6
Retry longer when connecting to plasma store from node manager and ob…
robertnishihara Feb 6, 2019
1f31342
Close pubsub channels to avoid leaking file descriptors.
robertnishihara Feb 6, 2019
036165f
Limit log monitor open files to 200.
robertnishihara Feb 6, 2019
6fa0474
Increase plasma connect retries.
robertnishihara Feb 7, 2019
f5a348b
Add comment.
robertnishihara Feb 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 4 additions & 28 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,34 +401,6 @@ def client_table(self):

return parse_client_table(self.redis_client)

def log_files(self):
"""Fetch and return a dictionary of log file names to outputs.

Returns:
IP address to log file name to log file contents mappings.
"""
relevant_files = self.redis_client.keys("LOGFILE*")

ip_filename_file = {}

for filename in relevant_files:
filename = decode(filename)
filename_components = filename.split(":")
ip_addr = filename_components[1]

file = self.redis_client.lrange(filename, 0, -1)
file_str = []
for x in file:
y = decode(x)
file_str.append(y)

if ip_addr not in ip_filename_file:
ip_filename_file[ip_addr] = {}

ip_filename_file[ip_addr][filename] = file_str

return ip_filename_file

def _profile_table(self, batch_id):
"""Get the profile events for a given batch of profile events.

Expand Down Expand Up @@ -869,6 +841,10 @@ def available_resources(self):
for resource_id, num_available in available_resources.items():
total_available_resources[resource_id] += num_available

# Close the pubsub clients to avoid leaking file descriptors.
for subscribe_client in subscribe_clients:
subscribe_client.close()

return dict(total_available_resources)

def _error_messages(self, job_id):
Expand Down
1 change: 1 addition & 0 deletions python/ray/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
]

FUNCTION_PREFIX = "RemoteFunction:"
LOG_FILE_CHANNEL = "RAY_LOG_CHANNEL"

# xray heartbeats
XRAY_HEARTBEAT_CHANNEL = str(TablePubsub.HEARTBEAT).encode("ascii")
Expand Down
55 changes: 34 additions & 21 deletions python/ray/import_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import threading
import traceback

import redis

import ray
from ray import ray_constants
from ray import cloudpickle as pickle
Expand All @@ -17,29 +15,35 @@
class ImportThread(object):
"""A thread used to import exports from the driver or other workers.

Note:
The driver also has an import thread, which is used only to
import custom class definitions from calls to register_custom_serializer
that happen under the hood on workers.
Note: The driver also has an import thread, which is used only to import
custom class definitions from calls to register_custom_serializer that
happen under the hood on workers.

Attributes:
worker: the worker object in this process.
mode: worker mode
redis_client: the redis client used to query exports.
threads_stopped (threading.Event): A threading event used to signal to
the thread that it should exit.
"""

def __init__(self, worker, mode):
def __init__(self, worker, mode, threads_stopped):
self.worker = worker
self.mode = mode
self.redis_client = worker.redis_client
self.threads_stopped = threads_stopped

def start(self):
"""Start the import thread."""
t = threading.Thread(target=self._run, name="ray_import_thread")
self.t = threading.Thread(target=self._run, name="ray_import_thread")
# Making the thread a daemon causes it to exit
# when the main thread exits.
t.daemon = True
t.start()
self.t.daemon = True
self.t.start()

def join_import_thread(self):
"""Wait for the thread to exit."""
self.t.join()

def _run(self):
import_pubsub_client = self.redis_client.pubsub()
Expand All @@ -50,14 +54,24 @@ def _run(self):
# Keep track of the number of imports that we've imported.
num_imported = 0

# Get the exports that occurred before the call to subscribe.
with self.worker.lock:
export_keys = self.redis_client.lrange("Exports", 0, -1)
for key in export_keys:
num_imported += 1
self._process_key(key)
try:
for msg in import_pubsub_client.listen():
# Get the exports that occurred before the call to subscribe.
with self.worker.lock:
export_keys = self.redis_client.lrange("Exports", 0, -1)
for key in export_keys:
num_imported += 1
self._process_key(key)

while True:
# Exit if we received a signal that we should stop.
if self.threads_stopped.is_set():
return

msg = import_pubsub_client.get_message()
if msg is None:
self.threads_stopped.wait(timeout=0.01)
continue

with self.worker.lock:
if msg["type"] == "subscribe":
continue
Expand All @@ -68,10 +82,9 @@ def _run(self):
num_imported += 1
key = self.redis_client.lindex("Exports", i)
self._process_key(key)
except redis.ConnectionError:
# When Redis terminates the listen call will throw a
# ConnectionError, which we catch here.
pass
finally:
# Close the pubsub client to avoid leaking file descriptors.
import_pubsub_client.close()

def _process_key(self, key):
"""Process the given export key from redis."""
Expand Down
Loading