Skip to content
77 changes: 77 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/_connection_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from threading import RLock
from uamqp import Connection, TransportType, c_uamqp


class _SharedConnectionManager(object):
def __init__(self, **kwargs):
self._lock = RLock()
self._conn = None # type: Connection

self._container_id = kwargs.get("container_id")
self._debug = kwargs.get("debug")
self._error_policy = kwargs.get("error_policy")
self._properties = kwargs.get("properties")
self._encoding = kwargs.get("encoding") or "UTF-8"
self._transport_type = kwargs.get('transport_type') or TransportType.Amqp
self._http_proxy = kwargs.get('http_proxy')
self._max_frame_size = kwargs.get("max_frame_size")
self._channel_max = kwargs.get("channel_max")
self._idle_timeout = kwargs.get("idle_timeout")
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.get("remote_idle_timeout_empty_frame_send_ratio")

def get_connection(self, host, auth):
# type: (...) -> Connection
with self._lock:
if self._conn is None:
self._conn = Connection(
host,
auth,
container_id=self._container_id,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug,
encoding=self._encoding)
return self._conn

def close_connection(self):
with self._lock:
if self._conn:
self._conn.destroy()
self._conn = None

def reset_connection_if_broken(self):
with self._lock:
if self._conn and self._conn._state in (
c_uamqp.ConnectionState.CLOSE_RCVD,
c_uamqp.ConnectionState.CLOSE_SENT,
c_uamqp.ConnectionState.DISCARDING,
c_uamqp.ConnectionState.END,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how - but we may need to confirm whether all of these states indicate that it is safe to GC the connection. If the connection is GC'ed before it's finished cleaning up the C objects it can lead to a crash. This is something that may be revealed when we do the stress and perf tests - so worth keeping in mind for now.

):
self._conn = None


class _SeparateConnectionManager(object):
def __init__(self, **kwargs):
pass

def get_connection(self, host, auth):
return None

def close_connection(self):
pass

def reset_connection_if_broken(self):
pass


def get_connection_manager(**kwargs):
return _SeparateConnectionManager(**kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import logging
import time

from uamqp import errors
from azure.eventhub.error import EventHubError, _handle_exception

log = logging.getLogger(__name__)


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._handler = None
self.name = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._close_connection()

def _open(self, timeout_time=None):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
if not self.running:
if self.redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
))
while not self._handler.client_ready():
if timeout_time and time.time() >= timeout_time:
return
time.sleep(0.05)
self.running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken()

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
_handle_exception(exception, retry_count, max_retries, self, timeout_time)

def close(self, exception=None):
# type:(Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
:start-after: [START eventhub_client_receiver_close]
:end-before: [END eventhub_client_receiver_close]
:language: python
:dedent: 4
:caption: Close down the handler.

"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from asyncio import Lock
from uamqp import TransportType, c_uamqp
from uamqp.async_ops import ConnectionAsync


class _SharedConnectionManager(object):
def __init__(self, **kwargs):
self._lock = Lock()
self._conn = None

self._container_id = kwargs.get("container_id")
self._debug = kwargs.get("debug")
self._error_policy = kwargs.get("error_policy")
self._properties = kwargs.get("properties")
self._encoding = kwargs.get("encoding") or "UTF-8"
self._transport_type = kwargs.get('transport_type') or TransportType.Amqp
self._http_proxy = kwargs.get('http_proxy')
self._max_frame_size = kwargs.get("max_frame_size")
self._channel_max = kwargs.get("channel_max")
self._idle_timeout = kwargs.get("idle_timeout")
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.get("remote_idle_timeout_empty_frame_send_ratio")

async def get_connection(self, host, auth):
# type: (...) -> ConnectionAsync
async with self._lock:
if self._conn is None:
self._conn = ConnectionAsync(
host,
auth,
container_id=self._container_id,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug,
encoding=self._encoding)
return self._conn

async def close_connection(self):
async with self._lock:
if self._conn:
await self._conn.destroy_async()
self._conn = None

async def reset_connection_if_broken(self):
async with self._lock:
if self._conn and self._conn._state in (
c_uamqp.ConnectionState.CLOSE_RCVD,
c_uamqp.ConnectionState.CLOSE_SENT,
c_uamqp.ConnectionState.DISCARDING,
c_uamqp.ConnectionState.END,
):
self._conn = None


class _SeparateConnectionManager(object):
def __init__(self, **kwargs):
pass

async def get_connection(self, host, auth):
pass # return None

async def close_connection(self):
pass

def reset_connection_if_broken(self):
pass


def get_connection_manager(**kwargs):
return _SharedConnectionManager(**kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import logging
import time

from uamqp import errors
from azure.eventhub.error import EventHubError, ConnectError
from ..aio.error_async import _handle_exception

log = logging.getLogger(__name__)


class ConsumerProducerMixin(object):

def __init__(self):
self.client = None
self._handler = None
self.name = None

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name))

def _create_handler(self):
pass

async def _redirect(self, redirect):
self.redirected = redirect
self.running = False
await self._close_connection()

async def _open(self, timeout_time=None):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
if not self.running:
if self.redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
await self._handler.open_async(connection=await self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
))
while not await self._handler.client_ready_async():
if timeout_time and time.time() >= timeout_time:
return
await asyncio.sleep(0.05)
self.running = True

async def _close_handler(self):
await self._handler.close_async() # close the link (sharing connection) or connection (not sharing)
self.running = False

async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken()

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
await _handle_exception(exception, retry_count, max_retries, self, timeout_time)

async def close(self, exception=None):
# type: (Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_receiver_close]
:end-before: [END eventhub_client_async_receiver_close]
:language: python
:dedent: 4
:caption: Close down the handler.

"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This receive handler is now closed.")
if self._handler:
await self._handler.close_async()
Loading