-
Notifications
You must be signed in to change notification settings - Fork 7k
Experimental asyncio support #2015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 101 commits
Commits
Show all changes
103 commits
Select commit
Hold shift + click to select a range
6c262c6
Init commit for async plasma client
suquark 8609a0c
Create an eventloop model for ray/plasma
suquark 2ed2d8d
Implement a poll-like selector base on `ray.wait`. Huge improvements.
suquark c8fd938
Allow choosing workers & selectors
suquark 4d17096
remove original design
suquark bbef768
initial implementation of epoll-like selector for plasma
suquark b95c891
Add a param for `worker` used in `PlasmaSelectorEventLoop`
suquark 08995f1
Allow accepting a `Future` which returns object_id
suquark 13747bf
Do not need `io.py` anymore
suquark 747e65a
Create a basic testing model
suquark 4e954a1
fix: `ray.wait` returns tuple of lists
suquark 80e8b3c
fix a few bugs
suquark f127927
improving performance & bug fixing
suquark e4849b0
add test
suquark abdbbb7
several improvements & fixing
suquark 47de57d
fix relative import
suquark 1e83886
[async] change code format, remove old files
suquark a85d640
[async] Create context wrapper for the eventloop
suquark 1753282
[async] fix: context should return a value
suquark ba99e03
[async] Implement futures grouping
suquark 04b33e0
[async] Fix bugs & replace old functions
suquark 203400c
[async] Fix bugs found in tests
suquark 64ad596
[async] Implement `PlasmaEpoll`
suquark bd1c6a9
[async] Make test faster, add tests for epoll
suquark 6f8e2e9
[async] Fix code format
suquark 9cd12a1
[async] Add comments for main code.
suquark 5bd2584
[async] Fix import path.
suquark a3e599a
[async] Fix test.
suquark e489e9f
[async] Compatibility.
suquark 2554cd9
[async] less verbose to not annoy the CI.
suquark 6b080d3
[async] Add test for new API
suquark 60b0f05
[async] Allow showing debug info in some of the test.
suquark e09f840
[async] Fix test.
suquark 6db4ce3
[async] Proper shutdown.
suquark 40b5392
[async] Lint~
suquark 734d1c7
[async] Move files to experimental and create API
suquark c107284
[async] Use async/await syntax
suquark ecc2743
[async] Fix names & styles
suquark c5648ce
[async] comments
suquark e0ae7a1
[async] bug fixing & use pytest
suquark 91bdd93
[async] bug fixing & change tests
suquark 739f43f
[async] use logger
suquark 6c0a5fd
[async] add tests
suquark ad25009
[async] lint
suquark 7db7c3d
[async] type checking
suquark 9b16b04
[async] add more tests
suquark a9b9690
[async] fix bugs on waiting a future while timeout. Add more docs.
suquark 997bcc0
[async] Formal docs.
suquark 60e57f6
[async] Add typing info since these codes are compatible with py3.5+.
suquark f6a80bd
[async] Documents.
suquark 292431f
[async] Lint.
suquark f516ad1
[async] Fix deprecated call.
suquark 0ed39ba
[async] Fix deprecated call.
suquark 64782aa
[async] Implement a more reasonable way for dealing with pending inputs.
suquark 9ecb798
[async] Fix docs
suquark 7af1814
[async] Lint
suquark 4e61498
[async] Fix bug: Type for time
suquark bbe0d2d
[async] Set our eventloop as the default eventloop so that we can get…
suquark b76810b
[async] Update test & docs.
suquark 03de3d3
[async] Lint.
suquark fad1ca4
[async] Temporarily print more debug info.
suquark f63a855
[async] Use `Poll` as a default option.
suquark b94542f
[async] Limit resources.
suquark 8412461
new async implementation for Ray
suquark 363a0b6
implement linked list
suquark 1057d57
bug fix
suquark 519fd9d
update
suquark 27453b0
support seamless async operations
suquark cce0078
update
suquark 08046f9
update API
suquark 29bceac
fix tests
suquark e5d42f5
lint
suquark 22b2a14
bug fix
suquark 3e6d1e8
refactor names
suquark d9e10ce
improve doc
suquark 50a3ee9
properly shutdown async_api
suquark b19cf71
doc
suquark 9c05c1e
Change the table on the index page.
suquark 0dbca21
Adjust table size.
suquark a2b6826
Only keeps `as_future`.
suquark 4d2bfec
change how we init connection
suquark c35d1b1
init connection in `ray.worker.connect`
suquark 7ad33a9
doc
suquark e8d21b8
fix
suquark 4f4ea6b
Move initialization code into the module.
suquark a82b9c3
Fix docs & code
suquark 37fdc47
Update pyarrow version.
suquark 46f90ca
lint
suquark 8ba9a78
Restore index.rst
suquark 38e32ba
Add known issues.
suquark d96fb62
Merge branch 'master' of github.com:suquark/ray
suquark 4563b0a
Apply suggestions from code review
ericl 5fd5d8a
rename
suquark 0a9b217
Update async_api.rst
ericl a8e24b8
Update async_api.py
ericl a238cd6
Update async_api.rst
ericl 837cbdd
Update async_api.py
ericl 4d8ab75
Update worker.py
ericl d2bf03a
Update async_api.rst
ericl 7f0b61c
fix tests
suquark 9a78bb9
lint
suquark ce0fead
lint
suquark 8af2703
replace the magic number
suquark File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| Async API (Experimental) | ||
| ======================== | ||
|
|
||
| Since Python 3.5, it is possible to write concurrent code using the ``async/await`` `syntax <https://docs.python.org/3/library/asyncio.html>`__. | ||
|
|
||
| This document describes Ray's support for asyncio, which enables integration with popular async frameworks (e.g., aiohttp, aioredis, etc.) for high performance web and prediction serving. | ||
|
|
||
| Starting Ray | ||
| ------------ | ||
|
|
||
| You must initialize Ray first. | ||
|
|
||
| Please refer to `Starting Ray`_ for instructions. | ||
|
|
||
| .. _`Starting Ray`: http://ray.readthedocs.io/en/latest/tutorial.html#starting-ray | ||
|
|
||
|
|
||
| Converting Ray objects into asyncio futures | ||
| ------------------------------------------- | ||
|
|
||
| Ray object IDs can be converted into asyncio futures with ``ray.experimental.async_api``. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| import asyncio | ||
| import time | ||
| import ray | ||
| from ray.experimental import async_api | ||
|
|
||
| @ray.remote | ||
| def f(): | ||
| time.sleep(1) | ||
| return {'key1': ['value']} | ||
|
|
||
| ray.init() | ||
| future = async_api.as_future(f.remote()) | ||
| asyncio.get_event_loop().run_until_complete(future) # {'key1': ['value']} | ||
|
|
||
|
|
||
| .. autofunction:: ray.experimental.async_api.as_future | ||
|
|
||
|
|
||
| Example Usage | ||
| ------------- | ||
|
|
||
| +----------------------------------------+-----------------------------------------------------+ | ||
| | **Basic Python** | **Distributed with Ray** | | ||
| +----------------------------------------+-----------------------------------------------------+ | ||
| | .. code-block:: python | .. code-block:: python | | ||
| | | | | ||
| | # Execute f serially. | # Execute f in parallel. | | ||
| | | | | ||
| | | | | ||
| | def f(): | @ray.remote | | ||
| | time.sleep(1) | def f(): | | ||
| | return 1 | time.sleep(1) | | ||
| | | return 1 | | ||
| | | | | ||
| | | ray.init() | | ||
| | results = [f() for i in range(4)] | results = ray.get([f.remote() for i in range(4)]) | | ||
| +----------------------------------------+-----------------------------------------------------+ | ||
| | **Async Python** | **Async Ray** | | ||
| +----------------------------------------+-----------------------------------------------------+ | ||
| | .. code-block:: python | .. code-block:: python | | ||
| | | | | ||
| | # Execute f asynchronously. | # Execute f asynchronously with Ray/asyncio. | | ||
| | | | | ||
| | | from ray.experimental import async_api | | ||
| | | | | ||
| | | @ray.remote | | ||
| | async def f(): | def f(): | | ||
| | await asyncio.sleep(1) | time.sleep(1) | | ||
| | return 1 | return 1 | | ||
| | | | | ||
| | | ray.init() | | ||
| | loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | | ||
| | tasks = [f() for i in range(4)] | tasks = [async_api.as_future(f.remote()) | | ||
| | | for i in range(4)] | | ||
| | results = loop.run_until_complete( | results = loop.run_until_complete( | | ||
| | asyncio.gather(tasks)) | asyncio.gather(tasks)) | | ||
| +----------------------------------------+-----------------------------------------------------+ | ||
|
|
||
|
|
||
| Known Issues | ||
| ------------ | ||
|
|
||
| Async API support is experimental, and we are working to improve its performance. Please `let us know <https://github.com/ray-project/ray/issues>`__ any issues you encounter. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| # Note: asyncio is only compatible with Python 3 | ||
|
|
||
| import asyncio | ||
| import ray | ||
| from ray.experimental.async_plasma import PlasmaProtocol, PlasmaEventHandler | ||
|
|
||
| handler = None | ||
| transport = None | ||
| protocol = None | ||
|
|
||
|
|
||
| async def _async_init(): | ||
| global handler, transport, protocol | ||
| if handler is None: | ||
| worker = ray.worker.global_worker | ||
| loop = asyncio.get_event_loop() | ||
| worker.plasma_client.subscribe() | ||
| rsock = worker.plasma_client.get_notification_socket() | ||
| handler = PlasmaEventHandler(loop, worker) | ||
| transport, protocol = await loop.create_connection( | ||
| lambda: PlasmaProtocol(worker.plasma_client, handler), sock=rsock) | ||
|
|
||
|
|
||
| def init(): | ||
| """ | ||
| Initialize synchronously. | ||
| """ | ||
| loop = asyncio.get_event_loop() | ||
| if loop.is_running(): | ||
| raise Exception("You must initialize the Ray async API by calling " | ||
| "async_api.init() or async_api.as_future(obj) before " | ||
| "the event loop starts.") | ||
| else: | ||
| asyncio.get_event_loop().run_until_complete(_async_init()) | ||
|
|
||
|
|
||
| def as_future(object_id): | ||
| """Turn an object_id into a Future object. | ||
|
|
||
| Args: | ||
| object_id: A Ray object_id. | ||
|
|
||
| Returns: | ||
| PlasmaObjectFuture: A future object that waits the object_id. | ||
| """ | ||
| if handler is None: | ||
| init() | ||
| return handler.as_future(object_id) | ||
|
|
||
|
|
||
| def shutdown(): | ||
|
||
| """Manually shutdown the async API. | ||
|
|
||
| Cancels all related tasks and all the socket transportation. | ||
| """ | ||
| global handler, transport, protocol | ||
| if handler is not None: | ||
| handler.close() | ||
| transport.close() | ||
| handler = None | ||
| transport = None | ||
| protocol = None | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.