From 2d22a1a03b2e6f01137277fab5098b9b0ce1f407 Mon Sep 17 00:00:00 2001 From: Yunhao Ling <47871814+yunhaoling@users.noreply.github.com> Date: Wed, 10 Jul 2019 23:25:02 -0700 Subject: [PATCH 1/9] create_batch feature implementation (#6256) * Init create batch event * create_batch implementation * Revert _set_partition_key method and update comment * Refacor EventDataBatch class * Revert logic when setting partition_key of event_data --- .../azure/eventhub/__init__.py | 3 +- .../azure/eventhub/aio/consumer_async.py | 12 ++-- .../azure/eventhub/aio/producer_async.py | 46 +++++++++++---- .../azure-eventhubs/azure/eventhub/common.py | 56 +++++++++++++++++-- .../azure/eventhub/consumer.py | 12 ++-- .../azure/eventhub/producer.py | 47 ++++++++++++---- 6 files changed, 137 insertions(+), 39 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 616107e86125..c4faad253c16 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -5,7 +5,7 @@ __version__ = "5.0.0b1" -from azure.eventhub.common import EventData, EventPosition +from azure.eventhub.common import EventData, EventDataBatch, EventPosition from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \ AuthenticationError, EventDataSendError, ConnectionLostError from azure.eventhub.client import EventHubClient @@ -18,6 +18,7 @@ __all__ = [ "EventData", + "EventDataBatch", "EventHubError", "ConnectError", "ConnectionLostError", diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 6cf020176d96..38f4586ce5a6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -19,15 +19,15 @@ class EventHubConsumer(object): """ A consumer responsible for reading EventData from a specific Event Hub - partition and as a member of a specific consumer group. + partition and as a member of a specific consumer group. A consumer may be exclusive, which asserts ownership over the partition for the consumer - group to ensure that only one consumer from that group is reading the from the partition. - These exclusive consumers are sometimes referred to as "Epoch Consumers." + group to ensure that only one consumer from that group is reading the from the partition. + These exclusive consumers are sometimes referred to as "Epoch Consumers." A consumer may also be non-exclusive, allowing multiple consumers from the same consumer - group to be actively reading events from the partition. These non-exclusive consumers are - sometimes referred to as "Non-Epoch Consumers." + group to be actively reading events from the partition. These non-exclusive consumers are + sometimes referred to as "Non-Epoch Consumers." """ timeout = 0 @@ -38,7 +38,7 @@ def __init__( # pylint: disable=super-init-not-called keep_alive=None, auto_reconnect=True, loop=None): """ Instantiate an async consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index aef8dc50ff02..5a893f308724 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -10,7 +10,7 @@ from uamqp import constants, errors, compat from uamqp import SendClientAsync -from azure.eventhub.common import EventData, _BatchSendEventData +from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler @@ -20,9 +20,9 @@ class EventHubProducer(object): """ A producer responsible for transmitting EventData to a specific Event Hub, - grouped together in batches. Depending on the options specified at creation, the producer may - be created to allow event data to be automatically routed to an available partition or specific - to a partition. + grouped together in batches. Depending on the options specified at creation, the producer may + be created to allow event data to be automatically routed to an available partition or specific + to a partition. """ @@ -31,7 +31,7 @@ def __init__( # pylint: disable=super-init-not-called keep_alive=None, auto_reconnect=True, loop=None): """ Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` - method in EventHubClient. + method in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync @@ -52,6 +52,7 @@ def __init__( # pylint: disable=super-init-not-called :param loop: An event loop. If not specified the default event loop will be used. """ self.loop = loop or asyncio.get_event_loop() + self._max_message_size_on_link = None self.running = False self.client = client self.target = target @@ -110,6 +111,10 @@ async def _open(self): await self._connect() self.running = True + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ + self._handler.message_handler._link.peer_max_message_size\ + else constants.MAX_MESSAGE_LENGTH_BYTES + async def _connect(self): connected = await self._build_connection() if not connected: @@ -301,6 +306,23 @@ def _set_partition_key(event_datas, partition_key): ed._set_partition_key(partition_key) yield ed + async def create_batch(self, max_message_size=None, partition_key=None): + """ + Create an EventDataBatch object with max message size being max_message_size. + The max_message_size should be no greater than the max allowed message size defined by the service side. + :param max_message_size: + :param partition_key: + :return: + """ + if not self._max_message_size_on_link: + await self._open() + + if max_message_size and max_message_size > self._max_message_size_on_link: + raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_message_size, self._max_message_size_on_link)) + + return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) + async def send(self, event_data, partition_key=None): # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None """ @@ -329,13 +351,15 @@ async def send(self, event_data, partition_key=None): self._check_closed() if isinstance(event_data, EventData): if partition_key: - event_data._set_partition_key(partition_key) + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: - event_data_with_pk = self._set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData( - event_data_with_pk, - partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) + if isinstance(event_data, EventDataBatch): + wrapper_event_data = event_data + else: + if partition_key: + event_data = self._set_partition_key(event_data, partition_key) + wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] await self._send_event_data() @@ -373,4 +397,4 @@ async def close(self, exception=None): self.error = EventHubError(str(exception)) else: self.error = EventHubError("This send handler is now closed.") - await self._handler.close_async() \ No newline at end of file + await self._handler.close_async() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 5a6702a60324..24138bb2b6cb 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -9,9 +9,13 @@ import json import six -from uamqp import BatchMessage, Message, types +from azure.eventhub.error import EventDataError +from uamqp import BatchMessage, Message, types, constants, errors from uamqp.message import MessageHeader, MessageProperties +# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each +_BATCH_MESSAGE_OVERHEAD_COST = [5, 8] + def parse_sas_token(sas_token): """Parse a SAS token into its components. @@ -244,10 +248,25 @@ def encode_message(self): return self.message.encode_message() -class _BatchSendEventData(EventData): - def __init__(self, batch_event_data, partition_key=None): - self.message = BatchMessage(data=batch_event_data, multi_messages=False, properties=None) +class EventDataBatch(object): + """ + The EventDataBatch class is a holder of a batch of event date within max message size bytes. + Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. + Do not instantiate an EventDataBatch object directly. + """ + def __init__(self, max_message_size=None, partition_key=None): + self.max_message_size = max_message_size if max_message_size else constants.MAX_MESSAGE_LENGTH_BYTES + self._partition_key = partition_key + self.message = BatchMessage(data=[], multi_messages=False, properties=None) + self._set_partition_key(partition_key) + self._size = self.message.gather()[0].get_message_encoded_size() + + @staticmethod + def _from_batch(batch_data, partition_key=None): + batch_data_instance = EventDataBatch(partition_key=partition_key) + batch_data_instance.message._body_gen = batch_data + return batch_data_instance def _set_partition_key(self, value): if value: @@ -260,6 +279,35 @@ def _set_partition_key(self, value): self.message.annotations = annotations self.message.header = header + def try_add(self, event_data): + """ + The message size is a sum up of body, properties, header, etc. + :param event_data: + :return: + """ + if not isinstance(event_data, EventData): + raise EventDataError('event_data should be type of EventData') + + if self._partition_key: + if event_data.partition_key and not (event_data.partition_key == self._partition_key): + raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch') + if not event_data.partition_key: + event_data._set_partition_key(self._partition_key) + + event_data_size = event_data.message.get_message_encoded_size() + + # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that + # message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes. + size_after_add = self._size + event_data_size\ + + _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1] + + if size_after_add > self.max_message_size: + return False + + self.message._body_gen.append(event_data) # pylint: disable=protected-access + self._size = size_after_add + return True + class EventPosition(object): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 856c77d6fb65..08db5118e925 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -23,15 +23,15 @@ class EventHubConsumer(object): """ A consumer responsible for reading EventData from a specific Event Hub - partition and as a member of a specific consumer group. + partition and as a member of a specific consumer group. A consumer may be exclusive, which asserts ownership over the partition for the consumer - group to ensure that only one consumer from that group is reading the from the partition. - These exclusive consumers are sometimes referred to as "Epoch Consumers." + group to ensure that only one consumer from that group is reading the from the partition. + These exclusive consumers are sometimes referred to as "Epoch Consumers." A consumer may also be non-exclusive, allowing multiple consumers from the same consumer - group to be actively reading events from the partition. These non-exclusive consumers are - sometimes referred to as "Non-Epoch Consumers." + group to be actively reading events from the partition. These non-exclusive consumers are + sometimes referred to as "Non-Epoch Consumers." """ timeout = 0 @@ -41,7 +41,7 @@ def __init__(self, client, source, event_position=None, prefetch=300, owner_leve keep_alive=None, auto_reconnect=True): """ Instantiate a consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 3f95b7be08c3..4317b993ceb1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -13,7 +13,7 @@ from uamqp import compat from uamqp import SendClient -from azure.eventhub.common import EventData, _BatchSendEventData +from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler @@ -23,16 +23,16 @@ class EventHubProducer(object): """ A producer responsible for transmitting EventData to a specific Event Hub, - grouped together in batches. Depending on the options specified at creation, the producer may - be created to allow event data to be automatically routed to an available partition or specific - to a partition. + grouped together in batches. Depending on the options specified at creation, the producer may + be created to allow event data to be automatically routed to an available partition or specific + to a partition. """ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True): """ Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient. @@ -51,6 +51,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N Default value is `True`. :type auto_reconnect: bool """ + self._max_message_size_on_link = None self.running = False self.client = client self.target = target @@ -109,6 +110,10 @@ def _open(self): self._connect() self.running = True + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ + self._handler.message_handler._link.peer_max_message_size\ + else constants.MAX_MESSAGE_LENGTH_BYTES + def _connect(self): connected = self._build_connection() if not connected: @@ -298,6 +303,23 @@ def _error(outcome, condition): if outcome != constants.MessageSendResult.Ok: raise condition + def create_batch(self, max_message_size=None, partition_key=None): + """ + Create an EventDataBatch object with max message size being max_message_size. + The max_message_size should be no greater than the max allowed message size defined by the service side. + :param max_message_size: + :param partition_key: + :return: + """ + if not self._max_message_size_on_link: + self._open() + + if max_message_size and max_message_size > self._max_message_size_on_link: + raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_message_size, self._max_message_size_on_link)) + + return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) + def send(self, event_data, partition_key=None): # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None """ @@ -307,7 +329,8 @@ def send(self, event_data, partition_key=None): :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to - a particular partition of the Event Hub decided by the service. + a particular partition of the Event Hub decided by the service. partition_key + will be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError @@ -327,13 +350,15 @@ def send(self, event_data, partition_key=None): self._check_closed() if isinstance(event_data, EventData): if partition_key: - event_data._set_partition_key(partition_key) + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: - event_data_with_pk = self._set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData( - event_data_with_pk, - partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) + if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. + wrapper_event_data = event_data + else: + if partition_key: + event_data = self._set_partition_key(event_data, partition_key) + wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] self._send_event_data() From 11b33332452438287797c7fc7a8f7c8bb9fc02e7 Mon Sep 17 00:00:00 2001 From: yijxie Date: Wed, 17 Jul 2019 15:28:55 -0700 Subject: [PATCH 2/9] Misc fixes from code review --- .../azure/eventhub/aio/producer_async.py | 6 +++--- .../azure-eventhubs/azure/eventhub/common.py | 13 +++++++++++++ .../azure-eventhubs/azure/eventhub/producer.py | 13 ++++++------- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 5a893f308724..8fd8076140e1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -5,7 +5,7 @@ import uuid import asyncio import logging -from typing import Iterator, Generator, List, Union +from typing import Iterable from uamqp import constants, errors, compat from uamqp import SendClientAsync @@ -318,13 +318,13 @@ async def create_batch(self, max_message_size=None, partition_key=None): await self._open() if max_message_size and max_message_size > self._max_message_size_on_link: - raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_message_size, self._max_message_size_on_link)) return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) async def send(self, event_data, partition_key=None): - # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None + # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None """ Sends an event data and blocks until acknowledgement is received or operation times out. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 24138bb2b6cb..3497b4d8000a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -261,6 +261,18 @@ def __init__(self, max_message_size=None, partition_key=None): self._set_partition_key(partition_key) self._size = self.message.gather()[0].get_message_encoded_size() + self._count = 0 + + def __len__(self): + return self._count + + @property + def size(self): + """The size in bytes + + :return: int + """ + return self._size @staticmethod def _from_batch(batch_data, partition_key=None): @@ -306,6 +318,7 @@ def try_add(self, event_data): self.message._body_gen.append(event_data) # pylint: disable=protected-access self._size = size_after_add + self._count += 1 return True diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 4317b993ceb1..24b36c0f0f8e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -7,7 +7,7 @@ import uuid import logging import time -from typing import Iterator, Generator, List, Union +from typing import Iterable, Union from uamqp import constants, errors from uamqp import compat @@ -110,9 +110,8 @@ def _open(self): self._connect() self.running = True - self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ - self._handler.message_handler._link.peer_max_message_size\ - else constants.MAX_MESSAGE_LENGTH_BYTES + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ + or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access def _connect(self): connected = self._build_connection() @@ -315,13 +314,13 @@ def create_batch(self, max_message_size=None, partition_key=None): self._open() if max_message_size and max_message_size > self._max_message_size_on_link: - raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_message_size, self._max_message_size_on_link)) - return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) + return EventDataBatch(max_message_size or self._max_message_size_on_link, partition_key) def send(self, event_data, partition_key=None): - # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None + # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None """ Sends an event data and blocks until acknowledgement is received or operation times out. From 4f310eab17aab531f4120e6e4ad69ce9d247d0ec Mon Sep 17 00:00:00 2001 From: yijxie Date: Wed, 17 Jul 2019 17:07:51 -0700 Subject: [PATCH 3/9] Rename max_message_size to max_size Other small fixes --- .../azure/eventhub/aio/producer_async.py | 8 ++++---- sdk/eventhub/azure-eventhubs/azure/eventhub/common.py | 11 +++++------ .../azure-eventhubs/azure/eventhub/producer.py | 8 ++++---- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 8fd8076140e1..a14142672f27 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -306,7 +306,7 @@ def _set_partition_key(event_datas, partition_key): ed._set_partition_key(partition_key) yield ed - async def create_batch(self, max_message_size=None, partition_key=None): + async def create_batch(self, max_size=None, partition_key=None): """ Create an EventDataBatch object with max message size being max_message_size. The max_message_size should be no greater than the max allowed message size defined by the service side. @@ -317,11 +317,11 @@ async def create_batch(self, max_message_size=None, partition_key=None): if not self._max_message_size_on_link: await self._open() - if max_message_size and max_message_size > self._max_message_size_on_link: + if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' - .format(max_message_size, self._max_message_size_on_link)) + .format(max_size, self._max_message_size_on_link)) - return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) + return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) async def send(self, event_data, partition_key=None): # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 3497b4d8000a..97168d23dbdb 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -254,8 +254,8 @@ class EventDataBatch(object): Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. Do not instantiate an EventDataBatch object directly. """ - def __init__(self, max_message_size=None, partition_key=None): - self.max_message_size = max_message_size if max_message_size else constants.MAX_MESSAGE_LENGTH_BYTES + def __init__(self, max_size=None, partition_key=None): + self.max_size = max_size if max_size else constants.MAX_MESSAGE_LENGTH_BYTES self._partition_key = partition_key self.message = BatchMessage(data=[], multi_messages=False, properties=None) @@ -298,7 +298,7 @@ def try_add(self, event_data): :return: """ if not isinstance(event_data, EventData): - raise EventDataError('event_data should be type of EventData') + raise TypeError('event_data should be type of EventData') if self._partition_key: if event_data.partition_key and not (event_data.partition_key == self._partition_key): @@ -313,13 +313,12 @@ def try_add(self, event_data): size_after_add = self._size + event_data_size\ + _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1] - if size_after_add > self.max_message_size: - return False + if size_after_add > self.max_size: + raise ValueError("EventDataBatch has reached its size limit {}".format(self.max_size)) self.message._body_gen.append(event_data) # pylint: disable=protected-access self._size = size_after_add self._count += 1 - return True class EventPosition(object): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 24b36c0f0f8e..44bd24b16553 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -302,7 +302,7 @@ def _error(outcome, condition): if outcome != constants.MessageSendResult.Ok: raise condition - def create_batch(self, max_message_size=None, partition_key=None): + def create_batch(self, max_size=None, partition_key=None): """ Create an EventDataBatch object with max message size being max_message_size. The max_message_size should be no greater than the max allowed message size defined by the service side. @@ -313,11 +313,11 @@ def create_batch(self, max_message_size=None, partition_key=None): if not self._max_message_size_on_link: self._open() - if max_message_size and max_message_size > self._max_message_size_on_link: + if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' - .format(max_message_size, self._max_message_size_on_link)) + .format(max_size, self._max_message_size_on_link)) - return EventDataBatch(max_message_size or self._max_message_size_on_link, partition_key) + return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) def send(self, event_data, partition_key=None): # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None From 47df21bb96f275b66146ae105701d6d536ddafdd Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 18 Jul 2019 11:53:57 -0700 Subject: [PATCH 4/9] Warn if event_data is None when call try_add --- sdk/eventhub/azure-eventhubs/azure/eventhub/common.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 97168d23dbdb..6c7d68092842 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -8,6 +8,7 @@ import calendar import json import six +import logging from azure.eventhub.error import EventDataError from uamqp import BatchMessage, Message, types, constants, errors @@ -254,6 +255,9 @@ class EventDataBatch(object): Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. Do not instantiate an EventDataBatch object directly. """ + + log = logging.getLogger(__name__) + def __init__(self, max_size=None, partition_key=None): self.max_size = max_size if max_size else constants.MAX_MESSAGE_LENGTH_BYTES self._partition_key = partition_key @@ -297,6 +301,9 @@ def try_add(self, event_data): :param event_data: :return: """ + if event_data is None: + self.log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") + return if not isinstance(event_data, EventData): raise TypeError('event_data should be type of EventData') From dfd4e39cf7603408fa8a23c9b4f2924542a52712 Mon Sep 17 00:00:00 2001 From: Yunhao Ling <47871814+yunhaoling@users.noreply.github.com> Date: Fri, 26 Jul 2019 14:50:08 -0700 Subject: [PATCH 5/9] Create batch event update (#6509) * Update according to the review * Update comment --- .../azure/eventhub/aio/producer_async.py | 26 ++++++++++++------- .../azure-eventhubs/azure/eventhub/common.py | 10 +++---- .../azure/eventhub/producer.py | 21 +++++++++------ 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index a14142672f27..93ea12460eca 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -111,9 +111,8 @@ async def _open(self): await self._connect() self.running = True - self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ - self._handler.message_handler._link.peer_max_message_size\ - else constants.MAX_MESSAGE_LENGTH_BYTES + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ + or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access async def _connect(self): connected = await self._build_connection() @@ -308,18 +307,22 @@ def _set_partition_key(event_datas, partition_key): async def create_batch(self, max_size=None, partition_key=None): """ - Create an EventDataBatch object with max message size being max_message_size. - The max_message_size should be no greater than the max allowed message size defined by the service side. - :param max_message_size: - :param partition_key: - :return: + Create an EventDataBatch object with max size being max_size. + The max_size should be no greater than the max allowed message size defined by the service side. + :param max_size: The maximum size of bytes data that an EventDataBatch object can hold. + :type max_size: int + :param partition_key: With the given partition_key, event data will land to + a particular partition of the Event Hub decided by the service. + :type partition_key: str + :return: an EventDataBatch instance + :rtype: ~azure.eventhub.EventDataBatch """ if not self._max_message_size_on_link: await self._open() if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' - .format(max_size, self._max_message_size_on_link)) + .format(max_size, self._max_message_size_on_link)) return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) @@ -332,7 +335,8 @@ async def send(self, event_data, partition_key=None): :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to - a particular partition of the Event Hub decided by the service. + a particular partition of the Event Hub decided by the service. partition_key + could be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError @@ -355,6 +359,8 @@ async def send(self, event_data, partition_key=None): wrapper_event_data = event_data else: if isinstance(event_data, EventDataBatch): + if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access + raise EventDataError('The partition_key does not match the one of the EventDataBatch') wrapper_event_data = event_data else: if partition_key: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 6c7d68092842..00015c5fd5dc 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -14,6 +14,8 @@ from uamqp import BatchMessage, Message, types, constants, errors from uamqp.message import MessageHeader, MessageProperties +log = logging.getLogger(__name__) + # event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each _BATCH_MESSAGE_OVERHEAD_COST = [5, 8] @@ -251,15 +253,13 @@ def encode_message(self): class EventDataBatch(object): """ - The EventDataBatch class is a holder of a batch of event date within max message size bytes. + The EventDataBatch class is a holder of a batch of event data within max size bytes. Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. Do not instantiate an EventDataBatch object directly. """ - log = logging.getLogger(__name__) - def __init__(self, max_size=None, partition_key=None): - self.max_size = max_size if max_size else constants.MAX_MESSAGE_LENGTH_BYTES + self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES self._partition_key = partition_key self.message = BatchMessage(data=[], multi_messages=False, properties=None) @@ -302,7 +302,7 @@ def try_add(self, event_data): :return: """ if event_data is None: - self.log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") + log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") return if not isinstance(event_data, EventData): raise TypeError('event_data should be type of EventData') diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 44bd24b16553..4a2a5f0cc81d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -304,18 +304,22 @@ def _error(outcome, condition): def create_batch(self, max_size=None, partition_key=None): """ - Create an EventDataBatch object with max message size being max_message_size. - The max_message_size should be no greater than the max allowed message size defined by the service side. - :param max_message_size: - :param partition_key: - :return: + Create an EventDataBatch object with max size being max_size. + The max_size should be no greater than the max allowed message size defined by the service side. + :param max_size: The maximum size of bytes data that an EventDataBatch object can hold. + :type max_size: int + :param partition_key: With the given partition_key, event data will land to + a particular partition of the Event Hub decided by the service. + :type partition_key: str + :return: an EventDataBatch instance + :rtype: ~azure.eventhub.EventDataBatch """ if not self._max_message_size_on_link: self._open() if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' - .format(max_size, self._max_message_size_on_link)) + .format(max_size, self._max_message_size_on_link)) return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) @@ -329,11 +333,10 @@ def send(self, event_data, partition_key=None): :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key - will be omitted if event_data is of type ~azure.eventhub.EventDataBatch. + could be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError - :return: None :rtype: None @@ -353,6 +356,8 @@ def send(self, event_data, partition_key=None): wrapper_event_data = event_data else: if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. + if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access + raise EventDataError('The partition_key does not match the one of the EventDataBatch') wrapper_event_data = event_data else: if partition_key: From fc69d42e9a43268ea9f448d3a16a6366aaaffad0 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Sat, 27 Jul 2019 00:05:41 -0700 Subject: [PATCH 6/9] Revert some kwargs backto optional parameters as it may cause breaking changes --- .../azure/eventhub/aio/_connection_manager_async.py | 2 +- .../azure-eventhubs/azure/eventhub/aio/consumer_async.py | 3 +-- .../azure-eventhubs/azure/eventhub/aio/producer_async.py | 3 +-- .../azure-eventhubs/azure/eventhub/client_abstract.py | 1 + sdk/eventhub/azure-eventhubs/azure/eventhub/common.py | 6 ++---- sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py | 3 +-- sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py | 3 +-- 7 files changed, 8 insertions(+), 13 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py index 3178e1fb72a7..989f3416d582 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py @@ -75,4 +75,4 @@ def reset_connection_if_broken(self): def get_connection_manager(**kwargs): - return _SharedConnectionManager(**kwargs) + return _SeparateConnectionManager(**kwargs) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 70f076120cd7..286b62f0a05c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -226,7 +226,7 @@ async def receive(self, **kwargs): last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) retry_count += 1 - async def close(self, **kwargs): + async def close(self, exception=None): # type: (Exception) -> None """ Close down the handler. If the handler has already closed, @@ -246,7 +246,6 @@ async def close(self, **kwargs): :caption: Close down the handler. """ - exception = kwargs.get("exception", None) self.running = False if self.error: return diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index a335ccce411e..bb16b8ee45bb 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -229,7 +229,7 @@ async def send(self, event_data, **kwargs): self.unsent_events = [wrapper_event_data.message] await self._send_event_data(timeout) - async def close(self, **kwargs): + async def close(self, exception=None): # type: (Exception) -> None """ Close down the handler. If the handler has already closed, @@ -249,5 +249,4 @@ async def close(self, **kwargs): :caption: Close down the handler. """ - exception = kwargs.get("exception", None) await super(EventHubProducer, self).close(exception) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 7f6afb51c7fc..8c97797c01ff 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -276,6 +276,7 @@ def from_connection_string(cls, conn_str, **kwargs): host = address[left_slash_pos + 2:] else: host = address + kwargs.pop("event_hub_path", None) return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) else: return cls._from_iothub_connection_string(conn_str, **kwargs) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 8f862e9e34bc..ea00d0aef5ff 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -57,7 +57,7 @@ class EventData(object): PROP_TIMESTAMP = b"x-opt-enqueued-time" PROP_DEVICE_ID = b"iothub-connection-device-id" - def __init__(self, **kwargs): + def __init__(self, body=None, **kwargs): """ Initialize EventData. @@ -70,7 +70,6 @@ def __init__(self, **kwargs): :param message: The received message. :type message: ~uamqp.message.Message """ - body = kwargs.get("body", None) to_device = kwargs.get("to_device", None) message = kwargs.get("message", None) @@ -354,7 +353,7 @@ class EventPosition(object): >>> event_pos = EventPosition(1506968696002) """ - def __init__(self, value, **kwargs): + def __init__(self, value, inclusive=False): """ Initialize EventPosition. @@ -363,7 +362,6 @@ def __init__(self, value, **kwargs): :param inclusive: Whether to include the supplied value as the start point. :type inclusive: bool """ - inclusive = kwargs.get("inclusive", False) self.value = value if value is not None else "-1" self.inclusive = inclusive diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index de1f1facd708..dd83b3848748 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -217,7 +217,7 @@ def receive(self, **kwargs): last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) retry_count += 1 - def close(self, **kwargs): + def close(self, exception=None): # type:(Exception) -> None """ Close down the handler. If the handler has already closed, @@ -237,7 +237,6 @@ def close(self, **kwargs): :caption: Close down the handler. """ - exception = kwargs.get("exception", None) if self.messages_iter: self.messages_iter.close() self.messages_iter = None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 9e07891c6690..16803dace84e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -237,7 +237,7 @@ def send(self, event_data, **kwargs): self.unsent_events = [wrapper_event_data.message] self._send_event_data(timeout=timeout) - def close(self, **kwargs): + def close(self, exception=None): # type:(Exception) -> None """ Close down the handler. If the handler has already closed, @@ -257,5 +257,4 @@ def close(self, **kwargs): :caption: Close down the handler. """ - exception = kwargs.get("exception", None) super(EventHubProducer, self).close(exception) From e4be05ec4fb46a1f63592c9231670825cf5f1fb3 Mon Sep 17 00:00:00 2001 From: Yunhao Ling <47871814+yunhaoling@users.noreply.github.com> Date: Sun, 28 Jul 2019 20:35:02 -0700 Subject: [PATCH 7/9] Small fixes (#6520) * Change back to normal number writings as not supported by python under 3.6 * small fix --- .../eventhub/aio/_connection_manager_async.py | 2 +- .../azure/eventhub/aio/consumer_async.py | 2 +- .../azure/eventhub/aio/producer_async.py | 2 +- .../azure/eventhub/consumer.py | 2 +- .../azure-eventhubs/azure/eventhub/error.py | 3 +- .../azure/eventhub/producer.py | 2 +- .../tests/asynctests/test_auth_async.py | 4 +- .../tests/asynctests/test_negative_async.py | 38 +++++++++---------- .../azure-eventhubs/tests/test_auth.py | 4 +- .../azure-eventhubs/tests/test_negative.py | 22 +++++++---- 10 files changed, 42 insertions(+), 39 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py index 989f3416d582..618359192ffe 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py @@ -70,7 +70,7 @@ async def get_connection(self, host, auth): async def close_connection(self): pass - def reset_connection_if_broken(self): + async def reset_connection_if_broken(self): pass diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 286b62f0a05c..dac2d0c0fa61 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -193,7 +193,7 @@ async def receive(self, **kwargs): max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size timeout = self.client.config.receive_timeout if timeout is None else timeout if not timeout: - timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout data_batch = [] start_time = time.time() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index bb16b8ee45bb..e326aef0a115 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -113,7 +113,7 @@ async def _open(self, timeout_time=None): async def _send_event_data(self, timeout=None): timeout = timeout or self.client.config.send_timeout if not timeout: - timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout start_time = time.time() timeout_time = start_time + timeout max_retries = self.client.config.max_retries diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index dd83b3848748..e59c440c7c88 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -185,7 +185,7 @@ def receive(self, **kwargs): max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size timeout = self.client.config.receive_timeout if timeout is None else timeout if not timeout: - timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout data_batch = [] # type: List[EventData] start_time = time.time() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py index 0fb6933e3015..cbe6a8a04946 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py @@ -57,8 +57,7 @@ class EventHubError(Exception): :vartype details: dict[str, str] """ - def __init__(self, message, **kwargs): - details = kwargs.get("details", None) + def __init__(self, message, details=None): self.error = None self.message = message self.details = details diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 16803dace84e..da2a9ee95368 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -121,7 +121,7 @@ def _open(self, timeout_time=None): def _send_event_data(self, timeout=None): timeout = timeout or self.client.config.send_timeout if not timeout: - timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout start_time = time.time() timeout_time = start_time + timeout max_retries = self.client.config.max_retries diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py index 0a1ef23e1dea..d0a0d0912868 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py @@ -31,7 +31,7 @@ async def test_client_secret_credential_async(aad_credential, live_eventhub): async with receiver: - received = await receiver.receive(timeout=1) + received = await receiver.receive(timeout=3) assert len(received) == 0 async with sender: @@ -40,7 +40,7 @@ async def test_client_secret_credential_async(aad_credential, live_eventhub): await asyncio.sleep(1) - received = await receiver.receive(timeout=1) + received = await receiver.receive(timeout=3) assert len(received) == 1 assert list(received[0].body)[0] == 'A single message'.encode('utf-8') diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py index 27deb42c3aeb..4504d8558298 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py @@ -29,7 +29,7 @@ async def test_send_with_invalid_hostname_async(invalid_hostname, connstr_receiv client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @@ -38,7 +38,7 @@ async def test_receive_with_invalid_hostname_async(invalid_hostname): client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @@ -48,7 +48,7 @@ async def test_send_with_invalid_key_async(invalid_key, connstr_receivers): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @@ -57,7 +57,7 @@ async def test_receive_with_invalid_key_async(invalid_key): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @@ -67,7 +67,7 @@ async def test_send_with_invalid_policy_async(invalid_policy, connstr_receivers) client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @@ -76,7 +76,7 @@ async def test_receive_with_invalid_policy_async(invalid_policy): client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @@ -88,7 +88,7 @@ async def test_send_partition_key_with_partition_async(connection_str): try: data = EventData(b"Data") with pytest.raises(ValueError): - await sender.send(data) + await sender.send(EventData("test data")) finally: await sender.close() @@ -99,7 +99,7 @@ async def test_non_existing_entity_sender_async(connection_str): client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) sender = client.create_producer(partition_id="1") with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @@ -108,35 +108,31 @@ async def test_non_existing_entity_receiver_async(connection_str): client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await receiver._open() + await receiver.receive(timeout=5) @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_from_invalid_partitions_async(connection_str): - partitions = ["XYZ", "-1", "1000", "-" ] + partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: client = EventHubClient.from_connection_string(connection_str, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1")) - try: - with pytest.raises(ConnectError): - await receiver.receive(timeout=10) - finally: - await receiver.close() + with pytest.raises(ConnectError): + await receiver.receive(timeout=10) + await receiver.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_to_invalid_partitions_async(connection_str): - partitions = ["XYZ", "-1", "1000", "-" ] + partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: client = EventHubClient.from_connection_string(connection_str, network_tracing=False) sender = client.create_producer(partition_id=p) - try: - with pytest.raises(ConnectError): - await sender._open() - finally: - await sender.close() + with pytest.raises(ConnectError): + await sender.send(EventData("test data")) + await sender.close() @pytest.mark.liveTest diff --git a/sdk/eventhub/azure-eventhubs/tests/test_auth.py b/sdk/eventhub/azure-eventhubs/tests/test_auth.py index d5871971a5b4..0f0c78794884 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_auth.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_auth.py @@ -26,7 +26,7 @@ def test_client_secret_credential(aad_credential, live_eventhub): receiver = client.create_consumer(consumer_group="$default", partition_id='0', event_position=EventPosition("@latest")) with receiver: - received = receiver.receive(timeout=1) + received = receiver.receive(timeout=3) assert len(received) == 0 with sender: @@ -34,7 +34,7 @@ def test_client_secret_credential(aad_credential, live_eventhub): sender.send(event) time.sleep(1) - received = receiver.receive(timeout=1) + received = receiver.receive(timeout=3) assert len(received) == 1 assert list(received[0].body)[0] == 'A single message'.encode('utf-8') diff --git a/sdk/eventhub/azure-eventhubs/tests/test_negative.py b/sdk/eventhub/azure-eventhubs/tests/test_negative.py index ac19a01f76c9..4749df940d9c 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_negative.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_negative.py @@ -34,7 +34,8 @@ def test_receive_with_invalid_hostname_sync(invalid_hostname): client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - receiver.receive(timeout=3) + receiver.receive(timeout=5) + receiver.close() @pytest.mark.liveTest @@ -44,14 +45,16 @@ def test_send_with_invalid_key(invalid_key, connstr_receivers): sender = client.create_producer() with pytest.raises(AuthenticationError): sender.send(EventData("test data")) - + sender.close() @pytest.mark.liveTest def test_receive_with_invalid_key_sync(invalid_key): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + with pytest.raises(AuthenticationError): - receiver.receive(timeout=3) + receiver.receive(timeout=10) + receiver.close() @pytest.mark.liveTest @@ -61,6 +64,7 @@ def test_send_with_invalid_policy(invalid_policy, connstr_receivers): sender = client.create_producer() with pytest.raises(AuthenticationError): sender.send(EventData("test data")) + sender.close() @pytest.mark.liveTest @@ -68,7 +72,8 @@ def test_receive_with_invalid_policy_sync(invalid_policy): client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - receiver.receive(timeout=3) + receiver.receive(timeout=5) + receiver.close() @pytest.mark.liveTest @@ -97,13 +102,16 @@ def test_non_existing_entity_sender(connection_str): def test_non_existing_entity_receiver(connection_str): client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + with pytest.raises(AuthenticationError): - receiver.receive(timeout=3) + receiver.receive(timeout=5) + receiver.close() + @pytest.mark.liveTest def test_receive_from_invalid_partitions_sync(connection_str): - partitions = ["XYZ", "-1", "1000", "-" ] + partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: client = EventHubClient.from_connection_string(connection_str, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1")) @@ -116,7 +124,7 @@ def test_receive_from_invalid_partitions_sync(connection_str): @pytest.mark.liveTest def test_send_to_invalid_partitions(connection_str): - partitions = ["XYZ", "-1", "1000", "-" ] + partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: client = EventHubClient.from_connection_string(connection_str, network_tracing=False) sender = client.create_producer(partition_id=p) From e702c76b6c01d96fe840671a7901ff0f3166f5b8 Mon Sep 17 00:00:00 2001 From: Yunhao Ling <47871814+yunhaoling@users.noreply.github.com> Date: Sun, 28 Jul 2019 22:10:28 -0700 Subject: [PATCH 8/9] Add missing return (#6522) --- .../azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py | 2 +- .../azure/eventhub/aio/_consumer_producer_mixin_async.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index 9ac6fb468945..95f6e908c404 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -72,7 +72,7 @@ def _close_connection(self): self.client._conn_manager.reset_connection_if_broken() def _handle_exception(self, exception, retry_count, max_retries, timeout_time): - _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return _handle_exception(exception, retry_count, max_retries, self, timeout_time) def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 5a0f0d9eaa4d..e6b35ad41ae4 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -73,7 +73,7 @@ async def _close_connection(self): await self.client._conn_manager.reset_connection_if_broken() async def _handle_exception(self, exception, retry_count, max_retries, timeout_time): - await _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) async def close(self, exception=None): # type: (Exception) -> None From aa56698dc0e6cc121a0529388bab2f49b2ccb5d9 Mon Sep 17 00:00:00 2001 From: Yunhao Ling <47871814+yunhaoling@users.noreply.github.com> Date: Mon, 29 Jul 2019 00:11:34 -0700 Subject: [PATCH 9/9] Fix livetest (#6523) --- .../asynctests/test_longrunning_receive_async.py | 2 +- .../tests/asynctests/test_negative_async.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py index f0666094618c..74c05d174e47 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py @@ -60,7 +60,7 @@ async def pump(_pid, receiver, _args, _dl): try: async with receiver: while time.time() < deadline: - batch = await receiver.receive(timeout=1) + batch = await receiver.receive(timeout=3) size = len(batch) total += size iteration += 1 diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py index 4504d8558298..3d43942fe6c8 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py @@ -36,9 +36,9 @@ async def test_send_with_invalid_hostname_async(invalid_hostname, connstr_receiv @pytest.mark.asyncio async def test_receive_with_invalid_hostname_async(invalid_hostname): client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) - sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender.send(EventData("test data")) + await receiver.receive(timeout=3) @pytest.mark.liveTest @@ -55,9 +55,9 @@ async def test_send_with_invalid_key_async(invalid_key, connstr_receivers): @pytest.mark.asyncio async def test_receive_with_invalid_key_async(invalid_key): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) - sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender.send(EventData("test data")) + await receiver.receive(timeout=3) @pytest.mark.liveTest @@ -74,9 +74,9 @@ async def test_send_with_invalid_policy_async(invalid_policy, connstr_receivers) @pytest.mark.asyncio async def test_receive_with_invalid_policy_async(invalid_policy): client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) - sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender.send(EventData("test data")) + await receiver.receive(timeout=3) @pytest.mark.liveTest