Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 5.1.0b2 (Unreleased)

**New Features**

- `EventHubProducerClient.send_batch` accepts either an `EventDataBatch` or a finite list of `EventData`. #9181

## 5.1.0b1 (2020-04-06)

Expand Down
9 changes: 9 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,15 @@ def _from_batch(cls, batch_data, partition_key=None):
)
return batch_data_instance

def _load_events(self, events):
for event_data in events:
try:
self.add(event_data)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we validate event_data type here, or is EH flexible/do we get a sane error later? (Thinking in terms of in SB where if someone tries to add a string where it's enumerable but not a valid Message type)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user will finally get an AttributeError because some code will finally call event_data.message.

except ValueError:
raise ValueError("The combined size of EventData collection exceeds the Event Hub frame size limit. "
"Please send a smaller collection of EventData, or use EventDataBatch, "
"which is guaranteed to be under the frame size limit")

@property
def size_in_bytes(self):
# type: () -> int
Expand Down
42 changes: 35 additions & 7 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import threading

from typing import Any, Union, TYPE_CHECKING, Dict, List, Optional, cast

from uamqp import constants

from .exceptions import ConnectError, EventHubError
from ._client_base import ClientBase
from ._producer import EventHubProducer
from ._constants import ALL_PARTITIONS
from ._common import EventDataBatch
from ._common import EventDataBatch, EventData

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential
Expand Down Expand Up @@ -183,20 +184,37 @@ def from_connection_string(cls, conn_str, **kwargs):
return cls(**constructor_args)

def send_batch(self, event_data_batch, **kwargs):
# type: (EventDataBatch, Any) -> None
# type: (Union[EventDataBatch, List[EventData]], Any) -> None
"""Sends event data and blocks until acknowledgement is received or operation times out.

:param event_data_batch: The EventDataBatch object to be sent.
:type event_data_batch: ~azure.eventhub.EventDataBatch
If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub
frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch`
to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit,
and then call this method to send out the batch.

:param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent
in a batch.
:type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]]
:keyword float timeout: The maximum wait time to send the event data.
If not specified, the default wait time specified when the producer was created will be used.
:keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_id.
:keyword str partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_key.
If both partition_id and partition_key is provided, the partition_id will take precedence.
:rtype: None
:raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>`
:class:`ConnectError<azure.eventhub.exceptions.ConnectError>`
:class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>`
:class:`EventDataError<azure.eventhub.exceptions.EventDataError>`
:class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>`
:class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
:class:`ValueError`
:class:`TypeError`

.. admonition:: Example:

Expand All @@ -208,18 +226,28 @@ def send_batch(self, event_data_batch, **kwargs):
:caption: Sends event data

"""
partition_id = kwargs.get("partition_id")
partition_key = kwargs.get("partition_key")
if isinstance(event_data_batch, EventDataBatch):
if partition_id or partition_key:
raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why we raise but not ignoring them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pk and pid are important information when sending an event data. It's better let them know if they provide wrong data.

"because type EventDataBatch itself may have partition_id or partition_key")
to_send_batch = event_data_batch
else:
to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key)
to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access
partition_id = (
event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
)
send_timeout = kwargs.pop("timeout", None)
try:
cast(EventHubProducer, self._producers[partition_id]).send(
event_data_batch, timeout=send_timeout
to_send_batch, timeout=send_timeout
)
except (KeyError, AttributeError, EventHubError):
self._start_producer(partition_id, send_timeout)
cast(EventHubProducer, self._producers[partition_id]).send(
event_data_batch, timeout=send_timeout
to_send_batch, timeout=send_timeout
)

def create_batch(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ._client_base_async import ClientBaseAsync
from ._producer_async import EventHubProducer
from .._constants import ALL_PARTITIONS
from .._common import EventDataBatch
from .._common import EventDataBatch, EventData

if TYPE_CHECKING:
from uamqp.constants import TransportType
Expand Down Expand Up @@ -211,23 +211,41 @@ def from_connection_string(

async def send_batch(
self,
event_data_batch: EventDataBatch,
event_data_batch: Union[EventDataBatch, List[EventData]],
*,
timeout: Optional[Union[int, float]] = None
timeout: Optional[Union[int, float]] = None,
**kwargs
) -> None:
"""Sends event data and blocks until acknowledgement is received or operation times out.

:param event_data_batch: The EventDataBatch object to be sent.
:type event_data_batch: ~azure.eventhub.EventDataBatch
:param float timeout: The maximum wait time to send the event data.
If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub
frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch`
to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit,
and then call this method to send out the batch.

:param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent
in a batch.
:type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]]
:keyword float timeout: The maximum wait time to send the event data.
If not specified, the default wait time specified when the producer was created will be used.
:keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_id.
:keyword str partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is TypeError right here? Seems more like a ValueError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python None is of NoneType. So I think it's understandable to have a TypeError.
I like TypeError more than ValueError here because I don't want to mix this with the too large list of events, which raises a ValueError.

`EventDataBatch` itself has partition_key.
If both partition_id and partition_key is provided, the partition_id will take precedence.
:rtype: None
:raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>`
:class:`ConnectError<azure.eventhub.exceptions.ConnectError>`
:class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>`
:class:`EventDataError<azure.eventhub.exceptions.EventDataError>`
:class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>`
:class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
:class:`ValueError`
:class:`TypeError`

.. admonition:: Example:

Expand All @@ -239,17 +257,28 @@ async def send_batch(
:caption: Asynchronously sends event data

"""
partition_id = kwargs.get("partition_id")
partition_key = kwargs.get("partition_key")
if isinstance(event_data_batch, EventDataBatch):
if partition_id or partition_key:
raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch "
"because type EventDataBatch itself may have partition_id or partition_key")
to_send_batch = event_data_batch
else:
to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key)
to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access

partition_id = (
event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
)
try:
await cast(EventHubProducer, self._producers[partition_id]).send(
event_data_batch, timeout=timeout
to_send_batch, timeout=timeout
)
except (KeyError, AttributeError, EventHubError):
await self._start_producer(partition_id, timeout)
await cast(EventHubProducer, self._producers[partition_id]).send(
event_data_batch, timeout=timeout
to_send_batch, timeout=timeout
)

async def create_batch(
Expand Down
18 changes: 18 additions & 0 deletions sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os

from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData

CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
Expand Down Expand Up @@ -70,6 +71,22 @@ async def send_event_data_batch_with_properties(producer):
await producer.send_batch(event_data_batch)


async def send_event_data_list(producer):
# If you know beforehand that the list of events you have will not exceed the
# size limits, you can use the `send_batch()` api directly without creating an EventDataBatch

# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.

event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
try:
await producer.send_batch(event_data_list)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
print("Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
print("Sending error: ", eh_err)


async def run():

producer = EventHubProducerClient.from_connection_string(
Expand All @@ -82,6 +99,7 @@ async def run():
await send_event_data_batch_with_partition_key(producer)
await send_event_data_batch_with_partition_id(producer)
await send_event_data_batch_with_properties(producer)
await send_event_data_list(producer)


loop = asyncio.get_event_loop()
Expand Down
19 changes: 18 additions & 1 deletion sdk/eventhub/azure-eventhub/samples/sync_samples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import time
import os
from azure.eventhub import EventHubProducerClient, EventData

from azure.eventhub.exceptions import EventHubError

CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
Expand Down Expand Up @@ -68,6 +68,22 @@ def send_event_data_batch_with_properties(producer):
producer.send_batch(event_data_batch)


def send_event_data_list(producer):
# If you know beforehand that the list of events you have will not exceed the
# size limits, you can use the `send_batch()` api directly without creating an EventDataBatch

# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.

event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
try:
producer.send_batch(event_data_list)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
print("Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
print("Sending error: ", eh_err)


producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
Expand All @@ -80,5 +96,6 @@ def send_event_data_batch_with_properties(producer):
send_event_data_batch_with_partition_key(producer)
send_event_data_batch_with_partition_id(producer)
send_event_data_batch_with_properties(producer)
send_event_data_list(producer)

print("Send messages in {} seconds.".format(time.time() - start_time))
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import time
import json

from azure.eventhub import EventData, TransportType
from azure.eventhub import EventData, TransportType, EventDataBatch
from azure.eventhub.aio import EventHubProducerClient

from azure.eventhub.exceptions import EventDataSendError

@pytest.mark.liveTest
@pytest.mark.asyncio
Expand Down Expand Up @@ -169,3 +169,57 @@ async def test_send_with_create_event_batch_async(connstr_receivers):
received.extend(r.receive_message_batch(timeout=10000))
assert len(received) >= 1
assert EventData._from_message(received[0]).properties[b"raw_prop"] == b"raw_value"



@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_list_async(connstr_receivers):
connection_str, receivers = connstr_receivers
client = EventHubProducerClient.from_connection_string(connection_str)
payload = "A1"
async with client:
await client.send_batch([EventData(payload)])
received = []
for r in receivers:
received.extend([EventData._from_message(x) for x in r.receive_message_batch(timeout=10000)])

assert len(received) == 1
assert received[0].body_as_str() == payload


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_list_partition_async(connstr_receivers):
connection_str, receivers = connstr_receivers
client = EventHubProducerClient.from_connection_string(connection_str)
payload = "A1"
async with client:
await client.send_batch([EventData(payload)], partition_id="0")
message = receivers[0].receive_message_batch(timeout=10000)[0]
received = EventData._from_message(message)
assert received.body_as_str() == payload


@pytest.mark.parametrize("to_send, exception_type",
[([], EventDataSendError),
([EventData("A"*1024)]*1100, ValueError),
("any str", AttributeError)
])
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_list_wrong_data_async(connection_str, to_send, exception_type):
client = EventHubProducerClient.from_connection_string(connection_str)
async with client:
with pytest.raises(exception_type):
await client.send_batch(to_send)


@pytest.mark.parametrize("partition_id, partition_key", [("0", None), (None, "pk")])
async def test_send_batch_pid_pk_async(invalid_hostname, partition_id, partition_key):
# Use invalid_hostname because this is not a live test.
client = EventHubProducerClient.from_connection_string(invalid_hostname)
batch = EventDataBatch(partition_id=partition_id, partition_key=partition_key)
async with client:
with pytest.raises(TypeError):
await client.send_batch(batch, partition_id=partition_id, partition_key=partition_key)
Loading