-
Notifications
You must be signed in to change notification settings - Fork 3.2k
[Event Hubs] EventHubProducerClient.send_batch accepts a list of EventData
#11079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
c92cd14
37e5cb3
fcfac3b
d0a0810
39dd6f3
07ed4ab
087b6d2
d71a94b
d8fcc68
b63c415
52d1c5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,8 @@ | |
| import threading | ||
|
|
||
| from typing import Any, Union, TYPE_CHECKING, Dict, List, Optional, cast | ||
|
|
||
| from azure.eventhub import EventData | ||
| from uamqp import constants | ||
|
|
||
| from .exceptions import ConnectError, EventHubError | ||
|
|
@@ -183,20 +185,36 @@ def from_connection_string(cls, conn_str, **kwargs): | |
| return cls(**constructor_args) | ||
|
|
||
| def send_batch(self, event_data_batch, **kwargs): | ||
| # type: (EventDataBatch, Any) -> None | ||
| # type: (Union[EventDataBatch, List[EventData]], Any) -> None | ||
| """Sends event data and blocks until acknowledgement is received or operation times out. | ||
|
|
||
| :param event_data_batch: The EventDataBatch object to be sent. | ||
| :type event_data_batch: ~azure.eventhub.EventDataBatch | ||
| If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub | ||
| frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch` | ||
| to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit, | ||
| and then call this method to send out the batch. | ||
|
|
||
| :param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent | ||
| in a batch. | ||
| :type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]] | ||
| :keyword float timeout: The maximum wait time to send the event data. | ||
| If not specified, the default wait time specified when the producer was created will be used. | ||
| :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service | ||
| will assign to all partitions using round-robin. | ||
| A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because | ||
| `EventDataBatch` itself has partition_id. | ||
| :keyword str partition_key: With the given partition_key, event data will be sent to | ||
| a particular partition of the Event Hub decided by the service. | ||
| A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because | ||
| `EventDataBatch` itself has partition_key. | ||
| If both partition_id and partition_key is provided, the partition_id will take precedence. | ||
| :rtype: None | ||
| :raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>` | ||
| :class:`ConnectError<azure.eventhub.exceptions.ConnectError>` | ||
| :class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>` | ||
| :class:`EventDataError<azure.eventhub.exceptions.EventDataError>` | ||
| :class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>` | ||
| :class:`EventHubError<azure.eventhub.exceptions.EventHubError>` | ||
| `ValueError` | ||
|
|
||
| .. admonition:: Example: | ||
|
|
||
|
|
@@ -208,18 +226,37 @@ def send_batch(self, event_data_batch, **kwargs): | |
| :caption: Sends event data | ||
|
|
||
| """ | ||
| partition_id = kwargs.get("partition_id") | ||
| partition_key = kwargs.get("partition_key") | ||
| if isinstance(event_data_batch, EventDataBatch): | ||
| if partition_id or partition_key: | ||
| raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch " | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious why we raise but not ignoring them?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pk and pid are important information when sending an event data. It's better let them know if they provide wrong data. |
||
| "because type EventDataBatch itself may have partition_id or partition_key") | ||
| to_send_batch = event_data_batch | ||
| elif isinstance(event_data_batch, List): | ||
YijunXieMS marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key) | ||
| for event_data in event_data_batch: | ||
| try: | ||
| to_send_batch.add(event_data) | ||
| except ValueError: | ||
| raise ValueError("The list of EventData exceeds the Event Hub frame size limit. " | ||
| "Please send a smaller list of EventData, or use EventDataBatch, " | ||
| "which is guaranteed to be under the frame size limit") | ||
|
||
| else: | ||
| raise TypeError("event_data_batch must be of type List[EventData] or EventDataBatch") | ||
|
|
||
| partition_id = ( | ||
| event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access | ||
| to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access | ||
| ) | ||
YijunXieMS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| send_timeout = kwargs.pop("timeout", None) | ||
| try: | ||
| cast(EventHubProducer, self._producers[partition_id]).send( | ||
| event_data_batch, timeout=send_timeout | ||
| to_send_batch, timeout=send_timeout | ||
| ) | ||
| except (KeyError, AttributeError, EventHubError): | ||
| self._start_producer(partition_id, send_timeout) | ||
| cast(EventHubProducer, self._producers[partition_id]).send( | ||
| event_data_batch, timeout=send_timeout | ||
| to_send_batch, timeout=send_timeout | ||
| ) | ||
|
|
||
| def create_batch(self, **kwargs): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,7 +12,7 @@ | |
| from ._client_base_async import ClientBaseAsync | ||
| from ._producer_async import EventHubProducer | ||
| from .._constants import ALL_PARTITIONS | ||
| from .._common import EventDataBatch | ||
| from .._common import EventDataBatch, EventData | ||
|
|
||
| if TYPE_CHECKING: | ||
| from uamqp.constants import TransportType | ||
|
|
@@ -211,23 +211,40 @@ def from_connection_string( | |
|
|
||
| async def send_batch( | ||
| self, | ||
| event_data_batch: EventDataBatch, | ||
| event_data_batch: Union[EventDataBatch, List[EventData]], | ||
| *, | ||
| timeout: Optional[Union[int, float]] = None | ||
| timeout: Optional[Union[int, float]] = None, | ||
| **kwargs | ||
| ) -> None: | ||
| """Sends event data and blocks until acknowledgement is received or operation times out. | ||
|
|
||
| :param event_data_batch: The EventDataBatch object to be sent. | ||
| :type event_data_batch: ~azure.eventhub.EventDataBatch | ||
| :param float timeout: The maximum wait time to send the event data. | ||
| If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub | ||
| frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch` | ||
| to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit, | ||
| and then call this method to send out the batch. | ||
|
|
||
| :param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent | ||
| in a batch. | ||
| :type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]] | ||
| :keyword float timeout: The maximum wait time to send the event data. | ||
| If not specified, the default wait time specified when the producer was created will be used. | ||
| :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service | ||
| will assign to all partitions using round-robin. | ||
| A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because | ||
| `EventDataBatch` itself has partition_id. | ||
| :keyword str partition_key: With the given partition_key, event data will be sent to | ||
| a particular partition of the Event Hub decided by the service. | ||
| A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is TypeError right here? Seems more like a ValueError
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Python |
||
| `EventDataBatch` itself has partition_key. | ||
| If both partition_id and partition_key is provided, the partition_id will take precedence. | ||
| :rtype: None | ||
| :raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>` | ||
| :class:`ConnectError<azure.eventhub.exceptions.ConnectError>` | ||
| :class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>` | ||
| :class:`EventDataError<azure.eventhub.exceptions.EventDataError>` | ||
| :class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>` | ||
| :class:`EventHubError<azure.eventhub.exceptions.EventHubError>` | ||
| `ValueError` | ||
YijunXieMS marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| .. admonition:: Example: | ||
|
|
||
|
|
@@ -239,17 +256,36 @@ async def send_batch( | |
| :caption: Asynchronously sends event data | ||
|
|
||
| """ | ||
| partition_id = kwargs.get("partition_id") | ||
| partition_key = kwargs.get("partition_key") | ||
| if isinstance(event_data_batch, EventDataBatch): | ||
| if partition_id or partition_key: | ||
| raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch " | ||
| "because type EventDataBatch itself may have partition_id or partition_key") | ||
| to_send_batch = event_data_batch | ||
| elif isinstance(event_data_batch, List): | ||
| to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key) | ||
| for event_data in event_data_batch: | ||
| try: | ||
| to_send_batch.add(event_data) | ||
| except ValueError: | ||
| raise ValueError("The list of EventData exceeds the Event Hub frame size limit. " | ||
| "Please send a smaller list of EventData, or use EventDataBatch, " | ||
| "which is guaranteed to be under the frame size limit") | ||
YijunXieMS marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| else: | ||
| raise TypeError("event_data_batch must be of type List[EventData] or EventDataBatch") | ||
|
|
||
| partition_id = ( | ||
| event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access | ||
| to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access | ||
| ) | ||
| try: | ||
| await cast(EventHubProducer, self._producers[partition_id]).send( | ||
| event_data_batch, timeout=timeout | ||
| to_send_batch, timeout=timeout | ||
| ) | ||
| except (KeyError, AttributeError, EventHubError): | ||
| await self._start_producer(partition_id, timeout) | ||
| await cast(EventHubProducer, self._producers[partition_id]).send( | ||
| event_data_batch, timeout=timeout | ||
| to_send_batch, timeout=timeout | ||
| ) | ||
|
|
||
| async def create_batch( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.