Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima

This release also includes changes from <<release-3-4-11, 3.4.11>>.

* Changed transport implementation to use AIOHTTP instead of Tornado for gremlin-python.
* Added max_content_length and unit test for it in gremlin-python.
* Removed compression_option support for transport in gremlin-python.
* Fixed event loop issues and added unit test for it in gremlin-python.
* Fixed DriverRemoteConnection multithreading issues and added unit test for it in gremlin-python.
* Fixed heartbeat timeout issues and tested with local server manually for gremlin-python.
* Fixed build errors emitted for gremlin-python (asyncio task destroyed but is pending error).
* Added `gremlin-language` module.
* Allowed the possibility for the propagation of `null` as a `Traverser` in Gremlin.
* Added a fully shaded version of `gremlin-driver`.
Expand Down
16 changes: 10 additions & 6 deletions docs/src/reference/gremlin-variants.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -831,24 +831,28 @@ can be passed to the `Client` or `DriverRemoteConnection` instance as keyword ar
|password |The password to submit on requests that require authentication. |""
|pool_size |The number of connections used by the pool. |4
|protocol_factory |A callable that returns an instance of `AbstractBaseProtocol`. |`gremlin_python.driver.protocol.GremlinServerWSProtocol`
|transport_factory |A callable that returns an instance of `AbstractBaseTransport`. |`gremlin_python.driver.tornado.transport.TornadoTransport`
|transport_factory |A callable that returns an instance of `AbstractBaseTransport`. |`gremlin_python.driver.aiohttp.transport.AiohttpTransport`
|username |The username to submit on requests that require authentication. |""
|kerberized_service |the first part of the principal name configured for the gremlin service|"""
|session | A unique string-based identifier (typically a UUID) to enable a <<sessions,session-based connection>>. This is not a valid configuration for `DriverRemoteConnection`. |None
|=========================================================

Note that the `transport_factory` can allow for additional configuration of the `TornadoTransport`, which exposes
options to manage `ioloop` timeouts and compression settings:
Note that the `transport_factory` can allow for additional configuration of the `AiohttpTransport`, which allows
pass through of the named parameters available in
link:https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientSession.ws_connect[AIOHTTP's ws_connect],
and the ability to call the api from an event loop:

[source,python]
----
import ssl
...
g = traversal().withRemote(
DriverRemoteConnection('ws://localhost:8182/gremlin','g',
transport_factory=lambda: TornadoTransport(read_timeout=10,
transport_factory=lambda: AiohttpTransport(read_timeout=10,
write_timeout=10,
compression_options={'compression_level':5,'mem_level':5},
heartbeat=1.0,
call_from_event_loop=True
max_content_length=100*1024*1024,
ssl_options=ssl.create_default_context(Purpose.CLIENT_AUTH))))
----

Expand Down Expand Up @@ -1133,7 +1137,7 @@ and is designed to work best with a running <<gremlin-server,Gremlin Server>> co
[source,shell]
----
pip install gremlinpython
pip install tornado
pip install aiohttp
python example.py
----

Expand Down
14 changes: 13 additions & 1 deletion docs/src/upgrade/release-3.5.x.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,18 @@ is up to the application to determine if retry is desired and how best to do so.

See: link:https://issues.apache.org/jira/browse/TINKERPOP-2517[TINKERPOP-2517]

==== Python Transportation Layer Rewrite

With the removal of Python 2.x support the transport layer of gremlin-python has been rewritten to use a library that
utilizes the asyncio event loop of Python 3. link:https://github.com/aio-libs/aiohttp[AIOHTTP] utilizes Python 3's
event loop with a minimal HTTP abstraction and is now used for the transport layer. From a user's perspective there is
not much of a change except there is now new configuration options available through named parameters, see
link:https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientSession.ws_connect[AIOHTTP ws_connect] for
more details. This change fixed a number of issues that were related to the IOLoop of the old
link:https://github.com/tornadoweb/tornado[Tornado] transport layer, which has been completely removed from the
library. An additional config which enables the driver to be used from within an event loop has been added and can be
used by setting `call_from_event_loop=True` when connecting.

==== Python Kerberos Support

The Python Driver now supports Kerberos based authentication:
Expand Down Expand Up @@ -1123,4 +1135,4 @@ drivers for example) will simply result in a no-op on the server and the expecte
From 3.5.0 forward, drivers need not send this message to close the session and simply rely on the close of the
connection to kill the session.

See: link:https://issues.apache.org/jira/browse/TINKERPOP-2336[TINKERPOP-2336]
See: link:https://issues.apache.org/jira/browse/TINKERPOP-2336[TINKERPOP-2336]
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#


import aiohttp
import asyncio
import async_timeout
from aiohttp import ClientResponseError

from gremlin_python.driver.transport import AbstractBaseTransport

__author__ = 'Lyndon Bauto ([email protected])'


class AiohttpTransport(AbstractBaseTransport):
nest_asyncio_applied = False

# Default heartbeat of 5.0 seconds.
def __init__(self, call_from_event_loop=None, read_timeout=None, write_timeout=None, **kwargs):
if call_from_event_loop is not None and call_from_event_loop and not AiohttpTransport.nest_asyncio_applied:
"""
The AiohttpTransport implementation uses the asyncio event loop. Because of this, it cannot be called within an
event loop without nest_asyncio. If the code is ever refactored so that it can be called within an event loop
this import and call can be removed. Without this, applications which use the event loop to call gremlin-python
(such as Jupyter) will not work.
"""
import nest_asyncio
nest_asyncio.apply()
AiohttpTransport.nest_asyncio_applied = True

# Start event loop and initialize websocket and client to None
self._loop = asyncio.new_event_loop()
self._websocket = None
self._client_session = None

# Set all inner variables to parameters passed in.
self._aiohttp_kwargs = kwargs
self._write_timeout = write_timeout
self._read_timeout = read_timeout
if "max_content_length" in self._aiohttp_kwargs:
self._aiohttp_kwargs["max_msg_size"] = self._aiohttp_kwargs.pop("max_content_length")
if "ssl_options" in self._aiohttp_kwargs:
self._aiohttp_kwargs["ssl"] = self._aiohttp_kwargs.pop("ssl_options")

def connect(self, url, headers=None):
# Inner function to perform async connect.
async def async_connect():
# Start client session and use it to create a websocket with all the connection options provided.
self._client_session = aiohttp.ClientSession(loop=self._loop)
try:
self._websocket = await self._client_session.ws_connect(url, **self._aiohttp_kwargs, headers=headers)
except ClientResponseError as err:
# If 403, just send forbidden because in some cases this prints out a huge verbose message
# that includes credentials.
if err.status == 403:
raise Exception('Failed to connect to server: HTTP Error code 403 - Forbidden.')
else:
raise

# Execute the async connect synchronously.
self._loop.run_until_complete(async_connect())

def write(self, message):
# Inner function to perform async write.
async def async_write():
async with async_timeout.timeout(self._write_timeout):
await self._websocket.send_bytes(message)

# Execute the async write synchronously.
self._loop.run_until_complete(async_write())

def read(self):
# Inner function to perform async read.
async def async_read():
async with async_timeout.timeout(self._read_timeout):
return await self._websocket.receive()

# Execute the async read synchronously.
msg = self._loop.run_until_complete(async_read())

# Need to handle multiple potential message types.
if msg.type == aiohttp.WSMsgType.close:
# Server is closing connection, shutdown and throw exception.
self.close()
raise RuntimeError("Connection was closed by server.")
elif msg.type == aiohttp.WSMsgType.closed:
# Should not be possible since our loop and socket would be closed.
raise RuntimeError("Connection was already closed.")
elif msg.type == aiohttp.WSMsgType.error:
# Error on connection, try to convert message to a string in error.
raise RuntimeError("Received error on read: '" + str(msg.data) + "'")
elif msg.type == aiohttp.WSMsgType.text:
# Convert message to bytes.
data = msg.data.strip().encode('utf-8')
else:
# General handle, return byte data.
data = msg.data
return data

def close(self):
# Inner function to perform async close.
async def async_close():
if not self._websocket.closed:
await self._websocket.close()
if not self._client_session.closed:
await self._client_session.close()

# If the loop is not closed (connection hasn't already been closed)
if not self._loop.is_closed():
# Execute the async close synchronously.
self._loop.run_until_complete(async_close())

# Close the event loop.
self._loop.close()

@property
def closed(self):
# Connection is closed if either the websocket or the client session is closed.
return self._websocket.closed or self._client_session.closed
19 changes: 10 additions & 9 deletions gremlin-python/src/main/python/gremlin_python/driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ class Client:
def __init__(self, url, traversal_source, protocol_factory=None,
transport_factory=None, pool_size=None, max_workers=None,
message_serializer=None, username="", password="",
kerberized_service="", headers=None, session="", max_content_length=None):
kerberized_service="", headers=None, session="",
**transport_kwargs):
self._url = url
self._headers = headers
self._traversal_source = traversal_source
if max_content_length is None:
max_content_length = 10 * 1024 * 1024
if "max_content_length" not in transport_kwargs:
transport_kwargs["max_content_length"] = 10 * 1024 * 1024
if message_serializer is None:
message_serializer = serializer.GraphSONSerializersV3d0()
self._message_serializer = message_serializer
Expand All @@ -54,17 +55,17 @@ def __init__(self, url, traversal_source, protocol_factory=None,
self._sessionEnabled = (session != "")
if transport_factory is None:
try:
from gremlin_python.driver.tornado.transport import (
TornadoTransport)
from gremlin_python.driver.aiohttp.transport import (
AiohttpTransport)
except ImportError:
raise Exception("Please install Tornado or pass"
raise Exception("Please install AIOHTTP or pass "
"custom transport factory")
else:
transport_factory = lambda: TornadoTransport(
max_content_length=max_content_length)
def transport_factory():
return AiohttpTransport(**transport_kwargs)
self._transport_factory = transport_factory
if protocol_factory is None:
protocol_factory = lambda: protocol.GremlinServerWSProtocol(
def protocol_factory(): return protocol.GremlinServerWSProtocol(
self._message_serializer,
username=self._username,
password=self._password,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def __init__(self, url, traversal_source, protocol_factory=None,
transport_factory=None, pool_size=None, max_workers=None,
username="", password="", kerberized_service='',
message_serializer=None, graphson_reader=None,
graphson_writer=None, headers=None, max_content_length=None):
graphson_writer=None, headers=None,
**transport_kwargs):
if message_serializer is None:
message_serializer = serializer.GraphSONMessageSerializer(
reader=graphson_reader,
Expand All @@ -47,7 +48,7 @@ def __init__(self, url, traversal_source, protocol_factory=None,
password=password,
kerberized_service=kerberized_service,
headers=headers,
max_content_length=max_content_length)
**transport_kwargs)
self._url = self._client._url
self._traversal_source = self._client._traversal_source

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def read(self):
def close(self):
pass

@abc.abstractproperty
@property
@abc.abstractmethod
def closed(self):
pass
5 changes: 3 additions & 2 deletions gremlin-python/src/main/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@
version = __version__.version

install_requires = [
'nest_asyncio',
'aiohttp>=3.7.0,<=3.7.4',
'aenum>=1.4.5,<3.0.0',
'tornado>=6.0',
'six>=1.10.0,<2.0.0',
'isodate>=0.6.0,<1.0.0',
'pyparsing>=2.4.7,<3.0.0'
Expand All @@ -59,7 +60,7 @@
name='gremlinpython',
version=version,
packages=['gremlin_python', 'gremlin_python.driver',
'gremlin_python.driver.tornado', 'gremlin_python.process',
'gremlin_python.driver.aiohttp', 'gremlin_python.process',
'gremlin_python.structure', 'gremlin_python.structure.io'],
license='Apache 2',
url='http://tinkerpop.apache.org',
Expand Down
Loading