diff --git a/.gitignore b/.gitignore index 8c0cad212c3d..da464a65266a 100644 --- a/.gitignore +++ b/.gitignore @@ -135,6 +135,7 @@ build # Pytest Cache **/.pytest_cache +.benchmarks # Vscode .vscode/ diff --git a/python/ray/experimental/serve/README.rst b/python/ray/experimental/serve/README.rst new file mode 100644 index 000000000000..b59672c46912 --- /dev/null +++ b/python/ray/experimental/serve/README.rst @@ -0,0 +1,76 @@ +Ray Serve Module +================ + +``ray.experimental.serve`` is a module for publishing your actors to +interact with outside world. + +Use Case +-------- + +Serve machine learning model +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Scalable anayltics query +~~~~~~~~~~~~~~~~~~~~~~~~ + +Composible pipelines +~~~~~~~~~~~~~~~~~~~~ + +Architecture +------------ + +``ray.experimental.serve`` is implemented in a three-tiered system. Each +tier can scale horizontally. + +In the following illustration, call chain goes from top to bottom. Each +box is one or more replicated ray actors. + +:: + + +-------------------+ +-----------------+ +------------+ + Frontend | HTTP Frontend | | Arrow RPC | | ... | + Tier | | | | | | + +-------------------+ +-----------------+ +------------+ + + +------------------------------------------------------------+ + + +--------------------+ +-------------------+ + Router | Default Router | | Deadline Aware | + Tier | | | Router | + +--------------------+ +-------------------+ + + +------------------------------------------------------------+ + + +----------------+ +--------------+ +-------------+ + Managed | Managed Actor | | ... | | ... | + Actor | Replica | | | | | + Tier +----------------+ +--------------+ +-------------+ + +Frontend Tier +~~~~~~~~~~~~~ + +The frontend tier is repsonsible for interface with the world. Currently +``ray.experimental.serve`` provides implementation for - HTTP Frontend + +And we are planning to add support for - Arrow RPC - zeromq + +Router Tier +~~~~~~~~~~~ + +The router tier receives calls from frontend and route them to the +managed actors. Routers both *route* and *queue* incoming queries. +``ray.experimental.serve`` has native support for (micro-)batching +queries. + +In addition, we implemented a deadline aware routers that will put high +priority queries in the front of the queue so they will be delivered +first. + +Managed Actor Tier +~~~~~~~~~~~~~~~~~~ + +Managed actors will be managed by routers. These actors can contains +arbitrary methods. Methods in the actors class are assumed to be able to +take into a single input. To fully utilize the vectorized instructions, like +``np.sum``, you can use the ``@batched_input`` decorator, it will run your method +in on a micro-batch. diff --git a/python/ray/experimental/serve/__init__.py b/python/ray/experimental/serve/__init__.py new file mode 100644 index 000000000000..15757a9d5f54 --- /dev/null +++ b/python/ray/experimental/serve/__init__.py @@ -0,0 +1,28 @@ +"""A module for serving from actors. + +The ray.experimental.serve module is a module for publishing your actors to +interact with the outside world. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import sys + +assert sys.version_info >= (3, ), ( + "ray.experimental.serve is a python3 only library") + +from ray.experimental.serve.router import (DeadlineAwareRouter, + SingleQuery) # noqa: E402 +from ray.experimental.serve.frontend import HTTPFrontendActor # noqa: E402 +from ray.experimental.serve.mixin import (RayServeMixin, + batched_input) # noqa: E402 + +__all__ = [ + "DeadlineAwareRouter", + "SingleQuery", + "HTTPFrontendActor", + "RayServeMixin", + "batched_input", +] diff --git a/python/ray/experimental/serve/examples/adder.py b/python/ray/experimental/serve/examples/adder.py new file mode 100644 index 000000000000..862e61c7150b --- /dev/null +++ b/python/ray/experimental/serve/examples/adder.py @@ -0,0 +1,47 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + +import ray +from ray.experimental.serve import RayServeMixin, batched_input + + +@ray.remote +class VectorizedAdder(RayServeMixin): + """Actor that adds scaler_increment to input batch. + + result = np.array(input_batch) + scaler_increment + """ + + def __init__(self, scaler_increment): + self.inc = scaler_increment + + @batched_input + def __call__(self, input_batch): + arr = np.array(input_batch) + arr += self.inc + return arr.tolist() + + +@ray.remote +class ScalerAdder(RayServeMixin): + """Actor that adds a scaler_increment to a single input.""" + + def __init__(self, scaler_increment): + self.inc = scaler_increment + + def __call__(self, input_scaler): + return input_scaler + self.inc + + +@ray.remote +class VectorDouble(RayServeMixin): + """Actor that doubles the batched input.""" + + @batched_input + def __call__(self, batched_vectors): + matrix = np.array(batched_vectors) + matrix *= 2 + return [v.tolist() for v in matrix] diff --git a/python/ray/experimental/serve/examples/counter.py b/python/ray/experimental/serve/examples/counter.py new file mode 100644 index 000000000000..369d53fb5a8e --- /dev/null +++ b/python/ray/experimental/serve/examples/counter.py @@ -0,0 +1,29 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray +from ray.experimental.serve import RayServeMixin, batched_input + + +@ray.remote +class Counter(RayServeMixin): + """Return the query id. Used for testing router.""" + + def __init__(self): + self.counter = 0 + + def __call__(self, batched_input): + self.counter += 1 + return self.counter + + +@ray.remote +class CustomCounter(RayServeMixin): + """Return the query id. Used for testing `serve_method` signature.""" + + serve_method = "count" + + @batched_input + def count(self, input_batch): + return [1 for _ in range(len(input_batch))] diff --git a/python/ray/experimental/serve/examples/halt.py b/python/ray/experimental/serve/examples/halt.py new file mode 100644 index 000000000000..eceb94d8653e --- /dev/null +++ b/python/ray/experimental/serve/examples/halt.py @@ -0,0 +1,41 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import ray +from ray.experimental.serve import RayServeMixin, batched_input + + +@ray.remote +class SleepOnFirst(RayServeMixin): + """Sleep on the first request, return batch size. + + Used for testing the DeadlineAwareRouter. + """ + + def __init__(self, sleep_time): + self.nap_time = sleep_time + + @batched_input + def __call__(self, input_batch): + time.sleep(self.nap_time) + return [len(input_batch) for _ in range(len(input_batch))] + + +@ray.remote +class SleepCounter(RayServeMixin): + """Sleep on input argument seconds, return the query id. + + Used to test the DeadlineAwareRouter. + """ + + def __init__(self): + self.counter = 0 + + def __call__(self, inp): + time.sleep(inp) + + self.counter += 1 + return self.counter diff --git a/python/ray/experimental/serve/frontend/__init__.py b/python/ray/experimental/serve/frontend/__init__.py new file mode 100644 index 000000000000..b1cb44636bde --- /dev/null +++ b/python/ray/experimental/serve/frontend/__init__.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.experimental.serve.frontend.http_frontend import HTTPFrontendActor + +__all__ = ["HTTPFrontendActor"] diff --git a/python/ray/experimental/serve/frontend/http_frontend.py b/python/ray/experimental/serve/frontend/http_frontend.py new file mode 100644 index 000000000000..66973caca838 --- /dev/null +++ b/python/ray/experimental/serve/frontend/http_frontend.py @@ -0,0 +1,72 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import uvicorn +from starlette.applications import Starlette +from starlette.responses import JSONResponse + +import ray + + +def unwrap(future): + """Unwrap the result from ray.experimental.server router. + Router returns a list of object ids when you call them. + """ + + return ray.get(future)[0] + + +@ray.remote +class HTTPFrontendActor: + """HTTP API for an Actor. This exposes /{actor_name} endpoint for query. + + Request: + GET /{actor_name} or POST /{actor_name} + Content-type: application/json + { + "slo_ms": float, + "input": any + } + Response: + Content-type: application/json + { + "success": bool, + "actor": str, + "result": any + } + """ + + def __init__(self, ip="0.0.0.0", port=8080, router="DefaultRouter"): + self.ip = ip + self.port = port + self.router = ray.experimental.named_actors.get_actor(router) + + def start(self): + default_app = Starlette() + + @default_app.route("/{actor}", methods=["GET", "POST"]) + async def dispatch_remote_function(request): + data = await request.json() + actor_name = request.path_params["actor"] + + slo_seconds = data.pop("slo_ms") / 1000 + deadline = time.perf_counter() + slo_seconds + + inp = data.pop("input") + + result_future = unwrap( + self.router.call.remote(actor_name, inp, deadline)) + + # TODO(simon): change to asyncio ray.get + result = ray.get(result_future) + + return JSONResponse({ + "success": True, + "actor": actor_name, + "result": result + }) + + uvicorn.run(default_app, host=self.ip, port=self.port) diff --git a/python/ray/experimental/serve/mixin.py b/python/ray/experimental/serve/mixin.py new file mode 100644 index 000000000000..858572634a04 --- /dev/null +++ b/python/ray/experimental/serve/mixin.py @@ -0,0 +1,63 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import traceback +from typing import List + +import ray +from ray.experimental.serve import SingleQuery + + +def batched_input(func): + """Decorator to mark an actor method as accepting only a single input. + + By default methods accept a batch. + """ + func.ray_serve_batched_input = True + return func + + +def _execute_and_seal_error(method, arg, method_name): + """Execute method with arg and return the result. + + If the method fails, return a RayTaskError so it can be sealed in the + resultOID and retried by user. + """ + try: + return method(arg) + except Exception: + return ray.worker.RayTaskError(method_name, traceback.format_exc()) + + +class RayServeMixin: + """Enable a ray actor to interact with ray.serve + + Usage: + ``` + @ray.remote + class MyActor(RayServeMixin): + # This is optional, by default it is "__call__" + serve_method = 'my_method' + + def my_method(self, arg): + ... + ``` + """ + + serve_method = "__call__" + + def _dispatch(self, input_batch: List[SingleQuery]): + """Helper method to dispatch a batch of input to self.serve_method.""" + method = getattr(self, self.serve_method) + if hasattr(method, "ray_serve_batched_input"): + batch = [inp.data for inp in input_batch] + result = _execute_and_seal_error(method, batch, self.serve_method) + for res, inp in zip(result, input_batch): + ray.worker.global_worker.put_object(inp.result_object_id, res) + else: + for inp in input_batch: + result = _execute_and_seal_error(method, inp.data, + self.serve_method) + ray.worker.global_worker.put_object(inp.result_object_id, + result) diff --git a/python/ray/experimental/serve/object_id.py b/python/ray/experimental/serve/object_id.py new file mode 100644 index 000000000000..cdde52532a58 --- /dev/null +++ b/python/ray/experimental/serve/object_id.py @@ -0,0 +1,21 @@ +""" +Helper methods for dealing with ray.ObjectID +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray + + +def unwrap(future): + return ray.get(future)[0] + + +def get_new_oid(): + worker = ray.worker.global_worker + oid = ray._raylet.compute_put_id(worker.current_task_id, + worker.task_context.put_index) + worker.task_context.put_index += 1 + return oid diff --git a/python/ray/experimental/serve/router/__init__.py b/python/ray/experimental/serve/router/__init__.py new file mode 100644 index 000000000000..dae5fcb7ce01 --- /dev/null +++ b/python/ray/experimental/serve/router/__init__.py @@ -0,0 +1,26 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.experimental.serve.router.routers import (DeadlineAwareRouter, + SingleQuery) +import ray + + +def start_router(router_class, router_name): + """Wrapper for starting a router and register it. + + Args: + router_class: The router class to instantiate. + router_name: The name to give to the router. + + Returns: + A handle to newly started router actor. + """ + handle = router_class.remote(router_name) + ray.experimental.register_actor(router_name, handle) + handle.start.remote() + return handle + + +__all__ = ["DeadlineAwareRouter", "SingleQuery"] diff --git a/python/ray/experimental/serve/router/routers.py b/python/ray/experimental/serve/router/routers.py new file mode 100644 index 000000000000..28fd91a8d0f8 --- /dev/null +++ b/python/ray/experimental/serve/router/routers.py @@ -0,0 +1,203 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from collections import defaultdict +from functools import total_ordering +from typing import Callable, Dict, List, Set, Tuple + +import ray +from ray.experimental.serve.object_id import get_new_oid +from ray.experimental.serve.utils.priority_queue import PriorityQueue + +ACTOR_NOT_REGISTERED_MSG: Callable = ( + lambda name: ("Actor {} is not registered with this router. Please use " + "'router.register_actor.remote(...)' " + "to register it.").format(name)) + + +# Use @total_ordering so we can sort SingleQuery +@total_ordering +class SingleQuery: + """A data container for a query. + + Attributes: + data: The request data. + result_object_id: The result object ID. + deadline: The deadline in seconds. + """ + + def __init__(self, data, result_object_id: ray.ObjectID, + deadline_s: float): + self.data = data + self.result_object_id = result_object_id + self.deadline = deadline_s + + def __lt__(self, other): + return self.deadline < other.deadline + + def __eq__(self, other): + return self.deadline == other.deadline + + +@ray.remote +class DeadlineAwareRouter: + """DeadlineAwareRouter is a router that is aware of deadlines. + + It takes into consideration the deadline attached to each query. It will + reorder incoming query based on their deadlines. + """ + + def __init__(self, router_name): + # Runtime Data + self.query_queues: Dict[str, PriorityQueue] = defaultdict( + PriorityQueue) + self.running_queries: Dict[ray.ObjectID, ray.actor.ActorHandle] = {} + self.actor_handles: Dict[str, List[ray.actor.ActorHandle]] = ( + defaultdict(list)) + + # Actor Metadata + self.managed_actors: Dict[str, ray.actor.ActorClass] = {} + self.actor_init_arguments: Dict[str, Tuple[List, Dict]] = {} + self.max_batch_size: Dict[str, int] = {} + + # Router Metadata + self.name = router_name + + def start(self): + """Kick off the router loop""" + + # Note: This is meant for hiding the complexity for a user + # facing method. + # Because the `loop` api can be hard to understand. + ray.experimental.get_actor(self.name).loop.remote() + + def register_actor( + self, + actor_name: str, + actor_class: ray.actor.ActorClass, + init_args: List = [], + init_kwargs: dict = {}, + num_replicas: int = 1, + max_batch_size: int = -1, # Unbounded batch size + ): + """Register a new managed actor. + """ + self.managed_actors[actor_name] = actor_class + self.actor_init_arguments[actor_name] = (init_args, init_kwargs) + self.max_batch_size[actor_name] = max_batch_size + + ray.experimental.get_actor(self.name).set_replica.remote( + actor_name, num_replicas) + + def set_replica(self, actor_name, new_replica_count): + """Scale a managed actor according to new_replica_count.""" + assert actor_name in self.managed_actors, ( + ACTOR_NOT_REGISTERED_MSG(actor_name)) + + current_replicas = len(self.actor_handles[actor_name]) + + # Increase the number of replicas + if new_replica_count > current_replicas: + for _ in range(new_replica_count - current_replicas): + args = self.actor_init_arguments[actor_name][0] + kwargs = self.actor_init_arguments[actor_name][1] + new_actor_handle = self.managed_actors[actor_name].remote( + *args, **kwargs) + self.actor_handles[actor_name].append(new_actor_handle) + + # Decrease the number of replicas + if new_replica_count < current_replicas: + for _ in range(current_replicas - new_replica_count): + # Note actor destructor will be called after all remaining + # calls finish. Therefore it's safe to call del here. + del self.actor_handles[actor_name][-1] + + def call(self, actor_name, data, deadline_s): + """Enqueue a request to one of the actor managed by this router. + + Returns: + List[ray.ObjectID] with length 1, the object ID wrapped inside is + the result object ID when the query is executed. + """ + assert actor_name in self.managed_actors, ( + ACTOR_NOT_REGISTERED_MSG(actor_name)) + + result_object_id = get_new_oid() + self.query_queues[actor_name].push( + SingleQuery(data, result_object_id, deadline_s)) + + return [result_object_id] + + def loop(self): + """Main loop for router. It will does the following things: + + 1. Check which running actors finished. + 2. Iterate over free actors and request queues, dispatch requests batch + to free actors. + 3. Tail recursively schedule itself. + """ + + # 1. Check which running actors finished. + ready_oids, _ = ray.wait( + object_ids=list(self.running_queries.keys()), + num_returns=len(self.running_queries), + timeout=0, + ) + + for ready_oid in ready_oids: + self.running_queries.pop(ready_oid) + busy_actors: Set[ray.actor.ActorHandle] = set( + self.running_queries.values()) + + # 2. Iterate over free actors and request queues, dispatch requests + # batch to free actors. + for actor_name, queue in self.query_queues.items(): + # try to drain the queue + for actor_handle in self.actor_handles[actor_name]: + if len(queue) == 0: + break + + if actor_handle in busy_actors: + continue + + # A free actor found. Dispatch queries. + batch = self._get_next_batch(actor_name) + assert len(batch) + + batch_result_object_id = actor_handle._dispatch.remote(batch) + self._mark_running(batch_result_object_id, actor_handle) + + # 3. Tail recursively schedule itself. + ray.experimental.get_actor(self.name).loop.remote() + + def _get_next_batch(self, actor_name: str) -> List[SingleQuery]: + """Get next batch of request for the actor whose name is provided.""" + assert actor_name in self.query_queues, ( + ACTOR_NOT_REGISTERED_MSG(actor_name)) + + inputs = [] + batch_size = self.max_batch_size[actor_name] + if batch_size == -1: + inp = self.query_queues[actor_name].try_pop() + while inp: + inputs.append(inp) + inp = self.query_queues[actor_name].try_pop() + else: + for _ in range(batch_size): + inp = self.query_queues[actor_name].try_pop() + if inp: + inputs.append(inp) + else: + break + + return inputs + + def _mark_running(self, batch_oid: ray.ObjectID, + actor_handle: ray.actor.ActorHandle): + """Mark actor_handle as running identified by batch_oid. + + This means that if batch_oid is fullfilled, then actor_handle must be + free. + """ + self.running_queries[batch_oid] = actor_handle diff --git a/python/ray/experimental/serve/tests/test_actors.py b/python/ray/experimental/serve/tests/test_actors.py new file mode 100644 index 000000000000..3b2748b73bf3 --- /dev/null +++ b/python/ray/experimental/serve/tests/test_actors.py @@ -0,0 +1,68 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import pytest + +import ray +from ray.experimental.serve import SingleQuery +from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder +from ray.experimental.serve.examples.counter import Counter, CustomCounter +from ray.experimental.serve.object_id import get_new_oid + +INCREMENT = 3 + + +@pytest.fixture(scope="module") +def ray_start(): + ray.init(num_cpus=4) + yield + ray.shutdown() + + +@pytest.fixture +def generated_inputs(): + deadline = 11111.11 + inputs = [] + input_arr = np.arange(10) + for i in input_arr: + oid = get_new_oid() + inputs.append( + SingleQuery(data=i, result_object_id=oid, deadline_s=deadline)) + return inputs + + +def test_vadd(ray_start, generated_inputs): + adder = VectorizedAdder.remote(INCREMENT) + inputs = generated_inputs + oids = [inp.result_object_id for inp in inputs] + input_data = [inp.data for inp in inputs] + + adder._dispatch.remote(inputs) + result_arr = np.array(ray.get(oids)) + assert np.array_equal(result_arr, np.array(input_data) + INCREMENT) + + +def test_batched_input(ray_start, generated_inputs): + counter = Counter.remote() + counter._dispatch.remote(generated_inputs) + oids = [inp.result_object_id for inp in generated_inputs] + returned_query_ids = np.array(ray.get(oids)) + assert np.array_equal(returned_query_ids, np.arange(1, 11)) + + +def test_custom_method(ray_start, generated_inputs): + dummy = CustomCounter.remote() + dummy._dispatch.remote(generated_inputs) + oids = [inp.result_object_id for inp in generated_inputs] + returned_query_ids = np.array(ray.get(oids)) + assert np.array_equal(returned_query_ids, np.ones(10)) + + +def test_exception(ray_start): + adder = ScalerAdder.remote(INCREMENT) + query = SingleQuery("this can't be added with int", get_new_oid(), 10) + adder._dispatch.remote([query]) + with pytest.raises(ray.worker.RayTaskError): + ray.get(query.result_object_id) diff --git a/python/ray/experimental/serve/tests/test_deadline_router.py b/python/ray/experimental/serve/tests/test_deadline_router.py new file mode 100644 index 000000000000..d1d4d6769794 --- /dev/null +++ b/python/ray/experimental/serve/tests/test_deadline_router.py @@ -0,0 +1,91 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import numpy as np +import pytest + +import ray +from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder +from ray.experimental.serve.examples.halt import SleepCounter, SleepOnFirst +from ray.experimental.serve.object_id import unwrap +from ray.experimental.serve.router import DeadlineAwareRouter, start_router + + +@pytest.fixture(scope="module") +def router(): + # We need at least 5 workers so resource won't be oversubscribed + ray.init(num_cpus=5) + + # The following two blobs are equivalent + # + # handle = DeadlineAwareRouter.remote("DefaultTestRouter") + # ray.experimental.register_actor("DefaultTestRouter", handle) + # handle.start.remote() + # + # handle = start_router(DeadlineAwareRouter, "DefaultRouter") + handle = start_router(DeadlineAwareRouter, "DefaultRouter") + + handle.register_actor.remote( + "VAdder", VectorizedAdder, + init_kwargs={"scaler_increment": 1}) # init args + handle.register_actor.remote( + "SAdder", ScalerAdder, init_kwargs={"scaler_increment": 2}) + handle.register_actor.remote( + "SleepFirst", SleepOnFirst, init_kwargs={"sleep_time": 1}) + handle.register_actor.remote( + "SleepCounter", SleepCounter, max_batch_size=1) + + yield handle + + ray.shutdown() + + +@pytest.fixture +def now(): + return time.perf_counter() + + +def test_throw_assert(router: DeadlineAwareRouter, now: float): + try: + ray.get(router.call.remote("Action-Not-Exist", "input", now + 1)) + except ray.worker.RayTaskError as e: + assert "AssertionError" in e.traceback_str + + +def test_vector_adder(router: DeadlineAwareRouter, now: float): + result = unwrap(router.call.remote("VAdder", 42, now + 1)) + assert isinstance(result, ray.ObjectID) + assert ray.get(result) == 43 + + +def test_scaler_adder(router: DeadlineAwareRouter, now: float): + result = unwrap(router.call.remote("SAdder", 42, now + 1)) + assert isinstance(result, ray.ObjectID) + assert ray.get(result) == 44 + + +def test_batching_ability(router: DeadlineAwareRouter, now: float): + first = unwrap(router.call.remote("SleepFirst", 1, now + 1)) + rest = [ + unwrap(router.call.remote("SleepFirst", 1, now + 1)) for _ in range(10) + ] + assert ray.get(first) == 1 + assert np.alltrue(np.array(ray.get(rest)) == 10) + + +def test_deadline_priority(router: DeadlineAwareRouter, now: float): + # first sleep 2 seconds + first = unwrap(router.call.remote("SleepCounter", 2, now + 1)) + + # then send a request to with deadline farther away + second = unwrap(router.call.remote("SleepCounter", 0, now + 10)) + + # and a request with sooner deadline + third = unwrap(router.call.remote("SleepCounter", 0, now + 1)) + + id_1, id_2, id_3 = ray.get([first, second, third]) + + assert id_1 < id_3 < id_2 diff --git a/python/ray/experimental/serve/tests/test_default_app.py b/python/ray/experimental/serve/tests/test_default_app.py new file mode 100644 index 000000000000..5eb758c5f7ed --- /dev/null +++ b/python/ray/experimental/serve/tests/test_default_app.py @@ -0,0 +1,46 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import pytest +import requests + +import ray +from ray.experimental.serve import DeadlineAwareRouter +from ray.experimental.serve.examples.adder import VectorizedAdder +from ray.experimental.serve.frontend import HTTPFrontendActor +from ray.experimental.serve.router import start_router + +ROUTER_NAME = "DefaultRouter" +NUMBER_OF_TRIES = 5 + + +@pytest.fixture +def get_router(): + # We need this many workers so resource are not oversubscribed + ray.init(num_cpus=4) + router = start_router(DeadlineAwareRouter, ROUTER_NAME) + yield router + ray.shutdown() + + +def test_http_basic(get_router): + router = get_router + a = HTTPFrontendActor.remote(router=ROUTER_NAME) + a.start.remote() + + router.register_actor.remote( + "VAdder", VectorizedAdder, init_kwargs={"scaler_increment": 1}) + + for _ in range(NUMBER_OF_TRIES): + try: + url = "http://0.0.0.0:8080/VAdder" + payload = {"input": 10, "slo_ms": 1000} + resp = requests.request("POST", url, json=payload) + except Exception: + # it is possible that the actor is not yet instantiated + time.sleep(1) + + assert resp.json() == {"success": True, "actor": "VAdder", "result": 11} diff --git a/python/ray/experimental/serve/utils/priority_queue.py b/python/ray/experimental/serve/utils/priority_queue.py new file mode 100644 index 000000000000..05b7045b43a5 --- /dev/null +++ b/python/ray/experimental/serve/utils/priority_queue.py @@ -0,0 +1,27 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import heapq + + +class PriorityQueue: + """A min-heap class wrapping heapq module.""" + + def __init__(self): + self.q = [] + + def push(self, item): + heapq.heappush(self.q, item) + + def pop(self): + return heapq.heappop(self.q) + + def try_pop(self): + if len(self.q) == 0: + return None + else: + return self.pop() + + def __len__(self): + return len(self.q) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 8d91766caee0..9b82c5c389e1 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -755,7 +755,7 @@ def testNoArgs(self): def no_op(): pass - self.init_ray() + self.ray_start() ray.get(no_op.remote())