diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index 341639213569..bebef7a51982 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -7,26 +7,27 @@ import logging import time -from uamqp import errors, constants +from uamqp import errors, constants, compat from azure.eventhub.error import EventHubError, _handle_exception log = logging.getLogger(__name__) def _retry_decorator(to_be_wrapped_func): - def wrapped_func(*args, **kwargs): + def wrapped_func(self, *args, **kwargs): timeout = kwargs.get("timeout", None) if not timeout: timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout timeout_time = time.time() + timeout - max_retries = args[0].client.config.max_retries + max_retries = self.client.config.max_retries retry_count = 0 last_exception = None + kwargs.pop("timeout", None) while True: try: - return to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs) + return to_be_wrapped_func(timeout_time=timeout_time, last_exception=last_exception, **kwargs) except Exception as exception: - last_exception = args[0]._handle_exception(exception, retry_count, max_retries, timeout_time) + last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) retry_count += 1 return wrapped_func @@ -92,6 +93,10 @@ def _close_connection(self): self.client._conn_manager.reset_connection_if_broken() def _handle_exception(self, exception, retry_count, max_retries, timeout_time): + if not self.running and isinstance(exception, compat.TimeoutException): + exception = errors.AuthenticationException("Authorization timeout.") + return _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return _handle_exception(exception, retry_count, max_retries, self, timeout_time) def close(self, exception=None): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 23b8bd6a8fa6..aa539110e50a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -6,7 +6,7 @@ import logging import time -from uamqp import errors, constants +from uamqp import errors, constants, compat from azure.eventhub.error import EventHubError, ConnectError from ..aio.error_async import _handle_exception @@ -14,19 +14,20 @@ def _retry_decorator(to_be_wrapped_func): - async def wrapped_func(*args, **kwargs): + async def wrapped_func(self, *args, **kwargs): timeout = kwargs.get("timeout", None) if not timeout: timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout timeout_time = time.time() + timeout - max_retries = args[0].client.config.max_retries + max_retries = self.client.config.max_retries retry_count = 0 last_exception = None + kwargs.pop("timeout", None) while True: try: - return await to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs) + return await to_be_wrapped_func(timeout_time=timeout_time, last_exception=last_exception, **kwargs) except Exception as exception: - last_exception = await args[0]._handle_exception(exception, retry_count, max_retries, timeout_time) + last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) retry_count += 1 return wrapped_func @@ -93,6 +94,10 @@ async def _close_connection(self): await self.client._conn_manager.reset_connection_if_broken() async def _handle_exception(self, exception, retry_count, max_retries, timeout_time): + if not self.running and isinstance(exception, compat.TimeoutException): + exception = errors.AuthenticationException("Authorization timeout.") + return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) async def close(self, exception=None): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 57748525fdc5..381ed9cb5dd6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -191,7 +191,7 @@ async def get_partition_properties(self, partition): return output def create_consumer(self, consumer_group, partition_id, event_position, **kwargs): - # type: (str, str, EventPosition, int, str, int, asyncio.AbstractEventLoop) -> EventHubConsumer + # type: (str, str, EventPosition) -> EventHubConsumer """ Create an async consumer to the client for a particular consumer group and partition. @@ -236,7 +236,7 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs prefetch=prefetch, loop=loop) return handler - def create_producer(self, partition_id=None, operation=None, send_timeout=None, loop=None): + def create_producer(self, *, partition_id=None, operation=None, send_timeout=None, loop=None): # type: (str, str, float, asyncio.AbstractEventLoop) -> EventHubProducer """ Create an async producer to send EventData object to an EventHub. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 40324827b2f4..8457913abcf0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -100,7 +100,7 @@ async def __anext__(self): if not self.messages_iter: self.messages_iter = self._handler.receive_messages_iter_async() message = await self.messages_iter.__anext__() - event_data = EventData(message=message) + event_data = EventData._from_message(message) self.offset = EventPosition(event_data.offset, inclusive=False) retry_count = 0 return event_data @@ -147,7 +147,6 @@ async def _open(self, timeout_time=None): self.source = self.redirected.address await super(EventHubConsumer, self)._open(timeout_time) - @_retry_decorator async def _receive(self, **kwargs): timeout_time = kwargs.get("timeout_time") last_exception = kwargs.get("last_exception") @@ -167,7 +166,7 @@ async def _receive(self, **kwargs): max_batch_size=max_batch_size, timeout=remaining_time_ms) for message in message_batch: - event_data = EventData(message=message) + event_data = EventData._from_message(message) self.offset = EventPosition(event_data.offset) data_batch.append(event_data) return data_batch @@ -185,7 +184,7 @@ def queue_size(self): return self._handler._received_messages.qsize() return 0 - async def receive(self, max_batch_size=None, timeout=None): + async def receive(self, *, max_batch_size=None, timeout=None): # type: (int, float) -> List[EventData] """ Receive events asynchronously from the EventHub. @@ -218,7 +217,8 @@ async def receive(self, max_batch_size=None, timeout=None): max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch) data_batch = [] # type: List[EventData] - return await self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) + return await _retry_decorator(self._receive)(self, timeout=timeout, + max_batch_size=max_batch_size, data_batch=data_batch) async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index bc132e0a46cc..e3cd1d9fcb09 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -110,11 +110,7 @@ async def _open(self, timeout_time=None, **kwargs): self.target = self.redirected.address await super(EventHubProducer, self)._open(timeout_time) - @_retry_decorator - async def _send_event_data(self, **kwargs): - timeout_time = kwargs.get("timeout_time") - last_exception = kwargs.get("last_exception") - + async def _send_event_data(self, timeout_time=None, last_exception=None): if self.unsent_events: await self._open(timeout_time) remaining_time = timeout_time - time.time() @@ -161,12 +157,8 @@ async def create_batch(self, max_size=None, partition_key=None): :rtype: ~azure.eventhub.EventDataBatch """ - @_retry_decorator - async def _wrapped_open(*args, **kwargs): - await self._open(**kwargs) - if not self._max_message_size_on_link: - await _wrapped_open(self, timeout=self.client.config.send_timeout) + await _retry_decorator(self._open)(self, timeout=self.client.config.send_timeout) if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' @@ -174,7 +166,7 @@ async def _wrapped_open(*args, **kwargs): return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key) - async def send(self, event_data, partition_key=None, timeout=None): + async def send(self, event_data, *, partition_key=None, timeout=None): # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None """ Sends an event data and blocks until acknowledgement is @@ -220,7 +212,7 @@ async def send(self, event_data, partition_key=None, timeout=None): wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] - await self._send_event_data(timeout) + await _retry_decorator(self._send_event_data)(self, timeout=timeout) async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 52ce44cf23a9..706db5498fe3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -242,7 +242,7 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs return handler def create_producer(self, partition_id=None, operation=None, send_timeout=None): - # type: (str, str, float) -> EventHubProducer + # type: (str, str, float, ...) -> EventHubProducer """ Create an producer to send EventData object to an EventHub. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 8c97797c01ff..d908d4702ac5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -282,11 +282,9 @@ def from_connection_string(cls, conn_str, **kwargs): return cls._from_iothub_connection_string(conn_str, **kwargs) @abstractmethod - def create_consumer( - self, consumer_group, partition_id, event_position, **kwargs - ): + def create_consumer(self, consumer_group, partition_id, event_position, **kwargs): pass @abstractmethod - def create_producer(self, **kwargs): + def create_producer(self, partition_id=None, operation=None, send_timeout=None): pass diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 0ac743bd91d2..701b45484d75 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -57,7 +57,7 @@ class EventData(object): PROP_TIMESTAMP = b"x-opt-enqueued-time" PROP_DEVICE_ID = b"iothub-connection-device-id" - def __init__(self, body=None, to_device=None, message=None): + def __init__(self, body=None, to_device=None): """ Initialize EventData. @@ -67,8 +67,6 @@ def __init__(self, body=None, to_device=None, message=None): :type batch: Generator :param to_device: An IoT device to route to. :type to_device: str - :param message: The received message. - :type message: ~uamqp.message.Message """ self._partition_key = types.AMQPSymbol(EventData.PROP_PARTITION_KEY) @@ -77,20 +75,14 @@ def __init__(self, body=None, to_device=None, message=None): self.msg_properties = MessageProperties() if to_device: self.msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) - if message: - self.message = message - self.msg_properties = message.properties - self._annotations = message.annotations - self._app_properties = message.application_properties + if body and isinstance(body, list): + self.message = Message(body[0], properties=self.msg_properties) + for more in body[1:]: + self.message._body.append(more) # pylint: disable=protected-access + elif body is None: + raise ValueError("EventData cannot be None.") else: - if body and isinstance(body, list): - self.message = Message(body[0], properties=self.msg_properties) - for more in body[1:]: - self.message._body.append(more) # pylint: disable=protected-access - elif body is None: - raise ValueError("EventData cannot be None.") - else: - self.message = Message(body, properties=self.msg_properties) + self.message = Message(body, properties=self.msg_properties) def __str__(self): dic = { @@ -125,6 +117,15 @@ def _set_partition_key(self, value): self.message.header = header self._annotations = annotations + @staticmethod + def _from_message(message): + event_data = EventData(body='') + event_data.message = message + event_data.msg_properties = message.properties + event_data._annotations = message.annotations + event_data._app_properties = message.application_properties + return event_data + @property def sequence_number(self): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 03827f546c0f..33a0e8ed187e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -96,7 +96,7 @@ def __next__(self): if not self.messages_iter: self.messages_iter = self._handler.receive_messages_iter() message = next(self.messages_iter) - event_data = EventData(message=message) + event_data = EventData._from_message(message) self.offset = EventPosition(event_data.offset, inclusive=False) retry_count = 0 return event_data @@ -142,7 +142,6 @@ def _open(self, timeout_time=None): self.source = self.redirected.address super(EventHubConsumer, self)._open(timeout_time) - @_retry_decorator def _receive(self, **kwargs): timeout_time = kwargs.get("timeout_time") last_exception = kwargs.get("last_exception") @@ -161,7 +160,7 @@ def _receive(self, **kwargs): max_batch_size=max_batch_size - (len(data_batch) if data_batch else 0), timeout=remaining_time_ms) for message in message_batch: - event_data = EventData(message=message) + event_data = EventData._from_message(message) self.offset = EventPosition(event_data.offset) data_batch.append(event_data) return data_batch @@ -211,7 +210,7 @@ def receive(self, max_batch_size=None, timeout=None): max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch) data_batch = [] # type: List[EventData] - return self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) + return _retry_decorator(self._receive)(self, timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 570bd8609964..dd4b7aa2396a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -117,11 +117,7 @@ def _open(self, timeout_time=None, **kwargs): self.target = self.redirected.address super(EventHubProducer, self)._open(timeout_time) - @_retry_decorator - def _send_event_data(self, **kwargs): - timeout_time = kwargs.get("timeout_time") - last_exception = kwargs.get("last_exception") - + def _send_event_data(self, timeout_time=None, last_exception=None): if self.unsent_events: self._open(timeout_time) remaining_time = timeout_time - time.time() @@ -168,12 +164,8 @@ def create_batch(self, max_size=None, partition_key=None): :rtype: ~azure.eventhub.EventDataBatch """ - @_retry_decorator - def _wrapped_open(*args, **kwargs): - self._open(**kwargs) - if not self._max_message_size_on_link: - _wrapped_open(self, timeout=self.client.config.send_timeout) + _retry_decorator(self._open)(self, timeout=self.client.config.send_timeout) if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' @@ -227,7 +219,7 @@ def send(self, event_data, partition_key=None, timeout=None): wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] - self._send_event_data(timeout=timeout) + _retry_decorator(self._send_event_data)(self, timeout=timeout) def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py index 56c57924edde..05be713e2d8c 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py @@ -37,7 +37,7 @@ async def test_send_with_long_interval_async(connstr_receivers, sleep): for r in receivers: if not sleep: # if sender sleeps, the receivers will be disconnected. destroy connection to simulate r._handler._connection._conn.destroy() - received.extend(r.receive(timeout=3)) + received.extend(r.receive(timeout=5)) assert len(received) == 2 assert list(received[0].body)[0] == b"A single event" diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py index c84268d15f21..aa301bad3119 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py @@ -255,7 +255,7 @@ async def test_send_with_create_event_batch_async(connstr_receivers): client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) sender = client.create_producer() - event_data_batch = await sender.create_batch(max_size=100 * 1024) + event_data_batch = await sender.create_batch(max_size=100000) while True: try: event_data_batch.try_add(EventData('A single event data')) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py b/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py index 223a759ea9c5..0796cee2178d 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py @@ -32,7 +32,7 @@ def test_send_with_long_interval_sync(connstr_receivers, sleep): for r in receivers: if not sleep: r._handler._connection._conn.destroy() - received.extend(r.receive(timeout=3)) + received.extend(r.receive(timeout=5)) assert len(received) == 2 assert list(received[0].body)[0] == b"A single event" diff --git a/sdk/eventhub/azure-eventhubs/tests/test_send.py b/sdk/eventhub/azure-eventhubs/tests/test_send.py index 3d7bc3815c22..8499ff93b36d 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_send.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_send.py @@ -257,7 +257,7 @@ def test_send_with_create_event_batch_sync(connstr_receivers): client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) sender = client.create_producer() - event_data_batch = sender.create_batch(max_size=100 * 1024) + event_data_batch = sender.create_batch(max_size=100000) while True: try: event_data_batch.try_add(EventData('A single event data'))