From 0213c7840a9b99368076a79ad981c8d989061bb1 Mon Sep 17 00:00:00 2001 From: Leon Sievers Date: Fri, 22 Mar 2019 14:00:54 +0100 Subject: [PATCH 1/4] Moved clip_action into policy_graph; Clip actions in compute_single_action --- python/ray/rllib/agents/agent.py | 6 ++-- python/ray/rllib/evaluation/policy_graph.py | 31 +++++++++++++++++++++ python/ray/rllib/evaluation/sampler.py | 27 +----------------- python/ray/rllib/rollout.py | 7 +---- 4 files changed, 37 insertions(+), 34 deletions(-) diff --git a/python/ray/rllib/agents/agent.py b/python/ray/rllib/agents/agent.py index 8c0a1bb7f1ab..8222876e130d 100644 --- a/python/ray/rllib/agents/agent.py +++ b/python/ray/rllib/agents/agent.py @@ -433,9 +433,11 @@ def compute_action(self, preprocessed, update=False) if state: return self.get_policy(policy_id).compute_single_action( - filtered_obs, state, prev_action, prev_reward, info) + filtered_obs, state, prev_action, prev_reward, info, + clip_actions=self.config["clip_actions"]) return self.get_policy(policy_id).compute_single_action( - filtered_obs, state, prev_action, prev_reward, info)[0] + filtered_obs, state, prev_action, prev_reward, info, + clip_actions=self.config["clip_actions"])[0] @property def iteration(self): diff --git a/python/ray/rllib/evaluation/policy_graph.py b/python/ray/rllib/evaluation/policy_graph.py index ecd80662a6c0..f2383d52eb26 100644 --- a/python/ray/rllib/evaluation/policy_graph.py +++ b/python/ray/rllib/evaluation/policy_graph.py @@ -2,6 +2,8 @@ from __future__ import division from __future__ import print_function +import gym + from ray.rllib.utils.annotations import DeveloperAPI @@ -81,6 +83,7 @@ def compute_single_action(self, prev_reward=None, info=None, episode=None, + clip_actions=False, **kwargs): """Unbatched version of compute_actions. @@ -93,6 +96,7 @@ def compute_single_action(self, episode (MultiAgentEpisode): this provides access to all of the internal episode state, which may be useful for model-based or multi-agent algorithms. + clip_actions (bool): should the action be clipped kwargs: forward compatibility placeholder Returns: @@ -119,6 +123,8 @@ def compute_single_action(self, prev_reward_batch=prev_reward_batch, info_batch=info_batch, episodes=episodes) + if clip_actions: + action = clip_action(action, self.action_space) return action, [s[0] for s in state_out], \ {k: v[0] for k, v in info.items()} @@ -263,3 +269,28 @@ def export_checkpoint(self, export_dir): export_dir (str): Local writable directory. """ raise NotImplementedError + + +def clip_action(action, space): + """Called to clip actions to the specified range of this policy. + + Arguments: + action: Single action. + space: Action space the actions should be present in. + + Returns: + Clipped batch of actions. + """ + + if isinstance(space, gym.spaces.Box): + return np.clip(action, space.low, space.high) + elif isinstance(space, gym.spaces.Tuple): + if type(action) not in (tuple, list): + raise ValueError("Expected tuple space for actions {}: {}".format( + action, space)) + out = [] + for a, s in zip(action, space.spaces): + out.append(clip_action(a, s)) + return out + else: + return action \ No newline at end of file diff --git a/python/ray/rllib/evaluation/sampler.py b/python/ray/rllib/evaluation/sampler.py index cd6935f8aa96..3e578c328e52 100644 --- a/python/ray/rllib/evaluation/sampler.py +++ b/python/ray/rllib/evaluation/sampler.py @@ -2,7 +2,6 @@ from __future__ import division from __future__ import print_function -import gym from collections import defaultdict, namedtuple import logging import numpy as np @@ -19,6 +18,7 @@ from ray.rllib.offline import InputReader from ray.rllib.utils.annotations import override from ray.rllib.utils.tf_run_builder import TFRunBuilder +from ray.rllib.evaluation.policy_graph import clip_action logger = logging.getLogger(__name__) _large_batch_warned = False @@ -200,31 +200,6 @@ def get_extra_batches(self): return extra -def clip_action(action, space): - """Called to clip actions to the specified range of this policy. - - Arguments: - action: Single action. - space: Action space the actions should be present in. - - Returns: - Clipped batch of actions. - """ - - if isinstance(space, gym.spaces.Box): - return np.clip(action, space.low, space.high) - elif isinstance(space, gym.spaces.Tuple): - if type(action) not in (tuple, list): - raise ValueError("Expected tuple space for actions {}: {}".format( - action, space)) - out = [] - for a, s in zip(action, space.spaces): - out.append(clip_action(a, s)) - return out - else: - return action - - def _env_runner(base_env, extra_batch_callback, policies, diff --git a/python/ray/rllib/rollout.py b/python/ray/rllib/rollout.py index 0bd3645837b9..70b00eb63fdc 100755 --- a/python/ray/rllib/rollout.py +++ b/python/ray/rllib/rollout.py @@ -12,7 +12,6 @@ import gym import ray from ray.rllib.agents.registry import get_agent_class -from ray.rllib.evaluation.sampler import clip_action from ray.tune.util import merge_dicts EXAMPLE_USAGE = """ @@ -154,11 +153,7 @@ def rollout(agent, env_name, num_steps, out=None, no_render=True): else: action = agent.compute_action(state) - if agent.config["clip_actions"]: - clipped_action = clip_action(action, env.action_space) - next_state, reward, done, _ = env.step(clipped_action) - else: - next_state, reward, done, _ = env.step(action) + next_state, reward, done, _ = env.step(action) if multiagent: done = done["__all__"] From 0bbc91d8629ad21507f9a36f7e97c3f425a6d238 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 22 Mar 2019 12:04:51 -0700 Subject: [PATCH 2/4] Update policy_graph.py --- python/ray/rllib/evaluation/policy_graph.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/rllib/evaluation/policy_graph.py b/python/ray/rllib/evaluation/policy_graph.py index f2383d52eb26..20290fb1cf56 100644 --- a/python/ray/rllib/evaluation/policy_graph.py +++ b/python/ray/rllib/evaluation/policy_graph.py @@ -2,6 +2,7 @@ from __future__ import division from __future__ import print_function +import numpy as np import gym from ray.rllib.utils.annotations import DeveloperAPI @@ -293,4 +294,4 @@ def clip_action(action, space): out.append(clip_action(a, s)) return out else: - return action \ No newline at end of file + return action From 4c6ec670f06c3c69522f3bdb6e91c915f040af72 Mon Sep 17 00:00:00 2001 From: Leon Sievers Date: Wed, 27 Mar 2019 18:07:36 +0100 Subject: [PATCH 3/4] Changed formatting --- python/ray/rllib/agents/agent.py | 41 ++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/python/ray/rllib/agents/agent.py b/python/ray/rllib/agents/agent.py index 8222876e130d..5dfd6494abeb 100644 --- a/python/ray/rllib/agents/agent.py +++ b/python/ray/rllib/agents/agent.py @@ -17,7 +17,8 @@ from ray.rllib.offline import NoopOutput, JsonReader, MixedInput, JsonWriter, \ ShuffledInput from ray.rllib.models import MODEL_DEFAULTS -from ray.rllib.evaluation.policy_evaluator import PolicyEvaluator +from ray.rllib.evaluation.policy_evaluator import PolicyEvaluator, \ + _validate_multiagent_config from ray.rllib.evaluation.sample_batch import DEFAULT_POLICY_ID from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.utils.annotations import override, PublicAPI, DeveloperAPI @@ -40,7 +41,10 @@ # === Debugging === # Whether to write episode stats and videos to the agent log dir "monitor": False, - # Set the ray.rllib.* log level for the agent process and its evaluators + # Set the ray.rllib.* log level for the agent process and its evaluators. + # Should be one of DEBUG, INFO, WARN, or ERROR. The DEBUG level will also + # periodically print out summaries of relevant internal dataflow (this is + # also printed out once at startup at the INFO level). "log_level": "INFO", # Callbacks that will be run during various phases of training. These all # take a single "info" dict as an argument. For episode callbacks, custom @@ -406,7 +410,7 @@ def compute_action(self, prev_action=None, prev_reward=None, info=None, - policy_id="default"): + policy_id=DEFAULT_POLICY_ID): """Computes an action for the specified policy. Note that you can also access the policy object through @@ -433,10 +437,18 @@ def compute_action(self, preprocessed, update=False) if state: return self.get_policy(policy_id).compute_single_action( - filtered_obs, state, prev_action, prev_reward, info, + filtered_obs, + state, + prev_action, + prev_reward, + info, clip_actions=self.config["clip_actions"]) return self.get_policy(policy_id).compute_single_action( - filtered_obs, state, prev_action, prev_reward, info, + filtered_obs, + state, + prev_action, + prev_reward, + info, clip_actions=self.config["clip_actions"])[0] @property @@ -655,12 +667,12 @@ def session_creator(): input_creator = (lambda ioctx: ioctx.default_sampler_input()) elif isinstance(config["input"], dict): input_creator = (lambda ioctx: ShuffledInput( - MixedInput(config["input"], ioctx), - config["shuffle_buffer_size"])) + MixedInput(config["input"], ioctx), config[ + "shuffle_buffer_size"])) else: input_creator = (lambda ioctx: ShuffledInput( - JsonReader(config["input"], ioctx), - config["shuffle_buffer_size"])) + JsonReader(config["input"], ioctx), config[ + "shuffle_buffer_size"])) if isinstance(config["output"], FunctionType): output_creator = config["output"] @@ -684,9 +696,18 @@ def session_creator(): else: input_evaluation = config["input_evaluation"] + # Fill in the default policy graph if 'None' is specified in multiagent + if self.config["multiagent"]["policy_graphs"]: + tmp = self.config["multiagent"]["policy_graphs"] + _validate_multiagent_config(tmp, allow_none_graph=True) + for k, v in tmp.items(): + if v[0] is None: + tmp[k] = (policy_graph, v[1], v[2], v[3]) + policy_graph = tmp + return cls( env_creator, - self.config["multiagent"]["policy_graphs"] or policy_graph, + policy_graph, policy_mapping_fn=self.config["multiagent"]["policy_mapping_fn"], policies_to_train=self.config["multiagent"]["policies_to_train"], tf_session_creator=(session_creator From 516ecf144b48c74119786ad97f83aecfb85c821b Mon Sep 17 00:00:00 2001 From: Leon Sievers Date: Wed, 27 Mar 2019 18:18:02 +0100 Subject: [PATCH 4/4] Updated codebase for convencience --- .bazelrc | 2 + README.rst | 2 +- build.sh | 5 +- ci/jenkins_tests/run_rllib_tests.sh | 2 +- ci/long_running_tests/config.yaml | 2 +- .../workloads/many_actor_tasks.py | 8 +- .../application_cluster_template.yaml | 4 +- ci/stress_tests/stress_testing_config.yaml | 2 +- doc/source/installation.rst | 16 +-- doc/source/rllib-env.rst | 36 +++--- doc/source/rllib-training.rst | 4 +- docker/stress_test/Dockerfile | 2 +- docker/tune_test/Dockerfile | 2 +- .../functionmanager/FunctionManager.java | 21 +++- .../ray/runtime/raylet/MockRayletClient.java | 14 +-- java/test.sh | 4 +- .../org/ray/api/test/MultiThreadingTest.java | 23 +++- python/ray/__init__.py | 2 +- python/ray/actor.py | 41 +++++-- python/ray/autoscaler/aws/example-full.yaml | 6 +- .../autoscaler/aws/example-gpu-docker.yaml | 6 +- python/ray/autoscaler/gcp/example-full.yaml | 6 +- .../autoscaler/gcp/example-gpu-docker.yaml | 6 +- python/ray/node.py | 4 +- python/ray/ray_constants.py | 11 ++ python/ray/rllib/agents/ars/ars.py | 7 +- python/ray/rllib/agents/dqn/dqn.py | 3 +- python/ray/rllib/agents/es/es.py | 7 +- python/ray/rllib/env/base_env.py | 2 +- python/ray/rllib/evaluation/metrics.py | 4 + .../ray/rllib/evaluation/policy_evaluator.py | 94 ++++++++++----- python/ray/rllib/evaluation/sample_batch.py | 7 +- .../rllib/evaluation/sample_batch_builder.py | 10 ++ python/ray/rllib/evaluation/sampler.py | 28 ++++- .../ray/rllib/evaluation/tf_policy_graph.py | 10 ++ .../rllib/examples/hierarchical_training.py | 6 +- .../ray/rllib/examples/multiagent_cartpole.py | 3 +- python/ray/rllib/models/preprocessors.py | 42 +++++-- .../optimizers/async_samples_optimizer.py | 14 ++- python/ray/rllib/optimizers/multi_gpu_impl.py | 13 +- python/ray/rllib/tests/test_catalog.py | 4 +- .../ray/rllib/tests/test_multi_agent_env.py | 2 +- .../ray/rllib/tests/test_policy_evaluator.py | 15 +-- python/ray/rllib/utils/debug.py | 111 ++++++++++++++++++ python/ray/rllib/utils/memory.py | 51 ++++++++ python/ray/services.py | 3 +- python/ray/tests/test_actor.py | 83 +++++++++++++ python/ray/tests/test_basic.py | 2 +- python/ray/tests/test_object_manager.py | 7 +- python/ray/tune/examples/mnist_pytorch.py | 1 - .../tune/examples/mnist_pytorch_trainable.py | 1 - python/ray/tune/examples/tune_mnist_keras.py | 1 - python/ray/tune/tests/test_commands.py | 76 +++++++----- python/ray/worker.py | 28 +---- 54 files changed, 640 insertions(+), 226 deletions(-) create mode 100644 python/ray/rllib/utils/debug.py create mode 100644 python/ray/rllib/utils/memory.py diff --git a/.bazelrc b/.bazelrc index 55bcd0242bd9..488b33101594 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,2 +1,4 @@ # build config build --compilation_mode=opt +build --action_env=PATH +build --action_env=PYTHON_BIN_PATH diff --git a/README.rst b/README.rst index 49301f2bd433..8425744bb4a5 100644 --- a/README.rst +++ b/README.rst @@ -6,7 +6,7 @@ .. image:: https://readthedocs.org/projects/ray/badge/?version=latest :target: http://ray.readthedocs.io/en/latest/?badge=latest -.. image:: https://img.shields.io/badge/pypi-0.6.4-blue.svg +.. image:: https://img.shields.io/badge/pypi-0.6.5-blue.svg :target: https://pypi.org/project/ray/ | diff --git a/build.sh b/build.sh index 08a404288b63..a67170722bdb 100755 --- a/build.sh +++ b/build.sh @@ -121,14 +121,15 @@ else $PYTHON_EXECUTABLE -m pip install \ --target=$ROOT_DIR/python/ray/pyarrow_files pyarrow==0.12.0.RAY \ --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/9357dc130789ee42f8181d8724bee1d5d1509060/index.html + export PYTHON_BIN_PATH="$PYTHON_EXECUTABLE" if [ "$RAY_BUILD_JAVA" == "YES" ]; then bazel run //java:bazel_deps -- generate -r $ROOT_DIR -s java/third_party/workspace.bzl -d java/dependencies.yaml - bazel build //java:all --verbose_failures --action_env=PATH + bazel build //java:all --verbose_failures fi if [ "$RAY_BUILD_PYTHON" == "YES" ]; then - bazel build //:ray_pkg --verbose_failures --action_env=PYTHON_BIN_PATH=$PYTHON_EXECUTABLE + bazel build //:ray_pkg --verbose_failures fi fi diff --git a/ci/jenkins_tests/run_rllib_tests.sh b/ci/jenkins_tests/run_rllib_tests.sh index f6c88811a01c..b582839b4bab 100644 --- a/ci/jenkins_tests/run_rllib_tests.sh +++ b/ci/jenkins_tests/run_rllib_tests.sh @@ -405,7 +405,7 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --run IMPALA \ --stop='{"timesteps_total": 40000}' \ --ray-object-store-memory=500000000 \ - --config '{"num_workers": 1, "num_gpus": 0, "num_envs_per_worker": 64, "sample_batch_size": 50, "train_batch_size": 50, "learner_queue_size": 1}' + --config '{"num_workers": 1, "num_gpus": 0, "num_envs_per_worker": 32, "sample_batch_size": 50, "train_batch_size": 50, "learner_queue_size": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/python/ray/rllib/agents/impala/vtrace_test.py diff --git a/ci/long_running_tests/config.yaml b/ci/long_running_tests/config.yaml index 8112bd522e87..e15fee7fe6a9 100644 --- a/ci/long_running_tests/config.yaml +++ b/ci/long_running_tests/config.yaml @@ -51,7 +51,7 @@ setup_commands: # - git clone https://github.com/ray-project/ray || true # - cd ray/python; git checkout master; git pull; pip install -e . --verbose # Install nightly Ray wheels. - - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp36-cp36m-manylinux1_x86_64.whl + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp36-cp36m-manylinux1_x86_64.whl - pip install ray[rllib] ray[debug] tensorflow - pip install -U dask # fix error importing lz4 diff --git a/ci/long_running_tests/workloads/many_actor_tasks.py b/ci/long_running_tests/workloads/many_actor_tasks.py index c37b22ccc3d1..2dd395c0e84f 100644 --- a/ci/long_running_tests/workloads/many_actor_tasks.py +++ b/ci/long_running_tests/workloads/many_actor_tasks.py @@ -36,9 +36,7 @@ # Run the workload. -# TODO (williamma12): Remove the num_cpus argument once -# https://github.com/ray-project/ray/issues/4312 gets resolved -@ray.remote(num_cpus=0.1) +@ray.remote class Actor(object): def __init__(self): self.value = 0 @@ -47,10 +45,8 @@ def method(self): self.value += 1 -# TODO (williamma12): Update the actors to each have only 0.1 of a cpu once -# https://github.com/ray-project/ray/issues/4312 gets resolved. actors = [ - Actor._remote([], {}, resources={str(i % num_nodes): 0.1}) + Actor._remote([], {}, num_cpus=0.1, resources={str(i % num_nodes): 0.1}) for i in range(num_nodes * 5) ] diff --git a/ci/stress_tests/application_cluster_template.yaml b/ci/stress_tests/application_cluster_template.yaml index 0d7baedb78fb..e8fc0efa2bcd 100644 --- a/ci/stress_tests/application_cluster_template.yaml +++ b/ci/stress_tests/application_cluster_template.yaml @@ -90,8 +90,8 @@ file_mounts: { # List of shell commands to run to set up nodes. setup_commands: - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_<<>>/bin:$PATH"' >> ~/.bashrc - - ray || wget https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-<<>>-manylinux1_x86_64.whl - - rllib || pip install -U ray-0.7.0.dev1-<<>>-manylinux1_x86_64.whl[rllib] + - ray || wget https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-<<>>-manylinux1_x86_64.whl + - rllib || pip install -U ray-0.7.0.dev2-<<>>-manylinux1_x86_64.whl[rllib] - pip install tensorflow-gpu==1.12.0 - echo "sudo halt" | at now + 60 minutes # Consider uncommenting these if you also want to run apt-get commands during setup diff --git a/ci/stress_tests/stress_testing_config.yaml b/ci/stress_tests/stress_testing_config.yaml index 1bee8248eacc..47c7dbf3e8d2 100644 --- a/ci/stress_tests/stress_testing_config.yaml +++ b/ci/stress_tests/stress_testing_config.yaml @@ -100,7 +100,7 @@ setup_commands: # - git clone https://github.com/ray-project/ray || true - pip install boto3==1.4.8 cython==0.29.0 # - cd ray/python; git checkout master; git pull; pip install -e . --verbose - - pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp36-cp36m-manylinux1_x86_64.whl + - pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp36-cp36m-manylinux1_x86_64.whl - echo "sudo halt" | at now + 60 minutes # Custom commands that will be run on the head node after common setup. diff --git a/doc/source/installation.rst b/doc/source/installation.rst index 6b56143a04bd..45ec5069bf00 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -33,14 +33,14 @@ Here are links to the latest wheels (which are built off of master). To install =================== =================== -.. _`Linux Python 3.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp37-cp37m-manylinux1_x86_64.whl -.. _`Linux Python 3.6`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp36-cp36m-manylinux1_x86_64.whl -.. _`Linux Python 3.5`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp35-cp35m-manylinux1_x86_64.whl -.. _`Linux Python 2.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp27-cp27mu-manylinux1_x86_64.whl -.. _`MacOS Python 3.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp37-cp37m-macosx_10_6_intel.whl -.. _`MacOS Python 3.6`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp36-cp36m-macosx_10_6_intel.whl -.. _`MacOS Python 3.5`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp35-cp35m-macosx_10_6_intel.whl -.. _`MacOS Python 2.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp27-cp27m-macosx_10_6_intel.whl +.. _`Linux Python 3.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp37-cp37m-manylinux1_x86_64.whl +.. _`Linux Python 3.6`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp36-cp36m-manylinux1_x86_64.whl +.. _`Linux Python 3.5`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp35-cp35m-manylinux1_x86_64.whl +.. _`Linux Python 2.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp27-cp27mu-manylinux1_x86_64.whl +.. _`MacOS Python 3.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp37-cp37m-macosx_10_6_intel.whl +.. _`MacOS Python 3.6`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp36-cp36m-macosx_10_6_intel.whl +.. _`MacOS Python 3.5`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp35-cp35m-macosx_10_6_intel.whl +.. _`MacOS Python 2.7`: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp27-cp27m-macosx_10_6_intel.whl Building Ray from source diff --git a/doc/source/rllib-env.rst b/doc/source/rllib-env.rst index 7180099fca04..f2a6c5051026 100644 --- a/doc/source/rllib-env.rst +++ b/doc/source/rllib-env.rst @@ -166,9 +166,10 @@ If all the agents will be using the same algorithm class to train, then you can trainer = pg.PGAgent(env="my_multiagent_env", config={ "multiagent": { "policy_graphs": { - "car1": (PGPolicyGraph, car_obs_space, car_act_space, {"gamma": 0.85}), - "car2": (PGPolicyGraph, car_obs_space, car_act_space, {"gamma": 0.99}), - "traffic_light": (PGPolicyGraph, tl_obs_space, tl_act_space, {}), + # the first tuple value is None -> uses default policy graph + "car1": (None, car_obs_space, car_act_space, {"gamma": 0.85}), + "car2": (None, car_obs_space, car_act_space, {"gamma": 0.99}), + "traffic_light": (None, tl_obs_space, tl_act_space, {}), }, "policy_mapping_fn": lambda agent_id: @@ -232,9 +233,9 @@ This can be implemented as a multi-agent environment with three types of agents. "multiagent": { "policy_graphs": { - "top_level": (some_policy_graph, ...), - "mid_level": (some_policy_graph, ...), - "low_level": (some_policy_graph, ...), + "top_level": (custom_policy_graph or None, ...), + "mid_level": (custom_policy_graph or None, ...), + "low_level": (custom_policy_graph or None, ...), }, "policy_mapping_fn": lambda agent_id: @@ -248,17 +249,6 @@ In this setup, the appropriate rewards for training lower-level agents must be p See this file for a runnable example: `hierarchical_training.py `__. - -Grouping Agents -~~~~~~~~~~~~~~~ - -It is common to have groups of agents in multi-agent RL. RLlib treats agent groups like a single agent with a Tuple action and observation space. The group agent can then be assigned to a single policy for centralized execution, or to specialized multi-agent policies such as `Q-Mix `__ that implement centralized training but decentralized execution. You can use the ``MultiAgentEnv.with_agent_groups()`` method to define these groups: - -.. literalinclude:: ../../python/ray/rllib/env/multi_agent_env.py - :language: python - :start-after: __grouping_doc_begin__ - :end-before: __grouping_doc_end__ - Variable-Sharing Between Policies ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -296,6 +286,18 @@ Implementing a centralized critic that takes as input the observations and actio 2. Updating the critic: the centralized critic loss can be added to the loss of the custom policy graph, the same as with any other value function. For an example of defining loss inputs, see the `PGPolicyGraph example `__. +Grouping Agents +~~~~~~~~~~~~~~~ + +It is common to have groups of agents in multi-agent RL. RLlib treats agent groups like a single agent with a Tuple action and observation space. The group agent can then be assigned to a single policy for centralized execution, or to specialized multi-agent policies such as `Q-Mix `__ that implement centralized training but decentralized execution. You can use the ``MultiAgentEnv.with_agent_groups()`` method to define these groups: + +.. literalinclude:: ../../python/ray/rllib/env/multi_agent_env.py + :language: python + :start-after: __grouping_doc_begin__ + :end-before: __grouping_doc_end__ + +For environments with multiple groups, or mixtures of agent groups and individual agents, you can use grouping in conjunction with the policy mapping API described in prior sections. + Interfacing with External Agents -------------------------------- diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index a95ffc3c596f..c192b674fbe7 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -184,7 +184,7 @@ Accessing Policy State ~~~~~~~~~~~~~~~~~~~~~~ It is common to need to access an agent's internal state, e.g., to set or get internal weights. In RLlib an agent's state is replicated across multiple *policy evaluators* (Ray actors) in the cluster. However, you can easily get and update this state between calls to ``train()`` via ``agent.optimizer.foreach_evaluator()`` or ``agent.optimizer.foreach_evaluator_with_index()``. These functions take a lambda function that is applied with the evaluator as an arg. You can also return values from these functions and those will be returned as a list. -You can also access just the "master" copy of the agent state through ``agent.get_policy()`` or ``agent.local_evaluator``, but note that updates here may not be immediately reflected in remote replicas if you have configured ``num_workers > 0``. For example, to access the weights of a local TF policy, you can run ``agent.get_policy().get_weights()``. This is also equivalent to ``agent.local_evaluator.policy_map["default"].get_weights()``: +You can also access just the "master" copy of the agent state through ``agent.get_policy()`` or ``agent.local_evaluator``, but note that updates here may not be immediately reflected in remote replicas if you have configured ``num_workers > 0``. For example, to access the weights of a local TF policy, you can run ``agent.get_policy().get_weights()``. This is also equivalent to ``agent.local_evaluator.policy_map["default_policy"].get_weights()``: .. code-block:: python @@ -192,7 +192,7 @@ You can also access just the "master" copy of the agent state through ``agent.ge agent.get_policy().get_weights() # Same as above - agent.local_evaluator.policy_map["default"].get_weights() + agent.local_evaluator.policy_map["default_policy"].get_weights() # Get list of weights of each evaluator, including remote replicas agent.optimizer.foreach_evaluator(lambda ev: ev.get_policy().get_weights()) diff --git a/docker/stress_test/Dockerfile b/docker/stress_test/Dockerfile index 2716670e7705..0891ac02c8f9 100644 --- a/docker/stress_test/Dockerfile +++ b/docker/stress_test/Dockerfile @@ -4,7 +4,7 @@ FROM ray-project/base-deps # We install ray and boto3 to enable the ray autoscaler as # a test runner. -RUN pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp27-cp27mu-manylinux1_x86_64.whl boto3 +RUN pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp27-cp27mu-manylinux1_x86_64.whl boto3 RUN mkdir -p /root/.ssh/ # We port the source code in so that we run the most up-to-date stress tests. diff --git a/docker/tune_test/Dockerfile b/docker/tune_test/Dockerfile index 42deadec8799..9546b676b779 100644 --- a/docker/tune_test/Dockerfile +++ b/docker/tune_test/Dockerfile @@ -4,7 +4,7 @@ FROM ray-project/base-deps # We install ray and boto3 to enable the ray autoscaler as # a test runner. -RUN pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp27-cp27mu-manylinux1_x86_64.whl boto3 +RUN pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp27-cp27mu-manylinux1_x86_64.whl boto3 # We install this after the latest wheels -- this should not override the latest wheels. RUN apt-get install -y zlib1g-dev RUN pip install gym[atari]==0.10.11 opencv-python-headless tensorflow lz4 keras pytest-timeout smart_open diff --git a/java/runtime/src/main/java/org/ray/runtime/functionmanager/FunctionManager.java b/java/runtime/src/main/java/org/ray/runtime/functionmanager/FunctionManager.java index 8ae245f8357e..54f01aed1b9b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/functionmanager/FunctionManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/functionmanager/FunctionManager.java @@ -11,10 +11,15 @@ import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.WeakHashMap; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.DirectoryFileFilter; +import org.apache.commons.io.filefilter.RegexFileFilter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.objectweb.asm.Type; @@ -95,11 +100,17 @@ public RayFunction getFunction(UniqueId driverId, JavaFunctionDescriptor functio classLoader = getClass().getClassLoader(); } else { File resourceDir = new File(driverResourcePath + "/" + driverId.toString() + "/"); - try { - classLoader = new URLClassLoader(new URL[]{resourceDir.toURI().toURL()}); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } + Collection files = FileUtils.listFiles(resourceDir, + new RegexFileFilter(".*\\.jar"), DirectoryFileFilter.DIRECTORY); + files.add(resourceDir); + final List urlList = files.stream().map(file -> { + try { + return file.toURI().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + classLoader = new URLClassLoader(urlList.toArray(new URL[urlList.size()])); LOGGER.debug("Resource loaded for driver {} from path {}.", driverId, resourceDir.getAbsolutePath()); } diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java index e44fd1014a63..b3a77f0dc71c 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java @@ -6,11 +6,11 @@ import java.util.Deque; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -47,7 +47,7 @@ public MockRayletClient(RayDevRuntime runtime, int numberThreads) { store.addObjectPutCallback(this::onObjectPut); // The thread pool that executes tasks in parallel. exec = Executors.newFixedThreadPool(numberThreads); - idleWorkers = new LinkedList<>(); + idleWorkers = new ConcurrentLinkedDeque<>(); actorWorkers = new HashMap<>(); currentWorker = new ThreadLocal<>(); } @@ -69,19 +69,19 @@ public Worker getCurrentWorker() { /** * Get a worker from the worker pool to run the given task. */ - private Worker getWorker(TaskSpec task) { + private synchronized Worker getWorker(TaskSpec task) { Worker worker; if (task.isActorTask()) { worker = actorWorkers.get(task.actorId); } else { - if (idleWorkers.size() > 0) { + if (task.isActorCreationTask()) { + worker = new Worker(runtime); + actorWorkers.put(task.actorCreationId, worker); + } else if (idleWorkers.size() > 0) { worker = idleWorkers.pop(); } else { worker = new Worker(runtime); } - if (task.isActorCreationTask()) { - actorWorkers.put(task.actorCreationId, worker); - } } currentWorker.set(worker); return worker; diff --git a/java/test.sh b/java/test.sh index 2f8dd0164666..d6be89b7fa90 100755 --- a/java/test.sh +++ b/java/test.sh @@ -9,7 +9,9 @@ ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) pushd $ROOT_DIR/.. echo "Linting Java code with checkstyle." -bazel test //java:all --test_tag_filters="checkstyle" --action_env=PATH +# NOTE(hchen): The `test_tag_filters` option causes bazel to ignore caches. +# Thus, we add the `build_tests_only` option to avoid re-building everything. +bazel test //java:all --test_tag_filters="checkstyle" --build_tests_only echo "Running tests under cluster mode." # TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests, diff --git a/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java b/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java index 6289d1cd7170..d90b20a7ba6f 100644 --- a/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java +++ b/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java @@ -15,13 +15,17 @@ import org.ray.api.TestUtils; import org.ray.api.WaitResult; import org.ray.api.annotation.RayRemote; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; public class MultiThreadingTest extends BaseTest { - private static final int LOOP_COUNTER = 1000; + private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadingTest.class); + + private static final int LOOP_COUNTER = 100; private static final int NUM_THREADS = 20; @RayRemote @@ -55,6 +59,21 @@ public static String testMultiThreading() { Assert.assertEquals(arg, (int) obj.get()); }, LOOP_COUNTER); + // Test creating multi actors + runTestCaseInMultipleThreads(() -> { + int arg = random.nextInt(); + RayActor echoActor1 = Ray.createActor(Echo::new); + try { + // Sleep a while to test the case that another actor is created before submitting + // tasks to this actor. + TimeUnit.MILLISECONDS.sleep(10); + } catch (InterruptedException e) { + LOGGER.warn("Got exception while sleeping.", e); + } + RayObject obj = Ray.call(Echo::echo, echoActor1, arg); + Assert.assertEquals(arg, (int) obj.get()); + }, 1); + // Test put and get. runTestCaseInMultipleThreads(() -> { int arg = random.nextInt(); @@ -74,8 +93,6 @@ public static String testMultiThreading() { @Test public void testInDriver() { - // TODO(hchen): Fix this test under single-process mode. - TestUtils.skipTestUnderSingleProcess(); testMultiThreading(); } diff --git a/python/ray/__init__.py b/python/ray/__init__.py index fddcde111646..d7db12077b22 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -95,7 +95,7 @@ from ray.runtime_context import _get_runtime_context # noqa: E402 # Ray version string. -__version__ = "0.7.0.dev1" +__version__ = "0.7.0.dev2" __all__ = [ "LOCAL_MODE", diff --git a/python/ray/actor.py b/python/ray/actor.py index f5c2a665898e..9d2d25505f08 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -21,8 +21,6 @@ from ray import (ObjectID, ActorID, ActorHandleID, ActorClassID, TaskID, DriverID) -DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1 - logger = logging.getLogger(__name__) @@ -166,7 +164,7 @@ class ActorClass(object): """ def __init__(self, modified_class, class_id, max_reconstructions, num_cpus, - num_gpus, resources, actor_method_cpus): + num_gpus, resources): self._modified_class = modified_class self._class_id = class_id self._class_name = modified_class.__name__ @@ -174,7 +172,6 @@ def __init__(self, modified_class, class_id, max_reconstructions, num_cpus, self._num_cpus = num_cpus self._num_gpus = num_gpus self._resources = resources - self._actor_method_cpus = actor_method_cpus self._exported = False self._actor_methods = inspect.getmembers( @@ -215,7 +212,7 @@ def __init__(self): method.__ray_num_return_vals__) else: self._actor_method_num_return_vals[method_name] = ( - DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS) + ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS) def __call__(self, *args, **kwargs): raise Exception("Actors methods cannot be instantiated directly. " @@ -276,6 +273,25 @@ def _remote(self, # updated to reflect the new invocation. actor_cursor = None + # Set the actor's default resources if not already set. First three + # conditions are to check that no resources were specified in the + # decorator. Last three conditions are to check that no resources were + # specified when _remote() was called. + if (self._num_cpus is None and self._num_gpus is None + and self._resources is None and num_cpus is None + and num_gpus is None and resources is None): + # In the default case, actors acquire no resources for + # their lifetime, and actor methods will require 1 CPU. + cpus_to_use = ray_constants.DEFAULT_ACTOR_CREATION_CPU_SIMPLE + actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SIMPLE + else: + # If any resources are specified (here or in decorator), then + # all resources are acquired for the actor's lifetime and no + # resources are associated with methods. + cpus_to_use = (ray_constants.DEFAULT_ACTOR_CREATION_CPU_SPECIFIED + if self._num_cpus is None else self._num_cpus) + actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED + # Do not export the actor class or the actor if run in LOCAL_MODE # Instead, instantiate the actor locally and add it to the worker's # dictionary @@ -290,15 +306,15 @@ def _remote(self, self._exported = True resources = ray.utils.resources_from_resource_arguments( - self._num_cpus, self._num_gpus, self._resources, num_cpus, + cpus_to_use, self._num_gpus, self._resources, num_cpus, num_gpus, resources) # If the actor methods require CPU resources, then set the required # placement resources. If actor_placement_resources is empty, then # the required placement resources will be the same as resources. actor_placement_resources = {} - assert self._actor_method_cpus in [0, 1] - if self._actor_method_cpus == 1: + assert actor_method_cpu in [0, 1] + if actor_method_cpu == 1: actor_placement_resources = resources.copy() actor_placement_resources["CPU"] += 1 @@ -322,8 +338,8 @@ def _remote(self, actor_handle = ActorHandle( actor_id, self._modified_class.__module__, self._class_name, actor_cursor, self._actor_method_names, self._method_signatures, - self._actor_method_num_return_vals, actor_cursor, - self._actor_method_cpus, worker.task_driver_id) + self._actor_method_num_return_vals, actor_cursor, actor_method_cpu, + worker.task_driver_id) # We increment the actor counter by 1 to account for the actor creation # task. actor_handle._ray_actor_counter += 1 @@ -664,8 +680,7 @@ def __setstate__(self, state): return self._deserialization_helper(state, False) -def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus, - max_reconstructions): +def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions): # Give an error if cls is an old-style class. if not issubclass(cls, object): raise TypeError( @@ -720,7 +735,7 @@ def __ray_checkpoint__(self): class_id = ActorClassID(_random_string()) return ActorClass(Class, class_id, max_reconstructions, num_cpus, num_gpus, - resources, actor_method_cpus) + resources) ray.worker.global_worker.make_actor = make_actor diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index 539a6dbc2245..d24a47c45b9b 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -100,9 +100,9 @@ setup_commands: # has your Ray repo pre-cloned. Then, you can replace the pip installs # below with a git checkout (and possibly a recompile). - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp27-cp27mu-manylinux1_x86_64.whl - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp35-cp35m-manylinux1_x86_64.whl - - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp36-cp36m-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp27-cp27mu-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp35-cp35m-manylinux1_x86_64.whl + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp36-cp36m-manylinux1_x86_64.whl # Consider uncommenting these if you also want to run apt-get commands during setup # - sudo pkill -9 apt-get || true # - sudo pkill -9 dpkg || true diff --git a/python/ray/autoscaler/aws/example-gpu-docker.yaml b/python/ray/autoscaler/aws/example-gpu-docker.yaml index fa889fd49190..6a10fff5dea6 100644 --- a/python/ray/autoscaler/aws/example-gpu-docker.yaml +++ b/python/ray/autoscaler/aws/example-gpu-docker.yaml @@ -92,9 +92,9 @@ file_mounts: { # List of shell commands to run to set up nodes. setup_commands: - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp27-cp27mu-manylinux1_x86_64.whl - - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp35-cp35m-manylinux1_x86_64.whl - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp36-cp36m-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp27-cp27mu-manylinux1_x86_64.whl + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp35-cp35m-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp36-cp36m-manylinux1_x86_64.whl # Custom commands that will be run on the head node after common setup. head_setup_commands: diff --git a/python/ray/autoscaler/gcp/example-full.yaml b/python/ray/autoscaler/gcp/example-full.yaml index 5339690cc7ab..9b36fa398817 100644 --- a/python/ray/autoscaler/gcp/example-full.yaml +++ b/python/ray/autoscaler/gcp/example-full.yaml @@ -122,9 +122,9 @@ setup_commands: && echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.profile # Install ray - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp27-cp27mu-manylinux1_x86_64.whl - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp35-cp35m-manylinux1_x86_64.whl - - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp36-cp36m-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp27-cp27mu-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp35-cp35m-manylinux1_x86_64.whl + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp36-cp36m-manylinux1_x86_64.whl # Custom commands that will be run on the head node after common setup. diff --git a/python/ray/autoscaler/gcp/example-gpu-docker.yaml b/python/ray/autoscaler/gcp/example-gpu-docker.yaml index d7080b94cf8f..fa1face51f81 100644 --- a/python/ray/autoscaler/gcp/example-gpu-docker.yaml +++ b/python/ray/autoscaler/gcp/example-gpu-docker.yaml @@ -127,9 +127,9 @@ setup_commands: # - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc # Install ray - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp27-cp27mu-manylinux1_x86_64.whl - - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp35-cp35m-manylinux1_x86_64.whl - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev1-cp36-cp36m-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp27-cp27mu-manylinux1_x86_64.whl + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp35-cp35m-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev2-cp36-cp36m-manylinux1_x86_64.whl # Custom commands that will be run on the head node after common setup. head_setup_commands: diff --git a/python/ray/node.py b/python/ray/node.py index 875d15799b9e..e5deec2324fe 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -316,8 +316,8 @@ def start_dashboard(self): self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD] = [ process_info ] - redis_client = self.create_redis_client() - redis_client.hmset("webui", {"url": self._webui_url}) + redis_client = self.create_redis_client() + redis_client.hmset("webui", {"url": self._webui_url}) def start_plasma_store(self): """Start the plasma store.""" diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index a54a3679c74a..da9a26e98ab9 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -25,6 +25,17 @@ def env_integer(key, default): # The smallest cap on the memory used by Redis that we allow. REDIS_MINIMUM_MEMORY_BYTES = 10**7 +# Default resource requirements for actors when no resource requirements are +# specified. +DEFAULT_ACTOR_METHOD_CPU_SIMPLE = 1 +DEFAULT_ACTOR_CREATION_CPU_SIMPLE = 0 +# Default resource requirements for actors when some resource requirements are +# specified in . +DEFAULT_ACTOR_METHOD_CPU_SPECIFIED = 0 +DEFAULT_ACTOR_CREATION_CPU_SPECIFIED = 1 +# Default number of return values for each actor method. +DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1 + # If a remote function or actor (or some other export) has serialized size # greater than this quantity, print an warning. PICKLE_OBJECT_WARNING_SIZE = 10**7 diff --git a/python/ray/rllib/agents/ars/ars.py b/python/ray/rllib/agents/ars/ars.py index aafcee7f4e36..16416e46a7dc 100644 --- a/python/ray/rllib/agents/ars/ars.py +++ b/python/ray/rllib/agents/ars/ars.py @@ -17,6 +17,7 @@ from ray.rllib.agents.ars import optimizers from ray.rllib.agents.ars import policies from ray.rllib.agents.ars import utils +from ray.rllib.evaluation.sample_batch import DEFAULT_POLICY_ID from ray.rllib.utils.annotations import override from ray.rllib.utils import FilterManager @@ -87,7 +88,7 @@ def __init__(self, config, env_creator, noise, min_task_runtime=0.2): @property def filters(self): - return {"default": self.policy.get_filter()} + return {DEFAULT_POLICY_ID: self.policy.get_filter()} def sync_filters(self, new_filters): for k in self.filters: @@ -271,7 +272,7 @@ def _train(self): # Now sync the filters FilterManager.synchronize({ - "default": self.policy.get_filter() + DEFAULT_POLICY_ID: self.policy.get_filter() }, self.workers) info = { @@ -335,5 +336,5 @@ def __setstate__(self, state): self.policy.set_weights(state["weights"]) self.policy.set_filter(state["filter"]) FilterManager.synchronize({ - "default": self.policy.get_filter() + DEFAULT_POLICY_ID: self.policy.get_filter() }, self.workers) diff --git a/python/ray/rllib/agents/dqn/dqn.py b/python/ray/rllib/agents/dqn/dqn.py index 56c064b2d421..5b1bb3597b33 100644 --- a/python/ray/rllib/agents/dqn/dqn.py +++ b/python/ray/rllib/agents/dqn/dqn.py @@ -10,6 +10,7 @@ from ray.rllib.agents.agent import Agent, with_common_config from ray.rllib.agents.dqn.dqn_policy_graph import DQNPolicyGraph from ray.rllib.evaluation.metrics import collect_metrics +from ray.rllib.evaluation.sample_batch import DEFAULT_POLICY_ID from ray.rllib.utils.annotations import override from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule @@ -195,7 +196,7 @@ def on_episode_end(info): policies = info["policy"] episode = info["episode"] episode.custom_metrics["policy_distance"] = policies[ - "default"].pi_distance + DEFAULT_POLICY_ID].pi_distance if end_callback: end_callback(info) diff --git a/python/ray/rllib/agents/es/es.py b/python/ray/rllib/agents/es/es.py index 4aa4a86aac88..8ca9689e6778 100644 --- a/python/ray/rllib/agents/es/es.py +++ b/python/ray/rllib/agents/es/es.py @@ -16,6 +16,7 @@ from ray.rllib.agents.es import optimizers from ray.rllib.agents.es import policies from ray.rllib.agents.es import utils +from ray.rllib.evaluation.sample_batch import DEFAULT_POLICY_ID from ray.rllib.utils.annotations import override from ray.rllib.utils import FilterManager @@ -91,7 +92,7 @@ def __init__(self, @property def filters(self): - return {"default": self.policy.get_filter()} + return {DEFAULT_POLICY_ID: self.policy.get_filter()} def sync_filters(self, new_filters): for k in self.filters: @@ -268,7 +269,7 @@ def _train(self): # Now sync the filters FilterManager.synchronize({ - "default": self.policy.get_filter() + DEFAULT_POLICY_ID: self.policy.get_filter() }, self.workers) info = { @@ -332,5 +333,5 @@ def __setstate__(self, state): self.policy.set_weights(state["weights"]) self.policy.set_filter(state["filter"]) FilterManager.synchronize({ - "default": self.policy.get_filter() + DEFAULT_POLICY_ID: self.policy.get_filter() }, self.workers) diff --git a/python/ray/rllib/env/base_env.py b/python/ray/rllib/env/base_env.py index 7dd1921f131d..05196a342d49 100644 --- a/python/ray/rllib/env/base_env.py +++ b/python/ray/rllib/env/base_env.py @@ -186,7 +186,7 @@ def get_unwrapped(self): # Fixed agent identifier when there is only the single agent in the env -_DUMMY_AGENT_ID = "single_agent" +_DUMMY_AGENT_ID = "singleton_agent" def _with_dummy_agent_id(env_id_to_values, dummy_id=_DUMMY_AGENT_ID): diff --git a/python/ray/rllib/evaluation/metrics.py b/python/ray/rllib/evaluation/metrics.py index 511e969373c6..ad3c1afa9169 100644 --- a/python/ray/rllib/evaluation/metrics.py +++ b/python/ray/rllib/evaluation/metrics.py @@ -38,6 +38,10 @@ def collect_episodes(local_evaluator, collected, _ = ray.wait( pending, num_returns=len(pending), timeout=timeout_seconds * 1.0) num_metric_batches_dropped = len(pending) - len(collected) + if pending and len(collected) == 0: + raise ValueError( + "Timed out waiting for metrics from workers. You can configure " + "this timeout with `collect_metrics_timeout`.") metric_lists = ray.get(collected) metric_lists.append(local_evaluator.get_metrics()) diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index e83b6a209a46..60e75ed46b99 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -28,6 +28,8 @@ from ray.rllib.utils import merge_dicts from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.compression import pack +from ray.rllib.utils.debug import disable_log_once_globally, log_once, \ + summarize, enable_periodic_logging from ray.rllib.utils.filter import get_filter from ray.rllib.utils.tf_run_builder import TFRunBuilder @@ -209,10 +211,16 @@ def __init__(self, if log_level: logging.getLogger("ray.rllib").setLevel(log_level) + if worker_index > 1: + disable_log_once_globally() # only need 1 evaluator to log + elif log_level == "DEBUG": + enable_periodic_logging() + env_context = EnvContext(env_config or {}, worker_index) policy_config = policy_config or {} self.policy_config = policy_config self.callbacks = callbacks or {} + self.worker_index = worker_index model_config = model_config or {} policy_mapping_fn = (policy_mapping_fn or (lambda agent_id: DEFAULT_POLICY_ID)) @@ -304,6 +312,8 @@ def make_env(vector_index): policy.observation_space.shape) for (policy_id, policy) in self.policy_map.items() } + if self.worker_index == 0: + logger.info("Built filter map: {}".format(self.filters)) # Always use vector env for consistency even if num_envs = 1 self.async_env = BaseEnv.to_base_env( @@ -390,6 +400,10 @@ def sample(self): SampleBatch|MultiAgentBatch from evaluating the current policies. """ + if log_once("sample_start"): + logger.info("Generating sample batch of size {}".format( + self.sample_batch_size)) + batches = [self.input_reader.next()] steps_so_far = batches[0].count @@ -423,6 +437,10 @@ def sample(self): for estimator in self.reward_estimators: estimator.process(sub_batch) + if log_once("sample_end"): + logger.info("Completed sample batch:\n\n{}\n".format( + summarize(batch))) + if self.compress_observations: if isinstance(batch, MultiAgentBatch): for data in batch.policy_batches.values(): @@ -457,6 +475,9 @@ def set_weights(self, weights): @override(EvaluatorInterface) def compute_gradients(self, samples): + if log_once("compute_gradients"): + logger.info("Compute gradients on:\n\n{}\n".format( + summarize(samples))) if isinstance(samples, MultiAgentBatch): grad_out, info_out = {}, {} if self.tf_sess is not None: @@ -479,10 +500,15 @@ def compute_gradients(self, samples): grad_out, info_out = ( self.policy_map[DEFAULT_POLICY_ID].compute_gradients(samples)) info_out["batch_count"] = samples.count + if log_once("grad_out"): + logger.info("Compute grad info:\n\n{}\n".format( + summarize(info_out))) return grad_out, info_out @override(EvaluatorInterface) def apply_gradients(self, grads): + if log_once("apply_gradients"): + logger.info("Apply gradients:\n\n{}\n".format(summarize(grads))) if isinstance(grads, dict): if self.tf_sess is not None: builder = TFRunBuilder(self.tf_sess, "apply_gradients") @@ -502,6 +528,10 @@ def apply_gradients(self, grads): @override(EvaluatorInterface) def learn_on_batch(self, samples): + if log_once("learn_on_batch"): + logger.info( + "Training on concatenated sample batches:\n\n{}\n".format( + summarize(samples))) if isinstance(samples, MultiAgentBatch): info_out = {} if self.tf_sess is not None: @@ -519,11 +549,12 @@ def learn_on_batch(self, samples): continue info_out[pid], _ = ( self.policy_map[pid].learn_on_batch(batch)) - return info_out else: - grad_fetch, apply_fetch = ( + info_out, _ = ( self.policy_map[DEFAULT_POLICY_ID].learn_on_batch(samples)) - return grad_fetch + if log_once("learn_out"): + logger.info("Training output:\n\n{}\n".format(summarize(info_out))) + return info_out @DeveloperAPI def get_metrics(self): @@ -659,6 +690,9 @@ def _build_policy_map(self, policy_dict, policy_config): "Tuple|DictFlatteningPreprocessor.") with tf.variable_scope(name): policy_map[name] = cls(obs_space, act_space, merged_conf) + if self.worker_index == 0: + logger.info("Built policy map: {}".format(policy_map)) + logger.info("Built preprocessor map: {}".format(preprocessors)) return policy_map, preprocessors def __del__(self): @@ -668,30 +702,7 @@ def __del__(self): def _validate_and_canonicalize(policy_graph, env): if isinstance(policy_graph, dict): - for k, v in policy_graph.items(): - if not isinstance(k, str): - raise ValueError( - "policy_graph keys must be strs, got {}".format(type(k))) - if not isinstance(v, tuple) or len(v) != 4: - raise ValueError( - "policy_graph values must be tuples of " - "(cls, obs_space, action_space, config), got {}".format(v)) - if not issubclass(v[0], PolicyGraph): - raise ValueError( - "policy_graph tuple value 0 must be a rllib.PolicyGraph " - "class, got {}".format(v[0])) - if not isinstance(v[1], gym.Space): - raise ValueError( - "policy_graph tuple value 1 (observation_space) must be a " - "gym.Space, got {}".format(type(v[1]))) - if not isinstance(v[2], gym.Space): - raise ValueError( - "policy_graph tuple value 2 (action_space) must be a " - "gym.Space, got {}".format(type(v[2]))) - if not isinstance(v[3], dict): - raise ValueError( - "policy_graph tuple value 3 (config) must be a dict, " - "got {}".format(type(v[3]))) + _validate_multiagent_config(policy_graph) return policy_graph elif not issubclass(policy_graph, PolicyGraph): raise ValueError("policy_graph must be a rllib.PolicyGraph class") @@ -707,6 +718,35 @@ def _validate_and_canonicalize(policy_graph, env): } +def _validate_multiagent_config(policy_graph, allow_none_graph=False): + for k, v in policy_graph.items(): + if not isinstance(k, str): + raise ValueError("policy_graph keys must be strs, got {}".format( + type(k))) + if not isinstance(v, tuple) or len(v) != 4: + raise ValueError( + "policy_graph values must be tuples of " + "(cls, obs_space, action_space, config), got {}".format(v)) + if allow_none_graph and v[0] is None: + pass + elif not issubclass(v[0], PolicyGraph): + raise ValueError( + "policy_graph tuple value 0 must be a rllib.PolicyGraph " + "class or None, got {}".format(v[0])) + if not isinstance(v[1], gym.Space): + raise ValueError( + "policy_graph tuple value 1 (observation_space) must be a " + "gym.Space, got {}".format(type(v[1]))) + if not isinstance(v[2], gym.Space): + raise ValueError( + "policy_graph tuple value 2 (action_space) must be a " + "gym.Space, got {}".format(type(v[2]))) + if not isinstance(v[3], dict): + raise ValueError( + "policy_graph tuple value 3 (config) must be a dict, " + "got {}".format(type(v[3]))) + + def _validate_env(env): # allow this as a special case (assumed gym.Env) if hasattr(env, "observation_space") and hasattr(env, "action_space"): diff --git a/python/ray/rllib/evaluation/sample_batch.py b/python/ray/rllib/evaluation/sample_batch.py index f9fb4b5a2800..4142f720a0fc 100644 --- a/python/ray/rllib/evaluation/sample_batch.py +++ b/python/ray/rllib/evaluation/sample_batch.py @@ -7,9 +7,10 @@ import numpy as np from ray.rllib.utils.annotations import PublicAPI +from ray.rllib.utils.memory import concat_aligned # Defaults policy id for single agent environments -DEFAULT_POLICY_ID = "default" +DEFAULT_POLICY_ID = "default_policy" @PublicAPI @@ -104,7 +105,7 @@ def concat_samples(samples): out = {} samples = [s for s in samples if s.count > 0] for k in samples[0].keys(): - out[k] = np.concatenate([s[k] for s in samples]) + out[k] = concat_aligned([s[k] for s in samples]) return SampleBatch(out) @PublicAPI @@ -121,7 +122,7 @@ def concat(self, other): assert self.keys() == other.keys(), "must have same columns" out = {} for k in self.keys(): - out[k] = np.concatenate([self[k], other[k]]) + out[k] = concat_aligned([self[k], other[k]]) return SampleBatch(out) @PublicAPI diff --git a/python/ray/rllib/evaluation/sample_batch_builder.py b/python/ray/rllib/evaluation/sample_batch_builder.py index 211e7075b3a5..2387f5cd0c87 100644 --- a/python/ray/rllib/evaluation/sample_batch_builder.py +++ b/python/ray/rllib/evaluation/sample_batch_builder.py @@ -3,10 +3,14 @@ from __future__ import print_function import collections +import logging import numpy as np from ray.rllib.evaluation.sample_batch import SampleBatch, MultiAgentBatch from ray.rllib.utils.annotations import PublicAPI, DeveloperAPI +from ray.rllib.utils.debug import log_once, summarize + +logger = logging.getLogger(__name__) def to_float_array(v): @@ -145,10 +149,16 @@ def postprocess_batch_so_far(self, episode): post_batches[agent_id] = policy.postprocess_trajectory( pre_batch, other_batches, episode) + if log_once("after_post"): + logger.info( + "Trajectory fragment after postprocess_trajectory():\n\n{}\n". + format(summarize(post_batches))) + # Append into policy batches and reset for agent_id, post_batch in sorted(post_batches.items()): self.policy_builders[self.agent_to_policy[agent_id]].add_batch( post_batch) + self.agent_builders.clear() self.agent_to_policy.clear() diff --git a/python/ray/rllib/evaluation/sampler.py b/python/ray/rllib/evaluation/sampler.py index 3e578c328e52..f33f0ff8f65d 100644 --- a/python/ray/rllib/evaluation/sampler.py +++ b/python/ray/rllib/evaluation/sampler.py @@ -17,11 +17,11 @@ from ray.rllib.models.action_dist import TupleActions from ray.rllib.offline import InputReader from ray.rllib.utils.annotations import override +from ray.rllib.utils.debug import log_once, summarize from ray.rllib.utils.tf_run_builder import TFRunBuilder from ray.rllib.evaluation.policy_graph import clip_action logger = logging.getLogger(__name__) -_large_batch_warned = False RolloutMetrics = namedtuple( "RolloutMetrics", @@ -278,6 +278,11 @@ def new_episode(): unfiltered_obs, rewards, dones, infos, off_policy_actions = \ base_env.poll() + if log_once("env_returns"): + logger.info("Raw obs from env: {}".format( + summarize(unfiltered_obs))) + logger.info("Info return from env: {}".format(summarize(infos))) + # Process observations and prepare for policy evaluation active_envs, to_eval, outputs = _process_observations( base_env, policies, batch_builder_pool, active_episodes, @@ -325,10 +330,8 @@ def _process_observations(base_env, policies, batch_builder_pool, episode.batch_builder.count += 1 episode._add_agent_rewards(rewards[env_id]) - global _large_batch_warned - if (not _large_batch_warned and - episode.batch_builder.total() > max(1000, unroll_length * 10)): - _large_batch_warned = True + if (episode.batch_builder.total() > max(1000, unroll_length * 10) + and log_once("large_batch_warning")): logger.warning( "More than {} observations for {} env steps ".format( episode.batch_builder.total(), @@ -362,7 +365,13 @@ def _process_observations(base_env, policies, batch_builder_pool, policy_id = episode.policy_for(agent_id) prep_obs = _get_or_raise(preprocessors, policy_id).transform(raw_obs) + if log_once("prep_obs"): + logger.info("Preprocessed obs: {}".format(summarize(prep_obs))) + filtered_obs = _get_or_raise(obs_filters, policy_id)(prep_obs) + if log_once("filtered_obs"): + logger.info("Filtered obs: {}".format(summarize(filtered_obs))) + agent_done = bool(all_done or dones[env_id].get(agent_id)) if not agent_done: to_eval[policy_id].append( @@ -466,6 +475,11 @@ def _do_policy_eval(tf_sess, to_eval, policies, active_episodes): pending_fetches = {} else: builder = None + + if log_once("compute_actions_input"): + logger.info("Example compute_actions() input:\n\n{}\n".format( + summarize(to_eval))) + for policy_id, eval_data in to_eval.items(): rnn_in_cols = _to_column_format([t.rnn_state for t in eval_data]) policy = _get_or_raise(policies, policy_id) @@ -489,6 +503,10 @@ def _do_policy_eval(tf_sess, to_eval, policies, active_episodes): for k, v in pending_fetches.items(): eval_results[k] = builder.get(v) + if log_once("compute_actions_result"): + logger.info("Example compute_actions() result:\n\n{}\n".format( + summarize(eval_results))) + return eval_results diff --git a/python/ray/rllib/evaluation/tf_policy_graph.py b/python/ray/rllib/evaluation/tf_policy_graph.py index 609c9f57b22c..8febf77381d5 100644 --- a/python/ray/rllib/evaluation/tf_policy_graph.py +++ b/python/ray/rllib/evaluation/tf_policy_graph.py @@ -13,6 +13,7 @@ from ray.rllib.evaluation.policy_graph import PolicyGraph from ray.rllib.models.lstm import chop_into_sequences from ray.rllib.utils.annotations import override, DeveloperAPI +from ray.rllib.utils.debug import log_once, summarize from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule from ray.rllib.utils.tf_run_builder import TFRunBuilder @@ -466,6 +467,15 @@ def _get_loss_inputs_dict(self, batch): for k, v in zip(state_keys, initial_states): feed_dict[self._loss_input_dict[k]] = v feed_dict[self._seq_lens] = seq_lens + + if log_once("rnn_feed_dict"): + logger.info("Padded input for RNN:\n\n{}\n".format( + summarize({ + "features": feature_sequences, + "initial_states": initial_states, + "seq_lens": seq_lens, + "max_seq_len": max_seq_len, + }))) return feed_dict diff --git a/python/ray/rllib/examples/hierarchical_training.py b/python/ray/rllib/examples/hierarchical_training.py index b55ee78df7e0..2cb25cbbf9e6 100644 --- a/python/ray/rllib/examples/hierarchical_training.py +++ b/python/ray/rllib/examples/hierarchical_training.py @@ -35,7 +35,6 @@ import ray from ray.tune import run_experiments, function from ray.rllib.env import MultiAgentEnv -from ray.rllib.agents.ppo import PPOAgent parser = argparse.ArgumentParser() parser.add_argument("--flat", action="store_true") @@ -213,12 +212,11 @@ def policy_mapping_fn(agent_id): "entropy_coeff": 0.01, "multiagent": { "policy_graphs": { - "high_level_policy": (PPOAgent._policy_graph, - maze.observation_space, + "high_level_policy": (None, maze.observation_space, Discrete(4), { "gamma": 0.9 }), - "low_level_policy": (PPOAgent._policy_graph, + "low_level_policy": (None, Tuple([ maze.observation_space, Discrete(4) diff --git a/python/ray/rllib/examples/multiagent_cartpole.py b/python/ray/rllib/examples/multiagent_cartpole.py index a0014053217f..bab549a41655 100644 --- a/python/ray/rllib/examples/multiagent_cartpole.py +++ b/python/ray/rllib/examples/multiagent_cartpole.py @@ -21,7 +21,6 @@ import ray from ray import tune -from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph from ray.rllib.models import Model, ModelCatalog from ray.rllib.tests.test_multi_agent_env import MultiCartpole from ray.tune import run_experiments @@ -90,7 +89,7 @@ def gen_policy(i): }, "gamma": random.choice([0.95, 0.99]), } - return (PPOPolicyGraph, obs_space, act_space, config) + return (None, obs_space, act_space, config) # Setup PPO with an ensemble of `num_policies` different policy graphs policy_graphs = { diff --git a/python/ray/rllib/models/preprocessors.py b/python/ray/rllib/models/preprocessors.py index e68ffbefc8d3..dd133f2aa8d4 100644 --- a/python/ray/rllib/models/preprocessors.py +++ b/python/ray/rllib/models/preprocessors.py @@ -30,6 +30,7 @@ def __init__(self, obs_space, options=None): self._obs_space = obs_space self._options = options or {} self.shape = self._init_shape(obs_space, options) + self._size = int(np.product(self.shape)) @PublicAPI def _init_shape(self, obs_space, options): @@ -41,10 +42,14 @@ def transform(self, observation): """Returns the preprocessed observation.""" raise NotImplementedError + def write(self, observation, array, offset): + """Alternative to transform for more efficient flattening.""" + array[offset:offset + self._size] = self.transform(observation) + @property @PublicAPI def size(self): - return int(np.product(self.shape)) + return self._size @property @PublicAPI @@ -123,6 +128,10 @@ def transform(self, observation): arr[observation] = 1 return arr + @override(Preprocessor) + def write(self, observation, array, offset): + array[offset + observation] = 1 + class NoPreprocessor(Preprocessor): @override(Preprocessor) @@ -133,6 +142,11 @@ def _init_shape(self, obs_space, options): def transform(self, observation): return observation + @override(Preprocessor) + def write(self, observation, array, offset): + array[offset:offset + self._size] = np.array( + observation, copy=False).ravel() + class TupleFlatteningPreprocessor(Preprocessor): """Preprocesses each tuple element, then flattens it all into a vector. @@ -155,11 +169,16 @@ def _init_shape(self, obs_space, options): @override(Preprocessor) def transform(self, observation): + array = np.zeros(self.shape) + self.write(observation, array, 0) + return array + + @override(Preprocessor) + def write(self, observation, array, offset): assert len(observation) == len(self.preprocessors), observation - return np.concatenate([ - np.reshape(p.transform(o), [p.size]) - for (o, p) in zip(observation, self.preprocessors) - ]) + for o, p in zip(observation, self.preprocessors): + p.write(o, array, offset) + offset += p.size class DictFlatteningPreprocessor(Preprocessor): @@ -182,14 +201,19 @@ def _init_shape(self, obs_space, options): @override(Preprocessor) def transform(self, observation): + array = np.zeros(self.shape) + self.write(observation, array, 0) + return array + + @override(Preprocessor) + def write(self, observation, array, offset): if not isinstance(observation, OrderedDict): observation = OrderedDict(sorted(list(observation.items()))) assert len(observation) == len(self.preprocessors), \ (len(observation), len(self.preprocessors)) - return np.concatenate([ - np.reshape(p.transform(o), [p.size]) - for (o, p) in zip(observation.values(), self.preprocessors) - ]) + for o, p in zip(observation.values(), self.preprocessors): + p.write(o, array, offset) + offset += p.size @PublicAPI diff --git a/python/ray/rllib/optimizers/async_samples_optimizer.py b/python/ray/rllib/optimizers/async_samples_optimizer.py index 22b7ea18b04e..22f33545bdab 100644 --- a/python/ray/rllib/optimizers/async_samples_optimizer.py +++ b/python/ray/rllib/optimizers/async_samples_optimizer.py @@ -15,6 +15,7 @@ from six.moves import queue import ray +from ray.rllib.evaluation.sample_batch import DEFAULT_POLICY_ID from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.utils.actors import TaskPool @@ -321,9 +322,9 @@ def __init__(self, assert self.train_batch_size % len(self.devices) == 0 assert self.train_batch_size >= len(self.devices), "batch too small" - if set(self.local_evaluator.policy_map.keys()) != {"default"}: + if set(self.local_evaluator.policy_map.keys()) != {DEFAULT_POLICY_ID}: raise NotImplementedError("Multi-gpu mode for multi-agent") - self.policy = self.local_evaluator.policy_map["default"] + self.policy = self.local_evaluator.policy_map[DEFAULT_POLICY_ID] # per-GPU graph copies created below must share vars with the policy # reuse is set to AUTO_REUSE because Adam nodes are created after @@ -331,7 +332,7 @@ def __init__(self, self.par_opt = [] with self.local_evaluator.tf_sess.graph.as_default(): with self.local_evaluator.tf_sess.as_default(): - with tf.variable_scope("default", reuse=tf.AUTO_REUSE): + with tf.variable_scope(DEFAULT_POLICY_ID, reuse=tf.AUTO_REUSE): if self.policy._state_inputs: rnn_inputs = self.policy._state_inputs + [ self.policy._seq_lens @@ -368,15 +369,16 @@ def step(self): assert self.loader_thread.is_alive() with self.load_wait_timer: opt, released = self.minibatch_buffer.get() - if released: - self.idle_optimizers.put(opt) with self.grad_timer: fetches = opt.optimize(self.sess, 0) self.weights_updated = True self.stats = fetches.get("stats", {}) - self.outqueue.put(self.train_batch_size) + if released: + self.idle_optimizers.put(opt) + + self.outqueue.put(opt.num_tuples_loaded) self.learner_queue_size.push(self.inqueue.qsize()) diff --git a/python/ray/rllib/optimizers/multi_gpu_impl.py b/python/ray/rllib/optimizers/multi_gpu_impl.py index 7c00fda99a91..5a418f79d8af 100644 --- a/python/ray/rllib/optimizers/multi_gpu_impl.py +++ b/python/ray/rllib/optimizers/multi_gpu_impl.py @@ -4,9 +4,10 @@ from collections import namedtuple import logging - import tensorflow as tf +from ray.rllib.utils.debug import log_once, summarize + # Variable scope in which created variables will be placed under TOWER_SCOPE_NAME = "tower" @@ -134,6 +135,15 @@ def load_data(self, sess, inputs, state_inputs): The number of tuples loaded per device. """ + if log_once("load_data"): + logger.info( + "Training on concatenated sample batches:\n\n{}\n".format( + summarize({ + "placeholders": self.loss_inputs, + "inputs": inputs, + "state_inputs": state_inputs + }))) + feed_dict = {} assert len(self.loss_inputs) == len(inputs + state_inputs), \ (self.loss_inputs, inputs, state_inputs) @@ -188,6 +198,7 @@ def load_data(self, sess, inputs, state_inputs): sess.run([t.init_op for t in self._towers], feed_dict=feed_dict) + self.num_tuples_loaded = truncated_len tuples_per_device = truncated_len // len(self.devices) assert tuples_per_device > 0, "No data loaded?" assert tuples_per_device % self._loaded_per_device_batch_size == 0 diff --git a/python/ray/rllib/tests/test_catalog.py b/python/ray/rllib/tests/test_catalog.py index 9346e1064c67..fc9b71d2c3e2 100644 --- a/python/ray/rllib/tests/test_catalog.py +++ b/python/ray/rllib/tests/test_catalog.py @@ -16,12 +16,12 @@ class CustomPreprocessor(Preprocessor): def _init_shape(self, obs_space, options): - return None + return [1] class CustomPreprocessor2(Preprocessor): def _init_shape(self, obs_space, options): - return None + return [1] class CustomModel(Model): diff --git a/python/ray/rllib/tests/test_multi_agent_env.py b/python/ray/rllib/tests/test_multi_agent_env.py index 6eeca3ef22a0..1c9d32e8f015 100644 --- a/python/ray/rllib/tests/test_multi_agent_env.py +++ b/python/ray/rllib/tests/test_multi_agent_env.py @@ -531,7 +531,7 @@ def gen_policy(): } obs_space = single_env.observation_space act_space = single_env.action_space - return (PGPolicyGraph, obs_space, act_space, config) + return (None, obs_space, act_space, config) pg = PGAgent( env="multi_cartpole", diff --git a/python/ray/rllib/tests/test_policy_evaluator.py b/python/ray/rllib/tests/test_policy_evaluator.py index d5466865bad0..827b737ef6e1 100644 --- a/python/ray/rllib/tests/test_policy_evaluator.py +++ b/python/ray/rllib/tests/test_policy_evaluator.py @@ -16,6 +16,7 @@ from ray.rllib.evaluation.metrics import collect_metrics from ray.rllib.evaluation.policy_graph import PolicyGraph from ray.rllib.evaluation.postprocessing import compute_advantages +from ray.rllib.evaluation.sample_batch import DEFAULT_POLICY_ID from ray.rllib.env.vector_env import VectorEnv from ray.tune.registry import register_env @@ -367,7 +368,7 @@ def testFilterSync(self): time.sleep(2) ev.sample() filters = ev.get_filters(flush_after=True) - obs_f = filters["default"] + obs_f = filters[DEFAULT_POLICY_ID] self.assertNotEqual(obs_f.rs.n, 0) self.assertNotEqual(obs_f.buffer.n, 0) @@ -381,8 +382,8 @@ def testGetFilters(self): filters = ev.get_filters(flush_after=False) time.sleep(2) filters2 = ev.get_filters(flush_after=False) - obs_f = filters["default"] - obs_f2 = filters2["default"] + obs_f = filters[DEFAULT_POLICY_ID] + obs_f2 = filters2[DEFAULT_POLICY_ID] self.assertGreaterEqual(obs_f2.rs.n, obs_f.rs.n) self.assertGreaterEqual(obs_f2.buffer.n, obs_f.buffer.n) @@ -396,15 +397,15 @@ def testSyncFilter(self): # Current State filters = ev.get_filters(flush_after=False) - obs_f = filters["default"] + obs_f = filters[DEFAULT_POLICY_ID] self.assertLessEqual(obs_f.buffer.n, 20) new_obsf = obs_f.copy() new_obsf.rs._n = 100 - ev.sync_filters({"default": new_obsf}) + ev.sync_filters({DEFAULT_POLICY_ID: new_obsf}) filters = ev.get_filters(flush_after=False) - obs_f = filters["default"] + obs_f = filters[DEFAULT_POLICY_ID] self.assertGreaterEqual(obs_f.rs.n, 100) self.assertLessEqual(obs_f.buffer.n, 20) @@ -412,7 +413,7 @@ def sample_and_flush(self, ev): time.sleep(2) ev.sample() filters = ev.get_filters(flush_after=True) - obs_f = filters["default"] + obs_f = filters[DEFAULT_POLICY_ID] self.assertNotEqual(obs_f.rs.n, 0) self.assertNotEqual(obs_f.buffer.n, 0) return obs_f diff --git a/python/ray/rllib/utils/debug.py b/python/ray/rllib/utils/debug.py new file mode 100644 index 000000000000..63638d292371 --- /dev/null +++ b/python/ray/rllib/utils/debug.py @@ -0,0 +1,111 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import pprint +import time + +from ray.rllib.evaluation.sample_batch import SampleBatch, MultiAgentBatch + +_logged = set() +_disabled = False +_periodic_log = True +_last_logged = 0.0 +_printer = pprint.PrettyPrinter(indent=2, width=60) + + +def log_once(key): + """Returns True if this is the "first" call for a given key. + + Various logging settings can adjust the definition of "first". + + Example: + >>> if log_once("some_key"): + ... logger.info("Some verbose logging statement") + """ + + global _last_logged + + if _disabled: + return False + elif key not in _logged: + _logged.add(key) + _last_logged = time.time() + return True + elif _periodic_log and time.time() - _last_logged > 60.0: + _logged.clear() + _last_logged = time.time() + return False + else: + return False + + +def disable_log_once_globally(): + """Make log_once() return False in this process.""" + + global _disabled + _disabled = True + + +def enable_periodic_logging(): + """Make log_once() periodically return True in this process.""" + + global _periodic_log + _periodic_log = True + + +def summarize(obj): + """Return a pretty-formatted string for an object. + + This has special handling for pretty-formatting of commonly used data types + in RLlib, such as SampleBatch, numpy arrays, etc. + """ + + return _printer.pformat(_summarize(obj)) + + +def _summarize(obj): + if isinstance(obj, dict): + return {k: _summarize(v) for k, v in obj.items()} + elif hasattr(obj, "_asdict"): + return { + "type": obj.__class__.__name__, + "data": _summarize(obj._asdict()), + } + elif isinstance(obj, list): + return [_summarize(x) for x in obj] + elif isinstance(obj, tuple): + return tuple(_summarize(x) for x in obj) + elif isinstance(obj, np.ndarray): + if obj.dtype == np.object: + return _StringValue("np.ndarray({}, dtype={}, head={})".format( + obj.shape, obj.dtype, _summarize(obj[0]))) + else: + return _StringValue( + "np.ndarray({}, dtype={}, min={}, max={}, mean={})".format( + obj.shape, obj.dtype, round(float(np.min(obj)), 3), + round(float(np.max(obj)), 3), round( + float(np.mean(obj)), 3))) + elif isinstance(obj, MultiAgentBatch): + return { + "type": "MultiAgentBatch", + "policy_batches": _summarize(obj.policy_batches), + "count": obj.count, + } + elif isinstance(obj, SampleBatch): + return { + "type": "SampleBatch", + "data": {k: _summarize(v) + for k, v in obj.items()}, + } + else: + return obj + + +class _StringValue(object): + def __init__(self, value): + self.value = value + + def __repr__(self): + return self.value diff --git a/python/ray/rllib/utils/memory.py b/python/ray/rllib/utils/memory.py new file mode 100644 index 000000000000..b5e3f12474c9 --- /dev/null +++ b/python/ray/rllib/utils/memory.py @@ -0,0 +1,51 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + + +def aligned_array(size, dtype, align=64): + """Returns an array of a given size that is 64-byte aligned. + + The returned array can be efficiently copied into GPU memory by TensorFlow. + """ + + n = size * dtype.itemsize + empty = np.empty(n + (align - 1), dtype=np.uint8) + data_align = empty.ctypes.data % align + offset = 0 if data_align == 0 else (align - data_align) + output = empty[offset:offset + n].view(dtype) + + assert len(output) == size, len(output) + assert output.ctypes.data % align == 0, output.ctypes.data + return output + + +def concat_aligned(items): + """Concatenate arrays, ensuring the output is 64-byte aligned. + + We only align float arrays; other arrays are concatenated as normal. + + This should be used instead of np.concatenate() to improve performance + when the output array is likely to be fed into TensorFlow. + """ + + if len(items) == 0: + return [] + elif len(items) == 1: + # we assume the input is aligned. In any case, it doesn't help + # performance to force align it since that incurs a needless copy. + return items[0] + elif (isinstance(items[0], np.ndarray) + and items[0].dtype in [np.float32, np.float64, np.uint8]): + dtype = items[0].dtype + flat = aligned_array(sum(s.size for s in items), dtype) + batch_dim = sum(s.shape[0] for s in items) + new_shape = (batch_dim, ) + items[0].shape[1:] + output = flat.reshape(new_shape) + assert output.ctypes.data % 64 == 0, output.ctypes.data + np.concatenate(items, out=output) + return output + else: + return np.concatenate(items) diff --git a/python/ray/services.py b/python/ray/services.py index 4bfa4719c6be..61e4fb9a6ba6 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -924,10 +924,9 @@ def start_dashboard(redis_address, import aiohttp # noqa: F401 import psutil # noqa: F401 except ImportError: - logger.warning( + raise ImportError( "Failed to start the dashboard. The dashboard requires Python 3 " "as well as 'pip install aiohttp psutil'.") - return None, None process_info = start_ray_process( command, diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 13d14f3d1bc2..b689629fe88e 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -502,6 +502,89 @@ def echo(self, value): assert ray.get(a.g.remote(2)) == 4 +def test_resource_assignment(shutdown_only): + """Test to make sure that we assign resource to actors at instantiation.""" + # This test will create 16 actors. Declaring this many CPUs initially will + # speed up the test because the workers will be started ahead of time. + ray.init(num_cpus=16, num_gpus=1, resources={"Custom": 1}) + + class Actor(object): + def __init__(self): + self.resources = ray.get_resource_ids() + + def get_actor_resources(self): + return self.resources + + def get_actor_method_resources(self): + return ray.get_resource_ids() + + decorator_resource_args = [{}, { + "num_cpus": 0.1 + }, { + "num_gpus": 0.1 + }, { + "resources": { + "Custom": 0.1 + } + }] + instantiation_resource_args = [{}, { + "num_cpus": 0.2 + }, { + "num_gpus": 0.2 + }, { + "resources": { + "Custom": 0.2 + } + }] + for decorator_args in decorator_resource_args: + for instantiation_args in instantiation_resource_args: + if len(decorator_args) == 0: + actor_class = ray.remote(Actor) + else: + actor_class = ray.remote(**decorator_args)(Actor) + actor = actor_class._remote(**instantiation_args) + actor_resources = ray.get(actor.get_actor_resources.remote()) + actor_method_resources = ray.get( + actor.get_actor_method_resources.remote()) + if len(decorator_args) == 0 and len(instantiation_args) == 0: + assert len(actor_resources) == 0, ( + "Actor should not be assigned resources.") + assert list(actor_method_resources.keys()) == [ + "CPU" + ], ("Actor method should only have CPUs") + assert actor_method_resources["CPU"][0][1] == 1, ( + "Actor method should default to one cpu.") + else: + if ("num_cpus" not in decorator_args + and "num_cpus" not in instantiation_args): + assert actor_resources["CPU"][0][1] == 1, ( + "Actor should default to one cpu.") + correct_resources = {} + defined_resources = decorator_args.copy() + defined_resources.update(instantiation_args) + for resource, value in defined_resources.items(): + if resource == "num_cpus": + correct_resources["CPU"] = value + elif resource == "num_gpus": + correct_resources["GPU"] = value + elif resource == "resources": + for custom_resource, amount in value.items(): + correct_resources[custom_resource] = amount + for resource, amount in correct_resources.items(): + assert (actor_resources[resource][0][0] == + actor_method_resources[resource][0][0]), ( + "Should have assigned same {} for both actor ", + "and actor method.".format(resource)) + assert (actor_resources[resource][0][ + 1] == actor_method_resources[resource][0][1]), ( + "Should have assigned same amount of {} for both ", + "actor and actor method.".format(resource)) + assert actor_resources[resource][0][1] == amount, ( + "Actor should have {amount} {resource} but has ", + "{amount} {resource}".format( + amount=amount, resource=resource)) + + def test_multiple_actors(ray_start_regular): @ray.remote class Counter(object): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index fad8e12413b8..cf2a3712525f 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -807,7 +807,7 @@ def m(x): def test_submit_api(shutdown_only): - ray.init(num_cpus=1, num_gpus=1, resources={"Custom": 1}) + ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @ray.remote def f(n): diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index b096d2a78567..71016c62a417 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -139,8 +139,11 @@ def set_weights(self, x): pass actors = [ - Actor._remote(args=[], kwargs={}, resources={str(i % num_nodes): 1}) - for i in range(100) + Actor._remote( + args=[], + kwargs={}, + num_cpus=0.01, + resources={str(i % num_nodes): 1}) for i in range(100) ] # Wait for the actors to start up. diff --git a/python/ray/tune/examples/mnist_pytorch.py b/python/ray/tune/examples/mnist_pytorch.py index 0bfdaaf9efc0..8e374c8c2066 100644 --- a/python/ray/tune/examples/mnist_pytorch.py +++ b/python/ray/tune/examples/mnist_pytorch.py @@ -171,7 +171,6 @@ def test(): tune.run( "TRAIN_FN", name="exp", - verbose=0, scheduler=sched, **{ "stop": { diff --git a/python/ray/tune/examples/mnist_pytorch_trainable.py b/python/ray/tune/examples/mnist_pytorch_trainable.py index cf8cd612e29b..f644dfd46e99 100644 --- a/python/ray/tune/examples/mnist_pytorch_trainable.py +++ b/python/ray/tune/examples/mnist_pytorch_trainable.py @@ -179,7 +179,6 @@ def _restore(self, checkpoint_path): time_attr="training_iteration", reward_attr="neg_mean_loss") tune.run( TrainMNIST, - verbose=0, scheduler=sched, **{ "stop": { diff --git a/python/ray/tune/examples/tune_mnist_keras.py b/python/ray/tune/examples/tune_mnist_keras.py index abe0452d4ae4..44a9b7d63df9 100644 --- a/python/ray/tune/examples/tune_mnist_keras.py +++ b/python/ray/tune/examples/tune_mnist_keras.py @@ -183,7 +183,6 @@ def create_parser(): tune.run( "TRAIN_FN", name="exp", - verbose=0, scheduler=sched, **{ "stop": { diff --git a/python/ray/tune/tests/test_commands.py b/python/ray/tune/tests/test_commands.py index 174f356d5b08..9b58d3090597 100644 --- a/python/ray/tune/tests/test_commands.py +++ b/python/ray/tune/tests/test_commands.py @@ -4,6 +4,11 @@ import os import pytest +import sys +try: + from cStringIO import StringIO +except ImportError: + from io import StringIO import ray from ray import tune @@ -11,6 +16,19 @@ from ray.tune import commands +class Capturing(): + def __enter__(self): + self._stdout = sys.stdout + sys.stdout = self._stringio = StringIO() + self.captured = [] + return self + + def __exit__(self, *args): + self.captured.extend(self._stringio.getvalue().splitlines()) + del self._stringio # free up some memory + sys.stdout = self._stdout + + @pytest.fixture def start_ray(): ray.init() @@ -19,48 +37,46 @@ def start_ray(): ray.shutdown() -def test_ls(start_ray, capsys, tmpdir): +def test_ls(start_ray, tmpdir): """This test captures output of list_trials.""" experiment_name = "test_ls" experiment_path = os.path.join(str(tmpdir), experiment_name) num_samples = 2 - with capsys.disabled(): - tune.run_experiments({ - experiment_name: { - "run": "__fake", - "stop": { - "training_iteration": 1 - }, - "num_samples": num_samples, - "local_dir": str(tmpdir) - } - }) + tune.run_experiments({ + experiment_name: { + "run": "__fake", + "stop": { + "training_iteration": 1 + }, + "num_samples": num_samples, + "local_dir": str(tmpdir) + } + }) - commands.list_trials(experiment_path, info_keys=("status", )) - captured = capsys.readouterr().out.strip() - lines = captured.split("\n") + with Capturing() as output: + commands.list_trials(experiment_path, info_keys=("status", )) + lines = output.captured assert sum("TERMINATED" in line for line in lines) == num_samples -def test_lsx(start_ray, capsys, tmpdir): +def test_lsx(start_ray, tmpdir): """This test captures output of list_experiments.""" project_path = str(tmpdir) num_experiments = 3 for i in range(num_experiments): experiment_name = "test_lsx{}".format(i) - with capsys.disabled(): - tune.run_experiments({ - experiment_name: { - "run": "__fake", - "stop": { - "training_iteration": 1 - }, - "num_samples": 1, - "local_dir": project_path - } - }) + tune.run_experiments({ + experiment_name: { + "run": "__fake", + "stop": { + "training_iteration": 1 + }, + "num_samples": 1, + "local_dir": project_path + } + }) - commands.list_experiments(project_path, info_keys=("total_trials", )) - captured = capsys.readouterr().out.strip() - lines = captured.split("\n") + with Capturing() as output: + commands.list_experiments(project_path, info_keys=("total_trials", )) + lines = output.captured assert sum("1" in line for line in lines) >= 3 diff --git a/python/ray/worker.py b/python/ray/worker.py index aa65def06727..809e2e43456a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -76,15 +76,6 @@ ERROR_KEY_PREFIX = b"Error:" -# Default resource requirements for actors when no resource requirements are -# specified. -DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE = 1 -DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE = 0 -# Default resource requirements for actors when some resource requirements are -# specified. -DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0 -DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE = 1 - # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray provides a default configuration at # entry/init points. @@ -2480,23 +2471,8 @@ def decorator(function_or_class): raise Exception("The keyword 'max_calls' is not allowed for " "actors.") - # Set the actor default resources. - if num_cpus is None and num_gpus is None and resources is None: - # In the default case, actors acquire no resources for - # their lifetime, and actor methods will require 1 CPU. - cpus_to_use = DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE - actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE - else: - # If any resources are specified, then all resources are - # acquired for the actor's lifetime and no resources are - # associated with methods. - cpus_to_use = (DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE - if num_cpus is None else num_cpus) - actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE - - return worker.make_actor(function_or_class, cpus_to_use, num_gpus, - resources, actor_method_cpus, - max_reconstructions) + return worker.make_actor(function_or_class, num_cpus, num_gpus, + resources, max_reconstructions) raise Exception("The @ray.remote decorator must be applied to " "either a function or to a class.")