-
Notifications
You must be signed in to change notification settings - Fork 844
TINKERPOP-2546 Gremlin-Python Client Transport Layer to use AIOHTTP #1415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
lyndonbauto
wants to merge
11
commits into
apache:master
from
lyndonbauto:lyndon/gremlin-python-asyncio-transport
Closed
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
3037292
[1] Pushing current state which has working AIOHTTP transport layer.
lyndonbauto cd6afbf
[1] Updated transport library for test that was accidentally reverted.
lyndonbauto e4bd01b
Merge branch 'master' of https://github.com/apache/tinkerpop into lyn…
lyndonbauto 6e1055f
[1] Missed this file from last commit.
lyndonbauto 79715ab
[1] Removed tornado
lyndonbauto 6b598ac
[1] Finished removing tornado
lyndonbauto 7f5dc1a
[1] Documentation changes for gremlin-variants Tornado->AIOHTTP
lyndonbauto 7f37340
[1] Added a couple extra comments to the asciidoc info
lyndonbauto ffe5a98
[1] Added nest_asyncio
lyndonbauto a172c31
[1] Made nest_asyncio usage optional based on init settings
lyndonbauto 3643316
[1] Pulled master and resolved conflicts
lyndonbauto File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
129 changes: 129 additions & 0 deletions
129
gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| # | ||
|
|
||
|
|
||
| import aiohttp | ||
| import asyncio | ||
| import async_timeout | ||
|
|
||
| from gremlin_python.driver.transport import AbstractBaseTransport | ||
|
|
||
| __author__ = 'Lyndon Bauto ([email protected])' | ||
|
|
||
|
|
||
| class AiohttpTransport(AbstractBaseTransport): | ||
| nest_asyncio_applied = False | ||
|
|
||
| # Default heartbeat of 5.0 seconds. | ||
| def __init__(self, read_timeout=None, write_timeout=None, ssl_options=None, | ||
| heartbeat=5.0, max_content_length=None, call_from_event_loop=None): | ||
| if call_from_event_loop is not None and call_from_event_loop and not AiohttpTransport.nest_asyncio_applied: | ||
| """ | ||
| The AiohttpTransport implementation uses the asyncio event loop. Because of this, it cannot be called within an | ||
| event loop without nest_asyncio. If the code is ever refactored so that it can be called within an event loop | ||
| this import and call can be removed. Without this, applications which use the event loop to call gremlin-python | ||
| (such as Jupyter) will not work. | ||
| """ | ||
| import nest_asyncio | ||
| nest_asyncio.apply() | ||
| AiohttpTransport.nest_asyncio_applied = True | ||
|
|
||
| # Start event loop and initialize websocket and client to None | ||
| self._loop = asyncio.new_event_loop() | ||
| self._websocket = None | ||
| self._client_session = None | ||
|
|
||
| # Set all inner variables to parameters passed in. | ||
| self._read_timeout = read_timeout | ||
| self._write_timeout = write_timeout | ||
| self._ssl_options = ssl_options | ||
| self._heartbeat = heartbeat | ||
| self._max_content_length = max_content_length | ||
|
|
||
| def connect(self, url, headers=None): | ||
| # Inner function to perform async connect. | ||
| async def async_connect(): | ||
| # Start client session and use it to create a websocket with all the connection options provided. | ||
| self._client_session = aiohttp.ClientSession(loop=self._loop) | ||
| self._websocket = await self._client_session.ws_connect(url, | ||
| ssl=self._ssl_options, | ||
| headers=headers, | ||
| heartbeat=self._heartbeat, | ||
| max_msg_size=self._max_content_length) | ||
|
|
||
| # Execute the async connect synchronously. | ||
| self._loop.run_until_complete(async_connect()) | ||
|
|
||
| def write(self, message): | ||
| # Inner function to perform async write. | ||
| async def async_write(): | ||
| async with async_timeout.timeout(self._write_timeout): | ||
| await self._websocket.send_bytes(message) | ||
|
|
||
| # Execute the async write synchronously. | ||
| self._loop.run_until_complete(async_write()) | ||
|
|
||
| def read(self): | ||
| # Inner function to perform async read. | ||
| async def async_read(): | ||
| async with async_timeout.timeout(self._read_timeout): | ||
| return await self._websocket.receive() | ||
|
|
||
| # Execute the async read synchronously. | ||
| msg = self._loop.run_until_complete(async_read()) | ||
|
|
||
| # Need to handle multiple potential message types. | ||
| if msg.type == aiohttp.WSMsgType.close: | ||
| # Server is closing connection, shutdown and throw exception. | ||
| self.close() | ||
| raise RuntimeError("Connection was closed by server.") | ||
| elif msg.type == aiohttp.WSMsgType.closed: | ||
| # Should not be possible since our loop and socket would be closed. | ||
| raise RuntimeError("Connection was already closed.") | ||
| elif msg.type == aiohttp.WSMsgType.error: | ||
| # Error on connection, try to convert message to a string in error. | ||
| raise RuntimeError("Received error on read: '" + str(msg.data) + "'") | ||
| elif msg.type == aiohttp.WSMsgType.text: | ||
| # Convert message to bytes. | ||
| data = msg.data.strip().encode('utf-8') | ||
| else: | ||
| # General handle, return byte data. | ||
| data = msg.data | ||
| return data | ||
|
|
||
| def close(self): | ||
| # Inner function to perform async close. | ||
| async def async_close(): | ||
| if not self._websocket.closed: | ||
| await self._websocket.close() | ||
| if not self._client_session.closed: | ||
| await self._client_session.close() | ||
|
|
||
| # If the loop is not closed (connection hasn't already been closed) | ||
| if not self._loop.is_closed(): | ||
| # Execute the async close synchronously. | ||
| self._loop.run_until_complete(async_close()) | ||
|
|
||
| # Close the event loop. | ||
| self._loop.close() | ||
|
|
||
| @property | ||
| def closed(self): | ||
| # Connection is closed if either the websocket or the client session is closed. | ||
| return self._websocket.closed or self._client_session.closed |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 0 additions & 61 deletions
61
gremlin-python/src/main/python/gremlin_python/driver/tornado/transport.py
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good - two things: