diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index 3659b7c680c1..3e974c622a45 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -46,7 +46,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def _check_closed(self): if self.error: - raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name)) + raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name)) def _create_handler(self): pass diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 21dc6bb8a012..3027fcfc3287 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -47,7 +47,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def _check_closed(self): if self.error: - raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name)) + raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name)) def _create_handler(self): pass diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 10de6017026b..a231e12eb63c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -163,6 +163,15 @@ async def create_batch(self, max_size=None, partition_key=None): :type partition_key: str :return: an EventDataBatch instance :rtype: ~azure.eventhub.EventDataBatch + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_create_batch] + :end-before: [END eventhub_client_async_create_batch] + :language: python + :dedent: 4 + :caption: Create EventDataBatch object within limited size + """ if not self._max_message_size_on_link: @@ -195,7 +204,7 @@ async def send(self, event_data, *, partition_key=None, timeout=None): :rtype: None Example: - .. literalinclude:: ../examples/test_examples_eventhub.py + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py :start-after: [START eventhub_client_async_send] :end-before: [END eventhub_client_async_send] :language: python diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index f49fc280814e..13ef7744772b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -170,6 +170,15 @@ def create_batch(self, max_size=None, partition_key=None): :type partition_key: str :return: an EventDataBatch instance :rtype: ~azure.eventhub.EventDataBatch + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_sync_create_batch] + :end-before: [END eventhub_client_sync_create_batch] + :language: python + :dedent: 4 + :caption: Create EventDataBatch object within limited size + """ if not self._max_message_size_on_link: diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py index 048f5af1623c..896f2a007b21 100644 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py @@ -23,7 +23,7 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config): os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_SAS_POLICY'], os.environ['EVENT_HUB_SAS_KEY'], - os.environ['EVENT_HUB_NAME']) + os.environ['EVENT_HUB_NAME']) client = EventHubClient.from_connection_string(connection_str) # [END create_eventhub_client_async] @@ -49,6 +49,17 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config): await consumer.receive(timeout=1) + # [START eventhub_client_async_create_batch] + event_data_batch = await producer.create_batch(max_size=10000) + while True: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # The EventDataBatch object reaches its max_size. + # You can send the full EventDataBatch object and create a new one here. + break + # [END eventhub_client_async_create_batch] + # [START eventhub_client_async_send] async with producer: event_data = EventData(b"A single event") diff --git a/sdk/eventhub/azure-eventhubs/examples/event_data_batch.py b/sdk/eventhub/azure-eventhubs/examples/event_data_batch.py new file mode 100644 index 000000000000..3cf6dc88f177 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/examples/event_data_batch.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show creating and sending EventBatchData within limited size. +""" + +# pylint: disable=C0111 + +import logging +import time +import os + +from azure.eventhub import EventHubClient, EventData, EventHubSharedKeyCredential + +import examples +logger = examples.get_logger(logging.INFO) + + +HOSTNAME = os.environ.get('EVENT_HUB_HOSTNAME') # .servicebus.windows.net +EVENT_HUB = os.environ.get('EVENT_HUB_NAME') + +USER = os.environ.get('EVENT_HUB_SAS_POLICY') +KEY = os.environ.get('EVENT_HUB_SAS_KEY') + + +def create_batch_data(producer): + event_data_batch = producer.create_batch(max_size=10000) + while True: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # EventDataBatch object reaches max_size. + # New EventDataBatch object can be created here to send more data + break + return event_data_batch + + +try: + if not HOSTNAME: + raise ValueError("No EventHubs URL supplied.") + + client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), + network_tracing=False) + producer = client.create_producer() + + try: + start_time = time.time() + with producer: + event_data_batch = create_batch_data(producer) + producer.send(event_data_batch) + except: + raise + finally: + end_time = time.time() + run_time = end_time - start_time + logger.info("Runtime: {} seconds".format(run_time)) + +except KeyboardInterrupt: + pass diff --git a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py index d8483dc6c032..04e29d08256c 100644 --- a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py +++ b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py @@ -86,6 +86,17 @@ def test_example_eventhub_sync_send_and_receive(live_eventhub_config): event_data = EventData(body=list_data) # [END create_event_data] + # [START eventhub_client_sync_create_batch] + event_data_batch = producer.create_batch(max_size=10000) + while True: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # The EventDataBatch object reaches its max_size. + # You can send the full EventDataBatch object and create a new one here. + break + # [END eventhub_client_sync_create_batch] + # [START eventhub_client_sync_send] with producer: event_data = EventData(b"A single event")