diff --git a/BUILD.bazel b/BUILD.bazel index 1d5861afeff7..a9f389fe7c4e 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1783,11 +1783,6 @@ filegroup( "python/ray/experimental/*.py", "python/ray/util/*.py", "python/ray/internal/*.py", - "python/ray/projects/*.py", - "python/ray/projects/schema.json", - "python/ray/projects/templates/cluster_template.yaml", - "python/ray/projects/templates/project_template.yaml", - "python/ray/projects/templates/requirements.txt", "python/ray/workers/default_worker.py", ]), ) diff --git a/ci/long_running_distributed_tests/workloads/pytorch_pbt_failure.py b/ci/long_running_distributed_tests/workloads/pytorch_pbt_failure.py index 94eafa2de9c3..6ab4ea61cacd 100644 --- a/ci/long_running_distributed_tests/workloads/pytorch_pbt_failure.py +++ b/ci/long_running_distributed_tests/workloads/pytorch_pbt_failure.py @@ -93,7 +93,7 @@ def optimizer_creator(model, config): momentum=config.get("momentum", 0.9)) -ray.init(address="auto" if not args.smoke_test else None, log_to_driver=True) +ray.init(address="auto" if not args.smoke_test else None, _log_to_driver=True) num_training_workers = 1 if args.smoke_test else 3 executor = FailureInjectorExecutor(queue_trials=True) diff --git a/ci/long_running_tests/workloads/serve.py b/ci/long_running_tests/workloads/serve.py index 0f401c83621a..f3db5f7ffbde 100644 --- a/ci/long_running_tests/workloads/serve.py +++ b/ci/long_running_tests/workloads/serve.py @@ -38,7 +38,8 @@ @serve.accept_batch def echo(_): time.sleep(0.01) # Sleep for 10ms - ray.show_in_webui(str(serve.context.batch_size), key="Current batch size") + ray.show_in_dashboard( + str(serve.context.batch_size), key="Current batch size") return ["hi {}".format(i) for i in range(serve.context.batch_size)] diff --git a/ci/long_running_tests/workloads/serve_failure.py b/ci/long_running_tests/workloads/serve_failure.py index 406d35e55cfe..04577b6dc0a0 100644 --- a/ci/long_running_tests/workloads/serve_failure.py +++ b/ci/long_running_tests/workloads/serve_failure.py @@ -26,7 +26,7 @@ dashboard_host="0.0.0.0") ray.init( - address=cluster.address, dashboard_host="0.0.0.0", log_to_driver=False) + address=cluster.address, dashboard_host="0.0.0.0", _log_to_driver=False) serve.init() diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index e9a95289bc46..8ad938525c7c 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -153,7 +153,6 @@ test_python() { -python/ray/tests:test_multiprocessing # test_connect_to_ray() fails to connect to raylet -python/ray/tests:test_node_manager -python/ray/tests:test_object_manager - -python/ray/tests:test_projects -python/ray/tests:test_ray_init # test_redis_port() seems to fail here, but pass in isolation -python/ray/tests:test_resource_demand_scheduler -python/ray/tests:test_stress # timeout @@ -279,12 +278,12 @@ build_wheels() { # caused timeouts in the past. See the "cache: false" line below. local MOUNT_BAZEL_CACHE=( -v "${HOME}/ray-bazel-cache":/root/ray-bazel-cache - -e TRAVIS=true - -e TRAVIS_PULL_REQUEST="${TRAVIS_PULL_REQUEST:-false}" - -e encrypted_1c30b31fe1ee_key="${encrypted_1c30b31fe1ee_key-}" - -e encrypted_1c30b31fe1ee_iv="${encrypted_1c30b31fe1ee_iv-}" - -e TRAVIS_COMMIT="${TRAVIS_COMMIT}" - -e CI="${CI}" + -e "TRAVIS=true" + -e "TRAVIS_PULL_REQUEST=${TRAVIS_PULL_REQUEST:-false}" + -e "encrypted_1c30b31fe1ee_key=${encrypted_1c30b31fe1ee_key-}" + -e "encrypted_1c30b31fe1ee_iv=${encrypted_1c30b31fe1ee_iv-}" + -e "TRAVIS_COMMIT=${TRAVIS_COMMIT}" + -e "CI=${CI}" ) # This command should be kept in sync with ray/python/README-building-wheels.md, diff --git a/doc/source/actors.rst b/doc/source/actors.rst index 09cd6fdf41f3..26d43fd2453a 100644 --- a/doc/source/actors.rst +++ b/doc/source/actors.rst @@ -72,7 +72,7 @@ Resources with Actors You can specify that an actor requires CPUs or GPUs in the decorator. While Ray has built-in support for CPUs and GPUs, Ray can also handle custom resources. When using GPUs, Ray will automatically set the environment variable ``CUDA_VISIBLE_DEVICES`` for the actor after instantiated. The actor will have access to a list of the IDs of the GPUs -that it is allowed to use via ``ray.get_gpu_ids(as_str=True)``. This is a list of strings, +that it is allowed to use via ``ray.get_gpu_ids()``. This is a list of strings, like ``[]``, or ``['1']``, or ``['2', '5', '6']``. Under some circumstances, the IDs of GPUs could be given as UUID strings instead of indices (see the `CUDA programming guide `__). .. code-block:: python diff --git a/doc/source/configure.rst b/doc/source/configure.rst index 98c9f8e55f47..5b152b4e8535 100644 --- a/doc/source/configure.rst +++ b/doc/source/configure.rst @@ -159,7 +159,7 @@ To add authentication via the Python API, start Ray using: .. code-block:: python - ray.init(redis_password="password") + ray.init(_redis_password="password") To add authentication via the CLI or to connect to an existing Ray instance with password-protected Redis ports: @@ -182,48 +182,4 @@ to localhost when the ray is started using ``ray.init``. See the `Redis security documentation `__ for more information. - -Using the Object Store with Huge Pages --------------------------------------- - -Plasma is a high-performance shared memory object store originally developed in -Ray and now being developed in `Apache Arrow`_. See the `relevant -documentation`_. - -On Linux, it is possible to increase the write throughput of the Plasma object -store by using huge pages. You first need to create a file system and activate -huge pages as follows. - -.. code-block:: shell - - sudo mkdir -p /mnt/hugepages - gid=`id -g` - uid=`id -u` - sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages - sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group" - # This typically corresponds to 20000 2MB pages (about 40GB), but this - # depends on the platform. - sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages" - -**Note:** Once you create the huge pages, they will take up memory which will -never be freed unless you remove the huge pages. If you run into memory issues, -that may be the issue. - -You need root access to create the file system, but not for running the object -store. - -You can then start Ray with huge pages on a single machine as follows. - -.. code-block:: python - - ray.init(huge_pages=True, plasma_directory="/mnt/hugepages") - -In the cluster case, you can do it by passing ``--huge-pages`` and -``--plasma-directory=/mnt/hugepages`` into ``ray start`` on any machines where -huge pages should be enabled. - -See the relevant `Arrow documentation for huge pages`_. - .. _`Apache Arrow`: https://arrow.apache.org/ -.. _`relevant documentation`: https://arrow.apache.org/docs/python/plasma.html#the-plasma-in-memory-object-store -.. _`Arrow documentation for huge pages`: https://arrow.apache.org/docs/python/plasma.html#using-plasma-with-huge-pages diff --git a/doc/source/index.rst b/doc/source/index.rst index d2acec412e27..fb47bce94f02 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -206,7 +206,6 @@ Academic Papers joblib.rst iter.rst pandas_on_ray.rst - projects.rst .. toctree:: :hidden: diff --git a/doc/source/package-ref.rst b/doc/source/package-ref.rst index 2fa9e55f6f9b..23c827f8a841 100644 --- a/doc/source/package-ref.rst +++ b/doc/source/package-ref.rst @@ -74,12 +74,12 @@ ray.get_resource_ids .. autofunction:: ray.get_resource_ids -.. _ray-get_webui_url-ref: +.. _ray-get_dashboard_url-ref: -ray.get_webui_url -~~~~~~~~~~~~~~~~~ +ray.get_dashboard_url +~~~~~~~~~~~~~~~~~~~~~ -.. autofunction:: ray.get_webui_url +.. autofunction:: ray.get_dashboard_url .. _ray-shutdown-ref: @@ -88,21 +88,6 @@ ray.shutdown .. autofunction:: ray.shutdown - -.. _ray-register_custom_serializer-ref: - -ray.register_custom_serializer -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. autofunction:: ray.register_custom_serializer - -.. _ray-profile-ref: - -ray.profile -~~~~~~~~~~~ - -.. autofunction:: ray.profile - .. _ray-method-ref: ray.method @@ -123,13 +108,6 @@ ray.nodes .. autofunction:: ray.nodes -.. _ray-objects-ref: - -ray.objects -~~~~~~~~~~~ - -.. autofunction:: ray.objects - .. _ray-timeline-ref: ray.timeline @@ -137,13 +115,6 @@ ray.timeline .. autofunction:: ray.timeline -.. _ray-object_transfer_timeline-ref: - -ray.object_transfer_timeline -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. autofunction:: ray.object_transfer_timeline - .. _ray-cluster_resources-ref: ray.cluster_resources @@ -221,24 +192,12 @@ The Ray Command Line API :prog: ray stack :show-nested: -.. _ray-stat-doc: - -.. click:: ray.scripts.scripts:statistics - :prog: ray statistics - :show-nested: - .. _ray-memory-doc: .. click:: ray.scripts.scripts:memory :prog: ray memory :show-nested: -.. _ray-globalgc-doc: - -.. click:: ray.scripts.scripts:globalgc - :prog: ray globalgc - :show-nested: - .. _ray-timeline-doc: .. click:: ray.scripts.scripts:timeline diff --git a/doc/source/projects.rst b/doc/source/projects.rst deleted file mode 100644 index be8900452840..000000000000 --- a/doc/source/projects.rst +++ /dev/null @@ -1,189 +0,0 @@ -Ray Projects (Experimental) -=========================== - -Ray projects make it easy to package a Ray application so it can be -rerun later in the same environment. They allow for the sharing and -reliable reuse of existing code. - -Quick start (CLI) ------------------ - -.. code-block:: bash - - # Creates a project in the current directory. It will create a - # project.yaml defining the code and environment and a cluster.yaml - # describing the cluster configuration. Both will be created in the - # ray-project subdirectory of the current directory. - $ ray project create - - # Create a new session from the given project. Launch a cluster and run - # the command, which must be specified in the project.yaml file. If no - # command is specified, the "default" command in ray-project/project.yaml - # will be used. Alternatively, use --shell to run a raw shell command. - $ ray session start [arguments] [--shell] - - # Open a console for the given session. - $ ray session attach - - # Stop the given session and terminate all of its worker nodes. - $ ray session stop - -Examples --------- -See `the readme `__ -for instructions on how to run these examples: - -- `Open Tacotron `__: - A TensorFlow implementation of Google's Tacotron speech synthesis with pre-trained model (unofficial) -- `PyTorch Transformers `__: - A library of state-of-the-art pretrained models for Natural Language Processing (NLP) - -Tutorial --------- - -We will walk through how to use projects by executing the `streaming MapReduce example `_. -Commands always apply to the project in the current directory. -Let us switch into the project directory with - -.. code-block:: bash - - cd ray/doc/examples/streaming - - -A session represents a running instance of a project. Let's start one with - -.. code-block:: bash - - ray session start - - -The ``ray session start`` command -will bring up a new cluster and initialize the environment of the cluster -according to the `environment` section of the `project.yaml`, installing all -dependencies of the project. - -Now we can execute a command in the session. To see a list of all available -commands of the project, run - -.. code-block:: bash - - ray session commands - - -which produces the following output: - -.. code-block:: - - Active project: ray-example-streaming - - Command "run": - usage: run [--num-mappers NUM_MAPPERS] [--num-reducers NUM_REDUCERS] - - Start the streaming example. - - optional arguments: - --num-mappers NUM_MAPPERS - Number of mapper actors used - --num-reducers NUM_REDUCERS - Number of reducer actors used - - -As you see, in this project there is only a single ``run`` command which has arguments -``--num-mappers`` and ``--num-reducers``. We can execute the streaming -wordcount with the default parameters by running - -.. code-block:: bash - - ray session execute run - - -You can interrupt the command with ``-c`` and attach to the running session by executing - -.. code-block:: bash - - ray session attach --tmux - - -Inside the session you can for example edit the streaming applications with - -.. code-block:: bash - - cd ray-example-streaming - emacs streaming.py - - -Try for example to add the following lines after the ``for count in counts:`` loop: - -.. code-block:: python - - if "million" in wordcounts: - print("Found the word!") - - -and re-run the application from outside the session with - -.. code-block:: bash - - ray session execute run - - -The session can be terminated from outside the session with - -.. code-block:: bash - - ray session stop - - -Project file format (project.yaml) ----------------------------------- - -A project file contains everything required to run a project. -This includes a cluster configuration, the environment and dependencies -for the application, and the specific inputs used to run the project. - -Here is an example for a minimal project format: - -.. code-block:: yaml - - name: test-project - description: "This is a simple test project" - repo: https://github.com/ray-project/ray - - # Cluster to be instantiated by default when starting the project. - cluster: - config: ray-project/cluster.yaml - - # Commands/information to build the environment, once the cluster is - # instantiated. This can include the versions of python libraries etc. - # It can be specified as a Python requirements.txt, a conda environment, - # a Dockerfile, or a shell script to run to set up the libraries. - environment: - requirements: requirements.txt - - # List of commands that can be executed once the cluster is instantiated - # and the environment is set up. - # A command can also specify a cluster that overwrites the default cluster. - commands: - - name: default - command: python default.py - help: "The command that will be executed if no command name is specified" - - name: test - command: python test.py --param1={{param1}} --param2={{param2}} - help: "A test command" - params: - - name: "param1" - help: "The first parameter" - # The following line indicates possible values this parameter can take. - choices: ["1", "2"] - - name: "param2" - help: "The second parameter" - -Project files have to adhere to the following schema: - -.. jsonschema:: ../../python/ray/projects/schema.json - -Cluster file format (cluster.yaml) ----------------------------------- - -This is the same as for the autoscaler, see -:ref:`Cluster Launch page `. diff --git a/doc/source/serialization.rst b/doc/source/serialization.rst index 732e547976c1..e1cadf7cb60e 100644 --- a/doc/source/serialization.rst +++ b/doc/source/serialization.rst @@ -10,7 +10,7 @@ Since Ray processes do not share memory space, data transferred between workers Plasma Object Store ------------------- -Plasma is an in-memory object store that is being developed as part of `Apache Arrow`_. Ray uses Plasma to efficiently transfer objects across different processes and different nodes. All objects in Plasma object store are **immutable** and held in shared memory. This is so that they can be accessed efficiently by many workers on the same node. +Plasma is an in-memory object store that is being developed as part of Apache Arrow. Ray uses Plasma to efficiently transfer objects across different processes and different nodes. All objects in Plasma object store are **immutable** and held in shared memory. This is so that they can be accessed efficiently by many workers on the same node. Each node has its own object store. When data is put into the object store, it does not get automatically broadcasted to other nodes. Data remains local to the writer until requested by another task or actor on another node. @@ -64,49 +64,6 @@ Serialization notes - Lock objects are mostly unserializable, because copying a lock is meaningless and could cause serious concurrency problems. You may have to come up with a workaround if your object contains a lock. -Last resort: Custom Serialization -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -If none of these options work, you can try registering a custom serializer with ``ray.register_custom_serializer`` (:ref:`docstring `): - -.. code-block:: python - - import ray - - ray.init() - - class Foo(object): - def __init__(self, value): - self.value = value - - def custom_serializer(obj): - return obj.value - - def custom_deserializer(value): - object = Foo() - object.value = value - return object - - ray.register_custom_serializer( - Foo, serializer=custom_serializer, deserializer=custom_deserializer) - - object_ref = ray.put(Foo(100)) - assert ray.get(object_ref).value == 100 - - -If you find cases where Ray serialization doesn't work or does something unexpected, please `let us know`_ so we can fix it. - -.. _`let us know`: https://github.com/ray-project/ray/issues - -Advanced: Huge Pages -~~~~~~~~~~~~~~~~~~~~ - -On Linux, it is possible to increase the write throughput of the Plasma object store by using huge pages. See the `Configuration page `_ for information on how to use huge pages in Ray. - - -.. _`Apache Arrow`: https://arrow.apache.org/ - - Known Issues ------------ diff --git a/doc/source/tune/user-guide.rst b/doc/source/tune/user-guide.rst index d2c8ffb3ef77..ddbf58256644 100644 --- a/doc/source/tune/user-guide.rst +++ b/doc/source/tune/user-guide.rst @@ -472,8 +472,7 @@ decide between the two options. Redirecting stdout and stderr to files -------------------------------------- The stdout and stderr streams are usually printed to the console. For remote actors, -Ray collects these logs and prints them to the head process, as long as it -has been initialized with ``log_to_driver=True``, which is the default. +Ray collects these logs and prints them to the head process. However, if you would like to collect the stream outputs in files for later analysis or troubleshooting, Tune offers an utility parameter, ``log_to_file``, @@ -508,17 +507,6 @@ too. If ``log_to_file`` is set, Tune will automatically register a new logging handler for Ray's base logger and log the output to the specified stderr output file. -Setting ``log_to_file`` does not disable logging to the driver. If you would -like to disable the logs showing up in the driver output (i.e. they should only -show up in the logfiles), initialize Ray accordingly: - -.. code-block:: python - - ray.init(log_to_driver=False) - tune.run( - trainable, - log_to_file=True) - .. _tune-debugging: Debugging diff --git a/doc/source/using-ray-with-gpus.rst b/doc/source/using-ray-with-gpus.rst index 05c1b9ab9198..8e189c4ae696 100644 --- a/doc/source/using-ray-with-gpus.rst +++ b/doc/source/using-ray-with-gpus.rst @@ -33,7 +33,7 @@ remote decorator. print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids())) print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"])) -Inside of the remote function, a call to ``ray.get_gpu_ids(as_str=True)`` will return a +Inside of the remote function, a call to ``ray.get_gpu_ids()`` will return a list of strings indicating which GPUs the remote function is allowed to use. Typically, it is not necessary to call ``ray.get_gpu_ids()`` because Ray will automatically set the ``CUDA_VISIBLE_DEVICES`` environment variable. diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 6b0bdd1b25a3..251a7453672f 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -80,12 +80,11 @@ available_resources) # noqa: E402 from ray.worker import ( # noqa: F401 LOCAL_MODE, SCRIPT_MODE, WORKER_MODE, IO_WORKER_MODE, cancel, connect, - disconnect, get, get_actor, get_gpu_ids, get_resource_ids, get_webui_url, - init, is_initialized, put, kill, register_custom_serializer, remote, - shutdown, show_in_webui, wait, + disconnect, get, get_actor, get_gpu_ids, get_resource_ids, + get_dashboard_url, init, is_initialized, put, kill, remote, shutdown, + show_in_dashboard, wait, ) # noqa: E402 import ray.internal # noqa: E402 -import ray.projects # noqa: E402 # We import ray.actor because some code is run in actor.py which initializes # some functions in the worker. import ray.actor # noqa: F401 @@ -113,7 +112,7 @@ "get_actor", "get_gpu_ids", "get_resource_ids", - "get_webui_url", + "get_dashboard_url", "init", "internal", "is_initialized", @@ -127,12 +126,10 @@ "objects", "object_transfer_timeline", "profile", - "projects", "put", - "register_custom_serializer", "remote", "shutdown", - "show_in_webui", + "show_in_dashboard", "timeline", "util", "wait", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index d5f4c1bd1ecc..4043e2598322 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -360,7 +360,7 @@ cdef execute_task( CFiberEvent task_done_event # Automatically restrict the GPUs available to this task. - ray.utils.set_cuda_visible_devices(ray.get_gpu_ids(as_str=True)) + ray.utils.set_cuda_visible_devices(ray.get_gpu_ids()) function_descriptor = CFunctionDescriptorToPython( ray_function.GetFunctionDescriptor()) diff --git a/python/ray/actor.py b/python/ray/actor.py index 9ae43d77b47f..e0661d93a7ea 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -406,7 +406,6 @@ def _remote(self, memory=None, object_store_memory=None, resources=None, - is_direct_call=None, max_concurrency=None, max_restarts=None, max_task_retries=None, @@ -430,7 +429,6 @@ def _remote(self, this actor when creating objects. resources: The custom resources required by the actor creation task. - is_direct_call: Use direct actor calls. max_concurrency: The max number of concurrent calls to allow for this actor. This only works with direct actor calls. The max concurrency defaults to 1 for threaded execution, and 1000 for @@ -456,8 +454,6 @@ def _remote(self, args = [] if kwargs is None: kwargs = {} - if is_direct_call is not None and not is_direct_call: - raise ValueError("Non-direct call actors are no longer supported.") meta = self.__ray_metadata__ actor_has_async_methods = len( inspect.getmembers( diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index 21a2f40fde92..a8cdf949732e 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -53,7 +53,7 @@ def connect(self): output_info = ray.init( ignore_reinit_error=True, address=self.redis_address, - redis_password=self.redis_password) + _redis_password=self.redis_password) logger.info(output_info) self.connected = True diff --git a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx index 3fe6bdcff81b..ebfc50d9ede9 100644 --- a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx +++ b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx @@ -202,7 +202,7 @@ class Actor extends React.Component, State> { .sort() .map((key, _, __) => { // Construct the value from actor. - // Please refer to worker.py::show_in_webui for schema. + // Please refer to worker.py::show_in_dashboard for schema. const valueEncoded = actor.webuiDisplay![key]; const valueParsed = JSON.parse(valueEncoded); let valueRendered = valueParsed["message"]; diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index 4ef40a716b41..9ffab24e8560 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -1,10 +1,8 @@ -from .api import get, wait from .dynamic_resources import set_resource from .object_spilling import force_spill_objects, force_restore_spilled_objects from .placement_group import (placement_group, placement_group_table, remove_placement_group) __all__ = [ - "get", "wait", "set_resource", "force_spill_objects", - "force_restore_spilled_objects", "placement_group", - "placement_group_table", "remove_placement_group" + "set_resource", "force_spill_objects", "force_restore_spilled_objects", + "placement_group", "placement_group_table", "remove_placement_group" ] diff --git a/python/ray/experimental/api.py b/python/ray/experimental/api.py deleted file mode 100644 index 4fab64d03f3d..000000000000 --- a/python/ray/experimental/api.py +++ /dev/null @@ -1,59 +0,0 @@ -import ray -import numpy as np - - -def get(object_refs): - """Get a single or a collection of remote objects from the object store. - - This method is identical to `ray.get` except it adds support for tuples, - ndarrays and dictionaries. - - Args: - object_refs: Object ref of the object to get, a list, tuple, ndarray of - object refs to get or a dict of {key: object ref}. - - Returns: - A Python object, a list of Python objects or a dict of {key: object}. - """ - if isinstance(object_refs, (tuple, np.ndarray)): - return ray.get(list(object_refs)) - elif isinstance(object_refs, dict): - keys_to_get = [ - k for k, v in object_refs.items() if isinstance(v, ray.ObjectRef) - ] - ids_to_get = [ - v for k, v in object_refs.items() if isinstance(v, ray.ObjectRef) - ] - values = ray.get(ids_to_get) - - result = object_refs.copy() - for key, value in zip(keys_to_get, values): - result[key] = value - return result - else: - return ray.get(object_refs) - - -def wait(object_refs, num_returns=1, timeout=None): - """Return a list of IDs that are ready and a list of IDs that are not. - - This method is identical to `ray.wait` except it adds support for tuples - and ndarrays. - - Args: - object_refs (List[ObjectRef], Tuple(ObjectRef), np.array(ObjectRef)): - List like of object refs for objects that may or may not be ready. - Note that these IDs must be unique. - num_returns (int): The number of object refs that should be returned. - timeout (float): The maximum amount of time in seconds to wait before - returning. - - Returns: - A list of object refs that are ready and a list of the remaining object - IDs. - """ - if isinstance(object_refs, (tuple, np.ndarray)): - return ray.wait( - list(object_refs), num_returns=num_returns, timeout=timeout) - - return ray.wait(object_refs, num_returns=num_returns, timeout=timeout) diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index 67284a6a2c1d..29737fa2e539 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -18,10 +18,6 @@ class ImportThread: """A thread used to import exports from the driver or other workers. - Note: The driver also has an import thread, which is used only to import - custom class definitions from calls to _register_custom_serializer that - happen under the hood on workers. - Attributes: worker: the worker object in this process. mode: worker mode diff --git a/python/ray/projects/__init__.py b/python/ray/projects/__init__.py deleted file mode 100644 index 82d7e0766d58..000000000000 --- a/python/ray/projects/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from ray.projects.projects import ProjectDefinition - -__all__ = [ - "ProjectDefinition", -] diff --git a/python/ray/projects/examples/README.md b/python/ray/projects/examples/README.md deleted file mode 100644 index cee7ece16b9b..000000000000 --- a/python/ray/projects/examples/README.md +++ /dev/null @@ -1,41 +0,0 @@ -Ray Projects -============ - -To run these example projects, we first have to make sure the full -repository is checked out into the project directory. - -Open Tacotron -------------- - -```shell -cd open-tacotron -# Check out the original repository -git init -git remote add origin https://github.com/keithito/tacotron.git -git fetch -git checkout -t origin/master - -# Serve the model -ray session start serve - -# Terminate the session -ray session stop -``` - -PyTorch Transformers --------------------- - -```shell -cd python-transformers -# Check out the original repository -git init -git remote add origin https://github.com/huggingface/pytorch-transformers.git -git fetch -git checkout -t origin/master - -# Now we can start the training -ray session start train --dataset SST-2 - -# Terminate the session -ray session stop -``` diff --git a/python/ray/projects/examples/open-tacotron/ray-project/cluster.yaml b/python/ray/projects/examples/open-tacotron/ray-project/cluster.yaml deleted file mode 100644 index 2dbfb452a6bd..000000000000 --- a/python/ray/projects/examples/open-tacotron/ray-project/cluster.yaml +++ /dev/null @@ -1,18 +0,0 @@ -# This file is generated by `ray project create` - -# A unique identifier for the head node and workers of this cluster. -cluster_name: open-tacotron - -# The maximum number of workers nodes to launch in addition to the head -# node. This takes precedence over min_workers. min_workers defaults to 0. -max_workers: 1 - -# Cloud-provider specific configuration. -provider: - type: aws - region: us-west-2 - availability_zone: us-west-2a - -# How Ray will authenticate with newly launched nodes. -auth: - ssh_user: ubuntu diff --git a/python/ray/projects/examples/open-tacotron/ray-project/project.yaml b/python/ray/projects/examples/open-tacotron/ray-project/project.yaml deleted file mode 100644 index 4a2c1dfb5ce8..000000000000 --- a/python/ray/projects/examples/open-tacotron/ray-project/project.yaml +++ /dev/null @@ -1,18 +0,0 @@ -# This file is generated by `ray project create` - -name: open-tacotron -description: "A TensorFlow implementation of Google's Tacotron speech synthesis with pre-trained model (unofficial)" -repo: https://github.com/keithito/tacotron - -cluster: - config: ray-project/cluster.yaml - -environment: - requirements: ray-project/requirements.txt - - shell: - - curl http://data.keithito.com/data/speech/tacotron-20180906.tar.gz | tar xzC /tmp - -commands: - - name: serve - command: python demo_server.py --checkpoint /tmp/tacotron-20180906/model.ckpt diff --git a/python/ray/projects/examples/open-tacotron/ray-project/requirements.txt b/python/ray/projects/examples/open-tacotron/ray-project/requirements.txt deleted file mode 100644 index 100979f0823f..000000000000 --- a/python/ray/projects/examples/open-tacotron/ray-project/requirements.txt +++ /dev/null @@ -1,11 +0,0 @@ -# Adapted from https://github.com/keithito/tacotron/blob/master/requirements.txt -# Note: this doesn't include tensorflow or tensorflow-gpu because the package you need to install -# depends on your platform. It is assumed you have already installed tensorflow. -falcon==1.2.0 -inflect==0.2.5 -librosa==0.5.1 -matplotlib==2.0.2 -numpy==1.14.3 -scipy==0.19.0 -tqdm==4.11.2 -Unidecode==0.4.20 diff --git a/python/ray/projects/examples/pytorch-transformers/ray-project/cluster.yaml b/python/ray/projects/examples/pytorch-transformers/ray-project/cluster.yaml deleted file mode 100644 index b2143d8c458e..000000000000 --- a/python/ray/projects/examples/pytorch-transformers/ray-project/cluster.yaml +++ /dev/null @@ -1,18 +0,0 @@ -# This file is generated by `ray project create` - -# An unique identifier for the head node and workers of this cluster. -cluster_name: pytorch-transformers - -# The maximum number of workers nodes to launch in addition to the head -# node. This takes precedence over min_workers. min_workers default to 0. -max_workers: 1 - -# Cloud-provider specific configuration. -provider: - type: aws - region: us-west-2 - availability_zone: us-west-2a - -# How Ray will authenticate with newly launched nodes. -auth: - ssh_user: ubuntu diff --git a/python/ray/projects/examples/pytorch-transformers/ray-project/project.yaml b/python/ray/projects/examples/pytorch-transformers/ray-project/project.yaml deleted file mode 100644 index 7bc27ff95ac1..000000000000 --- a/python/ray/projects/examples/pytorch-transformers/ray-project/project.yaml +++ /dev/null @@ -1,35 +0,0 @@ -# This file is generated by `ray project create` - -name: pytorch-transformers -description: "A library of state-of-the-art pretrained models for Natural Language Processing (NLP)" -repo: https://github.com/huggingface/pytorch-transformers - -cluster: - config: ray-project/cluster.yaml - -environment: - requirements: ray-project/requirements.txt - -commands: - - name: train - command: | - wget https://raw.githubusercontent.com/ray-project/project-data/master/download_glue_data.py && \ - python download_glue_data.py -d /tmp -t {{dataset}} && \ - python ./examples/run_glue.py \ - --model_type bert \ - --model_name_or_path bert-base-uncased \ - --task_name {{dataset}} \ - --do_train \ - --do_eval \ - --do_lower_case \ - --data_dir /tmp/{{dataset}} \ - --max_seq_length 128 \ - --per_gpu_eval_batch_size=8 \ - --per_gpu_train_batch_size=8 \ - --learning_rate 2e-5 \ - --num_train_epochs 3.0 \ - --output_dir /tmp/output/ - params: - - name: "dataset" - help: "The GLUE dataset to fine-tune on" - choices: ["CoLA", "SST-2", "MRPC", "STS-B", "QQP", "MNLI", "QNLI", "RTE", "WNLI"] diff --git a/python/ray/projects/examples/pytorch-transformers/ray-project/requirements.txt b/python/ray/projects/examples/pytorch-transformers/ray-project/requirements.txt deleted file mode 100644 index 0ebd68f98eb7..000000000000 --- a/python/ray/projects/examples/pytorch-transformers/ray-project/requirements.txt +++ /dev/null @@ -1,17 +0,0 @@ -# Adapted from https://github.com/huggingface/pytorch-transformers/blob/master/requirements.txt -# PyTorch -torch>=1.0.0 -# progress bars in model download and training scripts -tqdm -# Accessing files from S3 directly. -boto3 -# Used for downloading models over HTTP -requests -# For OpenAI GPT -regex -# For XLNet -sentencepiece -# TensorBoard visualization -tensorboardX -# Pytorch transformers -pytorch_transformers diff --git a/python/ray/projects/projects.py b/python/ray/projects/projects.py deleted file mode 100644 index add817c8754e..000000000000 --- a/python/ray/projects/projects.py +++ /dev/null @@ -1,213 +0,0 @@ -import argparse -import copy -import json -import jsonschema -import os -import yaml - - -def make_argument_parser(name, params, wildcards): - """Build argument parser dynamically to parse parameter arguments. - - Args: - name (str): Name of the command to parse. - params (dict): Parameter specification used to construct - the argparse parser. - wildcards (bool): Whether wildcards are allowed as arguments. - - Returns: - The argparse parser. - A dictionary from argument name to list of valid choices. - """ - - parser = argparse.ArgumentParser(prog=name) - # For argparse arguments that have a 'choices' list associated - # with them, save it in the following dictionary. - choices = {} - for param in params: - # Construct arguments to pass into argparse's parser.add_argument. - argparse_kwargs = copy.deepcopy(param) - name = argparse_kwargs.pop("name") - if wildcards and "choices" in param: - choices[name] = param["choices"] - argparse_kwargs["choices"] = param["choices"] + ["*"] - if "type" in param: - types = {"int": int, "str": str, "float": float} - if param["type"] in types: - argparse_kwargs["type"] = types[param["type"]] - else: - raise ValueError( - "Parameter {} has type {} which is not supported. " - "Type must be one of {}".format(name, param["type"], - list(types.keys()))) - parser.add_argument("--" + name, dest=name, **argparse_kwargs) - - return parser, choices - - -class ProjectDefinition: - def __init__(self, current_dir): - """Finds ray-project folder for current project, parse and validates it. - - Args: - current_dir (str): Path from which to search for ray-project. - - Raises: - jsonschema.exceptions.ValidationError: This exception is raised - if the project file is not valid. - ValueError: This exception is raised if there are other errors in - the project definition (e.g. files not existing). - """ - root = find_root(current_dir) - if root is None: - raise ValueError("No project root found") - # Add an empty pathname to the end so that rsync will copy the project - # directory to the correct target. - self.root = os.path.join(root, "") - - # Parse the project YAML. - project_file = os.path.join(self.root, "ray-project", "project.yaml") - if not os.path.exists(project_file): - raise ValueError("Project file {} not found".format(project_file)) - with open(project_file) as f: - self.config = yaml.safe_load(f) - - check_project_config(self.root, self.config) - - def cluster_yaml(self): - """Return the project's cluster configuration filename.""" - return self.config["cluster"]["config"] - - def working_directory(self): - """Return the project's working directory on a cluster session.""" - # Add an empty pathname to the end so that rsync will copy the project - # directory to the correct target. - directory = os.path.join("~", self.config["name"], "") - return directory - - def get_command_info(self, command_name, args, shell, wildcards=False): - """Get the shell command, parsed arguments and config for a command. - - Args: - command_name (str): Name of the command to run. The command - definition should be available in project.yaml. - args (tuple): Tuple containing arguments to format the command - with. - wildcards (bool): If True, enable wildcards as arguments. - - Returns: - The raw shell command to run with placeholders for the arguments. - The parsed argument dictonary, parsed with argparse. - The config dictionary of the command. - - Raises: - ValueError: This exception is raised if the given command is not - found in project.yaml. - """ - if shell or not command_name: - return command_name, {}, {} - - command_to_run = None - params = None - config = None - - for command_definition in self.config["commands"]: - if command_definition["name"] == command_name: - command_to_run = command_definition["command"] - params = command_definition.get("params", []) - config = command_definition.get("config", {}) - if not command_to_run: - raise ValueError( - "Cannot find the command named '{}' in commmands section " - "of the project file.".format(command_name)) - - parser, choices = make_argument_parser(command_name, params, wildcards) - parsed_args = vars(parser.parse_args(list(args))) - - if wildcards: - for key, val in parsed_args.items(): - if val == "*": - parsed_args[key] = choices[key] - - return command_to_run, parsed_args, config - - def git_repo(self): - return self.config.get("repo", None) - - -def find_root(directory): - """Find root directory of the ray project. - - Args: - directory (str): Directory to start the search in. - - Returns: - Path of the parent directory containing the ray-project or - None if no such project is found. - """ - prev, directory = None, os.path.abspath(directory) - while prev != directory: - if os.path.isdir(os.path.join(directory, "ray-project")): - return directory - prev, directory = directory, os.path.abspath( - os.path.join(directory, os.pardir)) - return None - - -def validate_project_schema(project_config): - """Validate a project config against the official ray project schema. - - Args: - project_config (dict): Parsed project yaml. - - Raises: - jsonschema.exceptions.ValidationError: This exception is raised - if the project file is not valid. - """ - dir = os.path.dirname(os.path.abspath(__file__)) - with open(os.path.join(dir, "schema.json")) as f: - schema = json.load(f) - - jsonschema.validate(instance=project_config, schema=schema) - - -def check_project_config(project_root, project_config): - """Checks if the project definition is valid. - - Args: - project_root (str): Path containing the ray-project - project_config (dict): Project config definition - - Raises: - jsonschema.exceptions.ValidationError: This exception is raised - if the project file is not valid. - ValueError: This exception is raised if there are other errors in - the project definition (e.g. files not existing). - """ - validate_project_schema(project_config) - - # Make sure the cluster yaml file exists - cluster_file = os.path.join(project_root, - project_config["cluster"]["config"]) - if not os.path.exists(cluster_file): - raise ValueError("'cluster' file does not exist " - "in {}".format(project_root)) - - if "environment" in project_config: - env = project_config["environment"] - - if sum(["dockerfile" in env, "dockerimage" in env]) > 1: - raise ValueError("Cannot specify both 'dockerfile' and " - "'dockerimage' in environment.") - - if "requirements" in env: - requirements_file = os.path.join(project_root, env["requirements"]) - if not os.path.exists(requirements_file): - raise ValueError("'requirements' file in 'environment' does " - "not exist in {}".format(project_root)) - - if "dockerfile" in env: - docker_file = os.path.join(project_root, env["dockerfile"]) - if not os.path.exists(docker_file): - raise ValueError("'dockerfile' file in 'environment' does " - "not exist in {}".format(project_root)) diff --git a/python/ray/projects/schema.json b/python/ray/projects/schema.json deleted file mode 100644 index 2040835bf05f..000000000000 --- a/python/ray/projects/schema.json +++ /dev/null @@ -1,183 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "name": { - "description": "The name of the project", - "type": "string" - }, - "description": { - "description": "A short description of the project", - "type": "string" - }, - "repo": { - "description": "The URL of the repo this project is part of", - "type": "string" - }, - "documentation": { - "description": "Link to the documentation of this project", - "type": "string" - }, - "tags": { - "description": "Relevant tags for this project", - "type": "array", - "items": { - "type": "string" - } - }, - "cluster": { - "type": "object", - "properties": { - "config": { - "type": "string", - "description": "Path to a .yaml cluster configuration file (relative to the project root)" - }, - "params": { - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "help": { - "type": "string" - }, - "choices": { - "type": "array" - }, - "default": { - }, - "type": { - "type": "string", - "enum": [ - "int", - "float", - "str" - ] - } - }, - "required": [ - "name" - ], - "additionalProperties": false - } - } - }, - "required": [ - "config" - ], - "additionalProperties": false - }, - "environment": { - "description": "The environment that needs to be set up to run the project", - "type": "object", - "properties": { - "dockerimage": { - "description": "URL to a docker image that can be pulled to run the project in", - "type": "string" - }, - "dockerfile": { - "description": "Path to a Dockerfile to set up an image the project can run in (relative to the project root)", - "type": "string" - }, - "requirements": { - "description": "Path to a Python requirements.txt file to set up project dependencies (relative to the project root)", - "type": "string" - }, - "shell": { - "description": "A sequence of shell commands to run to set up the project environment", - "type": "array", - "items": { - "type": "string" - } - } - }, - "additionalProperties": false - }, - "commands": { - "type": "array", - "items": { - "description": "Possible commands to run to start a session", - "type": "object", - "properties": { - "name": { - "description": "Name of the command", - "type": "string" - }, - "help": { - "description": "Help string for the command", - "type": "string" - }, - "command": { - "description": "Shell command to run on the cluster", - "type": "string" - }, - "params": { - "type": "array", - "items": { - "description": "Possible parameters in the command", - "type": "object", - "properties": { - "name": { - "description": "Name of the parameter", - "type": "string" - }, - "help": { - "description": "Help string for the parameter", - "type": "string" - }, - "choices": { - "description": "Possible values the parameter can take", - "type": "array" - }, - "default": { - }, - "type": { - "description": "Required type for the parameter", - "type": "string", - "enum": [ - "int", - "float", - "str" - ] - } - }, - "required": [ - "name" - ], - "additionalProperties": false - } - }, - "config": { - "description": "Configuration options for the command", - "type": "object", - "properties": { - "tmux": { - "description": "If true, the command will be run inside of tmux", - "type": "boolean" - } - }, - "additionalProperties": false - } - }, - "required": [ - "name", - "command" - ], - "additionalProperties": false - } - }, - "output_files": { - "type": "array", - "items": { - "type": "string" - } - } - }, - "required": [ - "name", - "cluster" - ], - "additionalProperties": false -} diff --git a/python/ray/projects/scripts.py b/python/ray/projects/scripts.py deleted file mode 100644 index 31f566a9e8ac..000000000000 --- a/python/ray/projects/scripts.py +++ /dev/null @@ -1,445 +0,0 @@ -import argparse -import click -import copy -import jsonschema -import logging -import os -from shutil import copyfile -import subprocess -import sys -import time - -import ray -from ray.autoscaler.commands import ( - attach_cluster, - exec_cluster, - create_or_update_cluster, - rsync, - teardown_cluster, -) - -logging.basicConfig(format=ray.ray_constants.LOGGER_FORMAT, level=logging.INFO) -logger = logging.getLogger(__file__) - -# File layout for generated project files -# user-dir/ -# ray-project/ -# project.yaml -# cluster.yaml -# requirements.txt -PROJECT_DIR = "ray-project" -PROJECT_YAML = os.path.join(PROJECT_DIR, "project.yaml") -CLUSTER_YAML = os.path.join(PROJECT_DIR, "cluster.yaml") -REQUIREMENTS_TXT = os.path.join(PROJECT_DIR, "requirements.txt") - -# File layout for templates file -# RAY/.../projects/ -# templates/ -# cluster_template.yaml -# project_template.yaml -# requirements.txt -_THIS_FILE_DIR = os.path.split(os.path.abspath(__file__))[0] -_TEMPLATE_DIR = os.path.join(_THIS_FILE_DIR, "templates") -PROJECT_TEMPLATE = os.path.join(_TEMPLATE_DIR, "project_template.yaml") -CLUSTER_TEMPLATE = os.path.join(_TEMPLATE_DIR, "cluster_template.yaml") -REQUIREMENTS_TXT_TEMPLATE = os.path.join(_TEMPLATE_DIR, "requirements.txt") - - -@click.group( - "project", help="[Experimental] Commands working with ray project") -def project_cli(): - pass - - -@project_cli.command(help="Validate current project spec") -@click.option( - "--verbose", help="If set, print the validated file", is_flag=True) -def validate(verbose): - try: - project = ray.projects.ProjectDefinition(os.getcwd()) - print("Project files validated!", file=sys.stderr) - if verbose: - print(project.config) - except (jsonschema.exceptions.ValidationError, ValueError) as e: - print("Validation failed for the following reason", file=sys.stderr) - raise click.ClickException(e) - - -@project_cli.command(help="Create a new project within current directory") -@click.argument("project_name") -@click.option( - "--cluster-yaml", - help="Path to autoscaler yaml. Created by default", - default=None) -@click.option( - "--requirements", - help="Path to requirements.txt. Created by default", - default=None) -def create(project_name, cluster_yaml, requirements): - if os.path.exists(PROJECT_DIR): - raise click.ClickException( - "Project directory {} already exists.".format(PROJECT_DIR)) - os.makedirs(PROJECT_DIR) - - if cluster_yaml is None: - logger.warning("Using default autoscaler yaml") - - with open(CLUSTER_TEMPLATE) as f: - template = f.read().replace(r"{{name}}", project_name) - with open(CLUSTER_YAML, "w") as f: - f.write(template) - - cluster_yaml = CLUSTER_YAML - - if requirements is None: - logger.warning("Using default requirements.txt") - # no templating required, just copy the file - copyfile(REQUIREMENTS_TXT_TEMPLATE, REQUIREMENTS_TXT) - - requirements = REQUIREMENTS_TXT - - repo = None - if os.path.exists(".git"): - try: - repo = subprocess.check_output( - "git remote get-url origin".split(" ")).strip() - logger.info("Setting repo URL to %s", repo) - except subprocess.CalledProcessError: - pass - - with open(PROJECT_TEMPLATE) as f: - project_template = f.read() - # NOTE(simon): - # We could use jinja2, which will make the templating part easier. - project_template = project_template.replace(r"{{name}}", project_name) - project_template = project_template.replace(r"{{cluster}}", - cluster_yaml) - project_template = project_template.replace(r"{{requirements}}", - requirements) - if repo is None: - project_template = project_template.replace( - r"{{repo_string}}", "# repo: {}".format("...")) - else: - project_template = project_template.replace( - r"{{repo_string}}", "repo: {}".format(repo)) - with open(PROJECT_YAML, "w") as f: - f.write(project_template) - - -@click.group( - "session", - help="[Experimental] Commands working with sessions, which are " - "running instances of a project.") -def session_cli(): - pass - - -def load_project_or_throw(): - # Validate the project file - try: - return ray.projects.ProjectDefinition(os.getcwd()) - except (jsonschema.exceptions.ValidationError, ValueError): - raise click.ClickException( - "Project file validation failed. Please run " - "`ray project validate` to inspect the error.") - - -class SessionRunner: - """Class for setting up a session and executing commands in it.""" - - def __init__(self, session_name=None): - """Initialize session runner and try to parse the command arguments. - - Args: - session_name (str): Name of the session. - - Raises: - click.ClickException: This exception is raised if any error occurs. - """ - self.project_definition = load_project_or_throw() - self.session_name = session_name - - # Check for features we don't support right now - project_environment = self.project_definition.config.get( - "environment", {}) - need_docker = ("dockerfile" in project_environment - or "dockerimage" in project_environment) - if need_docker: - raise click.ClickException( - "Docker support in session is currently not implemented.") - - def create_cluster(self, no_config_cache): - """Create a cluster that will run the session.""" - create_or_update_cluster( - config_file=self.project_definition.cluster_yaml(), - override_min_workers=None, - override_max_workers=None, - no_restart=False, - restart_only=False, - yes=True, - override_cluster_name=self.session_name, - no_config_cache=no_config_cache, - ) - - def sync_files(self): - """Synchronize files with the session.""" - rsync( - self.project_definition.cluster_yaml(), - source=self.project_definition.root, - target=self.project_definition.working_directory(), - override_cluster_name=self.session_name, - down=False, - ) - - def setup_environment(self): - """Set up the environment of the session.""" - project_environment = self.project_definition.config.get( - "environment", {}) - - if "requirements" in project_environment: - requirements_txt = project_environment["requirements"] - - # Create a temporary requirements_txt in the head node. - remote_requirements_txt = os.path.join( - ray.utils.get_user_temp_dir(), - "ray_project_requirements_txt_{}".format(time.time())) - - rsync( - self.project_definition.cluster_yaml(), - source=requirements_txt, - target=remote_requirements_txt, - override_cluster_name=self.session_name, - down=False, - ) - self.execute_command( - "pip install -r {}".format(remote_requirements_txt)) - - if "shell" in project_environment: - for cmd in project_environment["shell"]: - self.execute_command(cmd) - - def execute_command(self, cmd, config={}): - """Execute a shell command in the session. - - Args: - cmd (str): Shell command to run in the session. It will be - run in the working directory of the project. - """ - cwd = self.project_definition.working_directory() - cmd = "cd {cwd}; {cmd}".format(cwd=cwd, cmd=cmd) - exec_cluster( - config_file=self.project_definition.cluster_yaml(), - cmd=cmd, - run_env=config.get("run_env", "auto"), - screen=False, - tmux=config.get("tmux", False), - stop=False, - start=False, - override_cluster_name=self.session_name, - port_forward=config.get("port_forward", None), - ) - - -def format_command(command, parsed_args): - """Substitute arguments into command. - - Args: - command (str): Shell comand with argument placeholders. - parsed_args (dict): Dictionary that maps from argument names - to their value. - - Returns: - Shell command with parameters from parsed_args substituted. - """ - for key, val in parsed_args.items(): - command = command.replace("{{" + key + "}}", str(val)) - return command - - -def get_session_runs(name, command, parsed_args): - """Get a list of sessions to start. - - Args: - command (str): Shell command with argument placeholders. - parsed_args (dict): Dictionary that maps from argument names - to their values. - - Returns: - List of sessions to start, which are dictionaries with keys: - "name": Name of the session to start, - "command": Command to run after starting the session, - "params": Parameters for this run, - "num_steps": 4 if a command should be run, 3 if not. - """ - if not command: - return [{"name": name, "command": None, "params": {}, "num_steps": 3}] - - # Try to find a wildcard argument (i.e. one that has a list of values) - # and give an error if there is more than one (currently unsupported). - wildcard_arg = None - for key, val in parsed_args.items(): - if isinstance(val, list): - if not wildcard_arg: - wildcard_arg = key - else: - raise click.ClickException( - "More than one wildcard is not supported at the moment") - - if not wildcard_arg: - session_run = { - "name": name, - "command": format_command(command, parsed_args), - "params": parsed_args, - "num_steps": 4 - } - return [session_run] - else: - session_runs = [] - for val in parsed_args[wildcard_arg]: - parsed_args = copy.deepcopy(parsed_args) - parsed_args[wildcard_arg] = val - session_run = { - "name": "{}-{}-{}".format(name, wildcard_arg, val), - "command": format_command(command, parsed_args), - "params": parsed_args, - "num_steps": 4 - } - session_runs.append(session_run) - return session_runs - - -@session_cli.command(help="Attach to an existing cluster") -@click.option( - "--screen", is_flag=True, default=False, help="Run the command in screen.") -@click.option("--tmux", help="Attach to tmux session", is_flag=True) -def attach(screen, tmux): - project_definition = load_project_or_throw() - attach_cluster( - project_definition.cluster_yaml(), - start=False, - use_screen=screen, - use_tmux=tmux, - override_cluster_name=None, - new=False, - ) - - -@session_cli.command(help="Stop a session based on current project config") -@click.option("--name", help="Name of the session to stop", default=None) -def stop(name): - project_definition = load_project_or_throw() - - if not name: - name = project_definition.config["name"] - - teardown_cluster( - project_definition.cluster_yaml(), - yes=True, - workers_only=False, - override_cluster_name=name) - - -@session_cli.command( - name="start", - context_settings=dict(ignore_unknown_options=True, ), - help="Start a session based on current project config") -@click.argument("command", required=False) -@click.argument("args", nargs=-1, type=click.UNPROCESSED) -@click.option( - "--shell", - help=( - "If set, run the command as a raw shell command instead of looking up " - "the command in the project config"), - is_flag=True) -@click.option("--name", help="A name to tag the session with.", default=None) -@click.option( - "--no-config-cache", - is_flag=True, - default=False, - help="Disable the local cluster config cache.") -def session_start(command, args, shell, name, no_config_cache): - project_definition = load_project_or_throw() - - if not name: - name = project_definition.config["name"] - - # Get the actual command to run. This also validates the command, - # which should be done before the cluster is started. - try: - command, parsed_args, config = project_definition.get_command_info( - command, args, shell, wildcards=True) - except ValueError as e: - raise click.ClickException(e) - session_runs = get_session_runs(name, command, parsed_args) - - if len(session_runs) > 1 and not config.get("tmux", False): - logging.info("Using wildcards with tmux = False would not create " - "sessions in parallel, so we are overriding it with " - "tmux = True.") - config["tmux"] = True - - for run in session_runs: - runner = SessionRunner(session_name=run["name"]) - logger.info("[1/{}] Creating cluster".format(run["num_steps"])) - runner.create_cluster(no_config_cache) - logger.info("[2/{}] Syncing the project".format(run["num_steps"])) - runner.sync_files() - logger.info("[3/{}] Setting up environment".format(run["num_steps"])) - runner.setup_environment() - - if run["command"]: - # Run the actual command. - logger.info("[4/4] Running command") - runner.execute_command(run["command"], config) - - -@session_cli.command( - name="commands", - help="Print available commands for sessions of this project.") -def session_commands(): - project_definition = load_project_or_throw() - print("Active project: " + project_definition.config["name"]) - print() - - commands = project_definition.config["commands"] - - for command in commands: - print("Command \"{}\":".format(command["name"])) - parser = argparse.ArgumentParser( - command["name"], description=command.get("help"), add_help=False) - params = command.get("params", []) - for param in params: - name = param.pop("name") - if "type" in param: - param.pop("type") - parser.add_argument("--" + name, **param) - help_string = parser.format_help() - # Indent the help message by two spaces and print it. - print("\n".join([" " + line for line in help_string.split("\n")])) - - -@session_cli.command( - name="execute", - context_settings=dict(ignore_unknown_options=True, ), - help="Execute a command in a session") -@click.argument("command", required=False) -@click.argument("args", nargs=-1, type=click.UNPROCESSED) -@click.option( - "--shell", - help=( - "If set, run the command as a raw shell command instead of looking up " - "the command in the project config"), - is_flag=True) -@click.option( - "--name", help="Name of the session to run this command on", default=None) -def session_execute(command, args, shell, name): - project_definition = load_project_or_throw() - try: - command, parsed_args, config = project_definition.get_command_info( - command, args, shell, wildcards=False) - except ValueError as e: - raise click.ClickException(e) - - runner = SessionRunner(session_name=name) - command = format_command(command, parsed_args) - runner.execute_command(command) diff --git a/python/ray/projects/templates/cluster_template.yaml b/python/ray/projects/templates/cluster_template.yaml deleted file mode 100644 index 962dd1768313..000000000000 --- a/python/ray/projects/templates/cluster_template.yaml +++ /dev/null @@ -1,18 +0,0 @@ -# This file is generated by `ray project create`. - -# A unique identifier for the head node and workers of this cluster. -cluster_name: {{name}} - -# The maximum number of workers nodes to launch in addition to the head -# node. This takes precedence over min_workers. min_workers defaults to 0. -max_workers: 1 - -# Cloud-provider specific configuration. -provider: - type: aws - region: us-west-2 - availability_zone: us-west-2a - -# How Ray will authenticate with newly launched nodes. -auth: - ssh_user: ubuntu diff --git a/python/ray/projects/templates/project_template.yaml b/python/ray/projects/templates/project_template.yaml deleted file mode 100644 index f0c4d1b8fa27..000000000000 --- a/python/ray/projects/templates/project_template.yaml +++ /dev/null @@ -1,22 +0,0 @@ -# This file is generated by `ray project create`. - -name: {{name}} - -# description: A short description of the project. -# The URL of the repo this project is part of. -{{repo_string}} - -cluster: - config: {{cluster}} - -environment: - # dockerfile: The dockerfile to be built and ran the commands with. - # dockerimage: The docker image to be used to run the project in, e.g. ubuntu:18.04. - requirements: {{requirements}} - - shell: # Shell commands to be ran for environment setup. - - echo "Setting up the environment" - -commands: - - name: default - command: echo "Starting ray job" diff --git a/python/ray/projects/templates/requirements.txt b/python/ray/projects/templates/requirements.txt deleted file mode 100644 index 0f026d879f17..000000000000 --- a/python/ray/projects/templates/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -ray[debug] \ No newline at end of file diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 1696212aa5c0..a79eb0163f47 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -145,7 +145,6 @@ def _remote(self, args=None, kwargs=None, num_return_vals=None, - is_direct_call=None, num_cpus=None, num_gpus=None, memory=None, @@ -185,8 +184,6 @@ def _remote(self, if num_return_vals is None: num_return_vals = self._num_return_vals - if is_direct_call is not None and not is_direct_call: - raise ValueError("Non-direct call tasks are no longer supported.") if max_retries is None: max_retries = self._max_retries diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index b53f00b10bb3..afc14a63e621 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -20,7 +20,6 @@ debug_status, RUN_ENV_TYPES) import ray.ray_constants as ray_constants import ray.utils -from ray.projects.scripts import project_cli, session_cli from ray.autoscaler.cli_logger import cli_logger import colorful as cf @@ -1009,7 +1008,7 @@ def down(cluster_config_file, yes, workers_only, cluster_name, keep_min_workers) -@cli.command() +@cli.command(hidden=True) @click.argument("cluster_config_file", required=True, type=str) @click.option( "--yes", @@ -1454,36 +1453,6 @@ def timeline(address): "You can open this with chrome://tracing in the Chrome browser.") -@cli.command() -@click.option( - "--address", - required=False, - type=str, - help="Override the address to connect to.") -def statistics(address): - """Get the current metrics protobuf from a Ray cluster (developer tool).""" - if not address: - address = services.find_redis_address_or_die() - logger.info(f"Connecting to Ray instance at {address}.") - ray.init(address=address) - - import grpc - from ray.core.generated import node_manager_pb2 - from ray.core.generated import node_manager_pb2_grpc - - for raylet in ray.nodes(): - raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], - ray.nodes()[0]["NodeManagerPort"]) - logger.info(f"Querying raylet {raylet_address}") - - channel = grpc.insecure_channel(raylet_address) - stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) - reply = stub.GetNodeStats( - node_manager_pb2.GetNodeStatsRequest(include_memory_info=False), - timeout=2.0) - print(reply) - - @cli.command() @click.option( "--address", @@ -1520,13 +1489,13 @@ def status(address): print(debug_status()) -@cli.command() +@cli.command(hidden=True) @click.option( "--address", required=False, type=str, help="Override the address to connect to.") -def globalgc(address): +def global_gc(address): """Trigger Python garbage collection on all cluster workers.""" if not address: address = services.find_redis_address_or_die() @@ -1614,13 +1583,10 @@ def add_command_alias(command, name, hidden): cli.add_command(get_worker_ips) cli.add_command(microbenchmark) cli.add_command(stack) -cli.add_command(statistics) cli.add_command(status) cli.add_command(memory) -cli.add_command(globalgc) +cli.add_command(global_gc) cli.add_command(timeline) -cli.add_command(project_cli) -cli.add_command(session_cli) cli.add_command(install_nightly) try: diff --git a/python/ray/serve/benchmarks/microbenchmark.py b/python/ray/serve/benchmarks/microbenchmark.py index 1274c60b9b47..718d3cf30576 100644 --- a/python/ray/serve/benchmarks/microbenchmark.py +++ b/python/ray/serve/benchmarks/microbenchmark.py @@ -77,7 +77,7 @@ async def many_clients(): async def main(): - ray.init(log_to_driver=False) + ray.init(_log_to_driver=False) serve.init() serve.create_backend("backend", backend) diff --git a/python/ray/services.py b/python/ray/services.py index 4adbb1f85292..a52e32ec4873 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -256,35 +256,18 @@ def remaining_processes_alive(): return ray.worker._global_node.remaining_processes_alive() -def validate_redis_address(address, redis_address): - """Validates redis address parameter and splits it into host/ip components. - - We temporarily support both 'address' and 'redis_address', so both are - handled here. +def validate_redis_address(address): + """Validates address parameter. Returns: redis_address: string containing the full address. redis_ip: string representing the host portion of the address. redis_port: integer representing the port portion of the address. - - Raises: - ValueError: if both address and redis_address were specified or the - address was malformed. """ - if redis_address == "auto": - raise ValueError("auto address resolution not supported for " - "redis_address parameter. Please use address.") - - if address: - if redis_address: - raise ValueError( - "Both address and redis_address specified. Use only address.") - if address == "auto": - address = find_redis_address_or_die() - redis_address = address - - redis_address = address_to_ip(redis_address) + if address == "auto": + address = find_redis_address_or_die() + redis_address = address_to_ip(address) redis_address_parts = redis_address.split(":") if len(redis_address_parts) != 2: diff --git a/python/ray/state.py b/python/ray/state.py index 005ab1a92ce0..add1b21aed82 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -844,7 +844,7 @@ def actor_checkpoint_info(self, actor_id): def jobs(): - """Get a list of the jobs in the cluster. + """Get a list of the jobs in the cluster (for debugging only). Returns: Information from the job table, namely a list of dicts with keys: @@ -858,7 +858,7 @@ def jobs(): def nodes(): - """Get a list of the nodes in the cluster. + """Get a list of the nodes in the cluster (for debugging only). Returns: Information about the Ray clients in the cluster. @@ -899,7 +899,7 @@ def node_ids(): def actors(actor_id=None): - """Fetch and parse the actor info for one or more actor IDs. + """Fetch actor info for one or more actor IDs (for debugging only). Args: actor_id: A hex string of the actor ID to fetch information about. If diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index a597e2e446a7..3d9c59318fa9 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -88,7 +88,6 @@ py_test_module_list( "test_multi_tenancy.py", "test_node_manager.py", "test_numba.py", - "test_projects.py", "test_ray_init.py", "test_serialization.py", "test_tempfile.py", diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 7985050ad599..63f8f91066cb 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -30,7 +30,7 @@ def ray_init_with_task_retry_delay(): @pytest.mark.parametrize( "ray_start_regular", [{ "object_store_memory": 150 * 1024 * 1024, - "lru_evict": True, + "_lru_evict": True, }], indirect=True) def test_actor_eviction(ray_start_regular): diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index f7f75bcb45ca..66fb5fe09d1a 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -95,10 +95,10 @@ def test_actor_gpus(ray_start_cluster): @ray.remote(num_gpus=1) class Actor1: def __init__(self): - self.gpu_ids = ray.get_gpu_ids(as_str=True) + self.gpu_ids = ray.get_gpu_ids() def get_location_and_ids(self): - assert ray.get_gpu_ids(as_str=True) == self.gpu_ids + assert ray.get_gpu_ids() == self.gpu_ids return (ray.worker.global_worker.node.unique_id, tuple(self.gpu_ids)) diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index 9d4c328995c8..08dd168fa505 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -49,27 +49,6 @@ def sample_big(self): ray.get(big_id) -def test_wait_iterables(ray_start_regular): - @ray.remote - def f(delay): - time.sleep(delay) - return 1 - - object_refs = (f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5)) - ready_ids, remaining_ids = ray.experimental.wait(object_refs) - assert len(ready_ids) == 1 - assert len(remaining_ids) == 3 - - object_refs = np.array( - [f.remote(1.0), - f.remote(0.5), - f.remote(0.5), - f.remote(0.5)]) - ready_ids, remaining_ids = ray.experimental.wait(object_refs) - assert len(ready_ids) == 1 - assert len(remaining_ids) == 3 - - def test_multiple_waits_and_gets(shutdown_only): # It is important to use three workers here, so that the three tasks # launched in this experiment can run at the same time. diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 19f400534202..da20a974d65c 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -633,25 +633,6 @@ def save_gpu_ids_shutdown_only(): del os.environ["CUDA_VISIBLE_DEVICES"] -@pytest.mark.parametrize("as_str", [False, True]) -def test_gpu_ids_as_str(save_gpu_ids_shutdown_only, as_str): - allowed_gpu_ids = [4, 5, 6] - os.environ["CUDA_VISIBLE_DEVICES"] = ",".join( - str(i) for i in allowed_gpu_ids) - ray.init() - - @ray.remote - def get_gpu_ids(as_str): - gpu_ids = ray.get_gpu_ids(as_str) - for gpu_id in gpu_ids: - if as_str: - assert isinstance(gpu_id, str) - else: - assert isinstance(gpu_id, int) - - ray.get([get_gpu_ids.remote(as_str) for _ in range(10)]) - - def test_specific_gpus(save_gpu_ids_shutdown_only): allowed_gpu_ids = [4, 5, 6] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join( diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 1ec1d78218ff..07defb9672ae 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -208,7 +208,7 @@ def __exit__(self, exc_type, exc_value, traceback): def test_logging_to_driver(shutdown_only): - ray.init(num_cpus=1, log_to_driver=True) + ray.init(num_cpus=1, _log_to_driver=True) @ray.remote def f(): @@ -233,7 +233,7 @@ def f(): def test_not_logging_to_driver(shutdown_only): - ray.init(num_cpus=1, log_to_driver=False) + ray.init(num_cpus=1, _log_to_driver=False) @ray.remote def f(): @@ -272,23 +272,6 @@ def f(): worker_ids = set(ray.get([f.remote() for _ in range(10)])) -def test_specific_job_id(): - dummy_driver_id = ray.JobID.from_int(1) - ray.init(num_cpus=1, job_id=dummy_driver_id) - - # in driver - assert dummy_driver_id == ray.worker.global_worker.current_job_id - - # in worker - @ray.remote - def f(): - return ray.worker.global_worker.current_job_id - - assert dummy_driver_id == ray.get(f.remote()) - - ray.shutdown() - - def test_object_ref_properties(): id_bytes = b"00112233445566778899" object_ref = ray.ObjectRef(id_bytes) @@ -397,23 +380,6 @@ def unique_name_3(): "'ray stack'") -def test_socket_dir_not_existing(shutdown_only): - if sys.platform != "win32": - random_name = ray.ObjectRef.from_random().hex() - temp_raylet_socket_dir = os.path.join(ray.utils.get_ray_temp_dir(), - "tests", random_name) - temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir, - "raylet_socket") - ray.init(num_cpus=2, raylet_socket_name=temp_raylet_socket_name) - - @ray.remote - def foo(x): - time.sleep(1) - return 2 * x - - ray.get([foo.remote(i) for i in range(2)]) - - def test_raylet_is_robust_to_random_messages(ray_start_regular): node_manager_address = None node_manager_port = None @@ -465,13 +431,6 @@ def test_put_pins_object(ray_start_object_store_memory): assert not ray.worker.global_worker.core_worker.object_exists( ray.ObjectRef(x_binary)) - # weakref put - y_id = ray.put(obj, weakref=True) - for _ in range(10): - ray.put(np.zeros(10 * 1024 * 1024)) - with pytest.raises(ray.exceptions.UnreconstructableError): - ray.get(y_id) - def test_decorated_function(ray_start_regular): def function_invocation_decorator(f): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c4b564f72622..be879caf02d4 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -456,36 +456,6 @@ def method(self): ray.put(f) -def test_custom_serializers(ray_start_shared_local_modes): - class Foo: - def __init__(self): - self.x = 3 - - def custom_serializer(obj): - return 3, "string1", type(obj).__name__ - - def custom_deserializer(serialized_obj): - return serialized_obj, "string2" - - ray.register_custom_serializer( - Foo, serializer=custom_serializer, deserializer=custom_deserializer) - - assert ray.get(ray.put(Foo())) == ((3, "string1", Foo.__name__), "string2") - - class Bar: - def __init__(self): - self.x = 3 - - ray.register_custom_serializer( - Bar, serializer=custom_serializer, deserializer=custom_deserializer) - - @ray.remote - def f(): - return Bar() - - assert ray.get(f.remote()) == ((3, "string1", Bar.__name__), "string2") - - def test_keyword_args(ray_start_shared_local_modes): @ray.remote def keyword_fct1(a, b="hello"): diff --git a/python/ray/tests/test_basic_2.py b/python/ray/tests/test_basic_2.py index e3804b54182d..354eb89cf718 100644 --- a/python/ray/tests/test_basic_2.py +++ b/python/ray/tests/test_basic_2.py @@ -366,25 +366,6 @@ def test_get_multiple(ray_start_regular_shared): assert results == indices -def test_get_multiple_experimental(ray_start_regular_shared): - object_refs = [ray.put(i) for i in range(10)] - - object_refs_tuple = tuple(object_refs) - assert ray.experimental.get(object_refs_tuple) == list(range(10)) - - object_refs_nparray = np.array(object_refs) - assert ray.experimental.get(object_refs_nparray) == list(range(10)) - - -def test_get_dict(ray_start_regular_shared): - d = {str(i): ray.put(i) for i in range(5)} - for i in range(5, 10): - d[str(i)] = i - result = ray.experimental.get(d) - expected = {str(i): i for i in range(10)} - assert result == expected - - def test_get_with_timeout(ray_start_regular_shared): signal = ray.test_utils.SignalActor.remote() diff --git a/python/ray/tests/test_cancel.py b/python/ray/tests/test_cancel.py index 17fc14967929..9d336f192297 100644 --- a/python/ray/tests/test_cancel.py +++ b/python/ray/tests/test_cancel.py @@ -33,7 +33,7 @@ def wait_for(t): obj4 = wait_for.remote([obj3]) assert len(ray.wait([obj1], timeout=.1)[0]) == 0 - ray.cancel(obj1, use_force) + ray.cancel(obj1, force=use_force) for ob in [obj1, obj2, obj3, obj4]: with pytest.raises(valid_exceptions(use_force)): ray.get(ob) @@ -45,7 +45,7 @@ def wait_for(t): obj4 = wait_for.remote([obj3]) assert len(ray.wait([obj3], timeout=.1)[0]) == 0 - ray.cancel(obj3, use_force) + ray.cancel(obj3, force=use_force) for ob in [obj3, obj4]: with pytest.raises(valid_exceptions(use_force)): ray.get(ob) @@ -74,7 +74,7 @@ def wait_for(t): deps.append(wait_for.remote([head])) assert len(ray.wait([head], timeout=.1)[0]) == 0 - ray.cancel(head, use_force) + ray.cancel(head, force=use_force) for d in deps: with pytest.raises(valid_exceptions(use_force)): ray.get(d) @@ -86,7 +86,7 @@ def wait_for(t): deps2.append(wait_for.remote([head])) for d in deps2: - ray.cancel(d, use_force) + ray.cancel(d, force=use_force) for d in deps2: with pytest.raises(valid_exceptions(use_force)): @@ -111,11 +111,11 @@ def wait_for(t): indep = wait_for.remote([signaler.wait.remote()]) assert len(ray.wait([obj3], timeout=.1)[0]) == 0 - ray.cancel(obj3, use_force) + ray.cancel(obj3, force=use_force) with pytest.raises(valid_exceptions(use_force)): ray.get(obj3) - ray.cancel(obj1, use_force) + ray.cancel(obj1, force=use_force) for d in [obj1, obj2]: with pytest.raises(valid_exceptions(use_force)): @@ -145,12 +145,12 @@ def combine(a, b): assert len(ray.wait([a, b, a2, combo], timeout=1)[0]) == 0 - ray.cancel(a, use_force) + ray.cancel(a, force=use_force) with pytest.raises(valid_exceptions(use_force)): - ray.get(a, 10) + ray.get(a, timeout=10) with pytest.raises(valid_exceptions(use_force)): - ray.get(a2, 10) + ray.get(a2, timeout=10) signaler.send.remote() @@ -177,10 +177,10 @@ def infinite_sleep(y): cancelled = set() for t in tasks: if random.random() > 0.5: - ray.cancel(t, use_force) + ray.cancel(t, force=use_force) cancelled.add(t) - ray.cancel(first, use_force) + ray.cancel(first, force=use_force) cancelled.add(first) for done in cancelled: @@ -188,7 +188,7 @@ def infinite_sleep(y): ray.get(done) for indx, t in enumerate(tasks): if sleep_or_no[indx]: - ray.cancel(t, use_force) + ray.cancel(t, force=use_force) cancelled.add(t) if t in cancelled: with pytest.raises(valid_exceptions(use_force)): @@ -209,7 +209,7 @@ def fast(y): ids = list() for _ in range(100): x = fast.remote("a") - ray.cancel(x, use_force) + ray.cancel(x, force=use_force) ids.append(x) @ray.remote @@ -223,7 +223,7 @@ def wait_for(y): for idx in range(100, 5100): if random.random() > 0.95: - ray.cancel(ids[idx], use_force) + ray.cancel(ids[idx], force=use_force) signaler.send.remote() for obj_ref in ids: try: @@ -250,12 +250,12 @@ def remote_wait(sg): inner = ray.get(outer)[0] with pytest.raises(RayTimeoutError): - ray.get(inner, 1) + ray.get(inner, timeout=1) - ray.cancel(inner, use_force) + ray.cancel(inner, force=use_force) with pytest.raises(valid_exceptions(use_force)): - ray.get(inner, 10) + ray.get(inner, timeout=10) if __name__ == "__main__": diff --git a/python/ray/tests/test_cli_patterns/test_ray_attach.txt b/python/ray/tests/test_cli_patterns/test_ray_attach.txt index 791a2766161a..cc648a61f64d 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_attach.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_attach.txt @@ -1,3 +1,3 @@ -Bootstraping AWS config +Bootstrapping AWS config Fetched IP: .+ ubuntu@ip-.+:~\$ exit diff --git a/python/ray/tests/test_cli_patterns/test_ray_exec.txt b/python/ray/tests/test_cli_patterns/test_ray_exec.txt index 62179d79c3dd..5496ac18714d 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_exec.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_exec.txt @@ -1,3 +1,3 @@ -Bootstraping AWS config +Bootstrapping AWS config Fetched IP: .+ This is a test! diff --git a/python/ray/tests/test_cli_patterns/test_ray_submit.txt b/python/ray/tests/test_cli_patterns/test_ray_submit.txt index a7e4be5f6d6c..ef94f13c076b 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_submit.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_submit.txt @@ -1,5 +1,5 @@ -Bootstraping AWS config +Bootstrapping AWS config Fetched IP: .+ -Bootstraping AWS config +Bootstrapping AWS config Fetched IP: .+ This is a test! diff --git a/python/ray/tests/test_cli_patterns/test_ray_up.txt b/python/ray/tests/test_cli_patterns/test_ray_up.txt index 31d62671fd33..5f41dfd048af 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up.txt @@ -6,7 +6,7 @@ Cluster configuration valid Cluster: test-cli -Bootstraping AWS config +Bootstrapping AWS config AWS config IAM Profile: .+ \[default\] EC2 Key pair \(head & workers\): .+ \[default\] diff --git a/python/ray/tests/test_cross_language.py b/python/ray/tests/test_cross_language.py index cf655eae7eb8..9ba24a980628 100644 --- a/python/ray/tests/test_cross_language.py +++ b/python/ray/tests/test_cross_language.py @@ -6,7 +6,7 @@ def test_cross_language_raise_kwargs(shutdown_only): - ray.init(load_code_from_local=True, include_java=True) + ray.init(_load_code_from_local=True, _include_java=True) with pytest.raises(Exception, match="kwargs"): ray.java_function("a", "b").remote(x="arg1") @@ -16,7 +16,7 @@ def test_cross_language_raise_kwargs(shutdown_only): def test_cross_language_raise_exception(shutdown_only): - ray.init(load_code_from_local=True, include_java=True) + ray.init(_load_code_from_local=True, _include_java=True) class PythonObject(object): pass diff --git a/python/ray/tests/test_error_ray_not_initialized.py b/python/ray/tests/test_error_ray_not_initialized.py index fae3eb25c261..84f1c2ae05cd 100644 --- a/python/ray/tests/test_error_ray_not_initialized.py +++ b/python/ray/tests/test_error_ray_not_initialized.py @@ -23,7 +23,7 @@ class Foo: lambda: ray.get_actor("name"), ray.get_gpu_ids, ray.get_resource_ids, - ray.get_webui_url, + ray.get_dashboard_url, ray.jobs, lambda: ray.kill(None), # Not valid API usage. ray.nodes, diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index a5256175c3db..a3667f5aa906 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1000,7 +1000,7 @@ def test_fill_object_store_lru_fallback(shutdown_only): ray.init( num_cpus=2, object_store_memory=10**8, - lru_evict=True, + _lru_evict=True, _system_config=config) @ray.remote diff --git a/python/ray/tests/test_memory_limits.py b/python/ray/tests/test_memory_limits.py index 3167f1354d03..2fcf36d75138 100644 --- a/python/ray/tests/test_memory_limits.py +++ b/python/ray/tests/test_memory_limits.py @@ -47,8 +47,9 @@ def testQuotaTooLarge(self): def testTooLargeAllocation(self): try: - ray.init(num_cpus=1, driver_object_store_memory=100 * MB) - ray.put(np.zeros(50 * MB, dtype=np.uint8), weakref=True) + ray.init(num_cpus=1, _driver_object_store_memory=100 * MB) + ray.worker.global_worker.put_object( + np.zeros(50 * MB, dtype=np.uint8), pin_object=False) self.assertRaises( OBJECT_TOO_LARGE, lambda: ray.put(np.zeros(200 * MB, dtype=np.uint8))) @@ -61,9 +62,9 @@ def _run(self, driver_quota, a_quota, b_quota): ray.init( num_cpus=1, object_store_memory=300 * MB, - driver_object_store_memory=driver_quota) + _driver_object_store_memory=driver_quota) obj = np.ones(200 * 1024, dtype=np.uint8) - z = ray.put(obj, weakref=True) + z = ray.worker.global_worker.put_object(obj, pin_object=False) a = LightActor._remote(object_store_memory=a_quota) b = GreedyActor._remote(object_store_memory=b_quota) for _ in range(5): diff --git a/python/ray/tests/test_memory_scheduling.py b/python/ray/tests/test_memory_scheduling.py index 166313c34f9e..fa642a4b754a 100644 --- a/python/ray/tests/test_memory_scheduling.py +++ b/python/ray/tests/test_memory_scheduling.py @@ -34,7 +34,7 @@ def train_oom(config, reporter): class TestMemoryScheduling(unittest.TestCase): def testMemoryRequest(self): try: - ray.init(num_cpus=1, memory=200 * MB) + ray.init(num_cpus=1, _memory=200 * MB) # fits first 2 a = Actor.remote() b = Actor.remote() diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 6bdaa334fcc7..d23348ce0f95 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -50,7 +50,7 @@ def try_get_node_stats(num_retry=5, timeout=2): @ray.remote def f(): - ray.show_in_webui("test") + ray.show_in_dashboard("test") return os.getpid() @ray.remote @@ -59,10 +59,10 @@ def __init__(self): pass def f(self): - ray.show_in_webui("test") + ray.show_in_dashboard("test") return os.getpid() - # Test show_in_webui for remote functions. + # Test show_in_dashboard for remote functions. worker_pid = ray.get(f.remote()) reply = try_get_node_stats() target_worker_present = False @@ -75,7 +75,7 @@ def f(self): assert stats.webui_display[""] == "" # Empty proto assert target_worker_present - # Test show_in_webui for remote actors. + # Test show_in_dashboard for remote actors. a = Actor.remote() worker_pid = ray.get(a.f.remote()) reply = try_get_node_stats() diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 99f89486502d..1b376243254b 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -448,7 +448,7 @@ def test_calling_start_ray_head(call_ray_stop_only): ["ray start --head --num-cpus=1 " + "--node-ip-address=localhost"], indirect=True) def test_using_hostnames(call_ray_start): - ray.init(node_ip_address="localhost", address="localhost:6379") + ray.init(_node_ip_address="localhost", address="localhost:6379") @ray.remote def f(): diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 405a667263f1..05bd70a93508 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -11,7 +11,7 @@ def test_spill_objects_manually(shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( object_store_memory=75 * 1024 * 1024, - object_spilling_config={ + _object_spilling_config={ "type": "filesystem", "params": { "directory_path": "/tmp" @@ -58,7 +58,7 @@ def test_spill_objects_manually_from_workers(shutdown_only): # Limit our object store to 100 MiB of memory. ray.init( object_store_memory=100 * 1024 * 1024, - object_spilling_config={ + _object_spilling_config={ "type": "filesystem", "params": { "directory_path": "/tmp" @@ -84,7 +84,7 @@ def test_spill_objects_manually_with_workers(shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( object_store_memory=100 * 1024 * 1024, - object_spilling_config={ + _object_spilling_config={ "type": "filesystem", "params": { "directory_path": "/tmp" @@ -111,7 +111,7 @@ def _worker(object_refs): "ray_start_cluster_head", [{ "num_cpus": 0, "object_store_memory": 75 * 1024 * 1024, - "object_spilling_config": { + "_object_spilling_config": { "type": "filesystem", "params": { "directory_path": "/tmp" @@ -127,7 +127,7 @@ def test_spill_remote_object(ray_start_cluster_head): cluster = ray_start_cluster_head cluster.add_node( object_store_memory=75 * 1024 * 1024, - object_spilling_config={ + _object_spilling_config={ "type": "filesystem", "params": { "directory_path": "/tmp" diff --git a/python/ray/tests/test_projects.py b/python/ray/tests/test_projects.py deleted file mode 100644 index 4e306546c514..000000000000 --- a/python/ray/tests/test_projects.py +++ /dev/null @@ -1,256 +0,0 @@ -import jsonschema -import os -import pytest -import subprocess -import yaml -from click.testing import CliRunner -import sys -from unittest.mock import patch, DEFAULT - -from contextlib import contextmanager - -from ray.projects.scripts import (session_start, session_commands, - session_execute) -from ray.test_utils import check_call_ray -import ray - -TEST_DIR = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "project_files") - - -def load_project_description(project_file): - path = os.path.join(TEST_DIR, project_file) - with open(path) as f: - return yaml.safe_load(f) - - -def test_validation(): - project_dirs = ["docker_project", "requirements_project", "shell_project"] - for project_dir in project_dirs: - project_dir = os.path.join(TEST_DIR, project_dir) - ray.projects.ProjectDefinition(project_dir) - - bad_schema_dirs = ["no_project1"] - for project_dir in bad_schema_dirs: - project_dir = os.path.join(TEST_DIR, project_dir) - with pytest.raises(jsonschema.exceptions.ValidationError): - ray.projects.ProjectDefinition(project_dir) - - bad_project_dirs = ["no_project2", "noproject3"] - for project_dir in bad_project_dirs: - project_dir = os.path.join(TEST_DIR, project_dir) - with pytest.raises(ValueError): - ray.projects.ProjectDefinition(project_dir) - - -def test_project_root(): - path = os.path.join(TEST_DIR, "project1") - project_definition = ray.projects.ProjectDefinition(path) - assert os.path.normpath(project_definition.root) == os.path.normpath(path) - - path2 = os.path.join(TEST_DIR, "project1", "subdir") - project_definition = ray.projects.ProjectDefinition(path2) - assert os.path.normpath(project_definition.root) == os.path.normpath(path) - - path3 = ray.utils.get_user_temp_dir() + os.sep - with pytest.raises(ValueError): - project_definition = ray.projects.ProjectDefinition(path3) - - -def test_project_validation(): - with _chdir_and_back(os.path.join(TEST_DIR, "project1")): - check_call_ray(["project", "validate"]) - - -def test_project_no_validation(): - with _chdir_and_back(TEST_DIR): - with pytest.raises(subprocess.CalledProcessError): - check_call_ray(["project", "validate"]) - - -@contextmanager -def _chdir_and_back(d): - old_dir = os.getcwd() - try: - os.chdir(d) - yield - finally: - os.chdir(old_dir) - - -def run_test_project(project_dir, command, args): - # Run the CLI commands with patching - test_dir = os.path.join(TEST_DIR, project_dir) - with _chdir_and_back(test_dir): - runner = CliRunner() - with patch.multiple( - "ray.projects.scripts", - create_or_update_cluster=DEFAULT, - rsync=DEFAULT, - exec_cluster=DEFAULT, - ) as mock_calls: - result = runner.invoke(command, args) - - return result, mock_calls, test_dir - - -def test_session_start_default_project(): - result, mock_calls, test_dir = run_test_project( - os.path.join("session-tests", "project-pass"), session_start, - ["default"]) - - loaded_project = ray.projects.ProjectDefinition(test_dir) - assert result.exit_code == 0 - - # Part 1/3: Cluster Launching Call - create_or_update_cluster_call = mock_calls["create_or_update_cluster"] - assert create_or_update_cluster_call.call_count == 1 - _, kwargs = create_or_update_cluster_call.call_args - assert kwargs["config_file"] == loaded_project.cluster_yaml() - - # Part 2/3: Rsync Calls - rsync_call = mock_calls["rsync"] - # 1 for rsyncing the project directory, 1 for rsyncing the - # requirements.txt. - assert rsync_call.call_count == 2 - _, kwargs = rsync_call.call_args - assert kwargs["source"] == loaded_project.config["environment"][ - "requirements"] - - # Part 3/3: Exec Calls - exec_cluster_call = mock_calls["exec_cluster"] - commands_executed = [] - for _, kwargs in exec_cluster_call.call_args_list: - commands_executed.append(kwargs["cmd"].replace( - "cd {}; ".format(loaded_project.working_directory()), "")) - - expected_commands = loaded_project.config["environment"]["shell"] - expected_commands += [ - command["command"] for command in loaded_project.config["commands"] - ] - - if "requirements" in loaded_project.config["environment"]: - assert any("pip install -r" for cmd in commands_executed) - # pop the `pip install` off commands executed - commands_executed = [ - cmd for cmd in commands_executed if "pip install -r" not in cmd - ] - - assert expected_commands == commands_executed - - -def test_session_execute_default_project(): - result, mock_calls, test_dir = run_test_project( - os.path.join("session-tests", "project-pass"), session_execute, - ["default"]) - - loaded_project = ray.projects.ProjectDefinition(test_dir) - assert result.exit_code == 0 - - assert mock_calls["rsync"].call_count == 0 - assert mock_calls["create_or_update_cluster"].call_count == 0 - - exec_cluster_call = mock_calls["exec_cluster"] - commands_executed = [] - for _, kwargs in exec_cluster_call.call_args_list: - commands_executed.append(kwargs["cmd"].replace( - "cd {}; ".format(loaded_project.working_directory()), "")) - - expected_commands = [ - command["command"] for command in loaded_project.config["commands"] - ] - - assert expected_commands == commands_executed - - result, mock_calls, test_dir = run_test_project( - os.path.join("session-tests", "project-pass"), session_execute, - ["--shell", "uptime"]) - assert result.exit_code == 0 - - -def test_session_start_docker_fail(): - result, _, _ = run_test_project( - os.path.join("session-tests", "with-docker-fail"), session_start, []) - - assert result.exit_code == 1 - assert ("Docker support in session is currently " - "not implemented") in result.output - - -def test_session_invalid_config_errored(): - result, _, _ = run_test_project( - os.path.join("session-tests", "invalid-config-fail"), session_start, - []) - - assert result.exit_code == 1 - assert "validation failed" in result.output - # check that we are displaying actional error message - assert "ray project validate" in result.output - - -def test_session_create_command(): - result, mock_calls, test_dir = run_test_project( - os.path.join("session-tests", "commands-test"), session_start, - ["first", "--a", "1", "--b", "2"]) - - # Verify the project can be loaded. - ray.projects.ProjectDefinition(test_dir) - assert result.exit_code == 0 - - exec_cluster_call = mock_calls["exec_cluster"] - found_command = False - for _, kwargs in exec_cluster_call.call_args_list: - if "Starting ray job with 1 and 2" in kwargs["cmd"]: - found_command = True - assert found_command - - -def test_session_create_multiple(): - for args in [{"a": "*", "b": "2"}, {"a": "1", "b": "*"}]: - result, mock_calls, test_dir = run_test_project( - os.path.join("session-tests", "commands-test"), session_start, - ["first", "--a", args["a"], "--b", args["b"]]) - - loaded_project = ray.projects.ProjectDefinition(test_dir) - assert result.exit_code == 0 - - exec_cluster_call = mock_calls["exec_cluster"] - commands_executed = [] - for _, kwargs in exec_cluster_call.call_args_list: - commands_executed.append(kwargs["cmd"].replace( - "cd {}; ".format(loaded_project.working_directory()), "")) - assert commands_executed.count("echo \"Setting up\"") == 2 - if args["a"] == "*": - assert commands_executed.count( - "echo \"Starting ray job with 1 and 2\"") == 1 - assert commands_executed.count( - "echo \"Starting ray job with 2 and 2\"") == 1 - if args["b"] == "*": - assert commands_executed.count( - "echo \"Starting ray job with 1 and 1\"") == 1 - assert commands_executed.count( - "echo \"Starting ray job with 1 and 2\"") == 1 - - # Using multiple wildcards shouldn't work - result, mock_calls, test_dir = run_test_project( - os.path.join("session-tests", "commands-test"), session_start, - ["first", "--a", "*", "--b", "*"]) - assert result.exit_code == 1 - - -def test_session_commands(): - result, mock_calls, test_dir = run_test_project( - os.path.join("session-tests", "commands-test"), session_commands, []) - - assert "This is the first parameter" in result.output - assert "This is the second parameter" in result.output - - assert 'Command "first"' in result.output - assert 'Command "second"' in result.output - - -if __name__ == "__main__": - # Make subprocess happy in bazel. - os.environ["LC_ALL"] = "en_US.UTF-8" - os.environ["LANG"] = "en_US.UTF-8" - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 13d16f303ad9..2b9b66f061da 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -23,7 +23,7 @@ def test_redis_password(self, password, shutdown_only): def f(): return 1 - info = ray.init(redis_password=password) + info = ray.init(_redis_password=password) address = info["redis_address"] redis_ip, redis_port = address.split(":") @@ -58,20 +58,6 @@ def f(): object_ref = f.remote() ray.get(object_ref) - def test_redis_port(self, shutdown_only): - @ray.remote - def f(): - return 1 - - info = ray.init(redis_port=1234, redis_password="testpassword") - address = info["redis_address"] - redis_ip, redis_port = address.split(":") - assert redis_port == "1234" - - redis_client = redis.StrictRedis( - host=redis_ip, port=redis_port, password="testpassword") - assert redis_client.ping() - if __name__ == "__main__": import pytest diff --git a/python/ray/tests/test_serialization.py b/python/ray/tests/test_serialization.py index 10a9ccdaa74e..6928981aa91c 100644 --- a/python/ray/tests/test_serialization.py +++ b/python/ray/tests/test_serialization.py @@ -315,30 +315,6 @@ def test_numpy_serialization(ray_start_regular): assert len(buffers) == 1 -def test_numpy_subclass_serialization(ray_start_regular): - class MyNumpyConstant(np.ndarray): - def __init__(self, value): - super().__init__() - self.constant = value - - def __str__(self): - print(self.constant) - - constant = MyNumpyConstant(123) - - def explode(x): - raise RuntimeError("Expected error.") - - ray.register_custom_serializer( - type(constant), serializer=explode, deserializer=explode) - - try: - ray.put(constant) - assert False, "Should never get here!" - except (RuntimeError, IndexError): - print("Correct behavior, proof that customer serializer was used.") - - def test_numpy_subclass_serialization_pickle(ray_start_regular): class MyNumpyConstant(np.ndarray): def __init__(self, value): diff --git a/python/ray/tests/test_stress_sharded.py b/python/ray/tests/test_stress_sharded.py index 814348e9b142..8e49c9719d49 100644 --- a/python/ray/tests/test_stress_sharded.py +++ b/python/ray/tests/test_stress_sharded.py @@ -1,25 +1,20 @@ import numpy as np -import os import pytest import ray -@pytest.fixture(params=[1, 4]) +@pytest.fixture(params=[1]) def ray_start_sharded(request): - num_redis_shards = request.param - - if os.environ.get("RAY_USE_NEW_GCS") == "on": - num_redis_shards = 1 - # For now, RAY_USE_NEW_GCS supports 1 shard, and credis supports - # 1-node chain for that shard only. + # TODO(ekl) enable this again once GCS supports sharding. + # num_redis_shards = request.param # Start the Ray processes. ray.init( object_store_memory=int(0.5 * 10**9), num_cpus=10, - num_redis_shards=num_redis_shards, - redis_max_memory=10**7) + # _num_redis_shards=num_redis_shards, + _redis_max_memory=10**7) yield None diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 0a800049034b..8875499d91e5 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -5,7 +5,6 @@ import pytest import ray -from ray.cluster_utils import Cluster from ray.test_utils import check_call_ray @@ -24,43 +23,11 @@ def unix_socket_delete(unix_socket): return os.remove(unix_socket) if unix else None -def test_conn_cluster(): - # plasma_store_socket_name - with pytest.raises(Exception) as exc_info: - ray.init( - address="127.0.0.1:6379", - plasma_store_socket_name=os.path.join( - ray.utils.get_user_temp_dir(), "this_should_fail")) - assert exc_info.value.args[0] == ( - "When connecting to an existing cluster, " - "plasma_store_socket_name must not be provided.") - - # raylet_socket_name - with pytest.raises(Exception) as exc_info: - ray.init( - address="127.0.0.1:6379", - raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(), - "this_should_fail")) - assert exc_info.value.args[0] == ( - "When connecting to an existing cluster, " - "raylet_socket_name must not be provided.") - - # temp_dir - with pytest.raises(Exception) as exc_info: - ray.init( - address="127.0.0.1:6379", - temp_dir=os.path.join(ray.utils.get_user_temp_dir(), - "this_should_fail")) - assert exc_info.value.args[0] == ( - "When connecting to an existing cluster, " - "temp_dir must not be provided.") - - def test_tempdir(shutdown_only): shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True) ray.init( - temp_dir=os.path.join(ray.utils.get_user_temp_dir(), - "i_am_a_temp_dir")) + _temp_dir=os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_dir")) assert os.path.exists( os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_dir")), "Specified temp dir not found." @@ -94,47 +61,7 @@ def test_tempdir_long_path(): maxlen = 104 if sys.platform.startswith("darwin") else 108 temp_dir = os.path.join(ray.utils.get_user_temp_dir(), "z" * maxlen) with pytest.raises(OSError): - ray.init(temp_dir=temp_dir) # path should be too long - - -def test_raylet_socket_name(shutdown_only): - sock1 = unix_socket_create_path("i_am_a_temp_socket_1") - ray.init(raylet_socket_name=sock1) - unix_socket_verify(sock1) - ray.shutdown() - try: - unix_socket_delete(sock1) - except OSError: - pass # It could have been removed by Ray. - cluster = Cluster(True) - sock2 = unix_socket_create_path("i_am_a_temp_socket_2") - cluster.add_node(raylet_socket_name=sock2) - unix_socket_verify(sock2) - cluster.shutdown() - try: - unix_socket_delete(sock2) - except OSError: - pass # It could have been removed by Ray. - - -def test_temp_plasma_store_socket(shutdown_only): - sock1 = unix_socket_create_path("i_am_a_temp_socket_1") - ray.init(plasma_store_socket_name=sock1) - unix_socket_verify(sock1) - ray.shutdown() - try: - unix_socket_delete(sock1) - except OSError: - pass # It could have been removed by Ray. - cluster = Cluster(True) - sock2 = unix_socket_create_path("i_am_a_temp_socket_2") - cluster.add_node(plasma_store_socket_name=sock2) - unix_socket_verify(sock2) - cluster.shutdown() - try: - unix_socket_delete(sock2) - except OSError: - pass # It could have been removed by Ray. + ray.init(_temp_dir=temp_dir) # path should be too long def test_raylet_tempfiles(shutdown_only): diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py index c61fbdd586a9..d1dceca6113f 100644 --- a/python/ray/tests/test_unreconstructable_errors.py +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -9,13 +9,14 @@ def setUp(self): ray.init( num_cpus=1, object_store_memory=150 * 1024 * 1024, - redis_max_memory=10000000) + _redis_max_memory=10000000) def tearDown(self): ray.shutdown() def testDriverPutEvictedCannotReconstruct(self): - x_id = ray.put(np.zeros(1 * 1024 * 1024), weakref=True) + x_id = ray.worker.global_worker.put_object( + np.zeros(1 * 1024 * 1024), pin_object=False) ray.get(x_id) for _ in range(20): ray.put(np.zeros(10 * 1024 * 1024)) diff --git a/python/ray/tests/test_webui.py b/python/ray/tests/test_webui.py index b02cc9572626..011993af3e77 100644 --- a/python/ray/tests/test_webui.py +++ b/python/ray/tests/test_webui.py @@ -13,7 +13,7 @@ def test_get_webui(shutdown_only): addresses = ray.init(include_dashboard=True, num_cpus=1) webui_url = addresses["webui_url"] - assert ray.get_webui_url() == webui_url + assert ray.get_dashboard_url() == webui_url assert re.match(r"^(localhost|\d+\.\d+\.\d+\.\d+):\d+$", webui_url) diff --git a/python/ray/tune/examples/pbt_transformers/pbt_transformers.py b/python/ray/tune/examples/pbt_transformers/pbt_transformers.py index ee144f6e3983..a3d6c36e5ac3 100644 --- a/python/ray/tune/examples/pbt_transformers/pbt_transformers.py +++ b/python/ray/tune/examples/pbt_transformers/pbt_transformers.py @@ -106,7 +106,7 @@ def tune_transformer(num_samples=8, gpus_per_trial=0, smoke_test=False, ray_address=None): - ray.init(ray_address, log_to_driver=False) + ray.init(ray_address, _log_to_driver=False) data_dir_name = "./data" if not smoke_test else "./test_data" data_dir = os.path.abspath(os.path.join(os.getcwd(), data_dir_name)) if not os.path.exists(data_dir): diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 309c7a8e3b6a..ae2ef163c228 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -396,7 +396,7 @@ def reset_trial(self, trial, new_config, new_experiment_tag): try: reset_val = ray.get( trainable.reset.remote(new_config, trial.logdir), - DEFAULT_GET_TIMEOUT) + timeout=DEFAULT_GET_TIMEOUT) except RayTimeoutError: logger.exception("Trial %s: reset timed out.", trial) return False @@ -465,7 +465,7 @@ def fetch_result(self, trial): raise ValueError("Trial was not running.") self._running.pop(trial_future[0]) with warn_if_slow("fetch_result"): - result = ray.get(trial_future[0], DEFAULT_GET_TIMEOUT) + result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT) # For local mode if isinstance(result, _LocalWrapper): @@ -734,7 +734,7 @@ def export_trial_if_needed(self, trial): with self._change_working_directory(trial): return ray.get( trial.runner.export_model.remote(trial.export_formats), - DEFAULT_GET_TIMEOUT) + timeout=DEFAULT_GET_TIMEOUT) return {} def has_gpus(self): diff --git a/python/ray/tune/tests/test_commands.py b/python/ray/tune/tests/test_commands.py index 179ec5e74e86..1ffc98a8755d 100644 --- a/python/ray/tune/tests/test_commands.py +++ b/python/ray/tune/tests/test_commands.py @@ -31,7 +31,7 @@ def __exit__(self, *args): @pytest.fixture def start_ray(): - ray.init(log_to_driver=False, local_mode=True) + ray.init(_log_to_driver=False, local_mode=True) _register_all() yield ray.shutdown() diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index fefcf7665c81..14af9d998df1 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -96,9 +96,9 @@ def stop(self): def pin_in_object_store(obj): - """Deprecated, use ray.put(value, weakref=False) instead.""" + """Deprecated, use ray.put(value) instead.""" - obj_ref = ray.put(obj, weakref=False) + obj_ref = ray.put(obj) _pinned_objects.append(obj_ref) return obj_ref diff --git a/python/ray/util/__init__.py b/python/ray/util/__init__.py index 17b048c16c50..6bf57683f100 100644 --- a/python/ray/util/__init__.py +++ b/python/ray/util/__init__.py @@ -2,13 +2,11 @@ from ray.util.actor_pool import ActorPool from ray.util.debug import log_once, disable_log_once_globally, \ enable_periodic_logging -from ray.util.named_actors import get_actor __all__ = [ "ActorPool", "disable_log_once_globally", "enable_periodic_logging", - "get_actor", "iter", "log_once", ] diff --git a/python/ray/util/named_actors.py b/python/ray/util/named_actors.py deleted file mode 100644 index b1d39fe2d100..000000000000 --- a/python/ray/util/named_actors.py +++ /dev/null @@ -1,27 +0,0 @@ -import logging - -import ray - -logger = logging.getLogger(__name__) - - -def _get_actor(name): - worker = ray.worker.global_worker - handle = worker.core_worker.get_named_actor_handle(name) - return handle - - -def get_actor(name: str) -> ray.actor.ActorHandle: - """Get a named actor which was previously created. - - If the actor doesn't exist, an exception will be raised. - - Args: - name: The name of the named actor. - - Returns: - The ActorHandle object corresponding to the name. - """ - logger.warning("ray.util.get_actor has been moved to ray.get_actor and " - "will be removed in the future.") - return _get_actor(name) diff --git a/python/ray/util/sgd/torch/examples/cifar_pytorch_example.py b/python/ray/util/sgd/torch/examples/cifar_pytorch_example.py index 94d286d45e8f..2ecd356f494f 100644 --- a/python/ray/util/sgd/torch/examples/cifar_pytorch_example.py +++ b/python/ray/util/sgd/torch/examples/cifar_pytorch_example.py @@ -102,7 +102,7 @@ def scheduler_creator(optimizer, config): args, _ = parser.parse_known_args() num_cpus = 4 if args.smoke_test else None - ray.init(address=args.address, num_cpus=num_cpus, log_to_driver=True) + ray.init(address=args.address, num_cpus=num_cpus, _log_to_driver=True) trainer1 = TorchTrainer( model_creator=ResNet18, diff --git a/python/ray/util/sgd/torch/examples/cifar_pytorch_pbt.py b/python/ray/util/sgd/torch/examples/cifar_pytorch_pbt.py index f2b6a75c3013..de066c3283f0 100644 --- a/python/ray/util/sgd/torch/examples/cifar_pytorch_pbt.py +++ b/python/ray/util/sgd/torch/examples/cifar_pytorch_pbt.py @@ -98,7 +98,7 @@ def optimizer_creator(model, config): "--tune", action="store_true", default=False, help="Tune training") args, _ = parser.parse_known_args() - ray.init(address=args.address, log_to_driver=True) + ray.init(address=args.address, _log_to_driver=True) TorchTrainable = TorchTrainer.as_trainable( model_creator=ResNet18, diff --git a/python/ray/worker.py b/python/ray/worker.py index 72f72e1d09a4..a0a6ec8498cc 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -373,7 +373,7 @@ def sigterm_handler(signum, frame): sys.exit(0) -def get_gpu_ids(as_str=False): +def get_gpu_ids(): """Get the IDs of the GPUs that are available to the worker. If the CUDA_VISIBLE_DEVICES environment variable was set when the worker @@ -407,16 +407,6 @@ def get_gpu_ids(as_str=False): max_gpus = global_worker.node.get_resource_spec().num_gpus assigned_ids = global_worker.original_gpu_ids[:max_gpus] - if not as_str: - from ray.util.debug import log_once - if log_once("ray.get_gpu_ids.as_str"): - logger.warning( - "ray.get_gpu_ids() will return a list of strings by default" - " in a future version of Ray for compatibility with CUDA. " - "To enable the forward-compatible behavior, use " - "`ray.get_gpu_ids(as_str=True)`.") - assigned_ids = [int(assigned_id) for assigned_id in assigned_ids] - return assigned_ids @@ -438,13 +428,13 @@ def get_resource_ids(): return global_worker.core_worker.resource_ids() -def get_webui_url(): - """Get the URL to access the web UI. +def get_dashboard_url(): + """Get the URL to access the Ray dashboard. - Note that the URL does not specify which node the web UI is on. + Note that the URL does not specify which node the dashboard is on. Returns: - The URL of the web UI as a string. + The URL of the dashboard as a string. """ worker = global_worker worker.check_connected() @@ -477,48 +467,38 @@ def print_failed_task(task_status): """) -def init(address=None, - redis_address=None, - redis_port=None, - num_cpus=None, - num_gpus=None, - memory=None, - object_store_memory=None, - resources=None, - driver_object_store_memory=None, - redis_max_memory=None, - log_to_driver=True, - node_ip_address=ray_constants.NODE_DEFAULT_IP, - object_ref_seed=None, - local_mode=False, - redirect_worker_output=None, - redirect_output=None, - ignore_reinit_error=False, - num_redis_shards=None, - redis_max_clients=None, - redis_password=ray_constants.REDIS_DEFAULT_PASSWORD, - plasma_directory=None, - huge_pages=False, - include_java=False, - include_dashboard=None, - dashboard_host=ray_constants.DEFAULT_DASHBOARD_IP, - dashboard_port=ray_constants.DEFAULT_DASHBOARD_PORT, - job_id=None, - job_config=None, - configure_logging=True, - logging_level=logging.INFO, - logging_format=ray_constants.LOGGER_FORMAT, - plasma_store_socket_name=None, - raylet_socket_name=None, - temp_dir=None, - load_code_from_local=False, - java_worker_options=None, - use_pickle=True, - _system_config=None, - lru_evict=False, - enable_object_reconstruction=False, - _metrics_export_port=None, - object_spilling_config=None): +def init( + address=None, + *, + num_cpus=None, + num_gpus=None, + resources=None, + object_store_memory=None, + local_mode=False, + ignore_reinit_error=False, + include_dashboard=None, + dashboard_host=ray_constants.DEFAULT_DASHBOARD_IP, + dashboard_port=ray_constants.DEFAULT_DASHBOARD_PORT, + job_config=None, + configure_logging=True, + logging_level=logging.INFO, + logging_format=ray_constants.LOGGER_FORMAT, + enable_object_reconstruction=False, + # The following are unstable parameters and their use is discouraged. + _redis_max_memory=None, + _node_ip_address=ray_constants.NODE_DEFAULT_IP, + _driver_object_store_memory=None, + _log_to_driver=True, + _memory=None, + _redis_password=ray_constants.REDIS_DEFAULT_PASSWORD, + _include_java=False, + _java_worker_options=None, + _temp_dir=None, + _load_code_from_local=False, + _lru_evict=False, + _metrics_export_port=None, + _object_spilling_config=None, + _system_config=None): """ Connect to an existing Ray cluster or start one and connect to it. @@ -551,53 +531,19 @@ def init(address=None, is running on a node in a Ray cluster, using `auto` as the value tells the driver to detect the the cluster, removing the need to specify a specific node address. - redis_address (str): Deprecated; same as address. - redis_port (int): The port that the primary Redis shard should listen - to. If None, then a random port will be chosen. num_cpus (int): Number of CPUs the user wishes to assign to each - raylet. + raylet. By default, this is set based on virtual cores. num_gpus (int): Number of GPUs the user wishes to assign to each - raylet. + raylet. By default, this is set based on detected GPUs. resources: A dictionary mapping the names of custom resources to the quantities for them available. - memory: The amount of memory (in bytes) that is available for use by - workers requesting memory resources. By default, this is - automatically set based on available system memory. object_store_memory: The amount of memory (in bytes) to start the object store with. By default, this is automatically set based on - available system memory, subject to a 20GB cap. - redis_max_memory: The max amount of memory (in bytes) to allow each - redis shard to use. Once the limit is exceeded, redis will start - LRU eviction of entries. This only applies to the sharded redis - tables (task, object, and profile tables). By default, this is - autoset based on available system memory, subject to a 10GB cap. - log_to_driver (bool): If true, the output from all of the worker - processes on all nodes will be directed to the driver. - node_ip_address (str): The IP address of the node that we are on. - object_ref_seed (int): Used to seed the deterministic generation of - object refs. The same value can be used across multiple runs of the - same driver in order to generate the object refs in a consistent - manner. However, the same ID should not be used for different - drivers. + available system memory. local_mode (bool): If true, the code will be executed serially. This is useful for debugging. - driver_object_store_memory (int): Limit the amount of memory the driver - can use in the object store for creating objects. By default, this - is autoset based on available system memory, subject to a 20GB cap. ignore_reinit_error: If true, Ray suppresses errors from calling ray.init() a second time. Ray won't be restarted. - num_redis_shards: The number of Redis shards to start in addition to - the primary Redis shard. - redis_max_clients: If provided, attempt to configure Redis with this - maxclients number. - redis_password (str): Prevents external clients without the password - from connecting to Redis if provided. - plasma_directory: A directory where the Plasma memory mapped files - will be created. - huge_pages: Boolean flag indicating whether to start the Object - Store with hugetlbfs support. Requires plasma_directory. - include_java: Boolean flag indicating whether or not to enable java - workers. include_dashboard: Boolean flag indicating whether or not to start the Ray dashboard, which displays the status of the Ray cluster. If this argument is None, then the UI will be started if @@ -608,7 +554,6 @@ def init(address=None, external machines. dashboard_port: The port to bind the dashboard server to. Defaults to 8265. - job_id: The ID of this job. job_config (ray.job_config.JobConfig): The job configuration. configure_logging: True (default) if configuration of logging is allowed here. Otherwise, the user may want to configure it @@ -619,37 +564,42 @@ def init(address=None, timestamp, filename, line number, and message. See the source file ray_constants.py for details. Ignored unless "configure_logging" is true. - plasma_store_socket_name (str): If provided, specifies the socket - name used by the plasma store. - raylet_socket_name (str): If provided, specifies the socket path - used by the raylet process. - temp_dir (str): If provided, specifies the root temporary + enable_object_reconstruction (bool): If True, when an object stored in + the distributed plasma store is lost due to node failure, Ray will + attempt to reconstruct the object by re-executing the task that + created the object. Arguments to the task will be recursively + reconstructed. If False, then ray.UnreconstructableError will be + thrown. + _redis_max_memory: Redis max memory. + _node_ip_address (str): The IP address of the node that we are on. + _driver_object_store_memory (int): Limit the amount of memory the + driver can use in the object store for creating objects. + _log_to_driver (bool): If true, the output from all of the worker + processes on all nodes will be directed to the driver. + _memory: Amount of reservable memory resource to create. + _redis_password (str): Prevents external clients without the password + from connecting to Redis if provided. + _include_java: Boolean flag indicating whether or not to enable java + workers. + _temp_dir (str): If provided, specifies the root temporary directory for the Ray process. Defaults to an OS-specific conventional location, e.g., "/tmp/ray". - load_code_from_local: Whether code should be loaded from a local + _load_code_from_local: Whether code should be loaded from a local module or from the GCS. - java_worker_options: Overwrite the options to start Java workers. - use_pickle: Deprecated. - _system_config (dict): Configuration for overriding RayConfig - defaults. Used to set system configuration and for experimental Ray - core feature flags. - lru_evict (bool): If True, when an object store is full, it will evict + _java_worker_options: Overwrite the options to start Java workers. + _lru_evict (bool): If True, when an object store is full, it will evict objects in LRU order to make more space and when under memory pressure, ray.UnreconstructableError may be thrown. If False, then reference counting will be used to decide which objects are safe to evict and when under memory pressure, ray.ObjectStoreFullError may be thrown. - enable_object_reconstruction (bool): If True, when an object stored in - the distributed plasma store is lost due to node failure, Ray will - attempt to reconstruct the object by re-executing the task that - created the object. Arguments to the task will be recursively - reconstructed. If False, then ray.UnreconstructableError will be - thrown. _metrics_export_port(int): Port number Ray exposes system metrics through a Prometheus endpoint. It is currently under active development, and the API is subject to change. - object_spilling_config (str): The configuration json string for object + _object_spilling_config (str): The configuration json string for object spilling I/O worker. + _system_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: Address information about the started processes. @@ -659,15 +609,8 @@ def init(address=None, arguments is passed in. """ - if not use_pickle: - raise DeprecationWarning("The use_pickle argument is deprecated.") - - if redis_address is not None: - raise DeprecationWarning("The redis_address argument is deprecated. " - "Please use address instead.") - if "RAY_ADDRESS" in os.environ: - if redis_address is None and (address is None or address == "auto"): + if address is None or address == "auto": address = os.environ["RAY_ADDRESS"] else: raise RuntimeError( @@ -677,9 +620,10 @@ def init(address=None, "please call ray.init() or ray.init(address=\"auto\") on the " "driver.") - if redis_address is not None or address is not None: - redis_address, _, _ = services.validate_redis_address( - address, redis_address) + if address: + redis_address, _, _ = services.validate_redis_address(address) + else: + redis_address = None if configure_logging: setup_logger(logging_level, logging_format) @@ -700,12 +644,6 @@ def init(address=None, "'ignore_reinit_error=True' or by calling " "'ray.shutdown()' prior to 'ray.init()'.") - # Convert hostnames to numerical IP address. - if node_ip_address is not None: - node_ip_address = services.address_to_ip(node_ip_address) - - raylet_ip_address = node_ip_address - _system_config = _system_config or {} if not isinstance(_system_config, dict): raise TypeError("The _system_config must be a dict.") @@ -715,39 +653,37 @@ def init(address=None, # In this case, we need to start a new cluster. ray_params = ray.parameter.RayParams( redis_address=redis_address, - redis_port=redis_port, - node_ip_address=node_ip_address, - raylet_ip_address=raylet_ip_address, - object_ref_seed=object_ref_seed, + node_ip_address=None, + raylet_ip_address=None, + object_ref_seed=None, driver_mode=driver_mode, - redirect_worker_output=redirect_worker_output, - redirect_output=redirect_output, + redirect_worker_output=None, + redirect_output=None, num_cpus=num_cpus, num_gpus=num_gpus, resources=resources, - num_redis_shards=num_redis_shards, - redis_max_clients=redis_max_clients, - redis_password=redis_password, - plasma_directory=plasma_directory, - huge_pages=huge_pages, - include_java=include_java, + num_redis_shards=None, + redis_max_clients=None, + redis_password=_redis_password, + plasma_directory=None, + huge_pages=None, + include_java=_include_java, include_dashboard=include_dashboard, dashboard_host=dashboard_host, dashboard_port=dashboard_port, - memory=memory, + memory=_memory, object_store_memory=object_store_memory, - redis_max_memory=redis_max_memory, - plasma_store_socket_name=plasma_store_socket_name, - raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir, - load_code_from_local=load_code_from_local, - java_worker_options=java_worker_options, + redis_max_memory=_redis_max_memory, + plasma_store_socket_name=None, + temp_dir=_temp_dir, + load_code_from_local=_load_code_from_local, + java_worker_options=_java_worker_options, start_initial_python_workers_for_first_job=True, _system_config=_system_config, - lru_evict=lru_evict, + lru_evict=_lru_evict, enable_object_reconstruction=enable_object_reconstruction, metrics_export_port=_metrics_export_port, - object_spilling_config=object_spilling_config) + object_spilling_config=_object_spilling_config) # Start the Ray processes. We set shutdown_at_exit=False because we # shutdown the node in the ray.shutdown call that happens in the atexit # handler. We still spawn a reaper process in case the atexit handler @@ -766,45 +702,15 @@ def init(address=None, if resources is not None: raise ValueError("When connecting to an existing cluster, " "resources must not be provided.") - if num_redis_shards is not None: - raise ValueError("When connecting to an existing cluster, " - "num_redis_shards must not be provided.") - if redis_max_clients is not None: - raise ValueError("When connecting to an existing cluster, " - "redis_max_clients must not be provided.") - if memory is not None: - raise ValueError("When connecting to an existing cluster, " - "memory must not be provided.") if object_store_memory is not None: raise ValueError("When connecting to an existing cluster, " "object_store_memory must not be provided.") - if redis_max_memory is not None: - raise ValueError("When connecting to an existing cluster, " - "redis_max_memory must not be provided.") - if plasma_directory is not None: - raise ValueError("When connecting to an existing cluster, " - "plasma_directory must not be provided.") - if huge_pages: - raise ValueError("When connecting to an existing cluster, " - "huge_pages must not be provided.") - if temp_dir is not None: - raise ValueError("When connecting to an existing cluster, " - "temp_dir must not be provided.") - if plasma_store_socket_name is not None: - raise ValueError("When connecting to an existing cluster, " - "plasma_store_socket_name must not be provided.") - if raylet_socket_name is not None: - raise ValueError("When connecting to an existing cluster, " - "raylet_socket_name must not be provided.") - if java_worker_options is not None: - raise ValueError("When connecting to an existing cluster, " - "java_worker_options must not be provided.") if _system_config is not None and len(_system_config) != 0: raise ValueError("When connecting to an existing cluster, " "_system_config must not be provided.") - if lru_evict: + if _lru_evict: raise ValueError("When connecting to an existing cluster, " - "lru_evict must not be provided.") + "_lru_evict must not be provided.") if enable_object_reconstruction: raise ValueError( "When connecting to an existing cluster, " @@ -812,15 +718,15 @@ def init(address=None, # In this case, we only need to connect the node. ray_params = ray.parameter.RayParams( - node_ip_address=node_ip_address, - raylet_ip_address=raylet_ip_address, + node_ip_address=None, + raylet_ip_address=None, redis_address=redis_address, - redis_password=redis_password, - object_ref_seed=object_ref_seed, - temp_dir=temp_dir, - load_code_from_local=load_code_from_local, + redis_password=_redis_password, + object_ref_seed=None, + temp_dir=_temp_dir, + load_code_from_local=_load_code_from_local, _system_config=_system_config, - lru_evict=lru_evict, + lru_evict=_lru_evict, enable_object_reconstruction=enable_object_reconstruction, metrics_export_port=_metrics_export_port) _global_node = ray.node.Node( @@ -833,10 +739,10 @@ def init(address=None, connect( _global_node, mode=driver_mode, - log_to_driver=log_to_driver, + log_to_driver=_log_to_driver, worker=global_worker, - driver_object_store_memory=driver_object_store_memory, - job_id=job_id, + driver_object_store_memory=_driver_object_store_memory, + job_id=None, job_config=job_config) for hook in _post_init_hooks: @@ -849,7 +755,7 @@ def init(address=None, _post_init_hooks = [] -def shutdown(exiting_interpreter=False): +def shutdown(_exiting_interpreter=False): """Disconnect the worker, and terminate processes started by ray.init(). This will automatically run at the end when a Python process that uses Ray @@ -863,16 +769,16 @@ def shutdown(exiting_interpreter=False): will need to reload the module. Args: - exiting_interpreter (bool): True if this is called by the atexit hook + _exiting_interpreter (bool): True if this is called by the atexit hook and false otherwise. If we are exiting the interpreter, we will wait a little while to print any extra error messages. """ - if exiting_interpreter and global_worker.mode == SCRIPT_MODE: + if _exiting_interpreter and global_worker.mode == SCRIPT_MODE: # This is a duration to sleep before shutting down everything in order # to make sure that log messages finish printing. time.sleep(0.5) - disconnect(exiting_interpreter) + disconnect(_exiting_interpreter) # We need to destruct the core worker here because after this function, # we will tear down any processes spawned by ray.init() and the background @@ -1422,50 +1328,7 @@ def _changeproctitle(title, next_title): setproctitle.setproctitle(next_title) -def register_custom_serializer(cls, - serializer, - deserializer, - use_pickle=False, - use_dict=False, - class_id=None): - """Registers custom functions for efficient object serialization. - - The serializer and deserializer are used when transferring objects of - `cls` across processes and nodes. This can be significantly faster than - the Ray default fallbacks. Wraps `register_custom_serializer` underneath. - - Args: - cls (type): The class that ray should use this custom serializer for. - serializer: The custom serializer that takes in a cls instance and - outputs a serialized representation. use_pickle and use_dict - must be False if provided. - deserializer: The custom deserializer that takes in a serialized - representation of the cls and outputs a cls instance. use_pickle - and use_dict must be False if provided. - use_pickle: Deprecated. - use_dict: Deprecated. - class_id (str): Unique ID of the class. Autogenerated if None. - """ - worker = global_worker - worker.check_connected() - - if use_pickle: - raise DeprecationWarning( - "`use_pickle` is no longer a valid parameter and will be removed " - "in future versions of Ray. If this breaks your application, " - "see `SerializationContext.register_custom_serializer`.") - if use_dict: - raise DeprecationWarning( - "`use_pickle` is no longer a valid parameter and will be removed " - "in future versions of Ray. If this breaks your application, " - "see `SerializationContext.register_custom_serializer`.") - assert serializer is not None and deserializer is not None - context = global_worker.get_serialization_context() - context.register_custom_serializer( - cls, serializer, deserializer, class_id=class_id) - - -def show_in_webui(message, key="", dtype="text"): +def show_in_dashboard(message, key="", dtype="text"): """Display message in dashboard. Display message for the current task or actor in the dashboard. @@ -1497,7 +1360,7 @@ def show_in_webui(message, key="", dtype="text"): blocking_get_inside_async_warned = False -def get(object_refs, timeout=None): +def get(object_refs, *, timeout=None): """Get a remote object or a list of remote objects from the object store. This method blocks until the object corresponding to the object ref is @@ -1570,17 +1433,13 @@ def get(object_refs, timeout=None): return values -def put(value, weakref=False): +def put(value): """Store an object in the object store. The object may not be evicted while a reference to the returned ID exists. Args: value: The Python object to be stored. - weakref: If set, allows the object to be evicted while a reference - to the returned ID exists. You might want to set this if putting - a lot of objects that you might not need in the future. - It allows Ray to more aggressively reclaim memory. Returns: The object ref assigned to this value. @@ -1589,7 +1448,7 @@ def put(value, weakref=False): worker.check_connected() with profiling.profile("ray.put"): try: - object_ref = worker.put_object(value, pin_object=not weakref) + object_ref = worker.put_object(value, pin_object=True) except ObjectStoreFullError: logger.info( "Put failed since the value was either too large or the " @@ -1602,7 +1461,7 @@ def put(value, weakref=False): blocking_wait_inside_async_warned = False -def wait(object_refs, num_returns=1, timeout=None): +def wait(object_refs, *, num_returns=1, timeout=None): """Return a list of IDs that are ready and a list of IDs that are not. If timeout is set, the function returns either when the requested number of @@ -1710,11 +1569,11 @@ def get_actor(name): """ worker = global_worker worker.check_connected() - - return ray.util.named_actors._get_actor(name) + handle = worker.core_worker.get_named_actor_handle(name) + return handle -def kill(actor, no_restart=True): +def kill(actor, *, no_restart=True): """Kill an actor forcefully. This will interrupt any running tasks on the actor, causing them to fail @@ -1740,7 +1599,7 @@ def kill(actor, no_restart=True): worker.core_worker.kill_actor(actor._ray_actor_id, no_restart) -def cancel(object_ref, force=False): +def cancel(object_ref, *, force=False): """Cancels a task according to the following conditions. If the specified task is pending execution, it will not be executed. If diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index 73e69bd55998..58d6d303d0a2 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -420,7 +420,7 @@ def make_env(vector_index): if (ray.is_initialized() and ray.worker._mode() != ray.worker.LOCAL_MODE): # Check available number of GPUs - if not ray.get_gpu_ids(as_str=True): + if not ray.get_gpu_ids(): logger.debug("Creating policy evaluation worker {}".format( worker_index) + " on CPU (please ignore any CUDA init errors)") diff --git a/rllib/train.py b/rllib/train.py index b19268dfd4d4..08c9a4cefc08 100755 --- a/rllib/train.py +++ b/rllib/train.py @@ -69,16 +69,6 @@ def create_parser(parser_creator=None): default=None, type=int, help="Emulate multiple cluster nodes for debugging.") - parser.add_argument( - "--ray-redis-max-memory", - default=None, - type=int, - help="--redis-max-memory to use if starting a new cluster.") - parser.add_argument( - "--ray-memory", - default=None, - type=int, - help="--memory to use if starting a new cluster.") parser.add_argument( "--ray-object-store-memory", default=None, @@ -204,17 +194,13 @@ def run(args, parser): cluster.add_node( num_cpus=args.ray_num_cpus or 1, num_gpus=args.ray_num_gpus or 0, - object_store_memory=args.ray_object_store_memory, - memory=args.ray_memory, - redis_max_memory=args.ray_redis_max_memory) + object_store_memory=args.ray_object_store_memory) ray.init(address=cluster.address) else: ray.init( include_dashboard=not args.no_ray_ui, address=args.ray_address, object_store_memory=args.ray_object_store_memory, - memory=args.ray_memory, - redis_max_memory=args.ray_redis_max_memory, num_cpus=args.ray_num_cpus, num_gpus=args.ray_num_gpus, local_mode=args.local_mode)