diff --git a/sdk/eventhub/azure-eventhubs/README.md b/sdk/eventhub/azure-eventhubs/README.md index 188d845272f2..86ced0c53aa1 100644 --- a/sdk/eventhub/azure-eventhubs/README.md +++ b/sdk/eventhub/azure-eventhubs/README.md @@ -57,7 +57,7 @@ from azure.eventhub import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -consumer_client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path) +consumer_client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) ``` @@ -128,7 +128,7 @@ from azure.eventhub import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path) +client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) partition_ids = client.get_partition_ids() ``` @@ -143,7 +143,7 @@ from azure.eventhub import EventHubProducerClient, EventData connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path) +client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) try: event_list = [] @@ -167,7 +167,7 @@ from azure.eventhub import EventHubProducerClient, EventData try: connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' - client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path) + client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) event_data_batch = client.create_batch(max_size=10000) can_add = True @@ -195,7 +195,7 @@ from azure.eventhub import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path) +client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) logger = logging.getLogger("azure.eventhub") @@ -224,7 +224,7 @@ from azure.eventhub import EventData connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path) +client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) try: event_list = [] @@ -251,7 +251,7 @@ from azure.eventhub import EventData try: connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' - client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path) + client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) event_data_batch = await client.create_batch(max_size=10000) can_add = True @@ -279,7 +279,7 @@ from azure.eventhub.aio import EventHubConsumerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' event_hub_path = '<< NAME OF THE EVENT HUB >>' -client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path) +client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path) logger = logging.getLogger("azure.eventhub") @@ -330,6 +330,7 @@ from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManage RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' +event_hub_path = '<< NAME OF THE EVENT HUB >>' storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>' blob_name_str = '<>' @@ -337,21 +338,34 @@ async def do_operation(event): # do some sync or async operations. If the operation is i/o intensive, async will have better performance print(event) + async def process_events(partition_context, events): await asyncio.gather(*[do_operation(event) for event in events]) await partition_context.update_checkpoint(events[-1]) -if __name__ == '__main__': - loop = asyncio.get_event_loop() - container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str) - partition_manager = BlobPartitionManager(container_client=container_client) - client = EventHubConsumerClient.from_connection_string(connection_str, partition_manager=partition_manager, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) + +async def receive(client): try: - loop.run_until_complete(client.receive(process_events, "$default")) + await client.receive(on_events=process_events, consumer_group="$Default") except KeyboardInterrupt: - loop.run_until_complete(client.close()) - finally: - loop.stop() + await client.close() + + +async def main(): + container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str) + partition_manager = BlobPartitionManager(container_client) + client = EventHubConsumerClient.from_connection_string( + connection_str, + event_hub_path=event_hub_path, + partition_manager=partition_manager, # For load balancing and checkpoint. Leave None for no load balancing + ) + async with client: + await receive(client) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) ``` ### Use EventHubConsumerClient to work with IoT Hub @@ -393,10 +407,10 @@ For instance, this error is raised if you try to send an EventData that is alrea These are [more samples](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples) in our repo demonstrating the usage of the library. -- [./samples/sync_samples/send.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/send.py) - use EventHubProducerClient to publish events -- [./samples/sync_samples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/recv.py) - use EventHubConsumerClient to consume events -- [./samples/async_examples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py) - async/await support of a EventHubProducerClient -- [./samples/async_examples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py) - async/await support of a EventHubConsumerClient +- [./samples/sync_samples/send.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/sync_samples/send.py) - use EventHubProducerClient to publish events +- [./samples/sync_samples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv.py) - use EventHubConsumerClient to consume events +- [./samples/async_samples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/async_samples/send_async.py) - async/await support of a EventHubProducerClient +- [./samples/async_samples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_async.py) - async/await support of a EventHubConsumerClient ### Documentation