diff --git a/.travis.yml b/.travis.yml index 7402e6fb6e78..a3575f60e461 100644 --- a/.travis.yml +++ b/.travis.yml @@ -135,7 +135,7 @@ matrix: install: - eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py` - - if [ $RAY_CI_TUNE_AFFECTED != "1" ] && [ $RAY_CI_RLLIB_AFFECTED != "1" ] && [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi + - if [ $RAY_CI_TUNE_AFFECTED != "1" ] && [ $RAY_CI_RLLIB_AFFECTED != "1" ] && [ $RAY_CI_SERVE_AFFECTED != "1" ] && [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi - ./ci/suppress_output ./ci/travis/install-bazel.sh - ./ci/suppress_output ./ci/travis/install-dependencies.sh @@ -173,6 +173,9 @@ script: - if [ $RAY_CI_RLLIB_AFFECTED == "1" ]; then ./ci/suppress_output python python/ray/rllib/tests/test_optimizers.py; fi - if [ $RAY_CI_RLLIB_AFFECTED == "1" ]; then ./ci/suppress_output python python/ray/rllib/tests/test_evaluators.py; fi + # ray serve tests + - if [ $RAY_CI_SERVE_AFFECTED == "1" ] && [ $RAY_CI_PY3 == "1" ]; then ./ci/suppress_output python -m pytest -v --durations=5 python/ray/experimental/serve/tests; fi + # ray tests # Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester. - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi diff --git a/ci/travis/determine_tests_to_run.py b/ci/travis/determine_tests_to_run.py index 7518a0148d72..721447fe6e3d 100644 --- a/ci/travis/determine_tests_to_run.py +++ b/ci/travis/determine_tests_to_run.py @@ -5,6 +5,8 @@ import os import subprocess +import sys +from pprint import pformat def list_changed_files(commit_range): @@ -26,24 +28,37 @@ def list_changed_files(commit_range): return [s.strip() for s in out.decode().splitlines() if s is not None] +def print_and_log(s): + print(s) + sys.stderr.write(s) + sys.stderr.write("\n") + + if __name__ == "__main__": RAY_CI_TUNE_AFFECTED = 0 RAY_CI_RLLIB_AFFECTED = 0 + RAY_CI_SERVE_AFFECTED = 0 RAY_CI_JAVA_AFFECTED = 0 RAY_CI_PYTHON_AFFECTED = 0 RAY_CI_LINUX_WHEELS_AFFECTED = 0 RAY_CI_MACOS_WHEELS_AFFECTED = 0 + RAY_CI_PY3 = 1 if sys.version_info >= (3, 5) else 0 + if os.environ["TRAVIS_EVENT_TYPE"] == "pull_request": files = list_changed_files(os.environ["TRAVIS_COMMIT_RANGE"].replace( "...", "..")) skip_prefix_list = [ - "doc/", "examples/", "dev/", "docker/", "kubernetes/", "site/" + "doc/", "examples/", "dev/", "docker/", "kubernetes/", "site/", + "ci/", ".travis.yml" ] + sys.stderr.write("Files Changed\n") + sys.stderr.write(pformat(files)) + for changed_file in files: if changed_file.startswith("python/ray/tune/"): RAY_CI_TUNE_AFFECTED = 1 @@ -54,6 +69,10 @@ def list_changed_files(commit_range): RAY_CI_RLLIB_AFFECTED = 1 RAY_CI_LINUX_WHEELS_AFFECTED = 1 RAY_CI_MACOS_WHEELS_AFFECTED = 1 + elif changed_file.startswith("python/ray/experimental/serve"): + RAY_CI_SERVE_AFFECTED = 1 + RAY_CI_LINUX_WHEELS_AFFECTED = 1 + RAY_CI_MACOS_WHEELS_AFFECTED = 1 elif changed_file.startswith("python/"): RAY_CI_TUNE_AFFECTED = 1 RAY_CI_RLLIB_AFFECTED = 1 @@ -89,11 +108,18 @@ def list_changed_files(commit_range): RAY_CI_LINUX_WHEELS_AFFECTED = 1 RAY_CI_MACOS_WHEELS_AFFECTED = 1 - print("export RAY_CI_TUNE_AFFECTED={}".format(RAY_CI_TUNE_AFFECTED)) - print("export RAY_CI_RLLIB_AFFECTED={}".format(RAY_CI_RLLIB_AFFECTED)) - print("export RAY_CI_JAVA_AFFECTED={}".format(RAY_CI_JAVA_AFFECTED)) - print("export RAY_CI_PYTHON_AFFECTED={}".format(RAY_CI_PYTHON_AFFECTED)) - print("export RAY_CI_LINUX_WHEELS_AFFECTED={}" - .format(RAY_CI_LINUX_WHEELS_AFFECTED)) - print("export RAY_CI_MACOS_WHEELS_AFFECTED={}" - .format(RAY_CI_MACOS_WHEELS_AFFECTED)) + print_and_log("export RAY_CI_PY3={}".format(RAY_CI_PY3)) + print_and_log( + "export RAY_CI_TUNE_AFFECTED={}".format(RAY_CI_TUNE_AFFECTED)) + print_and_log( + "export RAY_CI_RLLIB_AFFECTED={}".format(RAY_CI_RLLIB_AFFECTED)) + print_and_log( + "export RAY_CI_SERVE_AFFECTED={}".format(RAY_CI_SERVE_AFFECTED)) + print_and_log( + "export RAY_CI_JAVA_AFFECTED={}".format(RAY_CI_JAVA_AFFECTED)) + print_and_log( + "export RAY_CI_PYTHON_AFFECTED={}".format(RAY_CI_PYTHON_AFFECTED)) + print_and_log("export RAY_CI_LINUX_WHEELS_AFFECTED={}".format( + RAY_CI_LINUX_WHEELS_AFFECTED)) + print_and_log("export RAY_CI_MACOS_WHEELS_AFFECTED={}".format( + RAY_CI_MACOS_WHEELS_AFFECTED)) diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index 20ff2e64d210..8b014ea9f635 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -34,7 +34,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp + feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \ + flask elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then # Install miniconda. wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv @@ -48,7 +49,8 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp + feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \ + flask elif [[ "$LINT" == "1" ]]; then sudo apt-get update sudo apt-get install -y build-essential curl unzip diff --git a/python/ray/experimental/serve/README.rst b/python/ray/experimental/serve/README.rst index b59672c46912..4857b4309d24 100644 --- a/python/ray/experimental/serve/README.rst +++ b/python/ray/experimental/serve/README.rst @@ -4,18 +4,6 @@ Ray Serve Module ``ray.experimental.serve`` is a module for publishing your actors to interact with outside world. -Use Case --------- - -Serve machine learning model -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Scalable anayltics query -~~~~~~~~~~~~~~~~~~~~~~~~ - -Composible pipelines -~~~~~~~~~~~~~~~~~~~~ - Architecture ------------ diff --git a/python/ray/experimental/serve/__init__.py b/python/ray/experimental/serve/__init__.py index 15757a9d5f54..cf60daaddbdb 100644 --- a/python/ray/experimental/serve/__init__.py +++ b/python/ray/experimental/serve/__init__.py @@ -10,8 +10,8 @@ import sys -assert sys.version_info >= (3, ), ( - "ray.experimental.serve is a python3 only library") +assert sys.version_info >= ( + 3, 5), "ray.experimental.serve is a python3 only library" from ray.experimental.serve.router import (DeadlineAwareRouter, SingleQuery) # noqa: E402 diff --git a/python/ray/experimental/serve/example_actors.py b/python/ray/experimental/serve/example_actors.py new file mode 100644 index 000000000000..2dd12e2ea064 --- /dev/null +++ b/python/ray/experimental/serve/example_actors.py @@ -0,0 +1,108 @@ +""" +This file contains example ray servable actors. These actors +are used for testing as well as demoing purpose. +""" + +from __future__ import absolute_import, division, print_function + +import time + +import numpy as np + +import ray +from ray.experimental.serve import RayServeMixin, batched_input + + +@ray.remote +class VectorizedAdder(RayServeMixin): + """Actor that adds scaler_increment to input batch. + + result = np.array(input_batch) + scaler_increment + """ + + def __init__(self, scaler_increment): + self.inc = scaler_increment + + @batched_input + def __call__(self, input_batch): + arr = np.array(input_batch) + arr += self.inc + return arr.tolist() + + +@ray.remote +class ScalerAdder(RayServeMixin): + """Actor that adds a scaler_increment to a single input.""" + + def __init__(self, scaler_increment): + self.inc = scaler_increment + + def __call__(self, input_scaler): + return input_scaler + self.inc + + +@ray.remote +class VectorDouble(RayServeMixin): + """Actor that doubles the batched input.""" + + @batched_input + def __call__(self, batched_vectors): + matrix = np.array(batched_vectors) + matrix *= 2 + return [v.tolist() for v in matrix] + + +@ray.remote +class Counter(RayServeMixin): + """Return the query id. Used for testing router.""" + + def __init__(self): + self.counter = 0 + + def __call__(self, batched_input): + self.counter += 1 + return self.counter + + +@ray.remote +class CustomCounter(RayServeMixin): + """Return the query id. Used for testing `serve_method` signature.""" + + serve_method = "count" + + @batched_input + def count(self, input_batch): + return [1 for _ in range(len(input_batch))] + + +@ray.remote +class SleepOnFirst(RayServeMixin): + """Sleep on the first request, return batch size. + + Used for testing the DeadlineAwareRouter. + """ + + def __init__(self, sleep_time): + self.nap_time = sleep_time + + @batched_input + def __call__(self, input_batch): + time.sleep(self.nap_time) + return [len(input_batch) for _ in range(len(input_batch))] + + +@ray.remote +class SleepCounter(RayServeMixin): + """Sleep on input argument seconds, return the query id. + + Used to test the DeadlineAwareRouter. + """ + + def __init__(self): + self.counter = 0 + + def __call__(self, inp): + time.sleep(inp) + + self.counter += 1 + return self.counter diff --git a/python/ray/experimental/serve/examples/adder.py b/python/ray/experimental/serve/examples/adder.py deleted file mode 100644 index 862e61c7150b..000000000000 --- a/python/ray/experimental/serve/examples/adder.py +++ /dev/null @@ -1,47 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class VectorizedAdder(RayServeMixin): - """Actor that adds scaler_increment to input batch. - - result = np.array(input_batch) + scaler_increment - """ - - def __init__(self, scaler_increment): - self.inc = scaler_increment - - @batched_input - def __call__(self, input_batch): - arr = np.array(input_batch) - arr += self.inc - return arr.tolist() - - -@ray.remote -class ScalerAdder(RayServeMixin): - """Actor that adds a scaler_increment to a single input.""" - - def __init__(self, scaler_increment): - self.inc = scaler_increment - - def __call__(self, input_scaler): - return input_scaler + self.inc - - -@ray.remote -class VectorDouble(RayServeMixin): - """Actor that doubles the batched input.""" - - @batched_input - def __call__(self, batched_vectors): - matrix = np.array(batched_vectors) - matrix *= 2 - return [v.tolist() for v in matrix] diff --git a/python/ray/experimental/serve/examples/counter.py b/python/ray/experimental/serve/examples/counter.py deleted file mode 100644 index 369d53fb5a8e..000000000000 --- a/python/ray/experimental/serve/examples/counter.py +++ /dev/null @@ -1,29 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class Counter(RayServeMixin): - """Return the query id. Used for testing router.""" - - def __init__(self): - self.counter = 0 - - def __call__(self, batched_input): - self.counter += 1 - return self.counter - - -@ray.remote -class CustomCounter(RayServeMixin): - """Return the query id. Used for testing `serve_method` signature.""" - - serve_method = "count" - - @batched_input - def count(self, input_batch): - return [1 for _ in range(len(input_batch))] diff --git a/python/ray/experimental/serve/examples/halt.py b/python/ray/experimental/serve/examples/halt.py deleted file mode 100644 index eceb94d8653e..000000000000 --- a/python/ray/experimental/serve/examples/halt.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import time - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class SleepOnFirst(RayServeMixin): - """Sleep on the first request, return batch size. - - Used for testing the DeadlineAwareRouter. - """ - - def __init__(self, sleep_time): - self.nap_time = sleep_time - - @batched_input - def __call__(self, input_batch): - time.sleep(self.nap_time) - return [len(input_batch) for _ in range(len(input_batch))] - - -@ray.remote -class SleepCounter(RayServeMixin): - """Sleep on input argument seconds, return the query id. - - Used to test the DeadlineAwareRouter. - """ - - def __init__(self): - self.counter = 0 - - def __call__(self, inp): - time.sleep(inp) - - self.counter += 1 - return self.counter diff --git a/python/ray/experimental/serve/frontend/http_frontend.py b/python/ray/experimental/serve/frontend/http_frontend.py index 66973caca838..5d6c8a3a1513 100644 --- a/python/ray/experimental/serve/frontend/http_frontend.py +++ b/python/ray/experimental/serve/frontend/http_frontend.py @@ -4,10 +4,6 @@ import time -import uvicorn -from starlette.applications import Starlette -from starlette.responses import JSONResponse - import ray @@ -45,12 +41,15 @@ def __init__(self, ip="0.0.0.0", port=8080, router="DefaultRouter"): self.router = ray.experimental.named_actors.get_actor(router) def start(self): - default_app = Starlette() + # We have to import flask here to avoid Flask's + # "Working outside of request context." error + from flask import Flask, request, jsonify # noqa: E402 + + default_app = Flask(__name__) - @default_app.route("/{actor}", methods=["GET", "POST"]) - async def dispatch_remote_function(request): - data = await request.json() - actor_name = request.path_params["actor"] + @default_app.route("/", methods=["GET", "POST"]) + def dispatch_remote_function(actor_name): + data = request.get_json() slo_seconds = data.pop("slo_ms") / 1000 deadline = time.perf_counter() + slo_seconds @@ -59,14 +58,12 @@ async def dispatch_remote_function(request): result_future = unwrap( self.router.call.remote(actor_name, inp, deadline)) - - # TODO(simon): change to asyncio ray.get result = ray.get(result_future) - return JSONResponse({ + return jsonify({ "success": True, "actor": actor_name, "result": result }) - uvicorn.run(default_app, host=self.ip, port=self.port) + default_app.run(host=self.ip, port=self.port) diff --git a/python/ray/experimental/serve/mixin.py b/python/ray/experimental/serve/mixin.py index 858572634a04..9061d93ba13a 100644 --- a/python/ray/experimental/serve/mixin.py +++ b/python/ray/experimental/serve/mixin.py @@ -1,3 +1,9 @@ +""" +Mixins are classes that are designed to be included in other class. +This module tries to provide a RayServeMixin class that makes +an actor servable by ray.serve +""" + from __future__ import absolute_import from __future__ import division from __future__ import print_function diff --git a/python/ray/experimental/serve/router/routers.py b/python/ray/experimental/serve/router/routers.py index b11849039ce9..fa103dc0f3e1 100644 --- a/python/ray/experimental/serve/router/routers.py +++ b/python/ray/experimental/serve/router/routers.py @@ -92,8 +92,8 @@ def register_actor( def set_replica(self, actor_name, new_replica_count): """Scale a managed actor according to new_replica_count.""" - assert actor_name in self.managed_actors, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) + assert actor_name in self.managed_actors, ACTOR_NOT_REGISTERED_MSG( + actor_name) current_replicas = len(self.actor_handles[actor_name]) @@ -120,8 +120,8 @@ def call(self, actor_name, data, deadline_s): List[ray.ObjectID] with length 1, the object ID wrapped inside is the result object ID when the query is executed. """ - assert actor_name in self.managed_actors, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) + assert actor_name in self.managed_actors, ACTOR_NOT_REGISTERED_MSG( + actor_name) result_object_id = get_new_oid() @@ -179,8 +179,8 @@ def loop(self): def _get_next_batch(self, actor_name: str) -> List[SingleQuery]: """Get next batch of request for the actor whose name is provided.""" - assert actor_name in self.query_queues, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) + assert actor_name in self.query_queues, ACTOR_NOT_REGISTERED_MSG( + actor_name) inputs = [] batch_size = self.max_batch_size[actor_name] diff --git a/python/ray/experimental/serve/tests/test_actors.py b/python/ray/experimental/serve/tests/test_actors.py index 3b2748b73bf3..0f6e600302db 100644 --- a/python/ray/experimental/serve/tests/test_actors.py +++ b/python/ray/experimental/serve/tests/test_actors.py @@ -7,8 +7,12 @@ import ray from ray.experimental.serve import SingleQuery -from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder -from ray.experimental.serve.examples.counter import Counter, CustomCounter +from ray.experimental.serve.example_actors import ( + ScalerAdder, + VectorizedAdder, + Counter, + CustomCounter, +) from ray.experimental.serve.object_id import get_new_oid INCREMENT = 3 diff --git a/python/ray/experimental/serve/tests/test_deadline_router.py b/python/ray/experimental/serve/tests/test_deadline_router.py index d1d4d6769794..3c0e5e5187b5 100644 --- a/python/ray/experimental/serve/tests/test_deadline_router.py +++ b/python/ray/experimental/serve/tests/test_deadline_router.py @@ -8,8 +8,12 @@ import pytest import ray -from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder -from ray.experimental.serve.examples.halt import SleepCounter, SleepOnFirst +from ray.experimental.serve.example_actors import ( + ScalerAdder, + VectorizedAdder, + SleepCounter, + SleepOnFirst, +) from ray.experimental.serve.object_id import unwrap from ray.experimental.serve.router import DeadlineAwareRouter, start_router diff --git a/python/ray/experimental/serve/tests/test_default_app.py b/python/ray/experimental/serve/tests/test_default_app.py index 5eb758c5f7ed..88ed845f230a 100644 --- a/python/ray/experimental/serve/tests/test_default_app.py +++ b/python/ray/experimental/serve/tests/test_default_app.py @@ -9,7 +9,7 @@ import ray from ray.experimental.serve import DeadlineAwareRouter -from ray.experimental.serve.examples.adder import VectorizedAdder +from ray.experimental.serve.example_actors import VectorizedAdder from ray.experimental.serve.frontend import HTTPFrontendActor from ray.experimental.serve.router import start_router @@ -17,7 +17,7 @@ NUMBER_OF_TRIES = 5 -@pytest.fixture +@pytest.fixture(scope="module") def get_router(): # We need this many workers so resource are not oversubscribed ray.init(num_cpus=4) diff --git a/python/ray/experimental/serve/utils/__init__.py b/python/ray/experimental/serve/utils/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/experimental/serve/utils/priority_queue.py b/python/ray/experimental/serve/utils/priority_queue.py index 05b7045b43a5..479b917e8ead 100644 --- a/python/ray/experimental/serve/utils/priority_queue.py +++ b/python/ray/experimental/serve/utils/priority_queue.py @@ -5,7 +5,23 @@ import heapq -class PriorityQueue: +class BaseQueue: + """Abstract base class for queue abstraction""" + + def push(self, item): + raise NotImplementedError() + + def pop(self): + raise NotImplementedError() + + def try_pop(self): + raise NotImplementedError() + + def __len__(self): + raise NotImplementedError() + + +class PriorityQueue(BaseQueue): """A min-heap class wrapping heapq module.""" def __init__(self): @@ -25,3 +41,25 @@ def try_pop(self): def __len__(self): return len(self.q) + + +class FIFOQueue(BaseQueue): + """A min-heap class wrapping heapq module.""" + + def __init__(self): + self.q = [] + + def push(self, item): + self.q.append(item) + + def pop(self): + return self.q.pop(0) + + def try_pop(self): + if len(self.q) == 0: + return None + else: + return self.pop() + + def __len__(self): + return len(self.q)