diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9447cb0f82a..36b4c6498f3 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -25,6 +25,13 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima This release also includes changes from <>. +* Changed transport implementation to use AIOHTTP instead of Tornado for gremlin-python. +* Added max_content_length and unit test for it in gremlin-python. +* Removed compression_option support for transport in gremlin-python. +* Fixed event loop issues and added unit test for it in gremlin-python. +* Fixed DriverRemoteConnection multithreading issues and added unit test for it in gremlin-python. +* Fixed heartbeat timeout issues and tested with local server manually for gremlin-python. +* Fixed build errors emitted for gremlin-python (asyncio task destroyed but is pending error). * Added `gremlin-language` module. * Allowed the possibility for the propagation of `null` as a `Traverser` in Gremlin. * Added a fully shaded version of `gremlin-driver`. diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc index 347da4e945c..2e4b22b0d54 100644 --- a/docs/src/reference/gremlin-variants.asciidoc +++ b/docs/src/reference/gremlin-variants.asciidoc @@ -844,14 +844,15 @@ can be passed to the `Client` or `DriverRemoteConnection` instance as keyword ar |password |The password to submit on requests that require authentication. |"" |pool_size |The number of connections used by the pool. |4 |protocol_factory |A callable that returns an instance of `AbstractBaseProtocol`. |`gremlin_python.driver.protocol.GremlinServerWSProtocol` -|transport_factory |A callable that returns an instance of `AbstractBaseTransport`. |`gremlin_python.driver.tornado.transport.TornadoTransport` +|transport_factory |A callable that returns an instance of `AbstractBaseTransport`. |`gremlin_python.driver.aiohttp.transport.AiohttpTransport` |username |The username to submit on requests that require authentication. |"" |kerberized_service |the first part of the principal name configured for the gremlin service|""" |session | A unique string-based identifier (typically a UUID) to enable a <>. This is not a valid configuration for `DriverRemoteConnection`. |None |========================================================= -Note that the `transport_factory` can allow for additional configuration of the `TornadoTransport`, which exposes -options to manage `ioloop` timeouts and compression settings: +Note that the `transport_factory` can allow for additional configuration of the `AiohttpTransport`, which exposes +options to manage `asyncio` timeouts, heartbeat, max content length, ssl options, and the ability to call the api from +an event loop: [source,python] ---- @@ -859,9 +860,11 @@ import ssl ... g = traversal().withRemote( DriverRemoteConnection('ws://localhost:8182/gremlin','g', - transport_factory=lambda: TornadoTransport(read_timeout=10, + transport_factory=lambda: AiohttpTransport(read_timeout=10, write_timeout=10, - compression_options={'compression_level':5,'mem_level':5}, + heartbeat=1.0, + call_from_event_loop=True + max_content_length=100*1024*1024, ssl_options=ssl.create_default_context(Purpose.CLIENT_AUTH)))) ---- @@ -1146,7 +1149,7 @@ and is designed to work best with a running <> co [source,shell] ---- pip install gremlinpython -pip install tornado +pip install aiohttp python example.py ---- diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc index c898fb95add..5a1f475f475 100644 --- a/docs/src/upgrade/release-3.5.x.asciidoc +++ b/docs/src/upgrade/release-3.5.x.asciidoc @@ -494,7 +494,7 @@ See: link:https://issues.apache.org/jira/browse/TINKERPOP-2517[TINKERPOP-2517] ==== Python 2.x Support -The gremlinpython module no longer supports Python 2.x. Users must use Python 3 going forward. For the most part, from +The gremlin-python module no longer supports Python 2.x. Users must use Python 3 going forward. For the most part, from a user's perspective, there are no specific API changes to consider as a result of this change. It is also worth noting that Jython support has been removed and that `gremlin-python` no longer produces a JVM-based artifact. This change means that the `GremlinJythonScriptEngine` no longer exists and there is no way to write native Python lambdas. @@ -502,6 +502,18 @@ All lambdas should be written using `gremlin-groovy` if they are needed. See: link:https://issues.apache.org/jira/browse/TINKERPOP-2317[TINKERPOP-2317] +==== Python Transportation Layer Rewrite + +With the removal of Python 2.x support the transport layer of gremlin-python has been rewritten to use a library that +utilizes the asyncio event loop of Python 3. link:https://github.com/aio-libs/aiohttp[AIOHTTP] utilizes Python 3's +event loop with a minimal HTTP abstraction and is now used for the transport layer. From a user's perspective there is +not much of a change except there is now a working heartbeat that can be set and there is no longer a +compression_options config when connecting with a custom transport factory. This change fixed a number of issues that +were related to the IOLoop of the old link:https://github.com/tornadoweb/tornado[Tornado] transport layer, which has +been completely removed from the library. + +See: link:https://issues.apache.org/jira/browse/TINKERPOP-2546[TINKERPOP-2546] + ==== Python Kerberos Support The Python Driver now supports Kerberos based authentication: diff --git a/gremlin-python/src/main/python/gremlin_python/driver/tornado/__init__.py b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/__init__.py similarity index 99% rename from gremlin-python/src/main/python/gremlin_python/driver/tornado/__init__.py rename to gremlin-python/src/main/python/gremlin_python/driver/aiohttp/__init__.py index 44d3f2da281..ef62ce50102 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/tornado/__init__.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/__init__.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py new file mode 100644 index 00000000000..25e57b88cdd --- /dev/null +++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py @@ -0,0 +1,129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +import aiohttp +import asyncio +import async_timeout + +from gremlin_python.driver.transport import AbstractBaseTransport + +__author__ = 'Lyndon Bauto (lyndonb@bitquilltech.com)' + + +class AiohttpTransport(AbstractBaseTransport): + nest_asyncio_applied = False + + # Default heartbeat of 5.0 seconds. + def __init__(self, read_timeout=None, write_timeout=None, ssl_options=None, + heartbeat=5.0, max_content_length=None, call_from_event_loop=None): + if call_from_event_loop is not None and call_from_event_loop and not AiohttpTransport.nest_asyncio_applied: + """ + The AiohttpTransport implementation uses the asyncio event loop. Because of this, it cannot be called within an + event loop without nest_asyncio. If the code is ever refactored so that it can be called within an event loop + this import and call can be removed. Without this, applications which use the event loop to call gremlin-python + (such as Jupyter) will not work. + """ + import nest_asyncio + nest_asyncio.apply() + AiohttpTransport.nest_asyncio_applied = True + + # Start event loop and initialize websocket and client to None + self._loop = asyncio.new_event_loop() + self._websocket = None + self._client_session = None + + # Set all inner variables to parameters passed in. + self._read_timeout = read_timeout + self._write_timeout = write_timeout + self._ssl_options = ssl_options + self._heartbeat = heartbeat + self._max_content_length = max_content_length + + def connect(self, url, headers=None): + # Inner function to perform async connect. + async def async_connect(): + # Start client session and use it to create a websocket with all the connection options provided. + self._client_session = aiohttp.ClientSession(loop=self._loop) + self._websocket = await self._client_session.ws_connect(url, + ssl=self._ssl_options, + headers=headers, + heartbeat=self._heartbeat, + max_msg_size=self._max_content_length) + + # Execute the async connect synchronously. + self._loop.run_until_complete(async_connect()) + + def write(self, message): + # Inner function to perform async write. + async def async_write(): + async with async_timeout.timeout(self._write_timeout): + await self._websocket.send_bytes(message) + + # Execute the async write synchronously. + self._loop.run_until_complete(async_write()) + + def read(self): + # Inner function to perform async read. + async def async_read(): + async with async_timeout.timeout(self._read_timeout): + return await self._websocket.receive() + + # Execute the async read synchronously. + msg = self._loop.run_until_complete(async_read()) + + # Need to handle multiple potential message types. + if msg.type == aiohttp.WSMsgType.close: + # Server is closing connection, shutdown and throw exception. + self.close() + raise RuntimeError("Connection was closed by server.") + elif msg.type == aiohttp.WSMsgType.closed: + # Should not be possible since our loop and socket would be closed. + raise RuntimeError("Connection was already closed.") + elif msg.type == aiohttp.WSMsgType.error: + # Error on connection, try to convert message to a string in error. + raise RuntimeError("Received error on read: '" + str(msg.data) + "'") + elif msg.type == aiohttp.WSMsgType.text: + # Convert message to bytes. + data = msg.data.strip().encode('utf-8') + else: + # General handle, return byte data. + data = msg.data + return data + + def close(self): + # Inner function to perform async close. + async def async_close(): + if not self._websocket.closed: + await self._websocket.close() + if not self._client_session.closed: + await self._client_session.close() + + # If the loop is not closed (connection hasn't already been closed) + if not self._loop.is_closed(): + # Execute the async close synchronously. + self._loop.run_until_complete(async_close()) + + # Close the event loop. + self._loop.close() + + @property + def closed(self): + # Connection is closed if either the websocket or the client session is closed. + return self._websocket.closed or self._client_session.closed diff --git a/gremlin-python/src/main/python/gremlin_python/driver/client.py b/gremlin-python/src/main/python/gremlin_python/driver/client.py index d5952c7d96f..aa35dcf2f96 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/client.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/client.py @@ -39,7 +39,9 @@ class Client: def __init__(self, url, traversal_source, protocol_factory=None, transport_factory=None, pool_size=None, max_workers=None, message_serializer=None, username="", password="", - kerberized_service="", headers=None, session="", max_content_length=None): + kerberized_service="", headers=None, session="", + max_content_length=None, heartbeat=None, + call_from_event_loop=None): self._url = url self._headers = headers self._traversal_source = traversal_source @@ -54,17 +56,19 @@ def __init__(self, url, traversal_source, protocol_factory=None, self._sessionEnabled = (session != "") if transport_factory is None: try: - from gremlin_python.driver.tornado.transport import ( - TornadoTransport) + from gremlin_python.driver.aiohttp.transport import ( + AiohttpTransport) except ImportError: - raise Exception("Please install Tornado or pass" + raise Exception("Please install AIOHTTP or pass " "custom transport factory") else: - transport_factory = lambda: TornadoTransport( - max_content_length=max_content_length) + def transport_factory(): + return AiohttpTransport(heartbeat=heartbeat, + max_content_length=max_content_length, + call_from_event_loop=call_from_event_loop) self._transport_factory = transport_factory if protocol_factory is None: - protocol_factory = lambda: protocol.GremlinServerWSProtocol( + def protocol_factory(): return protocol.GremlinServerWSProtocol( self._message_serializer, username=self._username, password=self._password, diff --git a/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py index 8f002c976cf..d88c8abd488 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py @@ -32,7 +32,9 @@ def __init__(self, url, traversal_source, protocol_factory=None, transport_factory=None, pool_size=None, max_workers=None, username="", password="", kerberized_service='', message_serializer=None, graphson_reader=None, - graphson_writer=None, headers=None, max_content_length=None): + graphson_writer=None, headers=None, + max_content_length=None, heartbeat=None, + call_from_event_loop=None): if message_serializer is None: message_serializer = serializer.GraphSONMessageSerializer( reader=graphson_reader, @@ -47,7 +49,9 @@ def __init__(self, url, traversal_source, protocol_factory=None, password=password, kerberized_service=kerberized_service, headers=headers, - max_content_length=max_content_length) + heartbeat=heartbeat, + max_content_length=max_content_length, + call_from_event_loop=call_from_event_loop) self._url = self._client._url self._traversal_source = self._client._traversal_source diff --git a/gremlin-python/src/main/python/gremlin_python/driver/tornado/transport.py b/gremlin-python/src/main/python/gremlin_python/driver/tornado/transport.py deleted file mode 100644 index cd0b4bf5d87..00000000000 --- a/gremlin-python/src/main/python/gremlin_python/driver/tornado/transport.py +++ /dev/null @@ -1,61 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from tornado import ioloop, websocket -from tornado import httpclient - -from gremlin_python.driver.transport import AbstractBaseTransport - -__author__ = 'David M. Brown (davebshow@gmail.com)' - - -class TornadoTransport(AbstractBaseTransport): - - _default_max_content_length = 10 * 1024 * 1024 - - def __init__(self, read_timeout=None, write_timeout=None, - compression_options={'compression_level': 5, 'mem_level': 5}, - ssl_options=None, max_content_length=_default_max_content_length): - self._loop = ioloop.IOLoop(make_current=False) - self._ws = None - self._read_timeout = read_timeout - self._write_timeout = write_timeout - self._compression_options = compression_options - self._ssl_options = ssl_options - self._max_content_length = max_content_length - - def connect(self, url, headers=None): - if headers or self._ssl_options: - url = httpclient.HTTPRequest(url, headers=headers, ssl_options=self._ssl_options) - self._ws = self._loop.run_sync( - lambda: websocket.websocket_connect(url, compression_options=self._compression_options, - max_message_size=self._max_content_length)) - - def write(self, message): - self._loop.run_sync( - lambda: self._ws.write_message(message, binary=True), timeout=self._write_timeout) - - def read(self): - return self._loop.run_sync(lambda: self._ws.read_message(), timeout=self._read_timeout) - - def close(self): - self._ws.close() - self._loop.close() - - def closed(self): - return not self._ws.protocol diff --git a/gremlin-python/src/main/python/gremlin_python/driver/transport.py b/gremlin-python/src/main/python/gremlin_python/driver/transport.py index ebdc325544f..e7535e0ed04 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/transport.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/transport.py @@ -41,6 +41,7 @@ def read(self): def close(self): pass - @abc.abstractproperty + @property + @abc.abstractmethod def closed(self): pass diff --git a/gremlin-python/src/main/python/setup.py b/gremlin-python/src/main/python/setup.py index 248c342a690..af4e970db5b 100644 --- a/gremlin-python/src/main/python/setup.py +++ b/gremlin-python/src/main/python/setup.py @@ -45,8 +45,9 @@ version = __version__.version install_requires = [ + 'nest_asyncio', + 'aiohttp>=3.7.0,<=3.7.4', 'aenum>=1.4.5,<3.0.0', - 'tornado>=6.0', 'six>=1.10.0,<2.0.0', 'isodate>=0.6.0,<1.0.0', 'pyparsing>=2.4.7,<3.0.0' @@ -59,7 +60,7 @@ name='gremlinpython', version=version, packages=['gremlin_python', 'gremlin_python.driver', - 'gremlin_python.driver.tornado', 'gremlin_python.process', + 'gremlin_python.driver.aiohttp', 'gremlin_python.process', 'gremlin_python.structure', 'gremlin_python.structure.io'], license='Apache 2', url='http://tinkerpop.apache.org', diff --git a/gremlin-python/src/main/python/tests/conftest.py b/gremlin-python/src/main/python/tests/conftest.py index b277b1c7b44..4f20a44b828 100644 --- a/gremlin-python/src/main/python/tests/conftest.py +++ b/gremlin-python/src/main/python/tests/conftest.py @@ -34,7 +34,7 @@ from gremlin_python.driver.serializer import ( GraphSONMessageSerializer, GraphSONSerializersV2d0, GraphSONSerializersV3d0, GraphBinarySerializersV1) -from gremlin_python.driver.tornado.transport import TornadoTransport +from gremlin_python.driver.aiohttp.transport import AiohttpTransport gremlin_server_url = 'ws://localhost:{}/gremlin' anonymous_url = gremlin_server_url.format(45940) @@ -52,7 +52,7 @@ def connection(request): pool = queue.Queue() try: conn = Connection(anonymous_url, 'gmodern', protocol, - lambda: TornadoTransport(), executor, pool) + lambda: AiohttpTransport(), executor, pool) except OSError: executor.shutdown() pytest.skip('Gremlin Server is not running') @@ -85,7 +85,7 @@ def authenticated_client(request): ssl_opts = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_opts.verify_mode = ssl.CERT_NONE client = Client(basic_url, 'gmodern', username='stephen', password='password', - transport_factory=lambda: TornadoTransport(ssl_options=ssl_opts)) + transport_factory=lambda: AiohttpTransport(ssl_options=ssl_opts)) elif request.param == 'kerberos': client = Client(kerberos_url, 'gmodern', kerberized_service=kerberized_service) else: @@ -132,7 +132,7 @@ def remote_connection_authenticated(request): remote_conn = DriverRemoteConnection(basic_url, 'gmodern', username='stephen', password='password', message_serializer=serializer.GraphSONSerializersV2d0(), - transport_factory=lambda: TornadoTransport(ssl_options=ssl_opts)) + transport_factory=lambda: AiohttpTransport(ssl_options=ssl_opts)) elif request.param == 'kerberos': remote_conn = DriverRemoteConnection(kerberos_url, 'gmodern', kerberized_service=kerberized_service, message_serializer=serializer.GraphSONSerializersV2d0()) diff --git a/gremlin-python/src/main/python/tests/driver/test_client.py b/gremlin-python/src/main/python/tests/driver/test_client.py index 56e2d8c92e8..ac9664255e7 100644 --- a/gremlin-python/src/main/python/tests/driver/test_client.py +++ b/gremlin-python/src/main/python/tests/driver/test_client.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. # +import asyncio import threading import uuid @@ -25,8 +26,8 @@ from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import OptionsStrategy from gremlin_python.structure.graph import Graph -from gremlin_python.driver.tornado.transport import TornadoTransport -from tornado.util import TimeoutError +from gremlin_python.driver.aiohttp.transport import AiohttpTransport +from asyncio import TimeoutError __author__ = 'David M. Brown (davebshow@gmail.com)' @@ -91,16 +92,32 @@ def test_client_connection_pool_after_error(client): assert client.available_pool_size == 1 -def test_client_side_timeout_set_for_tornado(client): +def test_client_side_timeout_set_for_aiohttp(client): client = Client('ws://localhost:45940/gremlin', 'gmodern', - transport_factory=lambda: TornadoTransport(read_timeout=1, write_timeout=1)) + transport_factory=lambda: AiohttpTransport(read_timeout=1, write_timeout=1)) try: # should fire an exception client.submit('Thread.sleep(2000);1').all().result() assert False - except TimeoutError as toerr: - assert str(toerr) == "Operation timed out after 1 seconds" + except TimeoutError as err: + # asyncio TimeoutError has no message. + assert str(err) == "" + + +async def async_connect(enable): + try: + transport = AiohttpTransport(call_from_event_loop=enable) + transport.connect('ws://localhost:45940/gremlin') + transport.close() + return True + except RuntimeError: + return False + + +def test_from_event_loop(): + assert not asyncio.get_event_loop().run_until_complete(async_connect(False)) + assert asyncio.get_event_loop().run_until_complete(async_connect(True)) def test_client_bytecode(client): @@ -126,6 +143,17 @@ def test_client_bytecode_options(client): assert len(result_set.all().result()) == 6 +def test_client_message_too_big(client): + try: + client = Client("http://localhost", 'g', max_content_length=1024) + client.submit("1+1").all().result() + assert False + except Exception: + assert True + finally: + client.close() + + def test_iterate_result_set(client): g = Graph().traversal() t = g.V() @@ -348,3 +376,14 @@ def test_big_result_set_secure(authenticated_client): for result in result_set: results += result assert len(results) == 10000 + + +async def asyncio_func(): + return 1 + + +def test_asyncio(client): + try: + asyncio.get_event_loop().run_until_complete(asyncio_func()) + except RuntimeError: + assert False diff --git a/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection_threaded.py b/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection_threaded.py index 65b121d7cbc..c3899a4d59a 100644 --- a/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection_threaded.py +++ b/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection_threaded.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. # +import concurrent.futures import sys from threading import Thread from six.moves import queue @@ -72,3 +73,24 @@ def _executor(q, conn): # Close conn if close: conn.close() + + +def handle_request(): + try: + remote_connection = DriverRemoteConnection("ws://localhost:45940/gremlin", "g") + g = traversal().withRemote(remote_connection) + g.V().limit(1).toList() + remote_connection.close() + return True + except RuntimeError: + return False + + +def test_multithread(client): + try: + for i in range(10): + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(handle_request) + assert future.result() + except RuntimeError: + assert False