diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 616107e86125..c4faad253c16 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -5,7 +5,7 @@ __version__ = "5.0.0b1" -from azure.eventhub.common import EventData, EventPosition +from azure.eventhub.common import EventData, EventDataBatch, EventPosition from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \ AuthenticationError, EventDataSendError, ConnectionLostError from azure.eventhub.client import EventHubClient @@ -18,6 +18,7 @@ __all__ = [ "EventData", + "EventDataBatch", "EventHubError", "ConnectError", "ConnectionLostError", diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 6cf020176d96..38f4586ce5a6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -19,15 +19,15 @@ class EventHubConsumer(object): """ A consumer responsible for reading EventData from a specific Event Hub - partition and as a member of a specific consumer group. + partition and as a member of a specific consumer group. A consumer may be exclusive, which asserts ownership over the partition for the consumer - group to ensure that only one consumer from that group is reading the from the partition. - These exclusive consumers are sometimes referred to as "Epoch Consumers." + group to ensure that only one consumer from that group is reading the from the partition. + These exclusive consumers are sometimes referred to as "Epoch Consumers." A consumer may also be non-exclusive, allowing multiple consumers from the same consumer - group to be actively reading events from the partition. These non-exclusive consumers are - sometimes referred to as "Non-Epoch Consumers." + group to be actively reading events from the partition. These non-exclusive consumers are + sometimes referred to as "Non-Epoch Consumers." """ timeout = 0 @@ -38,7 +38,7 @@ def __init__( # pylint: disable=super-init-not-called keep_alive=None, auto_reconnect=True, loop=None): """ Instantiate an async consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index aef8dc50ff02..5a893f308724 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -10,7 +10,7 @@ from uamqp import constants, errors, compat from uamqp import SendClientAsync -from azure.eventhub.common import EventData, _BatchSendEventData +from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler @@ -20,9 +20,9 @@ class EventHubProducer(object): """ A producer responsible for transmitting EventData to a specific Event Hub, - grouped together in batches. Depending on the options specified at creation, the producer may - be created to allow event data to be automatically routed to an available partition or specific - to a partition. + grouped together in batches. Depending on the options specified at creation, the producer may + be created to allow event data to be automatically routed to an available partition or specific + to a partition. """ @@ -31,7 +31,7 @@ def __init__( # pylint: disable=super-init-not-called keep_alive=None, auto_reconnect=True, loop=None): """ Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` - method in EventHubClient. + method in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync @@ -52,6 +52,7 @@ def __init__( # pylint: disable=super-init-not-called :param loop: An event loop. If not specified the default event loop will be used. """ self.loop = loop or asyncio.get_event_loop() + self._max_message_size_on_link = None self.running = False self.client = client self.target = target @@ -110,6 +111,10 @@ async def _open(self): await self._connect() self.running = True + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ + self._handler.message_handler._link.peer_max_message_size\ + else constants.MAX_MESSAGE_LENGTH_BYTES + async def _connect(self): connected = await self._build_connection() if not connected: @@ -301,6 +306,23 @@ def _set_partition_key(event_datas, partition_key): ed._set_partition_key(partition_key) yield ed + async def create_batch(self, max_message_size=None, partition_key=None): + """ + Create an EventDataBatch object with max message size being max_message_size. + The max_message_size should be no greater than the max allowed message size defined by the service side. + :param max_message_size: + :param partition_key: + :return: + """ + if not self._max_message_size_on_link: + await self._open() + + if max_message_size and max_message_size > self._max_message_size_on_link: + raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_message_size, self._max_message_size_on_link)) + + return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) + async def send(self, event_data, partition_key=None): # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None """ @@ -329,13 +351,15 @@ async def send(self, event_data, partition_key=None): self._check_closed() if isinstance(event_data, EventData): if partition_key: - event_data._set_partition_key(partition_key) + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: - event_data_with_pk = self._set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData( - event_data_with_pk, - partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) + if isinstance(event_data, EventDataBatch): + wrapper_event_data = event_data + else: + if partition_key: + event_data = self._set_partition_key(event_data, partition_key) + wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] await self._send_event_data() @@ -373,4 +397,4 @@ async def close(self, exception=None): self.error = EventHubError(str(exception)) else: self.error = EventHubError("This send handler is now closed.") - await self._handler.close_async() \ No newline at end of file + await self._handler.close_async() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 5a6702a60324..24138bb2b6cb 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -9,9 +9,13 @@ import json import six -from uamqp import BatchMessage, Message, types +from azure.eventhub.error import EventDataError +from uamqp import BatchMessage, Message, types, constants, errors from uamqp.message import MessageHeader, MessageProperties +# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each +_BATCH_MESSAGE_OVERHEAD_COST = [5, 8] + def parse_sas_token(sas_token): """Parse a SAS token into its components. @@ -244,10 +248,25 @@ def encode_message(self): return self.message.encode_message() -class _BatchSendEventData(EventData): - def __init__(self, batch_event_data, partition_key=None): - self.message = BatchMessage(data=batch_event_data, multi_messages=False, properties=None) +class EventDataBatch(object): + """ + The EventDataBatch class is a holder of a batch of event date within max message size bytes. + Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. + Do not instantiate an EventDataBatch object directly. + """ + def __init__(self, max_message_size=None, partition_key=None): + self.max_message_size = max_message_size if max_message_size else constants.MAX_MESSAGE_LENGTH_BYTES + self._partition_key = partition_key + self.message = BatchMessage(data=[], multi_messages=False, properties=None) + self._set_partition_key(partition_key) + self._size = self.message.gather()[0].get_message_encoded_size() + + @staticmethod + def _from_batch(batch_data, partition_key=None): + batch_data_instance = EventDataBatch(partition_key=partition_key) + batch_data_instance.message._body_gen = batch_data + return batch_data_instance def _set_partition_key(self, value): if value: @@ -260,6 +279,35 @@ def _set_partition_key(self, value): self.message.annotations = annotations self.message.header = header + def try_add(self, event_data): + """ + The message size is a sum up of body, properties, header, etc. + :param event_data: + :return: + """ + if not isinstance(event_data, EventData): + raise EventDataError('event_data should be type of EventData') + + if self._partition_key: + if event_data.partition_key and not (event_data.partition_key == self._partition_key): + raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch') + if not event_data.partition_key: + event_data._set_partition_key(self._partition_key) + + event_data_size = event_data.message.get_message_encoded_size() + + # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that + # message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes. + size_after_add = self._size + event_data_size\ + + _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1] + + if size_after_add > self.max_message_size: + return False + + self.message._body_gen.append(event_data) # pylint: disable=protected-access + self._size = size_after_add + return True + class EventPosition(object): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 856c77d6fb65..08db5118e925 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -23,15 +23,15 @@ class EventHubConsumer(object): """ A consumer responsible for reading EventData from a specific Event Hub - partition and as a member of a specific consumer group. + partition and as a member of a specific consumer group. A consumer may be exclusive, which asserts ownership over the partition for the consumer - group to ensure that only one consumer from that group is reading the from the partition. - These exclusive consumers are sometimes referred to as "Epoch Consumers." + group to ensure that only one consumer from that group is reading the from the partition. + These exclusive consumers are sometimes referred to as "Epoch Consumers." A consumer may also be non-exclusive, allowing multiple consumers from the same consumer - group to be actively reading events from the partition. These non-exclusive consumers are - sometimes referred to as "Non-Epoch Consumers." + group to be actively reading events from the partition. These non-exclusive consumers are + sometimes referred to as "Non-Epoch Consumers." """ timeout = 0 @@ -41,7 +41,7 @@ def __init__(self, client, source, event_position=None, prefetch=300, owner_leve keep_alive=None, auto_reconnect=True): """ Instantiate a consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 3f95b7be08c3..4317b993ceb1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -13,7 +13,7 @@ from uamqp import compat from uamqp import SendClient -from azure.eventhub.common import EventData, _BatchSendEventData +from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler @@ -23,16 +23,16 @@ class EventHubProducer(object): """ A producer responsible for transmitting EventData to a specific Event Hub, - grouped together in batches. Depending on the options specified at creation, the producer may - be created to allow event data to be automatically routed to an available partition or specific - to a partition. + grouped together in batches. Depending on the options specified at creation, the producer may + be created to allow event data to be automatically routed to an available partition or specific + to a partition. """ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True): """ Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient. @@ -51,6 +51,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N Default value is `True`. :type auto_reconnect: bool """ + self._max_message_size_on_link = None self.running = False self.client = client self.target = target @@ -109,6 +110,10 @@ def _open(self): self._connect() self.running = True + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ + self._handler.message_handler._link.peer_max_message_size\ + else constants.MAX_MESSAGE_LENGTH_BYTES + def _connect(self): connected = self._build_connection() if not connected: @@ -298,6 +303,23 @@ def _error(outcome, condition): if outcome != constants.MessageSendResult.Ok: raise condition + def create_batch(self, max_message_size=None, partition_key=None): + """ + Create an EventDataBatch object with max message size being max_message_size. + The max_message_size should be no greater than the max allowed message size defined by the service side. + :param max_message_size: + :param partition_key: + :return: + """ + if not self._max_message_size_on_link: + self._open() + + if max_message_size and max_message_size > self._max_message_size_on_link: + raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_message_size, self._max_message_size_on_link)) + + return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) + def send(self, event_data, partition_key=None): # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None """ @@ -307,7 +329,8 @@ def send(self, event_data, partition_key=None): :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to - a particular partition of the Event Hub decided by the service. + a particular partition of the Event Hub decided by the service. partition_key + will be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError @@ -327,13 +350,15 @@ def send(self, event_data, partition_key=None): self._check_closed() if isinstance(event_data, EventData): if partition_key: - event_data._set_partition_key(partition_key) + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: - event_data_with_pk = self._set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData( - event_data_with_pk, - partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) + if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. + wrapper_event_data = event_data + else: + if partition_key: + event_data = self._set_partition_key(event_data, partition_key) + wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] self._send_event_data()