diff --git a/.travis.yml b/.travis.yml index e4c3d0d24f04..9bf0c215c3db 100644 --- a/.travis.yml +++ b/.travis.yml @@ -69,3 +69,8 @@ script: - python src/common/test/test.py - python src/plasma/test/test.py - python src/photon/test/test.py + + - python test/runtest.py + - python test/array_test.py + - python test/failure_test.py + - python test/microbenchmarks.py diff --git a/CMakeLists.txt b/CMakeLists.txt deleted file mode 100644 index 5ffc42cb133e..000000000000 --- a/CMakeLists.txt +++ /dev/null @@ -1,147 +0,0 @@ -cmake_minimum_required(VERSION 2.8) - -project(ray) - -set(THIRDPARTY_DIR "${CMAKE_SOURCE_DIR}/thirdparty") - -list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake/Modules) - -set(CMAKE_PREFIX_PATH "${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/" ${CMAKE_PREFIX_PATH}) - -if(NOT APPLE) - find_package(PythonInterp REQUIRED) - find_package(PythonLibs REQUIRED) - set(CUSTOM_PYTHON_EXECUTABLE ${PYTHON_EXECUTABLE}) -else() - find_program(CUSTOM_PYTHON_EXECUTABLE python) - message("-- Found Python program: ${CUSTOM_PYTHON_EXECUTABLE}") - execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c - "import sys; print 'python' + sys.version[0:3]" - OUTPUT_VARIABLE PYTHON_LIBRARY_NAME OUTPUT_STRIP_TRAILING_WHITESPACE) - execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c - "import sys; print sys.exec_prefix" - OUTPUT_VARIABLE PYTHON_PREFIX OUTPUT_STRIP_TRAILING_WHITESPACE) - FIND_LIBRARY(PYTHON_LIBRARIES - NAMES ${PYTHON_LIBRARY_NAME} - HINTS "${PYTHON_PREFIX}" - PATH_SUFFIXES "lib" "libs" - NO_DEFAULT_PATH) - execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c - "from distutils.sysconfig import *; print get_python_inc()" - OUTPUT_VARIABLE PYTHON_INCLUDE_DIRS OUTPUT_STRIP_TRAILING_WHITESPACE) - if(PYTHON_LIBRARIES AND PYTHON_INCLUDE_DIRS) - SET(PYTHONLIBS_FOUND TRUE) - message("-- Found PythonLibs: " ${PYTHON_LIBRARIES}) - message("-- -- Used custom search path") - else() - find_package(PythonLibs REQUIRED) - message("-- -- Used find_package(PythonLibs)") - endif() -endif() - -find_package(NumPy REQUIRED) -find_package(Boost REQUIRED) - -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") - -include_directories("${CMAKE_SOURCE_DIR}/include") -include_directories("${CMAKE_SOURCE_DIR}/thirdparty/grpc/include/") -include_directories("${CMAKE_SOURCE_DIR}/thirdparty/grpc/third_party/protobuf/src") -include_directories("${PYTHON_INCLUDE_DIRS}") -include_directories("${NUMPY_INCLUDE_DIR}") -include_directories("/usr/local/include") -include_directories("${Boost_INCLUDE_DIRS}") - -set(PROTO_PATH "${CMAKE_SOURCE_DIR}/protos") - -set(GRAPH_PROTO "${PROTO_PATH}/graph.proto") -set(RAY_PROTO "${PROTO_PATH}/ray.proto") -set(TYPES_PROTO "${PROTO_PATH}/types.proto") -set(GENERATED_PROTOBUF_PATH "${CMAKE_BINARY_DIR}/generated") -file(MAKE_DIRECTORY ${GENERATED_PROTOBUF_PATH}) - -set(GRAPH_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/graph.pb.cc") -set(GRAPH_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/graph.pb.h") - -set(RAY_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/ray.pb.cc") -set(RAY_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/ray.pb.h") -set(RAY_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/ray.grpc.pb.cc") -set(RAY_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/ray.grpc.pb.h") - -set(TYPES_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.cc") -set(TYPES_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.h") - -add_custom_command( - OUTPUT "${GRAPH_PB_H_FILE}" - "${GRAPH_PB_CPP_FILE}" - COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc - ARGS "--proto_path=${PROTO_PATH}" - "--cpp_out=${GENERATED_PROTOBUF_PATH}" - "${GRAPH_PROTO}" - ) - -add_custom_command( - OUTPUT "${RAY_PB_H_FILE}" - "${RAY_PB_CPP_FILE}" - "${RAY_GRPC_PB_H_FILE}" - "${RAY_GRPC_PB_CPP_FILE}" - COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc - ARGS "--proto_path=${PROTO_PATH}" - "--cpp_out=${GENERATED_PROTOBUF_PATH}" - "${RAY_PROTO}" - COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc - ARGS "--proto_path=${PROTO_PATH}" - "--grpc_out=${GENERATED_PROTOBUF_PATH}" - "--plugin=protoc-gen-grpc=${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/grpc_cpp_plugin" - "${RAY_PROTO}" - ) - -add_custom_command( - OUTPUT "${TYPES_PB_H_FILE}" - "${TYPES_PB_CPP_FILE}" - COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc - ARGS "--proto_path=${PROTO_PATH}" - "--cpp_out=${GENERATED_PROTOBUF_PATH}" - "${TYPES_PROTO}" - ) - -set(GENERATED_PROTOBUF_FILES - ${GRAPH_PB_H_FILE} ${GRAPH_PB_CPP_FILE} - ${RAY_PB_H_FILE} ${RAY_PB_CPP_FILE} - ${RAY_GRPC_PB_H_FILE} ${RAY_GRPC_PB_CPP_FILE} - ${TYPES_PB_H_FILE} ${TYPES_PB_CPP_FILE}) - -include_directories(${GENERATED_PROTOBUF_PATH}) - -link_libraries(${CMAKE_SOURCE_DIR}/thirdparty/grpc/libs/opt/libgrpc++_unsecure.a - ${CMAKE_SOURCE_DIR}/thirdparty/grpc/libs/opt/libgrpc++.a - ${CMAKE_SOURCE_DIR}/thirdparty/grpc/libs/opt/libgrpc.a - ${CMAKE_SOURCE_DIR}/thirdparty/grpc/libs/opt/protobuf/libprotobuf.a - ${CMAKE_SOURCE_DIR}/thirdparty/hiredis/libhiredis.a - pthread) - -if(UNIX AND NOT APPLE) - link_libraries(rt) -endif() - -if(APPLE) - SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") -endif(APPLE) - -set(ARROW_LIB ${CMAKE_SOURCE_DIR}/thirdparty/arrow-old/cpp/build/release/libarrow.a) - -add_definitions(-fPIC) - -add_executable(objstore src/objstore.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) -add_executable(scheduler src/scheduler.cc src/computation_graph.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) -add_library(raylib SHARED src/raylib.cc src/worker.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(raylib ${PYTHON_LIBRARIES}) - -get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME) -if(APPLE) - add_custom_command(TARGET raylib - POST_BUILD COMMAND - ${CMAKE_INSTALL_NAME_TOOL} -change ${PYTHON_SHARED_LIBRARY} ${PYTHON_LIBRARIES} libraylib.so) -endif(APPLE) - -install(TARGETS objstore scheduler raylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/python/ray) diff --git a/data/README.md b/data/README.md deleted file mode 100644 index 9f4765b5a220..000000000000 --- a/data/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# Data for Ray - -This folder contains data neccessary to run tests, etc. Only very small amounts -of data should be stored here and if a loader for a large dataset is tested, a -miniature version of this dataset should be created. diff --git a/data/mini.tar b/data/mini.tar deleted file mode 100644 index 73098dfe53e0..000000000000 Binary files a/data/mini.tar and /dev/null differ diff --git a/install-dependencies.sh b/install-dependencies.sh index d944caaf89c5..8c283e3efc41 100755 --- a/install-dependencies.sh +++ b/install-dependencies.sh @@ -31,14 +31,14 @@ if [[ $platform == "linux" ]]; then # These commands must be kept in sync with the installation instructions. sudo apt-get update sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip graphviz - sudo pip install ipython funcsigs subprocess32 protobuf colorama graphviz + sudo pip install ipython funcsigs subprocess32 protobuf colorama graphviz redis sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. elif [[ $platform == "macosx" ]]; then # These commands must be kept in sync with the installation instructions. brew install git cmake automake autoconf libtool boost graphviz sudo easy_install pip sudo pip install ipython --user - sudo pip install numpy funcsigs subprocess32 protobuf colorama graphviz --ignore-installed six + sudo pip install numpy funcsigs subprocess32 protobuf colorama graphviz redis --ignore-installed six sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. fi diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 330705ad8f97..b536dc846dfa 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -11,8 +11,6 @@ import config import serialization -from worker import scheduler_info, register_class, visualize_computation_graph, task_info, init, connect, disconnect, get, put, wait, remote, kill_workers, restart_workers_local +from worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote from worker import Reusable, reusables -from libraylib import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE -from libraylib import ObjectID -import internal +from worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE diff --git a/lib/python/ray/default_worker.py b/lib/python/ray/default_worker.py new file mode 100644 index 000000000000..afff118bcc33 --- /dev/null +++ b/lib/python/ray/default_worker.py @@ -0,0 +1,25 @@ +from __future__ import print_function + +import sys +import argparse +import numpy as np + +import ray + +parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") +parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node") +parser.add_argument("--redis-port", required=True, type=int, help="the port to use for Redis") +parser.add_argument("--object-store-name", type=str, help="the object store's name") +parser.add_argument("--object-store-manager-name", type=str, help="the object store manager's name") +parser.add_argument("--local-scheduler-name", type=str, help="the local scheduler's name") + +if __name__ == "__main__": + args = parser.parse_args() + address_info = {"node_ip_address": args.node_ip_address, + "redis_port": args.redis_port, + "object_store_name": args.object_store_name, + "object_store_manager_name": args.object_store_manager_name, + "local_scheduler_name": args.local_scheduler_name} + ray.worker.connect(address_info, ray.WORKER_MODE) + + ray.worker.main_loop() diff --git a/lib/python/ray/graph.py b/lib/python/ray/graph.py deleted file mode 100644 index e11a7824ebb6..000000000000 --- a/lib/python/ray/graph.py +++ /dev/null @@ -1,34 +0,0 @@ -# Utilities to deal with computation graphs - -import graphviz - -def graph_to_graphviz(computation_graph): - """ - Convert the computation graph to graphviz format. - - Args: - computation_graph [graph_pb2.CompGraph]: protocol buffer description of - the computation graph - - Returns: - Graphviz description of the computation graph - """ - dot = graphviz.Digraph(format="pdf") - dot.node("op-root", shape="box") - for (i, op) in enumerate(computation_graph.operation): - if op.HasField("task"): - dot.node("op" + str(i), shape="box", label=str(i) + "\n" + op.task.name.split(".")[-1]) - for res in op.task.result: - dot.edge("op" + str(i), str(res)) - elif op.HasField("put"): - dot.node("op" + str(i), shape="box", label=str(i) + "\n" + "put") - dot.edge("op" + str(i), str(op.put.objectid)) - elif op.HasField("get"): - dot.node("op" + str(i), shape="box", label=str(i) + "\n" + "get") - creator_operationid = op.creator_operationid if op.creator_operationid != 2 ** 64 - 1 else "-root" - dot.edge("op" + str(creator_operationid), "op" + str(i), style="dotted", constraint="false") - for arg in op.task.arg: - if len(arg.serialized_arg) == 0: - dot.node(str(arg.objectid)) - dot.edge(str(arg.objectid), "op" + str(i)) - return dot diff --git a/lib/python/ray/internal/__init__.py b/lib/python/ray/internal/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/lib/python/ray/serialization.py b/lib/python/ray/serialization.py index 1f531e674adb..dd6106d95f72 100644 --- a/lib/python/ray/serialization.py +++ b/lib/python/ray/serialization.py @@ -1,89 +1,9 @@ +from __future__ import print_function + import numpy as np import pickling -import libraylib as raylib import numbuf -def is_argument_serializable(value): - """Checks if value is a composition of primitive types. - - This will return True if the argument is one of the following: - - An int - - A float - - A bool - - None - - A list of length at most 100 whose elements are serializable - - A tuple of length at most 100 whose elements are serializable - - A dict of length at most 100 whose keys and values are serializable - - A string of length at most 100. - - A unicode string of length at most 100. - - Args: - value: A Python object. - - Returns: - True if the object can be serialized as a composition of primitive types and - False otherwise. - """ - t = type(value) - if t is int or t is float or t is long or t is bool or value is None: - return True - if t is list: - if len(value) <= 100: - for element in value: - if not is_argument_serializable(element): - return False - return True - else: - return False - if t is tuple: - if len(value) <= 100: - for element in value: - if not is_argument_serializable(element): - return False - return True - else: - return False - if t is dict: - if len(value) <= 100: - for k, v in value.iteritems(): - if not is_argument_serializable(k) or not is_argument_serializable(v): - return False - return True - else: - return False - if t is str: - return len(value) <= 100 - if t is unicode: - return len(value) <= 100 - return False - -def serialize_argument_if_possible(value): - """This method serializes arguments that are passed by value. - - The result will be deserialized by deserialize_argument. - - Returns: - None if value cannot be efficiently serialized or is too big, and otherwise - this returns the serialized value as a string. - """ - if not is_argument_serializable(value): - # The argument is not obviously serializable using __repr__, so we will not - # serialize it. - return None - serialized_value = value.__repr__() - if len(serialized_value) > 1000: - # The argument is too big, so we will not pass it by value. - return None - # Return the serialized argument. - return serialized_value - -def deserialize_argument(serialized_value): - """This method deserializes arguments that are passed by value. - - The argument will have been serialized by serialize_argument. - """ - return eval(serialized_value) - def check_serializable(cls): """Throws an exception if Ray cannot serialize this class efficiently. diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 95ed7927ecb6..75a9b7a6dd54 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -1,31 +1,29 @@ +from __future__ import print_function + import os import sys import time -import subprocess32 as subprocess +import subprocess import string import random # Ray modules import config -_services_env = os.environ.copy() -_services_env["PATH"] = os.pathsep.join([os.path.dirname(os.path.abspath(__file__)), _services_env["PATH"]]) -# Make GRPC only print error messages. -_services_env["GRPC_VERBOSITY"] = "ERROR" - # all_processes is a list of the scheduler, object store, and worker processes # that have been started by this services module if Ray is being used in local # mode. all_processes = [] -TIMEOUT_SECONDS = 5 - def address(host, port): return host + ":" + str(port) -def new_scheduler_port(): +def new_port(): return random.randint(10000, 65535) +def random_name(): + return str(random.randint(0, 99999999)) + def cleanup(): """When running in local mode, shutdown the Ray processes. @@ -36,7 +34,8 @@ def cleanup(): """ global all_processes successfully_shut_down = True - for p in all_processes: + # Terminate the processes in reverse order. + for p in all_processes[::-1]: if p.poll() is not None: # process has already terminated continue p.kill() @@ -49,146 +48,112 @@ def cleanup(): continue successfully_shut_down = False if successfully_shut_down: - print "Successfully shut down Ray." + print("Successfully shut down Ray.") else: - print "Ray did not shut down properly." + print("Ray did not shut down properly.") all_processes = [] -def start_scheduler(scheduler_address, cleanup): - """This method starts a scheduler process. +def start_redis(port): + redis_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../common/thirdparty/redis-3.2.3/src/redis-server") + p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning"]) + if cleanup: + all_processes.append(p) - Args: - scheduler_address (str): The ip address and port to use for the scheduler. - cleanup (bool): True if using Ray in local mode. If cleanup is true, then - this process will be killed by serices.cleanup() when the Python process - that imported services exits. - """ - scheduler_port = scheduler_address.split(":")[1] - p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler-" + scheduler_port + ".log")], env=_services_env) +def start_local_scheduler(redis_address, plasma_store_name): + local_scheduler_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../photon/build/photon_scheduler") + local_scheduler_name = "/tmp/scheduler{}".format(random_name()) + p = subprocess.Popen([local_scheduler_filepath, "-s", local_scheduler_name, "-r", redis_address, "-p", plasma_store_name]) if cleanup: all_processes.append(p) + return local_scheduler_name -def start_objstore(scheduler_address, node_ip_address, cleanup): +def start_objstore(node_ip_address, redis_address, cleanup): """This method starts an object store process. Args: - scheduler_address (str): The ip address and port of the scheduler to connect - to. node_ip_address (str): The ip address of the node running the object store. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. """ - random_string = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10)) - p = subprocess.Popen(["objstore", scheduler_address, node_ip_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", random_string]) + ".log")], env=_services_env) + plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_store") + store_name = "/tmp/ray_plasma_store{}".format(random_name()) + p1 = subprocess.Popen([plasma_store_executable, "-s", store_name]) + + plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_manager") + manager_name = "/tmp/ray_plasma_manager{}".format(random_name()) + manager_port = new_port() + p2 = subprocess.Popen([plasma_manager_executable, + "-s", store_name, + "-m", manager_name, + "-h", node_ip_address, + "-p", str(manager_port), + "-r", redis_address]) + if cleanup: - all_processes.append(p) + all_processes.append(p1) + all_processes.append(p2) -def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, cleanup=True): + return store_name, manager_name, manager_port + +def start_worker(address_info, worker_path, cleanup=True): """This method starts a worker process. Args: - node_ip_address (str): The IP address of the node that the worker runs on. + address_info (dict): This dictionary contains the node_ip_address, + redis_port, object_store_name, object_store_manager_name, and + local_scheduler_name. worker_path (str): The path of the source code which the worker process will run. - scheduler_address (str): The ip address and port of the scheduler to connect - to. - objstore_address (Optional[str]): The ip address and port of the object - store to connect to. - cleanup (Optional[bool]): True if using Ray in local mode. If cleanup is - true, then this process will be killed by serices.cleanup() when the - Python process that imported services exits. This is True by default. + cleanup (bool): True if using Ray in local mode. If cleanup is true, then + this process will be killed by services.cleanup() when the Python process + that imported services exits. This is True by default. """ command = ["python", worker_path, - "--node-ip-address=" + node_ip_address, - "--scheduler-address=" + scheduler_address] - if objstore_address is not None: - command.append("--objstore-address=" + objstore_address) + "--node-ip-address=" + address_info["node_ip_address"], + "--object-store-name=" + address_info["object_store_name"], + "--object-store-manager-name=" + address_info["object_store_manager_name"], + "--local-scheduler-name=" + address_info["local_scheduler_name"], + "--redis-port=" + str(address_info["redis_port"])] p = subprocess.Popen(command) if cleanup: all_processes.append(p) -def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None, cleanup=False): - """Start an object store and associated workers in the cluster setting. - - This starts an object store and the associated workers when Ray is being used - in the cluster setting. This assumes the scheduler has already been started. - - Args: - scheduler_address (str): IP address and port of the scheduler (which may run - on a different node). - node_ip_address (str): IP address (without port) of the node this function - is run on. - num_workers (int): The number of workers to be started on this node. - worker_path (str): Path of the Python worker script that will be run on the - worker. - cleanup (bool): If cleanup is True, then the processes started by this - command will be killed when the process that imported services exits. - """ - start_objstore(scheduler_address, node_ip_address, cleanup=cleanup) - time.sleep(0.2) - if worker_path is None: - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py") - for _ in range(num_workers): - start_worker(node_ip_address, worker_path, scheduler_address, cleanup=cleanup) - time.sleep(0.5) - -def start_workers(scheduler_address, objstore_address, num_workers, worker_path): - """Start a new set of workers on this node. - - Start a new set of workers on this node. This assumes that the scheduler is - already running and that the object store on this node is already running. The - intended use case is that a developer wants to update the code running on the - worker processes so first kills all of the workers and then runs this method. - - Args: - scheduler_address (str): ip address and port of the scheduler (which may run - on a different node) - objstore_address (str): ip address and port of the object store (which runs - on the same node) - num_workers (int): the number of workers to be started on this node - worker_path (str): path of the source code that will be run on the worker - """ - node_ip_address = objstore_address.split(":")[0] - for _ in range(num_workers): - start_worker(node_ip_address, worker_path, scheduler_address, cleanup=False) - -def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, worker_path=None): +def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None): """Start Ray in local mode. - This method starts Ray in local mode (as opposed to cluster mode, which is - handled by cluster.py). - Args: - num_objstores (int): The number of object stores to start. Aside from - testing, this should be one. num_workers (int): The number of workers to start. worker_path (str): The path of the source code that will be run by the worker. Returns: - The address of the scheduler and the addresses of all of the object stores. + This returns a tuple of three things. The first element is a tuple of the + Redis hostname and port. The second """ if worker_path is None: - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py") - if num_objstores < 1: - raise Exception("`num_objstores` is {}, but should be at least 1.".format(num_objstores)) - scheduler_address = address(node_ip_address, new_scheduler_port()) - start_scheduler(scheduler_address, cleanup=True) + worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default_worker.py") + # Start Redis. + redis_port = new_port() + redis_address = address(node_ip_address, redis_port) + start_redis(redis_port) + time.sleep(0.1) + # Start Plasma. + object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True) + # Start the local scheduler. time.sleep(0.1) - # create objstores - for i in range(num_objstores): - start_objstore(scheduler_address, node_ip_address, cleanup=True) - time.sleep(0.2) - if i < num_objstores - 1: - num_workers_to_start = num_workers / num_objstores - else: - # In case num_workers is not divisible by num_objstores, start the correct - # remaining number of workers. - num_workers_to_start = num_workers - (num_objstores - 1) * (num_workers / num_objstores) - for _ in range(num_workers_to_start): - start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True) - time.sleep(0.3) - - return scheduler_address + local_scheduler_name = start_local_scheduler(redis_address, object_store_name) + time.sleep(0.2) + # Aggregate the address information together. + address_info = {"node_ip_address": node_ip_address, + "redis_port": redis_port, + "object_store_name": object_store_name, + "object_store_manager_name": object_store_manager_name, + "local_scheduler_name": local_scheduler_name} + # Start the workers. + for _ in range(num_workers): + start_worker(address_info, worker_path, cleanup=True) + time.sleep(0.3) + # Return the addresses of the relevant processes. + return address_info diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 4f9abadd0d6a..a3370cf9e801 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -1,26 +1,46 @@ +from __future__ import print_function + +import hashlib import os import sys import time import traceback import copy -import logging import funcsigs import numpy as np import colorama import atexit +import random +import redis import threading import string -import weakref # Ray modules import config import pickling import serialization -import internal.graph_pb2 -import graph import services import numbuf -import libraylib as raylib +import photon +import plasma + +SCRIPT_MODE = 0 +WORKER_MODE = 1 +PYTHON_MODE = 2 +SILENT_MODE = 3 + +def random_object_id(): + return photon.ObjectID("".join([chr(random.randint(0, 255)) for _ in range(20)])) + +def random_string(): + return "".join([chr(random.randint(0, 255)) for _ in range(20)]) + +class FunctionID(object): + def __init__(self, function_id): + self.function_id = function_id + + def id(self): + return self.function_id contained_objectids = [] def numbuf_serialize(value): @@ -93,7 +113,7 @@ def __init__(self, objectid, task_error): def __str__(self): """Format a RayGetError as a string.""" - return "Could not get objectid {}. It was created by remote function {}{}{} which failed with:\n\n{}".format(self.objectid, colorama.Fore.RED, self.task_error.function_name, colorama.Fore.RESET, self.task_error) + return "Could not get objectid {}. It was created by remote function {}{}{} which failed with:\n\n{}".format(self.objectid.id(), colorama.Fore.RED, self.task_error.function_name, colorama.Fore.RESET, self.task_error) class RayGetArgumentError(Exception): """An exception used when a task's argument was produced by a failed task. @@ -116,7 +136,7 @@ def __init__(self, function_name, argument_index, objectid, task_error): def __str__(self): """Format a RayGetArgumentError as a string.""" - return "Failed to get objectid {} as argument {} for remote function {}{}{}. It was created by remote function {}{}{} which failed with:\n{}".format(self.objectid, self.argument_index, colorama.Fore.RED, self.function_name, colorama.Fore.RESET, colorama.Fore.RED, self.task_error.function_name, colorama.Fore.RESET, self.task_error) + return "Failed to get objectid {} as argument {} for remote function {}{}{}. It was created by remote function {}{}{} which failed with:\n{}".format(self.objectid.id(), self.argument_index, colorama.Fore.RED, self.function_name, colorama.Fore.RESET, colorama.Fore.RED, self.task_error.function_name, colorama.Fore.RESET, self.task_error) class Reusable(object): @@ -216,7 +236,7 @@ def _create_and_export(self, name, reusable): # Export the reusable variable to the workers if we are on the driver. If # ray.init has not been called yet, then cache the reusable variable to # export later. - if _mode() in [raylib.SCRIPT_MODE, raylib.SILENT_MODE]: + if _mode() in [SCRIPT_MODE, SILENT_MODE]: _export_reusable_variable(name, reusable) elif _mode() is None: self._cached_reusables.append((name, reusable)) @@ -224,7 +244,7 @@ def _create_and_export(self, name, reusable): # We create a second copy of the reusable variable on the driver to use # inside of remote functions that run locally. This occurs when we start Ray # in PYTHON_MODE and when we call a remote function locally. - if _mode() in [raylib.SCRIPT_MODE, raylib.SILENT_MODE, raylib.PYTHON_MODE]: + if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]: self._local_mode_reusables[name] = reusable.initializer() def _reinitialize(self): @@ -234,7 +254,7 @@ def _reinitialize(self): new_value = self._reinitializers[name](current_value) # If we are on the driver, reset the copy of the reusable variable in the # _local_mode_reusables dictionary. - if _mode() in [raylib.SCRIPT_MODE, raylib.SILENT_MODE, raylib.PYTHON_MODE]: + if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]: assert self._running_remote_function_locally self._local_mode_reusables[name] = new_value else: @@ -308,30 +328,16 @@ def __delattr__(self, name): raise Exception("Attempted deletion of attribute {}. Attributes of a RayReusable object may not be deleted.".format(name)) class ObjectFixture(object): - """This is used to handle unmapping objects backed by the object store. - - The object referred to by objectid will get unmaped when the fixture is - deallocated. In addition, the ObjectFixture holds the objectid as a field, - which ensures that the corresponding object will not be deallocated from the - object store while the ObjectFixture is alive. ObjectFixture is used as the - base object for numpy arrays that are contained in the object referred to by - objectid and prevents memory that is used by them from getting unmapped by the - worker or deallocated by the object store. + """This is used to handle releasing objects backed by the object store. + + This keeps a PlasmaBuffer in scope as long as an object that is backed by that + PlasmaBuffer is in scope. This prevents memory in the object store from getting + released while it is still being used to back a Python object. """ - def __init__(self, objectid, segmentid, handle): + def __init__(self, plasma_buffer): """Initialize an ObjectFixture object.""" - self.objectid = objectid - self.segmentid = segmentid - self.handle = handle - - def __del__(self): - """Unmap the segment when the object goes out of scope.""" - # We probably shouldn't have this if statement, but if raylib gets set to - # None before this __del__ call happens, then an exception will be thrown - # at exit. - if raylib is not None: - raylib.unmap_object(self.handle, self.segmentid) + self.plasma_buffer = plasma_buffer class Worker(object): """A class used to define the control flow of a worker process. @@ -344,7 +350,7 @@ class Worker(object): functions (Dict[str, Callable]): A dictionary mapping the name of a remote function to the remote function itself. This is the set of remote functions that can be executed by this worker. - handle (worker capsule): A Python object wrapping a C++ Worker object. + connected (bool): True if Ray has been started and False otherwise. mode: The mode of the worker. One of SCRIPT_MODE, PYTHON_MODE, SILENT_MODE, and WORKER_MODE. cached_remote_functions (List[Tuple[str, str]]): A list of pairs @@ -356,15 +362,24 @@ class Worker(object): that connect has been called already. cached_functions_to_run (List): A list of functions to run on all of the workers that should be exported as soon as connect is called. + driver_export_counter (int): The number of exports that the driver has + exported. This is only used on the driver. + worker_import_counter (int): The number of exports that the worker has + imported so far. This is only used on the workers. """ def __init__(self): """Initialize a Worker object.""" self.functions = {} - self.handle = None + self.num_return_vals = {} + self.function_names = {} + self.function_export_counters = {} + self.connected = False self.mode = None self.cached_remote_functions = [] self.cached_functions_to_run = [] + self.driver_export_counter = 0 + self.worker_import_counter = 0 def set_mode(self, mode): """Set the mode of the worker. @@ -397,30 +412,21 @@ def put_object(self, objectid, value): local object store. Args: - objectid (raylib.ObjectID): The object ID of the value to be put. + objectid (object_id.ObjectID): The object ID of the value to be put. value (serializable object): The value to put in the object store. """ - # We put the value into a list here because in arrow the concept of - # "serializing a single object" does not exits. + # Serialize and put the object in the object store. schema, size, serialized = numbuf_serialize(value) + size = size + 4096 * 4 + 8 # The last 8 bytes are for the metadata offset. This is temporary. + buff = self.plasma_client.create(objectid.id(), size, buffer(schema)) + data = np.frombuffer(buff.buffer, dtype="byte")[8:] + metadata_offset = numbuf.write_to_buffer(serialized, memoryview(data)) + np.frombuffer(buff.buffer, dtype="int64", count=1)[0] = metadata_offset + self.plasma_client.seal(objectid.id()) + global contained_objectids - raylib.add_contained_objectids(self.handle, objectid, contained_objectids) + # Optionally do something with the contained_objectids here. contained_objectids = [] - # TODO(pcm): Right now, metadata is serialized twice, change that in the future - # in the following line, the "8" is for storing the metadata size, - # the len(schema) is for storing the metadata and the 8192 is for storing - # the metadata in the batch (see INITIAL_METADATA_SIZE in arrow) - size = size + 8 + len(schema) + 4096 * 4 - buff, segmentid = raylib.allocate_buffer(self.handle, objectid, size) - # write the metadata length - np.frombuffer(buff, dtype="int64", count=1)[0] = len(schema) - # metadata buffer - metadata = np.frombuffer(buff, dtype="byte", offset=8, count=len(schema)) - # write the metadata - metadata[:] = schema - data = np.frombuffer(buff, dtype="byte")[8 + len(schema):] - metadata_offset = numbuf.write_to_buffer(serialized, memoryview(data)) - raylib.finish_buffer(self.handle, objectid, segmentid, metadata_offset) def get_object(self, objectid): """Get the value in the local object store associated with objectid. @@ -429,34 +435,24 @@ def get_object(self, objectid): until the value for objectid has been written to the local object store. Args: - objectid (raylib.ObjectID): The object ID of the value to retrieve. + objectid (object_id.ObjectID): The object ID of the value to retrieve. """ - assert raylib.is_arrow(self.handle, objectid), "All objects should be serialized using Arrow." - buff, segmentid, metadata_offset = raylib.get_buffer(self.handle, objectid) - metadata_size = int(np.frombuffer(buff, dtype="int64", count=1)[0]) - metadata = np.frombuffer(buff, dtype="byte", offset=8, count=metadata_size) - data = np.frombuffer(buff, dtype="byte")[8 + metadata_size:] + buff = self.plasma_client.get(objectid.id()) + metadata = self.plasma_client.get_metadata(objectid.id()) + metadata_size = len(metadata) + data = np.frombuffer(buff.buffer, dtype="byte")[8:] + metadata_offset = int(np.frombuffer(buff.buffer, dtype="int64", count=1)[0]) serialized = numbuf.read_from_buffer(memoryview(data), bytearray(metadata), metadata_offset) - # If there is currently no ObjectFixture for this ObjectID, then create a - # new one. The object_fixtures object is a WeakValueDictionary, so entries - # will be discarded when there are no strong references to their values. - # We create object_fixture outside of the assignment because if we created - # it inside the assignement it would immediately go out of scope. - object_fixture = None - if objectid.id not in object_fixtures: - object_fixture = ObjectFixture(objectid, segmentid, self.handle) - object_fixtures[objectid.id] = object_fixture - deserialized = numbuf.deserialize_list(serialized, object_fixtures[objectid.id]) - # Unwrap the object from the list (it was wrapped put_object) + # Create an ObjectFixture. If the object we are getting is backed by the + # PlasmaBuffer, this ObjectFixture will keep the PlasmaBuffer in scope as + # long as the object is in scope. + object_fixture = ObjectFixture(buff) + deserialized = numbuf.deserialize_list(serialized, object_fixture) + # Unwrap the object from the list (it was wrapped put_object). assert len(deserialized) == 1 - result = deserialized[0] - return result - - def alias_objectids(self, alias_objectid, target_objectid): - """Make two object IDs refer to the same object.""" - raylib.alias_objectids(self.handle, alias_objectid, target_objectid) + return deserialized[0] - def submit_task(self, func_name, args): + def submit_task(self, function_id, func_name, args): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with name @@ -469,24 +465,22 @@ def submit_task(self, func_name, args): be object IDs or they can be values. If they are values, they must be serializable objecs. """ - # Convert all of the argumens to object IDs. It is a little strange that we - # are calling put, which is external to this class. - serialized_args = [] + # Put large or complex arguments that are passed by value in the object + # store first. + args_for_photon = [] for arg in args: - if isinstance(arg, raylib.ObjectID): - next_arg = arg + if isinstance(arg, photon.ObjectID): + args_for_photon.append(arg) + elif photon.check_simple_value(arg): + args_for_photon.append(arg) else: - serialized_arg = serialization.serialize_argument_if_possible(arg) - if serialized_arg is not None: - # Serialize the argument and pass it by value. - next_arg = serialized_arg - else: - # Put the objet in the object store under the hood. - next_arg = put(arg) - serialized_args.append(next_arg) - task_capsule = raylib.serialize_task(self.handle, func_name, serialized_args) - objectids = raylib.submit_task(self.handle, task_capsule) - return objectids + args_for_photon.append(put(arg)) + + # Submit the task to Photon. + task = photon.Task(photon.ObjectID(function_id.id()), args_for_photon, self.num_return_vals[function_id.id()]) + self.photon_client.submit(task) + + return task.returns() def export_function_to_run_on_all_workers(self, function): """Export this function and run it on all workers. @@ -496,11 +490,11 @@ def export_function_to_run_on_all_workers(self, function): not take any arguments. If it returns anything, its return values will not be used. """ - if self.mode not in [raylib.SCRIPT_MODE, raylib.SILENT_MODE, raylib.PYTHON_MODE]: + if self.mode not in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]: raise Exception("run_function_on_all_workers can only be called on a driver.") # Run the function on all of the workers. - if self.mode in [raylib.SCRIPT_MODE, raylib.SILENT_MODE]: - raylib.run_function_on_all_workers(self.handle, pickling.dumps(function)) + if self.mode in [SCRIPT_MODE, SILENT_MODE]: + self.run_function_on_all_workers(function) def run_function_on_all_workers(self, function): @@ -516,7 +510,7 @@ def run_function_on_all_workers(self, function): not take any arguments. If it returns anything, its return values will not be used. """ - if self.mode not in [None, raylib.SCRIPT_MODE, raylib.SILENT_MODE, raylib.PYTHON_MODE]: + if self.mode not in [None, SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]: raise Exception("run_function_on_all_workers can only be called on a driver.") # First run the function on the driver. function(self) @@ -525,7 +519,12 @@ def run_function_on_all_workers(self, function): if self.mode is None: self.cached_functions_to_run.append(function) else: - self.export_function_to_run_on_all_workers(function) + function_to_run_id = random_string() + key = "FunctionsToRun:{}".format(function_to_run_id) + self.redis_client.hmset(key, {"function_id": function_to_run_id, + "function": pickling.dumps(function)}) + self.redis_client.rpush("Exports", key) + self.driver_export_counter += 1 global_worker = Worker() """Worker: The global Worker object for this worker process. @@ -544,18 +543,6 @@ def run_function_on_all_workers(self, function): made by one task do not affect other tasks. """ -logger = logging.getLogger("ray") -"""Logger: The logging object for the Python worker code.""" - -object_fixtures = weakref.WeakValueDictionary() -"""WeakValueDictionary: The mapping from ObjectID to ObjectFixture object. - -This is to ensure that we have only one ObjectFixture per ObjectID. That way, if -we call get on an object twice, we do not unmap the segment before both of the -results go out of scope. It is a WeakValueDictionary instead of a regular -dictionary so that it does not keep the ObjectFixtures in scope forever. -""" - class RayConnectionError(Exception): pass @@ -565,8 +552,8 @@ def check_connected(worker=global_worker): Raises: Exception: An exception is raised if the worker is not connected. """ - if worker.handle is None and worker.mode != raylib.PYTHON_MODE: - raise RayConnectionError("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.init(start_ray_local=True, num_workers=1)'.") + if not worker.connected: + raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(start_ray_local=True, num_workers=1)'.") def print_failed_task(task_status): """Print information about failed tasks. @@ -575,59 +562,29 @@ def print_failed_task(task_status): task_status (Dict): A dictionary containing the name, operationid, and error message for a failed task. """ - print """ + print(""" Error: Task failed Function Name: {} Task ID: {} Error Message: \n{} - """.format(task_status["function_name"], task_status["operationid"], task_status["error_message"]) - -def scheduler_info(worker=global_worker): - """Return information about the state of the scheduler.""" - check_connected(worker) - return raylib.scheduler_info(worker.handle) + """.format(task_status["function_name"], task_status["operationid"], task_status["error_message"])) -def visualize_computation_graph(file_path=None, view=False, worker=global_worker): - """Write the computation graph to a pdf file. - - Args: - file_path (str): The name of a pdf file that the rendered computation graph - will be written to. If this argument is None, a temporary path will be - used. - view (bool): If true, the result the python graphviz package will try to - open the result in a viewer. - - Examples: - Try the following code. - - >>> import ray.array.distributed as da - >>> x = da.zeros([20, 20]) - >>> y = da.zeros([20, 20]) - >>> z = da.dot(x, y) - >>> ray.visualize_computation_graph(view=True) - """ - check_connected(worker) - if file_path is None: - file_path = config.get_log_file_path("computation-graph.pdf") - - base_path, extension = os.path.splitext(file_path) - if extension != ".pdf": - raise Exception("File path must be a .pdf file") - proto_path = base_path + ".binaryproto" - - raylib.dump_computation_graph(worker.handle, proto_path) - g = internal.graph_pb2.CompGraph() - g.ParseFromString(open(proto_path).read()) - graph.graph_to_graphviz(g).render(base_path, view=view) - - print "Wrote graph dot description to file {}".format(base_path) - print "Wrote graph protocol buffer description to file {}".format(proto_path) - print "Wrote computation graph to file {}.pdf".format(base_path) - -def task_info(worker=global_worker): +def error_info(worker=global_worker): """Return information about failed tasks.""" check_connected(worker) - return raylib.task_info(worker.handle) + result = {"TaskError": [], + "RemoteFunctionImportError": [], + "ReusableVariableImportError": [], + "ReusableVariableReinitializeError": [], + "FunctionToRunError": [] + } + error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1) + for error_key in error_keys: + error_type = error_key.split(":", 1)[0] + error_contents = worker.redis_client.hgetall(error_key) + result[error_type].append(error_contents) + + return result def initialize_numbuf(worker=global_worker): """Initialize the serialization library. @@ -639,19 +596,19 @@ def initialize_numbuf(worker=global_worker): def objectid_custom_serializer(obj): class_identifier = serialization.class_identifier(type(obj)) contained_objectids.append(obj) - return raylib.serialize_objectid(worker.handle, obj) + return obj.id() def objectid_custom_deserializer(serialized_obj): - return raylib.deserialize_objectid(worker.handle, serialized_obj) - serialization.add_class_to_whitelist(raylib.ObjectID, pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer) + return photon.ObjectID(serialized_obj) + serialization.add_class_to_whitelist(photon.ObjectID, pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer) - if worker.mode in [raylib.SCRIPT_MODE, raylib.SILENT_MODE]: + if worker.mode in [SCRIPT_MODE, SILENT_MODE]: # These should only be called on the driver because register_class will # export the class to all of the workers. register_class(RayTaskError) register_class(RayGetError) register_class(RayGetArgumentError) -def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_address=None, node_ip_address=None, driver_mode=raylib.SCRIPT_MODE): +def init(start_ray_local=False, num_workers=None, driver_mode=SCRIPT_MODE): """Either connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -664,54 +621,37 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_ existing Ray cluster. num_workers (Optional[int]): The number of workers to start if start_ray_local is True. - num_objstores (Optional[int]): The number of object stores to start if - start_ray_local is True. - scheduler_address (Optional[str]): The address of the scheduler to connect - to if start_ray_local is False. - node_ip_address (Optional[str]): The address of the node the worker is - running on. It is required if start_ray_local is False and it cannot be - provided otherwise. driver_mode (Optional[bool]): The mode in which to start the driver. This should be one of SCRIPT_MODE, PYTHON_MODE, and SILENT_MODE. Returns: - A string containing the address of the scheduler. + The address of the Redis server. Raises: Exception: An exception is raised if an inappropriate combination of arguments is passed in. """ - # Make GRPC only print error messages. - os.environ["GRPC_VERBOSITY"] = "ERROR" - if driver_mode == raylib.PYTHON_MODE: + if driver_mode == PYTHON_MODE: # If starting Ray in PYTHON_MODE, don't start any other processes. - pass + address_info = {} elif start_ray_local: # In this case, we launch a scheduler, a new object store, and some workers, # and we connect to them. - if (scheduler_address is not None) or (node_ip_address is not None): - raise Exception("If start_ray_local=True, then you cannot pass in a scheduler_address or a node_ip_address.") - if driver_mode not in [raylib.SCRIPT_MODE, raylib.PYTHON_MODE, raylib.SILENT_MODE]: + if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]: raise Exception("If start_ray_local=True, then driver_mode must be in [ray.SCRIPT_MODE, ray.PYTHON_MODE, ray.SILENT_MODE].") # Use the address 127.0.0.1 in local mode. - node_ip_address = "127.0.0.1" num_workers = 1 if num_workers is None else num_workers - num_objstores = 1 if num_objstores is None else num_objstores # Start the scheduler, object store, and some workers. These will be killed # by the call to cleanup(), which happens when the Python script exits. - scheduler_address = services.start_ray_local(num_objstores=num_objstores, num_workers=num_workers, worker_path=None) + address_info = services.start_ray_local(num_workers=num_workers) else: - # In this case, there is an existing scheduler and object store, and we do - # not need to start any processes. - if (num_workers is not None) or (num_objstores is not None): - raise Exception("The arguments num_workers and num_objstores must not be provided unless start_ray_local=True.") - if (node_ip_address is None) or (scheduler_address is None): - raise Exception("When start_ray_local=False, node_ip_address and scheduler_address must be provided.") - # Connect this driver to the scheduler and object store. The corresponing call - # to disconnect will happen in the call to cleanup() when the Python script - # exits. - connect(node_ip_address, scheduler_address, worker=global_worker, mode=driver_mode) - return scheduler_address + raise Exception("This mode is currently not enabled.") + # Connect this driver to Redis, the object store, and the local scheduler. The + # corresponing call to disconnect will happen in the call to cleanup() when + # the Python script exits. + connect(address_info, driver_mode, worker=global_worker) + if driver_mode != PYTHON_MODE: + return "{}:{}".format(address_info["node_ip_address"], address_info["redis_port"]) def cleanup(worker=global_worker): """Disconnect the driver, and terminate any processes started in init. @@ -723,99 +663,221 @@ def cleanup(worker=global_worker): """ disconnect(worker) worker.set_mode(None) + worker.driver_export_counter = 0 + worker.worker_import_counter = 0 + if hasattr(worker, "plasma_client"): + worker.plasma_client.shutdown() services.cleanup() atexit.register(cleanup) -def print_error_messages(worker=global_worker): - num_failed_tasks = 0 - num_failed_remote_function_imports = 0 - num_failed_reusable_variable_imports = 0 - num_failed_reusable_variable_reinitializations = 0 - num_failed_function_to_runs = 0 - while True: +def print_error_messages(worker): + """Print error messages in the background on the driver. + + This runs in a separate thread on the driver and prints error messages in the + background. + """ + worker.error_message_pubsub_client = worker.redis_client.pubsub() + # Exports that are published after the call to + # error_message_pubsub_client.psubscribe and before the call to + # error_message_pubsub_client.listen will still be processed in the loop. + worker.error_message_pubsub_client.psubscribe("__keyspace@0__:ErrorKeys") + num_errors_printed = 0 + + # Get the exports that occurred before the call to psubscribe. + try: + worker.lock.acquire() + error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1) + for error_key in error_keys: + error_message = worker.redis_client.hget(error_key, "message") + print(error_message) + num_errors_printed += 1 + finally: + worker.lock.release() + + try: + for msg in worker.error_message_pubsub_client.listen(): + try: + worker.lock.acquire() + for error_key in worker.redis_client.lrange("ErrorKeys", num_errors_printed, -1): + error_message = worker.redis_client.hget(error_key, "message") + print(error_message) + num_errors_printed += 1 + finally: + worker.lock.release() + except redis.ConnectionError: + # When Redis terminates the listen call will throw a ConnectionError, which + # we catch here. + pass + +def fetch_and_register_remote_function(key, worker=global_worker): + """Import a remote function.""" + function_id_str, function_name, serialized_function, num_return_vals, module, function_export_counter = worker.redis_client.hmget(key, ["function_id", "name", "function", "num_return_vals", "module", "driver_export_counter"]) + function_id = photon.ObjectID(function_id_str) + num_return_vals = int(num_return_vals) + try: + function = pickling.loads(serialized_function) + except: + # If an exception was thrown when the remote function was imported, we + # record the traceback and notify the scheduler of the failure. + traceback_str = format_error_message(traceback.format_exc()) + # Log the error message. + error_key = "RemoteFunctionImportError:{}".format(function_id.id()) + worker.redis_client.hmset(error_key, {"function_id": function_id.id(), + "function_name": function_name, + "message": traceback_str}) + worker.redis_client.rpush("ErrorKeys", error_key) + else: + # TODO(rkn): Why is the below line necessary? + function.__module__ = module + function_name = "{}.{}".format(function.__module__, function.__name__) + worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, function_id=function_id)(function) + worker.function_names[function_id.id()] = function_name + worker.num_return_vals[function_id.id()] = num_return_vals + worker.function_export_counters[function_id.id()] = function_export_counter + # Add the function to the function table. + worker.redis_client.rpush("FunctionTable:{}".format(function_id.id()), worker.worker_id) + +def fetch_and_register_reusable_variable(key, worker=global_worker): + """Import a reusable variable.""" + reusable_variable_name, serialized_initializer, serialized_reinitializer = worker.redis_client.hmget(key, ["name", "initializer", "reinitializer"]) + try: + initializer = pickling.loads(serialized_initializer) + reinitializer = pickling.loads(serialized_reinitializer) + reusables.__setattr__(reusable_variable_name, Reusable(initializer, reinitializer)) + except: + # If an exception was thrown when the reusable variable was imported, we + # record the traceback and notify the scheduler of the failure. + traceback_str = format_error_message(traceback.format_exc()) + # Log the error message. + error_key = "ReusableVariableImportError:{}".format(random_string()) + worker.redis_client.hmset(error_key, {"name": reusable_variable_name, + "message": traceback_str}) + worker.redis_client.rpush("ErrorKeys", error_key) + +def fetch_and_execute_function_to_run(key, worker=global_worker): + """Run on arbitrary function on the worker.""" + serialized_function, = worker.redis_client.hmget(key, ["function"]) + try: + # Deserialize the function. + function = pickling.loads(serialized_function) + # Run the function. + function(worker) + except: + # If an exception was thrown when the function was run, we record the + # traceback and notify the scheduler of the failure. + traceback_str = traceback.format_exc() + # Log the error message. + name = function.__name__ if "function" in locals() and hasattr(function, "__name__") else "" + error_key = "FunctionToRunError:{}".format(random_string()) + worker.redis_client.hmset(error_key, {"name": name, + "message": traceback_str}) + worker.redis_client.rpush("ErrorKeys", error_key) + +def import_thread(worker): + worker.import_pubsub_client = worker.redis_client.pubsub() + # Exports that are published after the call to import_pubsub_client.psubscribe + # and before the call to import_pubsub_client.listen will still be processed + # in the loop. + worker.import_pubsub_client.psubscribe("__keyspace@0__:Exports") + worker_info_key = "WorkerInfo:{}".format(worker.worker_id) + worker.redis_client.hset(worker_info_key, "export_counter", 0) + worker.worker_import_counter = 0 + + # Get the exports that occurred before the call to psubscribe. + try: + worker.lock.acquire() + export_keys = worker.redis_client.lrange("Exports", 0, -1) + for key in export_keys: + if key.startswith("RemoteFunction"): + fetch_and_register_remote_function(key, worker=worker) + elif key.startswith("ReusableVariables"): + fetch_and_register_reusable_variable(key, worker=worker) + elif key.startswith("FunctionsToRun"): + fetch_and_execute_function_to_run(key, worker=worker) + else: + raise Exception("This code should be unreachable.") + worker.redis_client.hincrby(worker_info_key, "export_counter", 1) + worker.worker_import_counter += 1 + finally: + worker.lock.release() + + for msg in worker.import_pubsub_client.listen(): try: - info = task_info(worker=worker) - # Print failed task errors. - for error in info["failed_tasks"][num_failed_tasks:]: - print error["error_message"] - num_failed_tasks = len(info["failed_tasks"]) - # Print remote function import errors. - for error in info["failed_remote_function_imports"][num_failed_remote_function_imports:]: - print error["error_message"] - num_failed_remote_function_imports = len(info["failed_remote_function_imports"]) - # Print reusable variable import errors. - for error in info["failed_reusable_variable_imports"][num_failed_reusable_variable_imports:]: - print error["error_message"] - num_failed_reusable_variable_imports = len(info["failed_reusable_variable_imports"]) - # Print reusable variable reinitialization errors. - for error in info["failed_reinitialize_reusable_variables"][num_failed_reusable_variable_reinitializations:]: - print error["error_message"] - num_failed_reusable_variable_reinitializations = len(info["failed_reinitialize_reusable_variables"]) - for error in info["failed_function_to_runs"][num_failed_function_to_runs:]: - print error["error_message"] - num_failed_function_to_runs = len(info["failed_function_to_runs"]) - time.sleep(0.2) - except: - # When the driver is exiting, we set worker.handle to None, which will - # cause the check_connected call inside of task_info to raise an - # exception. We use the try block here to suppress that exception. In - # addition, when the script exits, the different names get set to None, - # for example, the time module and the task_info method get set to None, - # and so a TypeError will be thrown when we attempt to call time.sleep or - # task_info. - pass - -def connect(node_ip_address, scheduler_address, objstore_address=None, worker=global_worker, mode=raylib.WORKER_MODE): + worker.lock.acquire() + if msg["type"] == "psubscribe": + continue + assert msg["data"] == "rpush" + num_imports = worker.redis_client.llen("Exports") + assert num_imports >= worker.worker_import_counter + for i in range(worker.worker_import_counter, num_imports): + key = worker.redis_client.lindex("Exports", i) + if key.startswith("RemoteFunction"): + fetch_and_register_remote_function(key, worker=worker) + elif key.startswith("ReusableVariables"): + fetch_and_register_reusable_variable(key, worker=worker) + elif key.startswith("FunctionsToRun"): + fetch_and_execute_function_to_run(key, worker=worker) + else: + raise Exception("This code should be unreachable.") + worker.redis_client.hincrby(worker_info_key, "export_counter", 1) + worker.worker_import_counter += 1 + finally: + worker.lock.release() + +def connect(address_info, mode=WORKER_MODE, worker=global_worker): """Connect this worker to the scheduler and an object store. Args: - node_ip_address (str): The ip address of the node the worker runs on. - scheduler_address (str): The ip address and port of the scheduler. - objstore_address (Optional[str]): The ip address and port of the local - object store. Normally, this argument should be omitted and the scheduler - will tell the worker what object store to connect to. + address_info (dict): This contains the entries node_ip_address, + redis_address, object_store_name, object_store_manager_name, and + local_scheduler_name. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. """ - assert worker.handle is None, "When connect is called, worker.handle should be None." + worker.worker_id = random_string() + worker.connected = True + worker.set_mode(mode) # If running Ray in PYTHON_MODE, there is no need to create call create_worker # or to start the worker service. - if mode == raylib.PYTHON_MODE: - worker.mode = raylib.PYTHON_MODE + if mode == PYTHON_MODE: return - - worker.scheduler_address = scheduler_address - random_string = "".join(np.random.choice(list(string.ascii_uppercase + string.digits)) for _ in range(10)) - cpp_log_file_name = config.get_log_file_path("-".join(["worker", random_string, "c++"]) + ".log") - python_log_file_name = config.get_log_file_path("-".join(["worker", random_string]) + ".log") - # Create a worker object. This also creates the worker service, which can - # receive commands from the scheduler. This call also sets up a queue between - # the worker and the worker service. - worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", mode, cpp_log_file_name) + # Create a Redis client. + worker.redis_client = redis.StrictRedis(host=address_info["node_ip_address"], port=address_info["redis_port"]) + worker.redis_client.config_set("notify-keyspace-events", "AKE") + worker.lock = threading.Lock() + # Create an object store client. + worker.plasma_client = plasma.PlasmaClient(address_info["object_store_name"], address_info["object_store_manager_name"]) + # Create the local scheduler client. + worker.photon_client = photon.PhotonClient(address_info["local_scheduler_name"]) + # Register the worker with Redis. + if mode in [SCRIPT_MODE, SILENT_MODE]: + worker.redis_client.rpush("Drivers", worker.worker_id) + elif mode == WORKER_MODE: + worker.redis_client.rpush("Workers", worker.worker_id) + else: + raise Exception("This code should be unreachable.") + # If this is a worker, then start a thread to import exports from the driver. + if mode == WORKER_MODE: + t = threading.Thread(target=import_thread, args=(worker,)) + # Making the thread a daemon causes it to exit when the main thread exits. + t.daemon = True + t.start() # If this is a driver running in SCRIPT_MODE, start a thread to print error # messages asynchronously in the background. Ideally the scheduler would push # messages to the driver's worker service, but we ran into bugs when trying to # properly shutdown the driver's worker service, so we are temporarily using # this implementation which constantly queries the scheduler for new error # messages. - if mode == raylib.SCRIPT_MODE: + if mode == SCRIPT_MODE: t = threading.Thread(target=print_error_messages, args=(worker,)) # Making the thread a daemon causes it to exit when the main thread exits. t.daemon = True t.start() - worker.set_mode(mode) - FORMAT = "%(asctime)-15s %(message)s" - # Configure the Python logging module. Note that if we do not provide our own - # logger, then our logging will interfere with other Python modules that also - # use the logging module. - log_handler = logging.FileHandler(python_log_file_name) - log_handler.setLevel(logging.DEBUG) - log_handler.setFormatter(logging.Formatter(FORMAT)) - _logger().addHandler(log_handler) - _logger().setLevel(logging.DEBUG) - _logger().propagate = False - if mode in [raylib.SCRIPT_MODE, raylib.SILENT_MODE, raylib.PYTHON_MODE]: + # Initialize the serialization library. This registers some classes, and so + # it must be run before we export all of the cached remote functions. + initialize_numbuf() + if mode in [SCRIPT_MODE, SILENT_MODE]: # Add the directory containing the script that is running to the Python # paths of the workers. Also add the current directory. Note that this # assumes that the directory structures on the machines in the clusters are @@ -828,25 +890,21 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, worker=gl for function in worker.cached_functions_to_run: worker.export_function_to_run_on_all_workers(function) # Export cached remote functions to the workers. - for function_name, function_to_export in worker.cached_remote_functions: - raylib.export_remote_function(worker.handle, function_name, function_to_export) - # Export the cached reusable variables. + for function_id, func_name, func, num_return_vals in worker.cached_remote_functions: + export_remote_function(function_id, func_name, func, num_return_vals, worker) + # Export cached reusable variables to the workers. for name, reusable_variable in reusables._cached_reusables: _export_reusable_variable(name, reusable_variable) - # Initialize the serialization library. - initialize_numbuf() worker.cached_functions_to_run = None worker.cached_remote_functions = None reusables._cached_reusables = None def disconnect(worker=global_worker): """Disconnect this worker from the scheduler and object store.""" - if worker.handle is not None: - raylib.disconnect(worker.handle) - worker.handle = None # Reset the list of cached remote functions so that if more remote functions # are defined and then connect is called again, the remote functions will be # exported. This is mostly relevant for the tests. + worker.connected = False worker.cached_functions_to_run = [] worker.cached_remote_functions = [] reusables._cached_reusables = [] @@ -871,7 +929,7 @@ def register_class(cls, pickle=False, worker=global_worker): # If the worker is not a driver, then return. We do this so that Python # modules can register classes and these modules can be imported on workers # without any trouble. - if worker.mode == raylib.WORKER_MODE: + if worker.mode == WORKER_MODE: return # Raise an exception if cls cannot be serialized efficiently by Ray. if not pickle: @@ -896,16 +954,14 @@ def get(objectid, worker=global_worker): A Python object or a list of Python objects. """ check_connected(worker) - if worker.mode == raylib.PYTHON_MODE: - return objectid # In raylib.PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objectid) + if worker.mode == PYTHON_MODE: + return objectid # In PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objectid) if isinstance(objectid, list): - [raylib.request_object(worker.handle, x) for x in objectid] values = [worker.get_object(x) for x in objectid] for i, value in enumerate(values): if isinstance(value, RayTaskError): raise RayGetError(objectid[i], value) return values - raylib.request_object(worker.handle, objectid) value = worker.get_object(objectid) if isinstance(value, RayTaskError): # If the result is a RayTaskError, then the task that created this object @@ -923,13 +979,13 @@ def put(value, worker=global_worker): The object ID assigned to this value. """ check_connected(worker) - if worker.mode == raylib.PYTHON_MODE: - return value # In raylib.PYTHON_MODE, ray.put is the identity operation - objectid = raylib.get_objectid(worker.handle) + if worker.mode == PYTHON_MODE: + return value # In PYTHON_MODE, ray.put is the identity operation + objectid = random_object_id() worker.put_object(objectid, value) return objectid -def wait(objectids, num_returns=1, timeout=None, worker=global_worker): +def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): """Return a list of IDs that are ready and a list of IDs that are not ready. If timeout is set, the function returns either when the requested number of @@ -942,65 +998,20 @@ def wait(objectids, num_returns=1, timeout=None, worker=global_worker): corresponds to the rest of the object IDs (which may or may not be ready). Args: - objectids (List[raylib.ObjectID]): List of object IDs for objects that may + object_ids (List[ObjectID]): List of object IDs for objects that may or may not be ready. num_returns (int): The number of object IDs that should be returned. - timeout (float): The maximum amount of time in seconds that should be spent - polling the scheduler. + timeout (int): The maximum amount of time in milliseconds to wait before + returning. Returns: A list of object IDs that are ready and a list of the remaining object IDs. """ check_connected(worker) - if num_returns < 0: - raise Exception("num_returns cannot be less than 0.") - if num_returns > len(objectids): - raise Exception("num_returns cannot be greater than the length of the input list: num_objects is {}, and the length is {}.".format(num_returns, len(objectids))) - start_time = time.time() - ready_indices = raylib.wait(worker.handle, objectids) - # Polls scheduler until enough objects are ready. - while len(ready_indices) < num_returns and (time.time() - start_time < timeout or timeout is None): - ready_indices = raylib.wait(worker.handle, objectids) - time.sleep(0.1) - # Return indices for exactly the requested number of objects. - ready_ids = [objectids[i] for i in ready_indices[:num_returns]] - not_ready_ids = [objectids[i] for i in range(len(objectids)) if i not in ready_indices[:num_returns]] - return ready_ids, not_ready_ids - -def kill_workers(worker=global_worker): - """Kill all of the workers in the cluster. This does not kill drivers. - - Note: - Currently, we only support killing workers if all submitted tasks have been - run. If some workers are still running tasks or if the scheduler still has - tasks in its queue, then this method will not do anything. - - Returns: - True if workers were successfully killed. False otherwise. - """ - success = raylib.kill_workers(worker.handle) - if not success: - print "Could not kill all workers. We currently do not support killing workers when tasks are running." - return success - -def restart_workers_local(num_workers, worker_path, worker=global_worker): - """Restart workers locally. - - This method kills all of the workers and starts new workers locally on the - same node as the driver. This is intended for use in the case where Ray is - being used on a single node. - - Args: - num_workers (int): The number of workers to be started. - worker_path (str): The path of the source code that workers will run. - - Returns: - True if workers were successfully restarted. False otherwise. - """ - if not kill_workers(worker): - return False - services.start_workers(worker.scheduler_address, worker.objstore_address, num_workers, worker_path) - return True + object_id_strs = [object_id.id() for object_id in object_ids] + timeout = timeout if timeout is not None else 2 ** 36 + ready_ids, remaining_ids = worker.plasma_client.wait(object_id_strs, timeout, num_returns) + return ready_ids, remaining_ids def format_error_message(exception_message): """Improve the formatting of an exception thrown by a remote function. @@ -1030,10 +1041,6 @@ def main_loop(worker=global_worker): process. The worker executes the command, notifies the scheduler of any errors that occurred while executing the command, and waits for the next command. """ - if not raylib.connected(worker.handle): - raise Exception("Worker is attempting to enter main_loop but has not been connected yet.") - # Notify the scheduler that the worker is ready to start receiving tasks. - raylib.ready_for_new_task(worker.handle) def process_task(task): # wrapping these lines in a function should cause the local variables to go out of scope more quickly, which is useful for inspecting reference counts """Execute a task assigned to this worker. @@ -1046,27 +1053,30 @@ def process_task(task): # wrapping these lines in a function should cause the lo After the task executes, the worker resets any reusable variables that were accessed by the task. """ - function_name, serialized_args, return_objectids = task + function_id = task.function_id() + args = task.arguments() + return_object_ids = task.returns() + function_name = worker.function_names[function_id.id()] try: - arguments = get_arguments_for_execution(worker.functions[function_name], serialized_args, worker) # get args from objstore - outputs = worker.functions[function_name].executor(arguments) # execute the function - if len(return_objectids) == 1: + arguments = get_arguments_for_execution(worker.functions[function_id.id()], args, worker) # get args from objstore + outputs = worker.functions[function_id.id()].executor(arguments) # execute the function + if len(return_object_ids) == 1: outputs = (outputs,) - store_outputs_in_objstore(return_objectids, outputs, worker) # store output in local object store + store_outputs_in_objstore(return_object_ids, outputs, worker) # store output in local object store except Exception as e: # If the task threw an exception, then record the traceback. We determine # whether the exception was thrown in the task execution by whether the # variable "arguments" is defined. traceback_str = format_error_message(traceback.format_exc()) if "arguments" in locals() else None failure_object = RayTaskError(function_name, e, traceback_str) - failure_objects = [failure_object for _ in range(len(return_objectids))] - store_outputs_in_objstore(return_objectids, failure_objects, worker) - # Notify the scheduler that the task failed. - raylib.notify_failure(worker.handle, function_name, str(failure_object), raylib.FailedTask) - _logger().info("While running function {}, worker threw exception with message: \n\n{}\n".format(function_name, str(failure_object))) - # Notify the scheduler that the task is done. This happens regardless of - # whether the task succeeded or failed. - raylib.ready_for_new_task(worker.handle) + failure_objects = [failure_object for _ in range(len(return_object_ids))] + store_outputs_in_objstore(return_object_ids, failure_objects, worker) + # Log the error message. + error_key = "TaskError:{}".format(random_string()) + worker.redis_client.hmset(error_key, {"function_id": function_id.id(), + "function_name": function_name, + "message": traceback_str}) + worker.redis_client.rpush("ErrorKeys", error_key) try: # Reinitialize the values of reusable variables that were used in the task # above so that changes made to their state do not affect other tasks. @@ -1075,90 +1085,35 @@ def process_task(task): # wrapping these lines in a function should cause the lo # The attempt to reinitialize the reusable variables threw an exception. # We record the traceback and notify the scheduler. traceback_str = format_error_message(traceback.format_exc()) - raylib.notify_failure(worker.handle, function_name, traceback_str, raylib.FailedReinitializeReusableVariable) - _logger().info("While attempting to reinitialize the reusable variables after running function {}, the worker threw exception with message: \n\n{}\n".format(function_name, traceback_str)) - - def process_remote_function(function_name, serialized_function): - """Import a remote function.""" - try: - function, num_return_vals, module = pickling.loads(serialized_function) - except: - # If an exception was thrown when the remote function was imported, we - # record the traceback and notify the scheduler of the failure. - traceback_str = format_error_message(traceback.format_exc()) - _logger().info("Failed to import remote function {}. Failed with message: \n\n{}\n".format(function_name, traceback_str)) - # Notify the scheduler that the remote function failed to import. - raylib.notify_failure(worker.handle, function_name, traceback_str, raylib.FailedRemoteFunctionImport) - else: - # TODO(rkn): Why is the below line necessary? - function.__module__ = module - assert function_name == "{}.{}".format(function.__module__, function.__name__), "The remote function name does not match the name that was passed in." - worker.functions[function_name] = remote(num_return_vals=num_return_vals)(function) - _logger().info("Successfully imported remote function {}.".format(function_name)) - # Noify the scheduler that the remote function imported successfully. - # We pass an empty error message string because the import succeeded. - raylib.register_remote_function(worker.handle, function_name, num_return_vals) - - def process_reusable_variable(reusable_variable_name, initializer_str, reinitializer_str): - """Import a reusable variable.""" - try: - initializer = pickling.loads(initializer_str) - reinitializer = pickling.loads(reinitializer_str) - reusables.__setattr__(reusable_variable_name, Reusable(initializer, reinitializer)) - except: - # If an exception was thrown when the reusable variable was imported, we - # record the traceback and notify the scheduler of the failure. - traceback_str = format_error_message(traceback.format_exc()) - _logger().info("Failed to import reusable variable {}. Failed with message: \n\n{}\n".format(reusable_variable_name, traceback_str)) - # Notify the scheduler that the reusable variable failed to import. - raylib.notify_failure(worker.handle, reusable_variable_name, traceback_str, raylib.FailedReusableVariableImport) - else: - _logger().info("Successfully imported reusable variable {}.".format(reusable_variable_name)) - - def process_function_to_run(serialized_function): - """Run on arbitrary function on the worker.""" - try: - # Deserialize the function. - function = pickling.loads(serialized_function) - # Run the function. - function(worker) - except: - # If an exception was thrown when the function was run, we record the - # traceback and notify the scheduler of the failure. - traceback_str = traceback.format_exc() - _logger().info("Failed to run function on worker. Failed with message: \n\n{}\n".format(traceback_str)) - # Notify the scheduler that running the function failed. - name = function.__name__ if "function" in locals() and hasattr(function, "__name__") else "" - raylib.notify_failure(worker.handle, name, traceback_str, raylib.FailedFunctionToRun) - else: - _logger().info("Successfully ran function on worker.") + error_key = "ReusableVariableReinitializeError:{}".format(random_string()) + worker.redis_client.hmset(error_key, {"task_instance_id": "NOTIMPLEMENTED", + "task_id": "NOTIMPLEMENTED", + "function_id": function_id.id(), + "function_name": function_name, + "message": traceback_str}) + worker.redis_client.rpush("ErrorKeys", error_key) while True: - command, command_args = raylib.wait_for_next_message(worker.handle) + task = worker.photon_client.get_task() + function_id = task.function_id() + # Check that the number of imports we have is at least as great as the + # export counter for the task. If not, wait until we have imported enough. + while True: + try: + worker.lock.acquire() + if worker.functions.has_key(function_id.id()) and worker.function_export_counters[function_id.id()] <= worker.worker_import_counter: + break + time.sleep(0.001) + finally: + worker.lock.release() + # Execute the task. try: - if command == "die": - # We use this as a mechanism to allow the scheduler to kill workers. - _logger().info("Received a 'die' command, and will exit now.") - break - elif command == "task": - process_task(command_args) - elif command == "function": - function_name, serialized_function = command_args - process_remote_function(function_name, serialized_function) - elif command == "reusable_variable": - name, initializer_str, reinitializer_str = command_args - process_reusable_variable(name, initializer_str, reinitializer_str) - elif command == "function_to_run": - serialized_function = command_args - process_function_to_run(serialized_function) - else: - _logger().info("Reached the end of the if-else loop in the main loop. This should be unreachable.") - assert False, "This code should be unreachable." + worker.lock.acquire() + process_task(task) finally: - # Allow releasing the variables BEFORE we wait for the next message or exit the block - del command_args + worker.lock.release() -def _submit_task(func_name, args, worker=global_worker): +def _submit_task(function_id, func_name, args, worker=global_worker): """This is a wrapper around worker.submit_task. We use this wrapper so that in the remote decorator, we can call _submit_task @@ -1166,7 +1121,7 @@ def _submit_task(func_name, args, worker=global_worker): serialize remote functions, we don't attempt to serialize the worker object, which cannot be serialized. """ - return worker.submit_task(func_name, args) + return worker.submit_task(function_id, func_name, args) def _mode(worker=global_worker): """This is a wrapper around worker.mode. @@ -1178,15 +1133,6 @@ def _mode(worker=global_worker): """ return worker.mode -def _logger(): - """Return the logger object. - - We use this wrapper because so that functions which do logging can be pickled. - Normally a logger object is specific to a machine (it opens a local file), and - so cannot be pickled. - """ - return logger - def _reusables(): """Return the reusables object. @@ -1203,9 +1149,32 @@ def _export_reusable_variable(name, reusable, worker=global_worker): reusable (Reusable): The reusable object containing code for initializing and reinitializing the variable. """ - if _mode(worker) not in [raylib.SCRIPT_MODE, raylib.SILENT_MODE]: + if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: raise Exception("_export_reusable_variable can only be called on a driver.") - raylib.export_reusable_variable(worker.handle, name, pickling.dumps(reusable.initializer), pickling.dumps(reusable.reinitializer)) + + reusable_variable_id = name + key = "ReusableVariables:{}".format(reusable_variable_id) + worker.redis_client.hmset(key, {"name": name, + "initializer": pickling.dumps(reusable.initializer), + "reinitializer": pickling.dumps(reusable.reinitializer)}) + worker.redis_client.rpush("Exports", key) + worker.driver_export_counter += 1 + +def export_remote_function(function_id, func_name, func, num_return_vals, worker=global_worker): + key = "RemoteFunction:{}".format(function_id.id()) + + worker.num_return_vals[function_id.id()] = num_return_vals + + + pickled_func = pickling.dumps(func) + worker.redis_client.hmset(key, {"function_id": function_id.id(), + "name": func_name, + "module": func.__module__, + "function": pickled_func, + "num_return_vals": num_return_vals, + "function_export_counter": worker.driver_export_counter}) + worker.redis_client.rpush("Exports", key) + worker.driver_export_counter += 1 def remote(*args, **kwargs): """This decorator is used to create remote functions. @@ -1215,8 +1184,14 @@ def remote(*args, **kwargs): should return. """ worker = global_worker - def make_remote_decorator(num_return_vals): + def make_remote_decorator(num_return_vals, func_id=None): def remote_decorator(func): + func_name = "{}.{}".format(func.__module__, func.__name__) + if func_id is None: + function_id = FunctionID((hashlib.sha256(func_name).digest())[:20]) + else: + function_id = func_id + def func_call(*args, **kwargs): """This gets run immediately when a worker calls a remote function.""" check_connected() @@ -1224,8 +1199,8 @@ def func_call(*args, **kwargs): args.extend([kwargs[keyword] if kwargs.has_key(keyword) else default for keyword, default in keyword_defaults[len(args):]]) # fill in the remaining arguments if any([arg is funcsigs._empty for arg in args]): raise Exception("Not enough arguments were provided to {}.".format(func_name)) - if _mode() == raylib.PYTHON_MODE: - # In raylib.PYTHON_MODE, remote calls simply execute the function. We copy the + if _mode() == PYTHON_MODE: + # In PYTHON_MODE, remote calls simply execute the function. We copy the # arguments to prevent the function call from mutating them and to match # the usual behavior of immutable remote objects. try: @@ -1235,18 +1210,16 @@ def func_call(*args, **kwargs): _reusables()._reinitialize() _reusables()._running_remote_function_locally = False return result - objectids = _submit_task(func_name, args) + objectids = _submit_task(function_id, func_name, args) if len(objectids) == 1: return objectids[0] elif len(objectids) > 1: return objectids def func_executor(arguments): """This gets run when the remote function is executed.""" - _logger().info("Calling function {}".format(func.__name__)) start_time = time.time() result = func(*arguments) end_time = time.time() - _logger().info("Finished executing function {}, it took {} seconds".format(func.__name__, end_time - start_time)) return result def func_invoker(*args, **kwargs): """This is returned by the decorator and used to invoke the function.""" @@ -1266,7 +1239,7 @@ def func_invoker(*args, **kwargs): check_signature_supported(has_kwargs_param, has_vararg_param, keyword_defaults, func_name) # Everything ready - export the function - if worker.mode in [None, raylib.SCRIPT_MODE, raylib.SILENT_MODE]: + if worker.mode in [None, SCRIPT_MODE, SILENT_MODE]: func_name_global_valid = func.__name__ in func.__globals__ func_name_global_value = func.__globals__.get(func.__name__) # Set the function globally to make it refer to itself @@ -1277,14 +1250,20 @@ def func_invoker(*args, **kwargs): # Undo our changes if func_name_global_valid: func.__globals__[func.__name__] = func_name_global_value else: del func.__globals__[func.__name__] - if worker.mode in [raylib.SCRIPT_MODE, raylib.SILENT_MODE]: - raylib.export_remote_function(worker.handle, func_name, to_export) + if worker.mode in [SCRIPT_MODE, SILENT_MODE]: + export_remote_function(function_id, func_name, func, num_return_vals) elif worker.mode is None: - worker.cached_remote_functions.append((func_name, to_export)) + worker.cached_remote_functions.append((function_id, func_name, func, num_return_vals)) return func_invoker return remote_decorator + if _mode() == WORKER_MODE: + if kwargs.has_key("function_id"): + num_return_vals = kwargs["num_return_vals"] + function_id = kwargs["function_id"] + return make_remote_decorator(num_return_vals, function_id) + if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # This is the case where the decorator is just @ray.remote. num_return_vals = 1 @@ -1293,8 +1272,9 @@ def func_invoker(*args, **kwargs): else: # This is the case where the decorator is something like # @ray.remote(num_return_vals=2). - assert len(args) == 0 and "num_return_vals" in kwargs.keys(), "The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'." + assert len(args) == 0 and kwargs.has_key("num_return_vals"), "The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'." num_return_vals = kwargs["num_return_vals"] + assert not kwargs.has_key("function_id") return make_remote_decorator(num_return_vals) def check_signature_supported(has_kwargs_param, has_vararg_param, keyword_defaults, name): @@ -1346,18 +1326,16 @@ def get_arguments_for_execution(function, serialized_args, worker=global_worker) """ arguments = [] for (i, arg) in enumerate(serialized_args): - if isinstance(arg, raylib.ObjectID): + if isinstance(arg, photon.ObjectID): # get the object from the local object store - _logger().info("Getting argument {} for function {}.".format(i, function.__name__)) argument = worker.get_object(arg) if isinstance(argument, RayTaskError): # If the result is a RayTaskError, then the task that created this # object failed, and we should propagate the error message here. raise RayGetArgumentError(function.__name__, i, arg, argument) - _logger().info("Successfully retrieved argument {} for function {}.".format(i, function.__name__)) else: # pass the argument by value - argument = serialization.deserialize_argument(arg) + argument = arg arguments.append(argument) return arguments @@ -1374,7 +1352,7 @@ def store_outputs_in_objstore(objectids, outputs, worker=global_worker): The arguments objectids and outputs should have the same length. Args: - objectids (List[raylib.ObjectID]): The object IDs that were assigned to the + objectids (List[ObjectID]): The object IDs that were assigned to the outputs of the remote function call. outputs (Tuple): The value returned by the remote function. If the remote function was supposed to only return one value, then its output was @@ -1382,7 +1360,7 @@ def store_outputs_in_objstore(objectids, outputs, worker=global_worker): function. """ for i in range(len(objectids)): - if isinstance(outputs[i], raylib.ObjectID): + if isinstance(outputs[i], photon.ObjectID): raise Exception("This remote function returned an ObjectID as its {}th return value. This is not allowed.".format(i)) for i in range(len(objectids)): worker.put_object(objectids[i], outputs[i]) diff --git a/scripts/default_worker.py b/scripts/default_worker.py deleted file mode 100644 index 54f31d7774ca..000000000000 --- a/scripts/default_worker.py +++ /dev/null @@ -1,16 +0,0 @@ -import sys -import argparse -import numpy as np - -import ray - -parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") -parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node") -parser.add_argument("--scheduler-address", required=True, type=str, help="the scheduler's address") -parser.add_argument("--objstore-address", type=str, help="the objstore's address") - -if __name__ == "__main__": - args = parser.parse_args() - ray.worker.connect(args.node_ip_address, args.scheduler_address) - - ray.worker.main_loop() diff --git a/setup-env.sh b/setup-env.sh deleted file mode 100644 index f86348d2e2b4..000000000000 --- a/setup-env.sh +++ /dev/null @@ -1,19 +0,0 @@ -# NO shebang! Force the user to run this using the 'source' command without spawning a new shell; otherwise, variable exports won't persist. - -echo "Adding Ray to PYTHONPATH" 1>&2 - -ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) - -export PYTHONPATH="$ROOT_DIR/lib/python/:$ROOT_DIR/thirdparty/numbuf/build:$PYTHONPATH" - -# Print instructions for adding Ray to your bashrc. -unamestr="$(uname)" -if [[ "$unamestr" == "Linux" ]]; then - BASH_RC="~/.bashrc" -elif [[ "$unamestr" == "Darwin" ]]; then - BASH_RC="~/.bash_profile" -fi -echo "To permanently add Ray to your Python path, run, - -echo 'export PYTHONPATH=$ROOT_DIR/lib/python/:$ROOT_DIR/thirdparty/numbuf/build:\$PYTHONPATH' >> $BASH_RC -" diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 8f2d6c1b5b22..e0af933227b3 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -169,7 +169,7 @@ void handle_worker_available(scheduler_info *info, /* Add client_sock to a list of available workers. This struct will be freed * when a task is assigned to this worker. */ utarray_push_back(state->available_workers, &worker_index); - LOG_INFO("Adding worker_index %d to available workers.\n", worker_index); + LOG_DEBUG("Adding worker_index %d to available workers.\n", worker_index); } } diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 1b3474e62514..7fbf4fede5eb 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -148,7 +148,7 @@ void new_client_connection(event_loop *loop, int listener_sock, void *context, local_scheduler_state *s = context; int new_socket = accept_client(listener_sock); event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, s); - LOG_INFO("new connection with fd %d", new_socket); + LOG_DEBUG("new connection with fd %d", new_socket); /* Add worker to list of workers. */ /* TODO(pcm): Where shall we free this? */ worker_index *new_worker_index = malloc(sizeof(worker_index)); diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/lib/python/plasma.py index af197df74b7c..a869d136a5a7 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/lib/python/plasma.py @@ -40,8 +40,12 @@ def __init__(self, buff, plasma_id, plasma_client): self.plasma_client = plasma_client def __del__(self): - """Notify Plasma that the object is no longer needed.""" - self.plasma_client.client.plasma_release(self.plasma_client.plasma_conn, self.plasma_id) + """Notify Plasma that the object is no longer needed. + + If the plasma client has been shut down, then don't do anything. + """ + if self.plasma_client.alive: + self.plasma_client.client.plasma_release(self.plasma_client.plasma_conn, self.plasma_id) def __getitem__(self, index): """Read from the PlasmaBuffer as if it were just a regular buffer.""" @@ -73,7 +77,7 @@ def __init__(self, store_socket_name, manager_socket_name=None): store_socket_name (str): Name of the socket the plasma store is listening at. manager_socket_name (str): Name of the socket the plasma manager is listening at. """ - + self.alive = True plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so") self.client = ctypes.cdll.LoadLibrary(plasma_client_library) @@ -85,6 +89,7 @@ def __init__(self, store_socket_name, manager_socket_name=None): self.client.plasma_seal.restype = None self.client.plasma_delete.restype = None self.client.plasma_subscribe.restype = ctypes.c_int + self.client.plasma_wait.restype = ctypes.c_int self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory self.buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] @@ -101,6 +106,15 @@ def __init__(self, store_socket_name, manager_socket_name=None): self.has_manager_conn = False self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, None)) + def shutdown(self): + """Shutdown the client so that it does not send messages. + + If we kill the Plasma store and Plasma manager that this client is connected + to, then we can use this method to prevent the client from trying to send + messages to the killed processes. + """ + self.alive = False + def create(self, object_id, size, metadata=None): """Create a new buffer in the PlasmaStore for a particular object ID. @@ -233,6 +247,12 @@ def wait(self, object_ids, timeout, num_returns): """ if not self.has_manager_conn: raise Exception("Not connected to the plasma manager socket") + if num_returns < 0: + raise Exception("The argument num_returns cannot be less than one.") + if num_returns > len(object_ids): + raise Exception("The argument num_returns cannot be greater than len(object_ids): num_returns is {}, len(object_ids) is {}.".format(num_returns, len(object_ids))) + if timeout > 2 ** 36: + raise Exception("The method wait currently cannot be used with a timeout greater than 2 ** 36.") object_id_array = (len(object_ids) * PlasmaID)() for i, object_id in enumerate(object_ids): object_id_array[i] = make_plasma_id(object_id) @@ -240,7 +260,9 @@ def wait(self, object_ids, timeout, num_returns): num_return_objects = self.client.plasma_wait(self.plasma_conn, object_id_array._length_, object_id_array, - timeout, num_returns, return_id_array) + ctypes.c_int64(timeout), + num_returns, + return_id_array) ready_ids = map(plasma_id_to_str, return_id_array[num_returns-num_return_objects:]) return ready_ids, list(set(object_ids) - set(ready_ids)) diff --git a/test/array_test.py b/test/array_test.py index 5d00cbe61e81..c09f804b3bf0 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -58,7 +58,7 @@ def testAssemble(self): def testMethods(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(start_ray_local=True, num_objstores=2, num_workers=10) + ray.init(start_ray_local=True, num_workers=10) x = da.zeros.remote([9, 25, 51], "float") assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51])) diff --git a/test/failure_test.py b/test/failure_test.py index 78e6c6227749..2e251767d042 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -4,16 +4,24 @@ import test_functions +def wait_for_errors(error_type, num_errors, timeout=10): + start_time = time.time() + while time.time() - start_time < timeout: + error_info = ray.error_info() + if len(error_info[error_type]) >= num_errors: + return + time.sleep(0.1) + print("Timing out of wait.") + class FailureTest(unittest.TestCase): def testUnknownSerialization(self): reload(test_functions) ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) test_functions.test_unknown_type.remote() - time.sleep(0.2) - task_info = ray.task_info() - self.assertEqual(len(task_info["failed_tasks"]), 1) - self.assertEqual(len(task_info["running_tasks"]), 0) + wait_for_errors("TaskError", 1) + error_info = ray.error_info() + self.assertEqual(len(error_info["TaskError"]), 1) ray.worker.cleanup() @@ -45,19 +53,11 @@ def testFailedTask(self): test_functions.throw_exception_fct1.remote() test_functions.throw_exception_fct1.remote() - for _ in range(100): # Retry if we need to wait longer. - if len(ray.task_info()["failed_tasks"]) >= 2: - break - time.sleep(0.1) - result = ray.task_info() - self.assertEqual(len(result["failed_tasks"]), 2) - task_ids = set() - for task in result["failed_tasks"]: - self.assertTrue(task.has_key("worker_address")) - self.assertTrue(task.has_key("operationid")) - self.assertTrue("Test function 1 intentionally failed." in task.get("error_message")) - self.assertTrue(task["operationid"] not in task_ids) - task_ids.add(task["operationid"]) + wait_for_errors("TaskError", 2) + result = ray.error_info() + self.assertEqual(len(result["TaskError"]), 2) + for task in result["TaskError"]: + self.assertTrue("Test function 1 intentionally failed." in task.get("message")) x = test_functions.throw_exception_fct2.remote() try: @@ -96,11 +96,8 @@ def __reduce__(self): def __call__(self): return ray.remote(Foo()) - for _ in range(100): # Retry if we need to wait longer. - if len(ray.task_info()["failed_remote_function_imports"]) >= 1: - break - time.sleep(0.1) - self.assertTrue("There is a problem here." in ray.task_info()["failed_remote_function_imports"][0]["error_message"]) + wait_for_errors("RemoteFunctionImportError", 1) + self.assertTrue("There is a problem here." in ray.error_info()["RemoteFunctionImportError"][0]["message"]) ray.worker.cleanup() @@ -114,12 +111,9 @@ def initializer(): raise Exception("The initializer failed.") return 0 ray.reusables.foo = ray.Reusable(initializer) - for _ in range(100): # Retry if we need to wait longer. - if len(ray.task_info()["failed_reusable_variable_imports"]) >= 1: - break - time.sleep(0.1) + wait_for_errors("ReusableVariableImportError", 1) # Check that the error message is in the task info. - self.assertTrue("The initializer failed." in ray.task_info()["failed_reusable_variable_imports"][0]["error_message"]) + self.assertTrue("The initializer failed." in ray.error_info()["ReusableVariableImportError"][0]["message"]) ray.worker.cleanup() @@ -135,12 +129,9 @@ def reinitializer(foo): def use_foo(): ray.reusables.foo use_foo.remote() - for _ in range(100): # Retry if we need to wait longer. - if len(ray.task_info()["failed_reinitialize_reusable_variables"]) >= 1: - break - time.sleep(0.1) + wait_for_errors("ReusableVariableReinitializeError", 1) # Check that the error message is in the task info. - self.assertTrue("The reinitializer failed." in ray.task_info()["failed_reinitialize_reusable_variables"][0]["error_message"]) + self.assertTrue("The reinitializer failed." in ray.error_info()["ReusableVariableReinitializeError"][0]["message"]) ray.worker.cleanup() @@ -151,14 +142,11 @@ def f(worker): if ray.worker.global_worker.mode == ray.WORKER_MODE: raise Exception("Function to run failed.") ray.worker.global_worker.run_function_on_all_workers(f) - for _ in range(100): # Retry if we need to wait longer. - if len(ray.task_info()["failed_function_to_runs"]) >= 2: - break - time.sleep(0.1) + wait_for_errors("FunctionToRunError", 2) # Check that the error message is in the task info. - self.assertEqual(len(ray.task_info()["failed_function_to_runs"]), 2) - self.assertTrue("Function to run failed." in ray.task_info()["failed_function_to_runs"][0]["error_message"]) - self.assertTrue("Function to run failed." in ray.task_info()["failed_function_to_runs"][1]["error_message"]) + self.assertEqual(len(ray.error_info()["FunctionToRunError"]), 2) + self.assertTrue("Function to run failed." in ray.error_info()["FunctionToRunError"][0]["message"]) + self.assertTrue("Function to run failed." in ray.error_info()["FunctionToRunError"][1]["message"]) ray.worker.cleanup() diff --git a/test/runtest.py b/test/runtest.py index a02405a02433..445a398fa973 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1,3 +1,5 @@ +from __future__ import print_function + import unittest import ray import numpy as np @@ -142,64 +144,6 @@ class ClassA(object): ray.worker.cleanup() -class ObjStoreTest(unittest.TestCase): - - # Test setting up object stores, transfering data between them and retrieving data to a client - def testObjStore(self): - node_ip_address = "127.0.0.1" - scheduler_address = ray.services.start_ray_local(num_objstores=2, num_workers=0, worker_path=None) - ray.connect(node_ip_address, scheduler_address, mode=ray.SCRIPT_MODE) - objstore_addresses = [objstore_info["address"] for objstore_info in ray.scheduler_info()["objstores"]] - w1 = ray.worker.Worker() - w2 = ray.worker.Worker() - ray.reusables._cached_reusables = [] # This is a hack to make the test run. - ray.connect(node_ip_address, scheduler_address, objstore_address=objstore_addresses[0], mode=ray.SCRIPT_MODE, worker=w1) - ray.reusables._cached_reusables = [] # This is a hack to make the test run. - ray.connect(node_ip_address, scheduler_address, objstore_address=objstore_addresses[1], mode=ray.SCRIPT_MODE, worker=w2) - - for cls in [Foo, Bar, Baz, Qux, SubQux, Exception, CustomError, Point, NamedTupleExample]: - ray.register_class(cls) - - # putting and getting an object shouldn't change it - for data in RAY_TEST_OBJECTS: - objectid = ray.put(data, w1) - result = ray.get(objectid, w1) - assert_equal(result, data) - - # putting an object, shipping it to another worker, and getting it shouldn't change it - for data in RAY_TEST_OBJECTS: - objectid = ray.put(data, w1) - result = ray.get(objectid, w2) - assert_equal(result, data) - - # putting an object, shipping it to another worker, and getting it shouldn't change it - for data in RAY_TEST_OBJECTS: - objectid = ray.put(data, w2) - result = ray.get(objectid, w1) - assert_equal(result, data) - - # This test fails. See https://github.com/ray-project/ray/issues/159. - # getting multiple times shouldn't matter - # for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]: - # objectid = worker.put(data, w1) - # result = worker.get(objectid, w2) - # result = worker.get(objectid, w2) - # result = worker.get(objectid, w2) - # assert_equal(result, data) - - # Getting a buffer after modifying it before it finishes should return updated buffer - objectid = ray.libraylib.get_objectid(w1.handle) - buf = ray.libraylib.allocate_buffer(w1.handle, objectid, 100) - buf[0][0] = 1 - ray.libraylib.finish_buffer(w1.handle, objectid, buf[1], 0) - completedbuffer = ray.libraylib.get_buffer(w1.handle, objectid) - self.assertEqual(completedbuffer[0][0], 1) - - # We started multiple drivers manually, so we will disconnect them manually. - ray.disconnect(worker=w1) - ray.disconnect(worker=w2) - ray.worker.cleanup() - class WorkerTest(unittest.TestCase): def testPutGet(self): @@ -233,29 +177,6 @@ def testPutGet(self): class APITest(unittest.TestCase): - def testPassingArgumentsByValue(self): - ray.init(start_ray_local=True, num_workers=0) - - # The types that can be passed by value are defined by - # is_argument_serializable in serialization.py. - class Foo(object): - pass - CAN_PASS_BY_VALUE = [1, 1L, 1.0, True, False, None, [1L, 1.0, True, None], - ([1, 2, 3], {False: [1.0, u"hi", ()]}), 100 * ["a"]] - CANNOT_PASS_BY_VALUE = [int, np.int64(0), np.float64(0), Foo(), [Foo()], - (Foo()), {0: Foo()}, [[[int]]], 101 * [1], - np.zeros(10)] - - for obj in CAN_PASS_BY_VALUE: - self.assertTrue(ray.serialization.is_argument_serializable(obj)) - self.assertEqual(obj, ray.serialization.deserialize_argument(ray.serialization.serialize_argument_if_possible(obj))) - - for obj in CANNOT_PASS_BY_VALUE: - self.assertFalse(ray.serialization.is_argument_serializable(obj)) - self.assertEqual(None, ray.serialization.serialize_argument_if_possible(obj)) - - ray.worker.cleanup() - def testRegisterClass(self): ray.init(start_ray_local=True, num_workers=0) @@ -328,11 +249,7 @@ def testNoArgs(self): reload(test_functions) ray.init(start_ray_local=True, num_workers=1) - test_functions.no_op.remote() - time.sleep(0.2) - task_info = ray.task_info() - self.assertEqual(len(task_info["failed_tasks"]), 0) - self.assertEqual(len(task_info["running_tasks"]), 0) + ray.get(test_functions.no_op.remote()) ray.worker.cleanup() @@ -400,22 +317,22 @@ def f(delay): objectids = [f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5)] ready_ids, remaining_ids = ray.wait(objectids) - self.assertTrue(len(ready_ids) == 1) - self.assertTrue(len(remaining_ids) == 3) + self.assertEqual(len(ready_ids), 1) + self.assertEqual(len(remaining_ids), 3) ready_ids, remaining_ids = ray.wait(objectids, num_returns=4) - self.assertEqual(ready_ids, objectids) + self.assertEqual(set(ready_ids), set([object_id.id() for object_id in objectids])) self.assertEqual(remaining_ids, []) objectids = [f.remote(0.5), f.remote(0.5), f.remote(0.5), f.remote(0.5)] start_time = time.time() - ready_ids, remaining_ids = ray.wait(objectids, timeout=1.75, num_returns=4) - self.assertTrue(time.time() - start_time < 2) + ready_ids, remaining_ids = ray.wait(objectids, timeout=1750, num_returns=4) + self.assertLess(time.time() - start_time, 2) self.assertEqual(len(ready_ids), 3) self.assertEqual(len(remaining_ids), 1) ray.wait(objectids) objectids = [f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5)] start_time = time.time() - ready_ids, remaining_ids = ray.wait(objectids, timeout=5) + ready_ids, remaining_ids = ray.wait(objectids, timeout=5000) self.assertTrue(time.time() - start_time < 5) self.assertEqual(len(ready_ids), 1) self.assertEqual(len(remaining_ids), 3) @@ -504,150 +421,6 @@ def f(worker): ray.worker.cleanup() - def testComputationGraph(self): - ray.init(start_ray_local=True, num_workers=1) - - @ray.remote - def f(x): - return x - @ray.remote - def g(x, y): - return x, y - a = f.remote(1) - b = f.remote(1) - c = g.remote(a, b) - c = g.remote(a, 1) - # Make sure that we can produce a computation_graph visualization. - ray.visualize_computation_graph(view=False) - - ray.worker.cleanup() - -class ReferenceCountingTest(unittest.TestCase): - - def testDeallocation(self): - reload(test_functions) - for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: - reload(module) - ray.init(start_ray_local=True, num_workers=1) - - def check_not_deallocated(object_ids): - reference_counts = ray.scheduler_info()["reference_counts"] - for object_id in object_ids: - self.assertGreater(reference_counts[object_id.id], 0) - - def check_everything_deallocated(): - reference_counts = ray.scheduler_info()["reference_counts"] - self.assertEqual(reference_counts, len(reference_counts) * [-1]) - - z = da.zeros.remote([da.BLOCK_SIZE, 2 * da.BLOCK_SIZE]) - time.sleep(0.1) - objectid_val = z.id - time.sleep(0.1) - check_not_deallocated([z]) - del z - time.sleep(0.1) - check_everything_deallocated() - - x = ra.zeros.remote([10, 10]) - y = ra.zeros.remote([10, 10]) - z = ra.dot.remote(x, y) - objectid_val = x.id - time.sleep(0.1) - check_not_deallocated([x, y, z]) - del x - time.sleep(0.1) - check_not_deallocated([y, z]) - del y - time.sleep(0.1) - check_not_deallocated([z]) - del z - time.sleep(0.1) - check_everything_deallocated() - - z = da.zeros.remote([4 * da.BLOCK_SIZE]) - time.sleep(0.1) - check_not_deallocated(ray.get(z).objectids.tolist()) - del z - time.sleep(0.1) - check_everything_deallocated() - - ray.worker.cleanup() - - def testGet(self): - ray.init(start_ray_local=True, num_workers=3) - - for cls in [Foo, Bar, Baz, Qux, SubQux, Exception, CustomError, Point, NamedTupleExample]: - ray.register_class(cls) - - # Remote objects should be deallocated when the corresponding ObjectID goes - # out of scope, and all results of ray.get called on the ID go out of scope. - for val in RAY_TEST_OBJECTS: - x = ray.put(val) - objectid = x.id - xval = ray.get(x) - del x, xval - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], -1) - - # Remote objects that do not contain numpy arrays should be deallocated when - # the corresponding ObjectID goes out of scope, even if ray.get has been - # called on the ObjectID. - for val in [True, False, None, 1, 1.0, 1L, "hi", u"hi", [1, 2, 3], (1, 2, 3), [(), {(): ()}]]: - x = ray.put(val) - objectid = x.id - xval = ray.get(x) - del x - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], -1) - - # Remote objects that contain numpy arrays should not be deallocated when - # the corresponding ObjectID goes out of scope, if ray.get has been called - # on the ObjectID and the result of that call is still in scope. - for val in [np.zeros(10), [np.zeros(10)], (((np.zeros(10)),),), {(): np.zeros(10)}, [1, 2, 3, np.zeros(1)]]: - x = ray.put(val) - objectid = x.id - xval = ray.get(x) - del x - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], 1) - - # Getting an object multiple times should not be a problem. And the remote - # object should not be deallocated until both of the results are out of scope. - for val in [np.zeros(10), [np.zeros(10)], (((np.zeros(10)),),), {(): np.zeros(10)}, [1, 2, 3, np.zeros(1)]]: - x = ray.put(val) - objectid = x.id - xval1 = ray.get(x) - xval2 = ray.get(x) - del xval1 - # Make sure we can still access xval2. - xval2 - del xval2 - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], 1) - xval3 = ray.get(x) - xval4 = ray.get(x) - xval5 = ray.get(x) - del x - del xval4, xval5 - # Make sure we can still access xval3. - xval3 - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], 1) - del xval3 - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], -1) - - # Getting an object multiple times and assigning it to the same name should - # work. This was a problem in https://github.com/ray-project/ray/issues/159. - for val in [np.zeros(10), [np.zeros(10)], (((np.zeros(10)),),), {(): np.zeros(10)}, [1, 2, 3, np.zeros(1)]]: - x = ray.put(val) - objectid = x.id - xval = ray.get(x) - xval = ray.get(x) - xval = ray.get(x) - xval = ray.get(x) - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], 1) - del x - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], 1) - del xval - self.assertEqual(ray.scheduler_info()["reference_counts"][objectid], -1) - - ray.worker.cleanup() - class PythonModeTest(unittest.TestCase): def testPythonMode(self): @@ -712,18 +485,18 @@ def use_l(): class PythonCExtensionTest(unittest.TestCase): - def testReferenceCountNone(self): - ray.init(start_ray_local=True, num_workers=1) - - # Make sure that we aren't accidentally messing up Python's reference counts. - @ray.remote - def f(): - return sys.getrefcount(None) - first_count = ray.get(f.remote()) - second_count = ray.get(f.remote()) - self.assertEqual(first_count, second_count) - - ray.worker.cleanup() + # def testReferenceCountNone(self): + # ray.init(start_ray_local=True, num_workers=1) + # + # # Make sure that we aren't accidentally messing up Python's reference counts. + # @ray.remote + # def f(): + # return sys.getrefcount(None) + # first_count = ray.get(f.remote()) + # second_count = ray.get(f.remote()) + # self.assertEqual(first_count, second_count) + # + # ray.worker.cleanup() def testReferenceCountTrue(self): ray.init(start_ray_local=True, num_workers=1) @@ -867,43 +640,5 @@ def use_foo(): ray.worker.cleanup() -class ClusterAttachingTest(unittest.TestCase): - - def testAttachingToCluster(self): - node_ip_address = "127.0.0.1" - scheduler_port = np.random.randint(40000, 50000) - scheduler_address = "{}:{}".format(node_ip_address, scheduler_port) - ray.services.start_scheduler(scheduler_address, cleanup=True) - time.sleep(0.1) - ray.services.start_node(scheduler_address, node_ip_address, num_workers=1, cleanup=True) - - ray.init(node_ip_address=node_ip_address, scheduler_address=scheduler_address) - - @ray.remote - def f(x): - return x + 1 - self.assertEqual(ray.get(f.remote(0)), 1) - - ray.worker.cleanup() - - def testAttachingToClusterWithMultipleObjectStores(self): - node_ip_address = "127.0.0.1" - scheduler_port = np.random.randint(40000, 50000) - scheduler_address = "{}:{}".format(node_ip_address, scheduler_port) - ray.services.start_scheduler(scheduler_address, cleanup=True) - time.sleep(0.1) - ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) - ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) - ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) - - ray.init(node_ip_address=node_ip_address, scheduler_address=scheduler_address) - - @ray.remote - def f(x): - return x + 1 - self.assertEqual(ray.get(f.remote(0)), 1) - - ray.worker.cleanup() - if __name__ == "__main__": unittest.main(verbosity=2)