diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md index a7abbaf01131..89e7ed923518 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md @@ -86,7 +86,7 @@ async def do_operation(events): async def process_events(partition_context, events): await do_operation(events) - partition_context.update_checkpoint(events[-1]) + await partition_context.update_checkpoint(events[-1]) async def main(): storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, storage_container_name) @@ -116,7 +116,7 @@ Refer to [Logging](#logging) to enable loggers for related libraries. ### Documentation -Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html +Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.aio.html#azure.eventhub.aio.PartitionManager) ### Logging diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md index 07cfd57bc247..6090bf128917 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md @@ -114,7 +114,7 @@ Refer to [Logging](#logging) to enable loggers for related libraries. ### Documentation -Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html +Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html#azure.eventhub.PartitionManager) ### Logging diff --git a/sdk/eventhub/azure-eventhubs/README.md b/sdk/eventhub/azure-eventhubs/README.md index 188d845272f2..84cd7ef10e3c 100644 --- a/sdk/eventhub/azure-eventhubs/README.md +++ b/sdk/eventhub/azure-eventhubs/README.md @@ -57,7 +57,7 @@ from azure.eventhub import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -consumer_client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path) +consumer_client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) ``` @@ -128,7 +128,7 @@ from azure.eventhub import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path) +client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) partition_ids = client.get_partition_ids() ``` @@ -143,18 +143,13 @@ from azure.eventhub import EventHubProducerClient, EventData connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path) - -try: - event_list = [] - for i in range(10): - event_list.append(EventData(b"A single event")) - with client: - client.send(event_list) -except: - raise -finally: - pass +client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) + +event_list = [] +for i in range(10): + event_list.append(EventData(b"A single event")) +with client: + client.send(event_list) ``` #### Send a batch of events @@ -164,25 +159,20 @@ Events may be added to the `EventDataBatch` using the `try_add` method until the ```python from azure.eventhub import EventHubProducerClient, EventData -try: - connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' - event_hub_path = '<< NAME OF THE EVENT HUB >>' - client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path) +connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' +event_hub_path = '<< NAME OF THE EVENT HUB >>' +client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) - event_data_batch = client.create_batch(max_size=10000) - can_add = True - while can_add: - try: - event_data_batch.try_add(EventData('Message inside EventBatchData')) - except ValueError: - can_add = False # EventDataBatch object reaches max_size. +event_data_batch = client.create_batch(max_size=10000) +can_add = True +while can_add: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + can_add = False # EventDataBatch object reaches max_size. - with client: - client.send(event_data_batch) -except: - raise -finally: - pass +with client: + client.send(event_data_batch) ``` ### Consume events from an Event Hub @@ -195,22 +185,17 @@ from azure.eventhub import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path) +client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) logger = logging.getLogger("azure.eventhub") def on_events(partition_context, events): logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id)) -try: - with client: - client.receive(on_events=on_events, consumer_group="$Default") - # receive events from specified partition: - # client.receive(on_events=on_events, consumer_group="$Default", partition_id='0') -except: - raise -finally: - pass +with client: + client.receive(on_events=on_events, consumer_group="$Default") + # receive events from specified partition: + # client.receive(on_events=on_events, consumer_group="$Default", partition_id='0') ``` ### Async publish events to an Event Hub @@ -219,25 +204,26 @@ Publish events to an Event Hub asynchronously. #### Send a single event or an array of events ```python +import asyncio from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path) - -try: - event_list = [] - for i in range(10): - event_list.append(EventData(b"A single event")) - - async with client: - await client.send(event_list) # Send a list of events - await client.send(EventData(b"A single event")) # Send a single event -except: - raise -finally: - pass + +event_list = [] +for i in range(10): + event_list.append(EventData(b"A single event")) + +async def send(): + client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) + async with client: + await client.send(event_list) # Send a list of events + await client.send(EventData(b"A single event")) # Send a single event + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(send()) ``` #### Send a batch of events @@ -245,14 +231,14 @@ finally: Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached. ```python +import asyncio from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData -try: - connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' - event_hub_path = '<< NAME OF THE EVENT HUB >>' - client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path) +connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' +event_hub_path = '<< NAME OF THE EVENT HUB >>' +async def create_batch(client): event_data_batch = await client.create_batch(max_size=10000) can_add = True while can_add: @@ -260,13 +246,17 @@ try: event_data_batch.try_add(EventData('Message inside EventBatchData')) except ValueError: can_add = False # EventDataBatch object reaches max_size. + return event_data_batch +async def send(): + client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) + batch_data = await create_batch(client) async with client: - await client.send(event_data_batch) -except: - raise -finally: - pass + await client.send(batch_data) + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(send()) ``` ### Async consume events from an Event Hub @@ -275,26 +265,27 @@ Consume events asynchronously from an EventHub. ```python import logging +import asyncio from azure.eventhub.aio import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path) logger = logging.getLogger("azure.eventhub") async def on_events(partition_context, events): logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id)) -try: +async def receive(): + client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) async with client: received = await client.receive(on_events=on_events, consumer_group='$Default') # receive events from specified partition: # received = await client.receive(on_events=on_events, consumer_group='$Default', partition_id='0') -except: - raise -finally: - pass + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(receive()) ``` ### Consume events using a partition manager @@ -330,6 +321,7 @@ from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManage RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' +event_hub_path = '<< NAME OF THE EVENT HUB >>' storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>' blob_name_str = '<>' @@ -337,21 +329,31 @@ async def do_operation(event): # do some sync or async operations. If the operation is i/o intensive, async will have better performance print(event) + async def process_events(partition_context, events): await asyncio.gather(*[do_operation(event) for event in events]) await partition_context.update_checkpoint(events[-1]) -if __name__ == '__main__': - loop = asyncio.get_event_loop() - container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str) - partition_manager = BlobPartitionManager(container_client=container_client) - client = EventHubConsumerClient.from_connection_string(connection_str, partition_manager=partition_manager, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) +async def receive(client): try: - loop.run_until_complete(client.receive(process_events, "$default")) + await client.receive(on_events=process_events, consumer_group="$Default") except KeyboardInterrupt: - loop.run_until_complete(client.close()) - finally: - loop.stop() + await client.close() + +async def main(): + container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str) + partition_manager = BlobPartitionManager(container_client) + client = EventHubConsumerClient.from_connection_string( + connection_str, + event_hub_path=event_hub_path, + partition_manager=partition_manager, # For load balancing and checkpoint. Leave None for no load balancing + ) + async with client: + await receive(client) + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) ``` ### Use EventHubConsumerClient to work with IoT Hub @@ -385,22 +387,22 @@ The Event Hubs APIs generate the following exceptions. For instance, this error is raised if you try to send an EventData that is already sent. - **EventDataSendError:** The Eventhubs service responds with an error when an EventData is sent. - **OperationTimeoutError:** EventHubConsumer.send() times out. -- **EventHubError:** All other Eventhubs related errors. It is also the root error class of all the above mentioned errors. +- **EventHubError:** All other Eventhubs related errors. It is also the root error class of all the errors described above. ## Next steps ### Examples -These are [more samples](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples) in our repo demonstrating the usage of the library. +There are [more samples](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples) in our repo demonstrating the usage of the library. -- [./samples/sync_samples/send.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/send.py) - use EventHubProducerClient to publish events -- [./samples/sync_samples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/recv.py) - use EventHubConsumerClient to consume events -- [./samples/async_examples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py) - async/await support of a EventHubProducerClient -- [./samples/async_examples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py) - async/await support of a EventHubConsumerClient +- [./samples/sync_samples/send.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/sync_samples/send.py) - use EventHubProducerClient to publish events +- [./samples/sync_samples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv.py) - use EventHubConsumerClient to consume events +- [./samples/async_samples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/async_samples/send_async.py) - async/await support of a EventHubProducerClient +- [./samples/async_samples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_async.py) - async/await support of a EventHubConsumerClient ### Documentation -Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html. +Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html). ### Logging @@ -423,4 +425,4 @@ PR appropriately (e.g., label, comment). Simply follow the instructions provided This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. -![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs/README.png) \ No newline at end of file +![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs/README.png)