diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index 4e87eab6dbdf..e235266b4a67 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -2,6 +2,9 @@ ## 5.1.0b2 (Unreleased) +**New Features** + +- `EventHubProducerClient.send_batch` accepts either an `EventDataBatch` or a finite list of `EventData`. #9181 ## 5.1.0b1 (2020-04-06) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index 74491e4dff99..975cd177ccb3 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -374,6 +374,15 @@ def _from_batch(cls, batch_data, partition_key=None): ) return batch_data_instance + def _load_events(self, events): + for event_data in events: + try: + self.add(event_data) + except ValueError: + raise ValueError("The combined size of EventData collection exceeds the Event Hub frame size limit. " + "Please send a smaller collection of EventData, or use EventDataBatch, " + "which is guaranteed to be under the frame size limit") + @property def size_in_bytes(self): # type: () -> int diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 0a30fba39a18..bce3efb9d67a 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -6,13 +6,14 @@ import threading from typing import Any, Union, TYPE_CHECKING, Dict, List, Optional, cast + from uamqp import constants from .exceptions import ConnectError, EventHubError from ._client_base import ClientBase from ._producer import EventHubProducer from ._constants import ALL_PARTITIONS -from ._common import EventDataBatch +from ._common import EventDataBatch, EventData if TYPE_CHECKING: from azure.core.credentials import TokenCredential @@ -183,13 +184,28 @@ def from_connection_string(cls, conn_str, **kwargs): return cls(**constructor_args) def send_batch(self, event_data_batch, **kwargs): - # type: (EventDataBatch, Any) -> None + # type: (Union[EventDataBatch, List[EventData]], Any) -> None """Sends event data and blocks until acknowledgement is received or operation times out. - :param event_data_batch: The EventDataBatch object to be sent. - :type event_data_batch: ~azure.eventhub.EventDataBatch + If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub + frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch` + to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit, + and then call this method to send out the batch. + + :param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent + in a batch. + :type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]] :keyword float timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. + :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service + will assign to all partitions using round-robin. + A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because + `EventDataBatch` itself has partition_id. + :keyword str partition_key: With the given partition_key, event data will be sent to + a particular partition of the Event Hub decided by the service. + A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because + `EventDataBatch` itself has partition_key. + If both partition_id and partition_key is provided, the partition_id will take precedence. :rtype: None :raises: :class:`AuthenticationError` :class:`ConnectError` @@ -197,6 +213,8 @@ def send_batch(self, event_data_batch, **kwargs): :class:`EventDataError` :class:`EventDataSendError` :class:`EventHubError` + :class:`ValueError` + :class:`TypeError` .. admonition:: Example: @@ -208,18 +226,28 @@ def send_batch(self, event_data_batch, **kwargs): :caption: Sends event data """ + partition_id = kwargs.get("partition_id") + partition_key = kwargs.get("partition_key") + if isinstance(event_data_batch, EventDataBatch): + if partition_id or partition_key: + raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch " + "because type EventDataBatch itself may have partition_id or partition_key") + to_send_batch = event_data_batch + else: + to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key) + to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access partition_id = ( - event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access + to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access ) send_timeout = kwargs.pop("timeout", None) try: cast(EventHubProducer, self._producers[partition_id]).send( - event_data_batch, timeout=send_timeout + to_send_batch, timeout=send_timeout ) except (KeyError, AttributeError, EventHubError): self._start_producer(partition_id, send_timeout) cast(EventHubProducer, self._producers[partition_id]).send( - event_data_batch, timeout=send_timeout + to_send_batch, timeout=send_timeout ) def create_batch(self, **kwargs): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index 9d69d52ea18f..b5eb6ba6fb71 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -12,7 +12,7 @@ from ._client_base_async import ClientBaseAsync from ._producer_async import EventHubProducer from .._constants import ALL_PARTITIONS -from .._common import EventDataBatch +from .._common import EventDataBatch, EventData if TYPE_CHECKING: from uamqp.constants import TransportType @@ -211,16 +211,32 @@ def from_connection_string( async def send_batch( self, - event_data_batch: EventDataBatch, + event_data_batch: Union[EventDataBatch, List[EventData]], *, - timeout: Optional[Union[int, float]] = None + timeout: Optional[Union[int, float]] = None, + **kwargs ) -> None: """Sends event data and blocks until acknowledgement is received or operation times out. - :param event_data_batch: The EventDataBatch object to be sent. - :type event_data_batch: ~azure.eventhub.EventDataBatch - :param float timeout: The maximum wait time to send the event data. + If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub + frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch` + to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit, + and then call this method to send out the batch. + + :param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent + in a batch. + :type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]] + :keyword float timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. + :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service + will assign to all partitions using round-robin. + A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because + `EventDataBatch` itself has partition_id. + :keyword str partition_key: With the given partition_key, event data will be sent to + a particular partition of the Event Hub decided by the service. + A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because + `EventDataBatch` itself has partition_key. + If both partition_id and partition_key is provided, the partition_id will take precedence. :rtype: None :raises: :class:`AuthenticationError` :class:`ConnectError` @@ -228,6 +244,8 @@ async def send_batch( :class:`EventDataError` :class:`EventDataSendError` :class:`EventHubError` + :class:`ValueError` + :class:`TypeError` .. admonition:: Example: @@ -239,17 +257,28 @@ async def send_batch( :caption: Asynchronously sends event data """ + partition_id = kwargs.get("partition_id") + partition_key = kwargs.get("partition_key") + if isinstance(event_data_batch, EventDataBatch): + if partition_id or partition_key: + raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch " + "because type EventDataBatch itself may have partition_id or partition_key") + to_send_batch = event_data_batch + else: + to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key) + to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access + partition_id = ( - event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access + to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access ) try: await cast(EventHubProducer, self._producers[partition_id]).send( - event_data_batch, timeout=timeout + to_send_batch, timeout=timeout ) except (KeyError, AttributeError, EventHubError): await self._start_producer(partition_id, timeout) await cast(EventHubProducer, self._producers[partition_id]).send( - event_data_batch, timeout=timeout + to_send_batch, timeout=timeout ) async def create_batch( diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py index a8d104a96188..d69f255861ef 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py @@ -16,6 +16,7 @@ import os from azure.eventhub.aio import EventHubProducerClient +from azure.eventhub.exceptions import EventHubError from azure.eventhub import EventData CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] @@ -70,6 +71,22 @@ async def send_event_data_batch_with_properties(producer): await producer.send_batch(event_data_batch) +async def send_event_data_list(producer): + # If you know beforehand that the list of events you have will not exceed the + # size limits, you can use the `send_batch()` api directly without creating an EventDataBatch + + # Without specifying partition_id or partition_key + # the events will be distributed to available partitions via round-robin. + + event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)] + try: + await producer.send_batch(event_data_list) + except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. + print("Size of the event data list exceeds the size limit of a single send") + except EventHubError as eh_err: + print("Sending error: ", eh_err) + + async def run(): producer = EventHubProducerClient.from_connection_string( @@ -82,6 +99,7 @@ async def run(): await send_event_data_batch_with_partition_key(producer) await send_event_data_batch_with_partition_id(producer) await send_event_data_batch_with_properties(producer) + await send_event_data_list(producer) loop = asyncio.get_event_loop() diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/send.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/send.py index c949ddfaec0e..0d4fd467a3a0 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/send.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/send.py @@ -14,7 +14,7 @@ import time import os from azure.eventhub import EventHubProducerClient, EventData - +from azure.eventhub.exceptions import EventHubError CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] @@ -68,6 +68,22 @@ def send_event_data_batch_with_properties(producer): producer.send_batch(event_data_batch) +def send_event_data_list(producer): + # If you know beforehand that the list of events you have will not exceed the + # size limits, you can use the `send_batch()` api directly without creating an EventDataBatch + + # Without specifying partition_id or partition_key + # the events will be distributed to available partitions via round-robin. + + event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)] + try: + producer.send_batch(event_data_list) + except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. + print("Size of the event data list exceeds the size limit of a single send") + except EventHubError as eh_err: + print("Sending error: ", eh_err) + + producer = EventHubProducerClient.from_connection_string( conn_str=CONNECTION_STR, eventhub_name=EVENTHUB_NAME @@ -80,5 +96,6 @@ def send_event_data_batch_with_properties(producer): send_event_data_batch_with_partition_key(producer) send_event_data_batch_with_partition_id(producer) send_event_data_batch_with_properties(producer) + send_event_data_list(producer) print("Send messages in {} seconds.".format(time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py index 57143c92fa35..851eec7e30a5 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py @@ -11,9 +11,9 @@ import time import json -from azure.eventhub import EventData, TransportType +from azure.eventhub import EventData, TransportType, EventDataBatch from azure.eventhub.aio import EventHubProducerClient - +from azure.eventhub.exceptions import EventDataSendError @pytest.mark.liveTest @pytest.mark.asyncio @@ -169,3 +169,57 @@ async def test_send_with_create_event_batch_async(connstr_receivers): received.extend(r.receive_message_batch(timeout=10000)) assert len(received) >= 1 assert EventData._from_message(received[0]).properties[b"raw_prop"] == b"raw_value" + + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_list_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + payload = "A1" + async with client: + await client.send_batch([EventData(payload)]) + received = [] + for r in receivers: + received.extend([EventData._from_message(x) for x in r.receive_message_batch(timeout=10000)]) + + assert len(received) == 1 + assert received[0].body_as_str() == payload + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_list_partition_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + payload = "A1" + async with client: + await client.send_batch([EventData(payload)], partition_id="0") + message = receivers[0].receive_message_batch(timeout=10000)[0] + received = EventData._from_message(message) + assert received.body_as_str() == payload + + +@pytest.mark.parametrize("to_send, exception_type", + [([], EventDataSendError), + ([EventData("A"*1024)]*1100, ValueError), + ("any str", AttributeError) + ]) +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_list_wrong_data_async(connection_str, to_send, exception_type): + client = EventHubProducerClient.from_connection_string(connection_str) + async with client: + with pytest.raises(exception_type): + await client.send_batch(to_send) + + +@pytest.mark.parametrize("partition_id, partition_key", [("0", None), (None, "pk")]) +async def test_send_batch_pid_pk_async(invalid_hostname, partition_id, partition_key): + # Use invalid_hostname because this is not a live test. + client = EventHubProducerClient.from_connection_string(invalid_hostname) + batch = EventDataBatch(partition_id=partition_id, partition_key=partition_key) + async with client: + with pytest.raises(TypeError): + await client.send_batch(batch, partition_id=partition_id, partition_key=partition_key) diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py index 69ea1cb44aa9..d3790df006d5 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py @@ -10,9 +10,9 @@ import json import sys -from azure.eventhub import EventData, TransportType +from azure.eventhub import EventData, TransportType, EventDataBatch from azure.eventhub import EventHubProducerClient - +from azure.eventhub.exceptions import EventDataSendError @pytest.mark.liveTest def test_send_with_partition_key(connstr_receivers): @@ -179,3 +179,53 @@ def test_send_with_create_event_batch_with_app_prop_sync(connstr_receivers): received.extend(r.receive_message_batch(timeout=5000)) assert len(received) >= 1 assert EventData._from_message(received[0]).properties[b"raw_prop"] == b"raw_value" + + +@pytest.mark.liveTest +def test_send_list(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + payload = "A1" + with client: + client.send_batch([EventData(payload)]) + received = [] + for r in receivers: + received.extend([EventData._from_message(x) for x in r.receive_message_batch(timeout=10000)]) + + assert len(received) == 1 + assert received[0].body_as_str() == payload + + +@pytest.mark.liveTest +def test_send_list_partition(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + payload = "A1" + with client: + client.send_batch([EventData(payload)], partition_id="0") + message = receivers[0].receive_message_batch(timeout=10000)[0] + received = EventData._from_message(message) + assert received.body_as_str() == payload + + +@pytest.mark.parametrize("to_send, exception_type", + [([], EventDataSendError), + ([EventData("A"*1024)]*1100, ValueError), + ("any str", AttributeError) + ]) +@pytest.mark.liveTest +def test_send_list_wrong_data(connection_str, to_send, exception_type): + client = EventHubProducerClient.from_connection_string(connection_str) + with client: + with pytest.raises(exception_type): + client.send_batch(to_send) + + +@pytest.mark.parametrize("partition_id, partition_key", [("0", None), (None, "pk")]) +def test_send_batch_pid_pk(invalid_hostname, partition_id, partition_key): + # Use invalid_hostname because this is not a live test. + client = EventHubProducerClient.from_connection_string(invalid_hostname) + batch = EventDataBatch(partition_id=partition_id, partition_key=partition_key) + with client: + with pytest.raises(TypeError): + client.send_batch(batch, partition_id=partition_id, partition_key=partition_key)