diff --git a/sdk/eventhub/azure-eventhubs/README.md b/sdk/eventhub/azure-eventhubs/README.md index 86ced0c53aa1..1b57eb34fa93 100644 --- a/sdk/eventhub/azure-eventhubs/README.md +++ b/sdk/eventhub/azure-eventhubs/README.md @@ -145,16 +145,11 @@ connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) -try: - event_list = [] - for i in range(10): - event_list.append(EventData(b"A single event")) - with client: - client.send(event_list) -except: - raise -finally: - pass +event_list = [] +for i in range(10): + event_list.append(EventData(b"A single event")) +with client: + client.send(event_list) ``` #### Send a batch of events @@ -164,25 +159,20 @@ Events may be added to the `EventDataBatch` using the `try_add` method until the ```python from azure.eventhub import EventHubProducerClient, EventData -try: - connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' - event_hub_path = '<< NAME OF THE EVENT HUB >>' - client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) +connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' +event_hub_path = '<< NAME OF THE EVENT HUB >>' +client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) - event_data_batch = client.create_batch(max_size=10000) - can_add = True - while can_add: - try: - event_data_batch.try_add(EventData('Message inside EventBatchData')) - except ValueError: - can_add = False # EventDataBatch object reaches max_size. +event_data_batch = client.create_batch(max_size=10000) +can_add = True +while can_add: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + can_add = False # EventDataBatch object reaches max_size. - with client: - client.send(event_data_batch) -except: - raise -finally: - pass +with client: + client.send(event_data_batch) ``` ### Consume events from an Event Hub @@ -202,15 +192,10 @@ logger = logging.getLogger("azure.eventhub") def on_events(partition_context, events): logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id)) -try: - with client: - client.receive(on_events=on_events, consumer_group="$Default") - # receive events from specified partition: - # client.receive(on_events=on_events, consumer_group="$Default", partition_id='0') -except: - raise -finally: - pass +with client: + client.receive(on_events=on_events, consumer_group="$Default") + # receive events from specified partition: + # client.receive(on_events=on_events, consumer_group="$Default", partition_id='0') ``` ### Async publish events to an Event Hub @@ -219,25 +204,26 @@ Publish events to an Event Hub asynchronously. #### Send a single event or an array of events ```python +import asyncio from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) -try: - event_list = [] - for i in range(10): - event_list.append(EventData(b"A single event")) - - async with client: - await client.send(event_list) # Send a list of events - await client.send(EventData(b"A single event")) # Send a single event -except: - raise -finally: - pass +event_list = [] +for i in range(10): + event_list.append(EventData(b"A single event")) + +async def send(): + client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) + async with client: + await client.send(event_list) # Send a list of events + await client.send(EventData(b"A single event")) # Send a single event + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(send()) ``` #### Send a batch of events @@ -245,14 +231,14 @@ finally: Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached. ```python +import asyncio from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData -try: - connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' - event_hub_path = '<< NAME OF THE EVENT HUB >>' - client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) +connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' +event_hub_path = '<< NAME OF THE EVENT HUB >>' +async def create_batch(client): event_data_batch = await client.create_batch(max_size=10000) can_add = True while can_add: @@ -260,13 +246,17 @@ try: event_data_batch.try_add(EventData('Message inside EventBatchData')) except ValueError: can_add = False # EventDataBatch object reaches max_size. + return event_data_batch +async def send(): + client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) + batch_data = await create_batch(client) async with client: - await client.send(event_data_batch) -except: - raise -finally: - pass + await client.send(batch_data) + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(send()) ``` ### Async consume events from an Event Hub @@ -275,26 +265,27 @@ Consume events asynchronously from an EventHub. ```python import logging +import asyncio from azure.eventhub.aio import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) logger = logging.getLogger("azure.eventhub") async def on_events(partition_context, events): logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id)) -try: +async def receive(): + client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) async with client: received = await client.receive(on_events=on_events, consumer_group='$Default') # receive events from specified partition: # received = await client.receive(on_events=on_events, consumer_group='$Default', partition_id='0') -except: - raise -finally: - pass + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(receive()) ``` ### Consume events using a partition manager @@ -343,14 +334,12 @@ async def process_events(partition_context, events): await asyncio.gather(*[do_operation(event) for event in events]) await partition_context.update_checkpoint(events[-1]) - async def receive(client): try: await client.receive(on_events=process_events, consumer_group="$Default") except KeyboardInterrupt: await client.close() - async def main(): container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str) partition_manager = BlobPartitionManager(container_client) @@ -362,7 +351,6 @@ async def main(): async with client: await receive(client) - if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) @@ -414,7 +402,7 @@ These are [more samples](https://github.com/Azure/azure-sdk-for-python/blob/mast ### Documentation -Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html. +Reference documentation is available at [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html). ### Logging