Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8e21ea6
Start porting
simon-mo Feb 18, 2019
1fdcc4b
Start re-implement corvette
simon-mo Feb 19, 2019
c477686
Add more models to testing
simon-mo Feb 20, 2019
a9e2cb5
Add more tests
simon-mo Feb 20, 2019
7937e03
Move to ray.experimental
simon-mo Mar 6, 2019
8615980
Move gitignore to ray root
simon-mo Mar 6, 2019
ffc78fd
Remove makefile
simon-mo Mar 6, 2019
75f3865
init_ray -> ray_start
simon-mo Mar 6, 2019
5166cfa
Address some comments
simon-mo Mar 6, 2019
21c1c9e
Start porting
simon-mo Feb 18, 2019
d139eeb
Start re-implement corvette
simon-mo Feb 19, 2019
0d9c654
Add more models to testing
simon-mo Feb 20, 2019
92d27c5
Add more tests
simon-mo Feb 20, 2019
668b812
Move to ray.experimental
simon-mo Mar 6, 2019
92adeb8
Move gitignore to ray root
simon-mo Mar 6, 2019
32952af
Remove makefile
simon-mo Mar 6, 2019
46986c4
init_ray -> ray_start
simon-mo Mar 6, 2019
479ba4e
Address some comments
simon-mo Mar 6, 2019
28a8fc3
Linting.
robertnishihara Mar 6, 2019
9282706
Add http_frontend
simon-mo Mar 6, 2019
5ea89e8
Revert runtest
simon-mo Mar 7, 2019
5f03e75
Format code, remove README.md
simon-mo Mar 7, 2019
d08a75e
Remove trailing whitespace
simon-mo Mar 7, 2019
6898748
Merge branch 'ray-serve' of github.com:simon-mo/ray into ray-serve
simon-mo Mar 7, 2019
ca25589
Add imports back
simon-mo Mar 7, 2019
410d529
Fix readme
simon-mo Mar 7, 2019
ea62de8
Linting
robertnishihara Mar 8, 2019
ba73b76
fix test
simon-mo Mar 8, 2019
c08429e
Linting.
robertnishihara Mar 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ build

# Pytest Cache
**/.pytest_cache
.benchmarks

# Vscode
.vscode/
Expand Down
76 changes: 76 additions & 0 deletions python/ray/experimental/serve/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
Ray Serve Module
================

``ray.experimental.serve`` is a module for publishing your actors to
interact with outside world.

Use Case
--------

Serve machine learning model
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Scalable anayltics query
~~~~~~~~~~~~~~~~~~~~~~~~

Composible pipelines
~~~~~~~~~~~~~~~~~~~~

Architecture
------------

``ray.experimental.serve`` is implemented in a three-tiered system. Each
tier can scale horizontally.

In the following illustration, call chain goes from top to bottom. Each
box is one or more replicated ray actors.

::

+-------------------+ +-----------------+ +------------+
Frontend | HTTP Frontend | | Arrow RPC | | ... |
Tier | | | | | |
+-------------------+ +-----------------+ +------------+

+------------------------------------------------------------+

+--------------------+ +-------------------+
Router | Default Router | | Deadline Aware |
Tier | | | Router |
+--------------------+ +-------------------+

+------------------------------------------------------------+

+----------------+ +--------------+ +-------------+
Managed | Managed Actor | | ... | | ... |
Actor | Replica | | | | |
Tier +----------------+ +--------------+ +-------------+

Frontend Tier
~~~~~~~~~~~~~

The frontend tier is repsonsible for interface with the world. Currently
``ray.experimental.serve`` provides implementation for - HTTP Frontend

And we are planning to add support for - Arrow RPC - zeromq

Router Tier
~~~~~~~~~~~

The router tier receives calls from frontend and route them to the
managed actors. Routers both *route* and *queue* incoming queries.
``ray.experimental.serve`` has native support for (micro-)batching
queries.

In addition, we implemented a deadline aware routers that will put high
priority queries in the front of the queue so they will be delivered
first.

Managed Actor Tier
~~~~~~~~~~~~~~~~~~

Managed actors will be managed by routers. These actors can contains
arbitrary methods. Methods in the actors class are assumed to be able to
take into a single input. To fully utilize the vectorized instructions, like
``np.sum``, you can use the ``@batched_input`` decorator, it will run your method
in on a micro-batch.
28 changes: 28 additions & 0 deletions python/ray/experimental/serve/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""A module for serving from actors.

The ray.experimental.serve module is a module for publishing your actors to
interact with the outside world.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys

assert sys.version_info >= (3, ), (
"ray.experimental.serve is a python3 only library")

from ray.experimental.serve.router import (DeadlineAwareRouter,
SingleQuery) # noqa: E402
from ray.experimental.serve.frontend import HTTPFrontendActor # noqa: E402
from ray.experimental.serve.mixin import (RayServeMixin,
batched_input) # noqa: E402

__all__ = [
"DeadlineAwareRouter",
"SingleQuery",
"HTTPFrontendActor",
"RayServeMixin",
"batched_input",
]
47 changes: 47 additions & 0 deletions python/ray/experimental/serve/examples/adder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np

import ray
from ray.experimental.serve import RayServeMixin, batched_input


@ray.remote
class VectorizedAdder(RayServeMixin):
"""Actor that adds scaler_increment to input batch.

result = np.array(input_batch) + scaler_increment
"""

def __init__(self, scaler_increment):
self.inc = scaler_increment

@batched_input
def __call__(self, input_batch):
arr = np.array(input_batch)
arr += self.inc
return arr.tolist()


@ray.remote
class ScalerAdder(RayServeMixin):
"""Actor that adds a scaler_increment to a single input."""

def __init__(self, scaler_increment):
self.inc = scaler_increment

def __call__(self, input_scaler):
return input_scaler + self.inc


@ray.remote
class VectorDouble(RayServeMixin):
"""Actor that doubles the batched input."""

@batched_input
def __call__(self, batched_vectors):
matrix = np.array(batched_vectors)
matrix *= 2
return [v.tolist() for v in matrix]
29 changes: 29 additions & 0 deletions python/ray/experimental/serve/examples/counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
from ray.experimental.serve import RayServeMixin, batched_input


@ray.remote
class Counter(RayServeMixin):
"""Return the query id. Used for testing router."""

def __init__(self):
self.counter = 0

def __call__(self, batched_input):
self.counter += 1
return self.counter


@ray.remote
class CustomCounter(RayServeMixin):
"""Return the query id. Used for testing `serve_method` signature."""

serve_method = "count"

@batched_input
def count(self, input_batch):
return [1 for _ in range(len(input_batch))]
41 changes: 41 additions & 0 deletions python/ray/experimental/serve/examples/halt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

import ray
from ray.experimental.serve import RayServeMixin, batched_input


@ray.remote
class SleepOnFirst(RayServeMixin):
"""Sleep on the first request, return batch size.

Used for testing the DeadlineAwareRouter.
"""

def __init__(self, sleep_time):
self.nap_time = sleep_time

@batched_input
def __call__(self, input_batch):
time.sleep(self.nap_time)
return [len(input_batch) for _ in range(len(input_batch))]


@ray.remote
class SleepCounter(RayServeMixin):
"""Sleep on input argument seconds, return the query id.

Used to test the DeadlineAwareRouter.
"""

def __init__(self):
self.counter = 0

def __call__(self, inp):
time.sleep(inp)

self.counter += 1
return self.counter
7 changes: 7 additions & 0 deletions python/ray/experimental/serve/frontend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from ray.experimental.serve.frontend.http_frontend import HTTPFrontendActor

__all__ = ["HTTPFrontendActor"]
72 changes: 72 additions & 0 deletions python/ray/experimental/serve/frontend/http_frontend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

import uvicorn
from starlette.applications import Starlette
from starlette.responses import JSONResponse

import ray


def unwrap(future):
"""Unwrap the result from ray.experimental.server router.
Router returns a list of object ids when you call them.
"""

return ray.get(future)[0]


@ray.remote
class HTTPFrontendActor:
"""HTTP API for an Actor. This exposes /{actor_name} endpoint for query.

Request:
GET /{actor_name} or POST /{actor_name}
Content-type: application/json
{
"slo_ms": float,
"input": any
}
Response:
Content-type: application/json
{
"success": bool,
"actor": str,
"result": any
}
"""

def __init__(self, ip="0.0.0.0", port=8080, router="DefaultRouter"):
self.ip = ip
self.port = port
self.router = ray.experimental.named_actors.get_actor(router)

def start(self):
default_app = Starlette()

@default_app.route("/{actor}", methods=["GET", "POST"])
async def dispatch_remote_function(request):
data = await request.json()
actor_name = request.path_params["actor"]

slo_seconds = data.pop("slo_ms") / 1000
deadline = time.perf_counter() + slo_seconds

inp = data.pop("input")

result_future = unwrap(
self.router.call.remote(actor_name, inp, deadline))

# TODO(simon): change to asyncio ray.get
result = ray.get(result_future)

return JSONResponse({
"success": True,
"actor": actor_name,
"result": result
})

uvicorn.run(default_app, host=self.ip, port=self.port)
63 changes: 63 additions & 0 deletions python/ray/experimental/serve/mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import traceback
from typing import List

import ray
from ray.experimental.serve import SingleQuery


def batched_input(func):
"""Decorator to mark an actor method as accepting only a single input.

By default methods accept a batch.
"""
func.ray_serve_batched_input = True
return func


def _execute_and_seal_error(method, arg, method_name):
"""Execute method with arg and return the result.

If the method fails, return a RayTaskError so it can be sealed in the
resultOID and retried by user.
"""
try:
return method(arg)
except Exception:
return ray.worker.RayTaskError(method_name, traceback.format_exc())


class RayServeMixin:
"""Enable a ray actor to interact with ray.serve

Usage:
```
@ray.remote
class MyActor(RayServeMixin):
# This is optional, by default it is "__call__"
serve_method = 'my_method'

def my_method(self, arg):
...
```
"""

serve_method = "__call__"

def _dispatch(self, input_batch: List[SingleQuery]):
"""Helper method to dispatch a batch of input to self.serve_method."""
method = getattr(self, self.serve_method)
if hasattr(method, "ray_serve_batched_input"):
batch = [inp.data for inp in input_batch]
result = _execute_and_seal_error(method, batch, self.serve_method)
for res, inp in zip(result, input_batch):
ray.worker.global_worker.put_object(inp.result_object_id, res)
else:
for inp in input_batch:
result = _execute_and_seal_error(method, inp.data,
self.serve_method)
ray.worker.global_worker.put_object(inp.result_object_id,
result)
Loading