diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md index bceae6dbd80d..2733803eed1c 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md @@ -1,7 +1,9 @@ # Azure EventHubs Checkpoint Store client library for Python using Storage Blobs Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs. -This Checkpoint Store package works as a plug-in package to `EventProcessor`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information. +This Checkpoint Store package works as a plug-in package to `EventHubConsumerClient`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information. + +Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob). [Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/) diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py deleted file mode 100644 index 5977900fe219..000000000000 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py +++ /dev/null @@ -1,42 +0,0 @@ -import asyncio -import logging -import os -from azure.eventhub.aio import EventHubClient -from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor -from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager -from azure.storage.blob.aio import ContainerClient - -RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout -RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small -CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] -STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] - -logging.basicConfig(level=logging.INFO) - - -async def do_operation(event): - # do some sync or async operations. If the operation is i/o intensive, async will have better performance - print(event) - - -class MyPartitionProcessor(PartitionProcessor): - async def process_events(self, events, partition_context): - if events: - await asyncio.gather(*[do_operation(event) for event in events]) - await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) - else: - print("empty events received", "partition:", partition_context.partition_id) - - -if __name__ == '__main__': - loop = asyncio.get_event_loop() - client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) - container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor") - partition_manager = BlobPartitionManager(container_client=container_client) - event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=10) - try: - loop.run_until_complete(event_processor.start()) - except KeyboardInterrupt: - loop.run_until_complete(event_processor.stop()) - finally: - loop.stop() diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py new file mode 100644 index 000000000000..bb0426b1f21a --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py @@ -0,0 +1,36 @@ +import asyncio +import logging +import os +from azure.eventhub.aio import EventHubConsumerClient +from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager +from azure.storage.blob.aio import ContainerClient + +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] +STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] + +logging.basicConfig(level=logging.INFO) + + +async def do_operation(event): + # put your code here + # do some sync or async operations. If the operation is i/o intensive, async will have better performance + print(event) + + +async def process_events(partition_context, events): + # put your code here + await asyncio.gather(*[do_operation(event) for event in events]) + await partition_context.update_checkpoint(events[-1]) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor") + partition_manager = BlobPartitionManager(container_client=container_client) + client = EventHubConsumerClient.from_connection_string(CONNECTION_STR, partition_manager=partition_manager) + try: + loop.run_until_complete(client.receive(process_events, "$default")) + except KeyboardInterrupt: + loop.run_until_complete(client.close()) + finally: + loop.stop() diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md index abeeae1ac384..1b47a54ecd35 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md @@ -3,6 +3,8 @@ Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs. This Checkpoint Store package works as a plug-in package to `EventHubConsumerClient`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information. +Please note that this is a sync library, for async version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob-aio](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio). + [Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/) ## Getting started diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob/samples/event_processor_blob_storage_example.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob/samples/event_processor_blob_storage_example.py index f3520b79c8f1..a12d8e660cf0 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob/samples/event_processor_blob_storage_example.py +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob/samples/event_processor_blob_storage_example.py @@ -4,8 +4,6 @@ from azure.eventhub.extensions.checkpointstoreblob import BlobPartitionManager from azure.storage.blob import ContainerClient -RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout -RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] @@ -13,24 +11,22 @@ def do_operation(event): - # do some sync or async operations. If the operation is i/o intensive, async will have better performance + # put your code here print(event) def process_events(partition_context, events): - if events: - print("received events: {} from partition: {}".format(len(events), partition_context.partition_id)) - for event in events: - do_operation(event) - partition_context.update_checkpoint(events[-1]) - else: - print("empty events received", "partition:", partition_context.partition_id) + # put your code here + print("received events: {} from partition: {}".format(len(events), partition_context.partition_id)) + for event in events: + do_operation(event) + partition_context.update_checkpoint(events[-1]) if __name__ == '__main__': container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor") partition_manager = BlobPartitionManager(container_client=container_client) - client = EventHubConsumerClient.from_connection_string(CONNECTION_STR, partition_manager=partition_manager, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) + client = EventHubConsumerClient.from_connection_string(CONNECTION_STR, partition_manager=partition_manager) try: client.receive(process_events, "$default") except KeyboardInterrupt: diff --git a/sdk/eventhub/azure-eventhubs/README.md b/sdk/eventhub/azure-eventhubs/README.md index 046f0cd99d27..188d845272f2 100644 --- a/sdk/eventhub/azure-eventhubs/README.md +++ b/sdk/eventhub/azure-eventhubs/README.md @@ -309,7 +309,7 @@ a consumer group in an Event Hub instance.The `EventHubConsumerClient` uses an i and to store the relevant information required by the load balancing algorithm. Search pypi with the prefix `azure-eventhub-checkpointstore` to -find packages that support this and use the PartitionManager implementation from one such package. +find packages that support this and use the PartitionManager implementation from one such package. Please note that both sync and async libraries are provided. In the below example, we create an instance of `EventHubConsumerClient` and use a `BlobPartitionManager`. You need to [create an Azure Storage account](https://docs.microsoft.com/en-us/azure/storage/common/storage-quickstart-create-account?tabs=azure-portal) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py index 1b9c25213d7c..b1514a3844d0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py @@ -76,7 +76,8 @@ async def send(self, event_data: Union[EventData, EventDataBatch, Iterable[Event """Sends event data and blocks until acknowledgement is received or operation times out. :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects. - :type event_data: ~azure.eventhub.EventData or ~azure.eventhub.EventDataBatch or Iterator[~azure.eventhub.EventData] + :type event_data: ~azure.eventhub.EventData or ~azure.eventhub.EventDataBatch or + Iterator[~azure.eventhub.EventData] :keyword str partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service diff --git a/sdk/eventhub/azure-eventhubs/doc/azure.eventhub.rst b/sdk/eventhub/azure-eventhubs/doc/azure.eventhub.rst index cae6efbafc92..546dd0280bb1 100644 --- a/sdk/eventhub/azure-eventhubs/doc/azure.eventhub.rst +++ b/sdk/eventhub/azure-eventhubs/doc/azure.eventhub.rst @@ -6,7 +6,6 @@ azure.eventhub package :undoc-members: :inherited-members: - .. autoclass:: azure.eventhub.EventHubConsumerClient :members: :undoc-members: @@ -22,6 +21,72 @@ azure.eventhub package :undoc-members: :inherited-members: + .. autoclass:: azure.eventhub.EventPosition + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.EventHubSharedKeyCredential + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.EventHubSASTokenCredential + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.PartitionManager + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.CloseReason + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.TransportType + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.EventHubError + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.ConnectError + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.ConnectionLostError + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.EventDataError + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.EventDataSendError + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.AuthenticationError + :members: + :undoc-members: + :inherited-members: + + .. autoclass:: azure.eventhub.OwnershipLostError + :members: + :undoc-members: + :inherited-members: + + Subpackages ----------- diff --git a/sdk/eventhub/azure-eventhubs/examples/__init__.py b/sdk/eventhub/azure-eventhubs/examples/__init__.py deleted file mode 100644 index 34913fb394d7..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/iterator_receiver_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/iterator_receiver_async.py deleted file mode 100644 index 53e73228032e..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/iterator_receiver_async.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show iterator consumer. -""" - -import os -import asyncio - -from azure.eventhub.aio import EventHubClient -from azure.eventhub import EventPosition, EventHubSharedKeyCredential - - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] -EVENT_POSITION = EventPosition("-1") - - -async def main(): - client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) - consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION) - async with consumer: - async for item in consumer: - print(item) - -if __name__ == '__main__': - loop = asyncio.get_event_loop() - loop.run_until_complete(main()) diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py deleted file mode 100644 index ba6bf68a9258..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show running concurrent consumers. -""" - -import os -import time -import asyncio - -from azure.eventhub.aio import EventHubClient -from azure.eventhub import EventPosition, EventHubSharedKeyCredential - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -EVENT_POSITION = EventPosition("-1") - - -async def pump(client, partition): - consumer = client.create_consumer(consumer_group="$default", partition_id=partition, event_position=EVENT_POSITION, prefetch=5) - async with consumer: - total = 0 - start_time = time.time() - for event_data in await consumer.receive(timeout=10): - last_offset = event_data.offset - last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) - total += 1 - end_time = time.time() - run_time = end_time - start_time - print("Received {} messages in {} seconds".format(total, run_time)) - - -loop = asyncio.get_event_loop() -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) -tasks = [ - asyncio.ensure_future(pump(client, "0")), - asyncio.ensure_future(pump(client, "1"))] -loop.run_until_complete(asyncio.wait(tasks)) diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_owner_level.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_owner_level.py deleted file mode 100644 index 384d914ad436..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_owner_level.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an Event Hub partition as an epoch consumer. -""" - -import os -import time -import asyncio - -from azure.eventhub.aio import EventHubClient -from azure.eventhub import EventHubSharedKeyCredential, EventPosition - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] - -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -PARTITION = "0" - - -async def pump(client, owner_level): - consumer = client.create_consumer( - consumer_group="$default", partition_id=PARTITION, event_position=EventPosition("-1"), owner_level=owner_level - ) - async with consumer: - total = 0 - start_time = time.time() - for event_data in await consumer.receive(timeout=5): - last_offset = event_data.offset - last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) - total += 1 - end_time = time.time() - run_time = end_time - start_time - print("Received {} messages in {} seconds".format(total, run_time)) - -loop = asyncio.get_event_loop() -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) -loop.run_until_complete(pump(client, 20)) diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py deleted file mode 100644 index 53d2e626a7f5..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show running concurrent consumers. -""" - -import os -import time -import asyncio - -from azure.eventhub.aio import EventHubClient -from azure.eventhub import EventPosition, EventHubSharedKeyCredential - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -EVENT_POSITION = EventPosition("-1") - - -async def pump(client, partition): - consumer = client.create_consumer(consumer_group="$default", partition_id=partition, event_position=EVENT_POSITION, - prefetch=5, track_last_enqueued_event_properties=True) - async with consumer: - total = 0 - start_time = time.time() - for event_data in await consumer.receive(timeout=10): - last_offset = event_data.offset - last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) - total += 1 - end_time = time.time() - run_time = end_time - start_time - print("Consumer last enqueued event properties: {}.".format(consumer.last_enqueued_event_properties)) - print("Received {} messages in {} seconds".format(total, run_time)) - - -loop = asyncio.get_event_loop() -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) -tasks = [asyncio.ensure_future(pump(client, "0"))] -loop.run_until_complete(asyncio.wait(tasks)) diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py deleted file mode 100644 index d24e73d0bc17..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show sending individual events asynchronously to an Event Hub. -""" - -# pylint: disable=C0111 - -import time -import asyncio -import os - -from azure.eventhub.aio import EventHubClient -from azure.eventhub import EventData, EventHubSharedKeyCredential - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] - -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - - -async def run(client): - producer = client.create_producer() - await send(producer, 4) - - -async def send(producer, count): - async with producer: - for i in range(count): - print("Sending message: {}".format(i)) - data = EventData(str(i)) - await producer.send(data) - -loop = asyncio.get_event_loop() -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) -tasks = asyncio.gather( - run(client), - run(client)) -start_time = time.time() -loop.run_until_complete(tasks) -print("Runtime: {} seconds".format(time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/examples/client_secret_auth.py b/sdk/eventhub/azure-eventhubs/examples/client_secret_auth.py deleted file mode 100644 index 878691b9cf81..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/client_secret_auth.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- -""" -An example to show authentication using aad credentials -""" - -import os -from azure.eventhub import EventHubClient -from azure.eventhub import EventData -from azure.identity import ClientSecretCredential - - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] - -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -CLIENT_ID = os.environ.get('AAD_CLIENT_ID') -SECRET = os.environ.get('AAD_SECRET') -TENANT_ID = os.environ.get('AAD_TENANT_ID') - - -credential = ClientSecretCredential(client_id=CLIENT_ID, client_secret=SECRET, tenant_id=TENANT_ID) -client = EventHubClient(host=HOSTNAME, - event_hub_path=EVENT_HUB, - credential=credential) - -producer = client.create_producer(partition_id='0') -with producer: - event = EventData(body='A single message') - producer.send(event) diff --git a/sdk/eventhub/azure-eventhubs/examples/proxy.py b/sdk/eventhub/azure-eventhubs/examples/proxy.py deleted file mode 100644 index 61005c347333..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/proxy.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show sending and receiving events behind a proxy -""" -import os -from azure.eventhub import EventHubClient, EventPosition, EventData, EventHubSharedKeyCredential - - -# Hostname can be .servicebus.windows.net" -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] -EVENT_HUB = os.environ['EVENT_HUB_NAME'] - -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -EVENT_POSITION = EventPosition("-1") -PARTITION = "0" -HTTP_PROXY = { - 'proxy_hostname': '127.0.0.1', # proxy hostname - 'proxy_port': 3128, # proxy port - 'username': 'admin', # username used for proxy authentication if needed - 'password': '123456' # password used for proxy authentication if needed -} - -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False, http_proxy=HTTP_PROXY) -producer = client.create_producer(partition_id=PARTITION) -consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, event_position=EVENT_POSITION) -try: - consumer.receive(timeout=1) - event_list = [] - for i in range(20): - event_list.append(EventData("Event Number {}".format(i))) - print('Start sending events behind a proxy.') - producer.send(event_list) - print('Start receiving events behind a proxy.') - received = consumer.receive(max_batch_size=50, timeout=5) -finally: - producer.close() - consumer.close() diff --git a/sdk/eventhub/azure-eventhubs/examples/recv.py b/sdk/eventhub/azure-eventhubs/examples/recv.py deleted file mode 100644 index 11ed8747fc22..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/recv.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an Event Hub partition. -""" -import os -import time -from azure.eventhub import EventHubClient, EventPosition, EventHubSharedKeyCredential - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] - -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -EVENT_POSITION = EventPosition("-1") -PARTITION = "0" - - -total = 0 -last_sn = -1 -last_offset = "-1" -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) - -consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, - event_position=EVENT_POSITION, prefetch=5000) -with consumer: - start_time = time.time() - batch = consumer.receive(timeout=5) - while batch: - for event_data in batch: - last_offset = event_data.offset - last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) - print(event_data.body_as_str()) - total += 1 - batch = consumer.receive(timeout=5) - print("Received {} messages in {} seconds".format(total, time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/examples/recv_batch.py b/sdk/eventhub/azure-eventhubs/examples/recv_batch.py deleted file mode 100644 index e3255ebe1c3f..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/recv_batch.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an Event Hub partition and processing -the event in on_event_data callback. - -""" -import os -from azure.eventhub import EventHubClient, EventPosition, EventHubSharedKeyCredential - - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] -EVENT_POSITION = EventPosition("-1") -PARTITION = "0" - -total = 0 -last_sn = -1 -last_offset = "-1" -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), network_tracing=False) - -consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, - event_position=EVENT_POSITION, prefetch=100) -with consumer: - batched_events = consumer.receive(max_batch_size=10) - for event_data in batched_events: - last_offset = event_data.offset - last_sn = event_data.sequence_number - total += 1 - print("Partition {}, Received {}, sn={} offset={}".format( - PARTITION, - total, - last_sn, - last_offset)) diff --git a/sdk/eventhub/azure-eventhubs/examples/recv_iterator.py b/sdk/eventhub/azure-eventhubs/examples/recv_iterator.py deleted file mode 100644 index 45068ae2c1ef..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/recv_iterator.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -import os -from azure.eventhub import EventHubClient, EventPosition, EventHubSharedKeyCredential, EventData - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] -EVENT_POSITION = EventPosition("-1") - - -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) -consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION) -try: - with consumer: - for item in consumer: - print(item) -except KeyboardInterrupt: - print("Iterator stopped") diff --git a/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py b/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py deleted file mode 100644 index 576ef19089e6..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show receiving events from an Event Hub partition. -""" -import os -import time -from azure.eventhub import EventHubClient, EventPosition, EventHubSharedKeyCredential - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] - -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -EVENT_POSITION = EventPosition("-1") -PARTITION = "0" - - -total = 0 -last_sn = -1 -last_offset = "-1" -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) - -consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, - event_position=EVENT_POSITION, prefetch=5000, - track_last_enqueued_event_properties=True) -with consumer: - start_time = time.time() - batch = consumer.receive(timeout=5) - for event_data in batch: - last_offset = event_data.offset - last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) - print(event_data.body_as_str()) - total += 1 - batch = consumer.receive(timeout=5) - print("Consumer last enqueued event properties: {}.".format(consumer.last_enqueued_event_properties)) - print("Received {} messages in {} seconds".format(total, time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/examples/send.py b/sdk/eventhub/azure-eventhubs/examples/send.py deleted file mode 100644 index 219d417447c1..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/send.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show sending individual events to an Event Hub partition. -Although this works, sending events in batches will get better performance. -See 'send_list_of_event_data.py' and 'send_event_data_batch.py' for an example of batching. -""" - -# pylint: disable=C0111 - -import time -import os -from azure.eventhub import EventHubClient, EventData, EventHubSharedKeyCredential - - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) -producer = client.create_producer(partition_id="0") - -start_time = time.time() -with producer: - for i in range(100): - ed = EventData("msg") - print("Sending message: {}".format(i)) - producer.send(ed) -print("Send 100 messages in {} seconds".format(time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/examples/send_list_of_event_data.py b/sdk/eventhub/azure-eventhubs/examples/send_list_of_event_data.py deleted file mode 100644 index 715c220e6417..000000000000 --- a/sdk/eventhub/azure-eventhubs/examples/send_list_of_event_data.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python - -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -""" -An example to show batch sending events to an Event Hub. -""" - -# pylint: disable=C0111 - -import time -import os -from azure.eventhub import EventData, EventHubClient, EventHubSharedKeyCredential - - -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net -EVENT_HUB = os.environ['EVENT_HUB_NAME'] -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) -producer = client.create_producer(partition_id="1") - -event_list = [] -for i in range(1500): - event_list.append(EventData('Hello World')) -start_time = time.time() -with producer: - producer.send(event_list) -print("Runtime: {} seconds".format(time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_async.py b/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_async.py new file mode 100644 index 000000000000..eddf2802c915 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_async.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show receiving events from an Event Hub asynchronously. +""" + +import asyncio +import os +from azure.eventhub.aio import EventHubConsumerClient + +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] + + +async def do_operation(event): + pass + # do some sync or async operations. If the operation is i/o intensive, async will have better performance + # print(event) + + +async def on_events(partition_context, events): + # put your code here + print("received events: {} from partition: {}".format(len(events), partition_context.partition_id)) + await asyncio.gather(*[do_operation(event) for event in events]) + + +async def receive(client): + try: + await client.receive(on_events=on_events, + consumer_group="$default") + except KeyboardInterrupt: + client.close() + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + client = EventHubConsumerClient.from_connection_string( + CONNECTION_STR, + ) + try: + loop.run_until_complete(receive(client)) + except KeyboardInterrupt: + pass + finally: + loop.run_until_complete(client.close()) + loop.stop() diff --git a/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_track_last_enqueued_event_info_async.py b/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_track_last_enqueued_event_info_async.py new file mode 100644 index 000000000000..e4d3fac74968 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_track_last_enqueued_event_info_async.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show receiving events from an Event Hub partition with EventHubConsumerClient tracking +the last enqueued event properties of specific partition. +""" + +import asyncio +import os +from azure.eventhub.aio import EventHubConsumerClient + +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] + + +async def do_operation(event): + pass + # do some sync or async operations. If the operation is i/o intensive, async will have better performance + # print(event) + + +async def on_events(partition_context, events): + # put your code here + print("received events: {} from partition: {}".format(len(events), partition_context.partition_id)) + await asyncio.gather(*[do_operation(event) for event in events]) + + print("Last enqueued event properties from partition: {} is: {}". + format(partition_context.partition_id, + events[-1].last_enqueued_event_properties)) + + +async def receive(client): + try: + await client.receive(on_events=on_events, + consumer_group="$default", + partition_id='0') + except KeyboardInterrupt: + client.close() + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + client = EventHubConsumerClient.from_connection_string( + CONNECTION_STR, + ) + try: + loop.run_until_complete(receive(client)) + except KeyboardInterrupt: + pass + finally: + loop.run_until_complete(client.close()) + loop.stop() diff --git a/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_with_partition_manager_async.py b/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_with_partition_manager_async.py new file mode 100644 index 000000000000..9353a6041a87 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_with_partition_manager_async.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show receiving events from an Event Hub with partition manager asynchronously. +In the `receive` method of `EventHubConsumerClient`: +If no partition id is specified, the partition_manager are used for load-balance and checkpoint. +If partition id is specified, the partition_manager can only be used for checkpoint. +""" + +import asyncio +import os +from azure.storage.blob.aio import ContainerClient +from azure.eventhub.aio import EventHubConsumerClient +from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager + +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] +STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] + + +async def do_operation(event): + pass + # do some sync or async operations. If the operation is i/o intensive, async will have better performance + # print(event) + + +async def on_events(partition_context, events): + # put your code here + print("received events: {} from partition: {}".format(len(events), partition_context.partition_id)) + await asyncio.gather(*[do_operation(event) for event in events]) + await partition_context.update_checkpoint(events[-1]) + + +async def receive(client): + try: + """ + Without specifying partition_id, the receive will try to receive events from all partitions and if provided with + partition manager, the client will load-balance partition assignment with other EventHubConsumerClient instances + which also try to receive events from all partitions and use the same storage resource. + """ + await client.receive(on_events=on_events, consumer_group="$default") + # With specified partition_id, load-balance will be disabled + # await client.receive(event_handler=event_handler, consumer_group="$default", partition_id = '0')) + except KeyboardInterrupt: + client.close() + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor") + partition_manager = BlobPartitionManager(container_client) + client = EventHubConsumerClient.from_connection_string( + CONNECTION_STR, + partition_manager=partition_manager, # For load balancing and checkpoint. Leave None for no load balancing + ) + try: + loop.run_until_complete(receive(client)) + except KeyboardInterrupt: + pass + finally: + loop.run_until_complete(client.close()) + loop.stop() diff --git a/sdk/eventhub/azure-eventhubs/samples/async_samples/sample_code_eventhub_async.py b/sdk/eventhub/azure-eventhubs/samples/async_samples/sample_code_eventhub_async.py new file mode 100644 index 000000000000..ed03fd1ba815 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/async_samples/sample_code_eventhub_async.py @@ -0,0 +1,129 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import pytest +import logging +import asyncio + + +def create_async_eventhub_producer_client(): + # [START create_eventhub_producer_client_async] + import os + from azure.eventhub.aio import EventHubProducerClient + + EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] + EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, + event_hub_path=EVENT_HUB) + # [END create_eventhub_producer_client_async] + return producer + + +def create_async_eventhub_consumer_client(): + # [START create_eventhub_consumer_client_async] + import os + + EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] + EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + from azure.eventhub.aio import EventHubConsumerClient + consumer = EventHubConsumerClient.from_connection_string( + conn_str=EVENT_HUB_CONNECTION_STR, + event_hub_path=EVENT_HUB + ) + # [END create_eventhub_consumer_client_async] + return consumer + + +async def example_eventhub_async_send_and_receive(live_eventhub_config): + producer = create_async_eventhub_producer_client() + consumer = create_async_eventhub_consumer_client() + try: + # [START eventhub_producer_client_create_batch_async] + from azure.eventhub import EventData + event_data_batch = await producer.create_batch(max_size=10000) + while True: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # The EventDataBatch object reaches its max_size. + # You can send the full EventDataBatch object and create a new one here. + break + # [END eventhub_producer_client_create_batch_async] + + # [START eventhub_producer_client_send_async] + async with producer: + event_data = EventData(b"A single event") + await producer.send(event_data) + # [END eventhub_producer_client_send_async] + await asyncio.sleep(1) + + # [START eventhub_consumer_client_receive_async] + logger = logging.getLogger("azure.eventhub") + + async def on_events(partition_context, events): + logger.info("Received {} messages from partition: {}".format( + len(events), partition_context.partition_id)) + # Do ops on received events + async with consumer: + task = asyncio.ensure_future(consumer.receive(on_events=on_events, consumer_group="$default")) + await asyncio.sleep(3) # keep receiving for 3 seconds + task.cancel() # stop receiving + # [END eventhub_consumer_client_receive_async] + finally: + pass + + +async def example_eventhub_async_producer_ops(live_eventhub_config, connection_str): + # [START eventhub_producer_client_close_async] + import os + from azure.eventhub.aio import EventHubProducerClient + from azure.eventhub import EventData + + EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] + EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, + event_hub_path=EVENT_HUB) + try: + await producer.send(EventData(b"A single event")) + finally: + # Close down the producer handler. + await producer.close() + # [END eventhub_producer_client_close_async] + + +async def example_eventhub_async_consumer_ops(live_eventhub_config, connection_str): + # [START eventhub_consumer_client_close_async] + import os + + EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] + EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + from azure.eventhub.aio import EventHubConsumerClient + consumer = EventHubConsumerClient.from_connection_string( + conn_str=EVENT_HUB_CONNECTION_STR, + event_hub_path=EVENT_HUB + ) + + logger = logging.getLogger("azure.eventhub") + + async def on_events(partition_context, events): + logger.info("Received {} messages from partition: {}".format( + len(events), partition_context.partition_id)) + # Do ops on received events + + # The receive method is a coroutine method which can be called by `await consumer.receive(...)` and it will block. + # so execute it in an async task to better demonstrate how to stop the receiving by calling he close method. + + recv_task = asyncio.ensure_future(consumer.receive(on_events=on_events, consumer_group='$Default')) + await asyncio.sleep(3) # keep receiving for 3 seconds + recv_task.cancel() # stop receiving + + # Close down the consumer handler explicitly. + await consumer.close() + # [END eventhub_consumer_client_close_async] diff --git a/sdk/eventhub/azure-eventhubs/samples/async_samples/send_async.py b/sdk/eventhub/azure-eventhubs/samples/async_samples/send_async.py new file mode 100644 index 000000000000..b6901c24148c --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/async_samples/send_async.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show sending individual events asynchronously to an Event Hub. +""" + +# pylint: disable=C0111 + +import time +import asyncio +import os + +from azure.eventhub.aio import EventHubProducerClient +from azure.eventhub import EventData + +EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + +async def run(producer): + async with producer: + ed = EventData("msg") + await producer.send(ed) # The event will be distributed to available partitions via round-robin. + + ed = EventData("msg sent to partition_id 0") + await producer.send(ed, partition_id='0') # Specifying partition_id + + ed = EventData("msg sent with partition_key") + await producer.send(ed, partition_key="p_key") # Specifying partition_key + + # Send a list of events + event_list = [] + for i in range(1500): + event_list.append(EventData('Hello World')) + await producer.send(event_list) + + +loop = asyncio.get_event_loop() +producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, event_hub_path=EVENT_HUB) +tasks = asyncio.gather( + run(producer)) +start_time = time.time() +loop.run_until_complete(tasks) +print("Send messages in {} seconds".format(time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/samples/async_samples/send_event_data_batch_async.py b/sdk/eventhub/azure-eventhubs/samples/async_samples/send_event_data_batch_async.py new file mode 100644 index 000000000000..a9d752281ffb --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/async_samples/send_event_data_batch_async.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show creating and sending EventBatchData within limited size. +""" + +# pylint: disable=C0111 + +import time +import os +import asyncio + +from azure.eventhub.aio import EventHubProducerClient +from azure.eventhub import EventData + + +EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + +async def create_batch_data(producer_client): + batch_data = await producer_client.create_batch(max_size=10000) + while True: + try: + batch_data.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # EventDataBatch object reaches max_size. + # New EventDataBatch object can be created here to send more data + break + return batch_data + + +async def run(producer): + data_batch = await create_batch_data(producer) + async with producer: + await producer.send(data_batch) + + +loop = asyncio.get_event_loop() +producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, event_hub_path=EVENT_HUB) +tasks = asyncio.gather( + run(producer)) +start_time = time.time() +loop.run_until_complete(tasks) +print("Send messages in {} seconds".format(time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/samples/sync_samples/client_secret_auth.py b/sdk/eventhub/azure-eventhubs/samples/sync_samples/client_secret_auth.py new file mode 100644 index 000000000000..e76adc8d3452 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/sync_samples/client_secret_auth.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +""" +An example to show authentication using aad credentials +""" + +import os +from azure.eventhub import EventData, EventHubProducerClient +from azure.identity import EnvironmentCredential + + +HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + +credential = EnvironmentCredential() +producer = EventHubProducerClient(host=HOSTNAME, + event_hub_path=EVENT_HUB, + credential=credential) + +with producer: + event = EventData(body='A single message') + producer.send(event, partition_id='0') diff --git a/sdk/eventhub/azure-eventhubs/samples/sync_samples/proxy.py b/sdk/eventhub/azure-eventhubs/samples/sync_samples/proxy.py new file mode 100644 index 000000000000..f4c027132f36 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/sync_samples/proxy.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show sending and receiving events behind a proxy +""" +import os +import time +from azure.eventhub import EventPosition, EventData, EventHubConsumerClient, EventHubProducerClient + +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + +EVENT_POSITION = EventPosition("-1") +PARTITION = "0" +HTTP_PROXY = { + 'proxy_hostname': '127.0.0.1', # proxy hostname + 'proxy_port': 3128, # proxy port + 'username': 'admin', # username used for proxy authentication if needed + 'password': '123456' # password used for proxy authentication if needed +} + + +def do_operation(event): + # do some operations on the event + print(event) + + +def on_events(partition_context, events): + print("received events: {} from partition: {}".format(len(events), partition_context.partition_id)) + for event in events: + do_operation(event) + + +consumer_client = EventHubConsumerClient.from_connection_string( + conn_str=CONNECTION_STR, event_hub_path=EVENT_HUB, http_proxy=HTTP_PROXY) +producer_client = EventHubProducerClient.from_connection_string( + conn_str=CONNECTION_STR, event_hub_path=EVENT_HUB, http_proxy=HTTP_PROXY) + +with producer_client: + producer_client.send(EventData("A single event")) + print('Finish sending.') + +with consumer_client: + receiving_time = 5 + consumer_client.receive(on_events=on_events, consumer_group='$Default') + time.sleep(receiving_time) + print('Finish receiving.') + diff --git a/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv.py b/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv.py new file mode 100644 index 000000000000..c6ea05711e4d --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show receiving events from an Event Hub partition. +""" +import os +from azure.eventhub import EventPosition, EventHubConsumerClient + +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + +EVENT_POSITION = EventPosition("-1") +PARTITION = "0" + +total = 0 + + +def do_operation(event): + # do some operations on the event, avoid time-consuming ops + pass + + +def on_partition_initialize(partition_context): + # put your code here + print("Partition: {} has been intialized".format(partition_context.partition_id)) + + +def on_partition_close(partition_context, reason): + # put your code here + print("Partition: {} has been closed, reason for closing: {}".format(partition_context.partition_id, + reason)) + + +def on_error(partition_context, error): + # put your code here + print("Partition: {} met an exception during receiving: {}".format(partition_context.partition_id, + error)) + + +def on_events(partition_context, events): + # put your code here + global total + + print("received events: {} from partition: {}".format(len(events), partition_context.partition_id)) + total += len(events) + for event in events: + do_operation(event) + + +if __name__ == '__main__': + consumer_client = EventHubConsumerClient.from_connection_string( + conn_str=CONNECTION_STR, + event_hub_path=EVENT_HUB, + ) + + try: + with consumer_client: + consumer_client.receive(on_events=on_events, consumer_group='$Default', + on_partition_initialize=on_partition_initialize, + on_partition_close=on_partition_close, + on_error=on_error) + # Receive with owner level: + # consumer_client.receive(on_events=on_events, consumer_group='$Default', owner_level=1) + except KeyboardInterrupt: + print('Stop receiving.') diff --git a/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv_track_last_enqueued_event_info.py b/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv_track_last_enqueued_event_info.py new file mode 100644 index 000000000000..5665befa3dbd --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv_track_last_enqueued_event_info.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show receiving events from an Event Hub partition with EventHubConsumerClient tracking +the last enqueued event properties of specific partition. +""" +import os +import time +from azure.eventhub import EventPosition, EventHubConsumerClient + +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + +EVENT_POSITION = EventPosition("-1") +PARTITION = "0" + +total = 0 + + +def do_operation(event): + # do some operations on the event, avoid time-consuming ops + pass + + +def on_events(partition_context, events): + # put your code here + global total + print("received events: {} from partition {}".format(len(events), partition_context.partition_id)) + total += len(events) + for event in events: + do_operation(event) + + print("Last enqueued event properties from partition: {} is: {}". + format(partition_context.partition_id, + events[-1].last_enqueued_event_properties)) + + +if __name__ == '__main__': + consumer_client = EventHubConsumerClient.from_connection_string( + conn_str=CONNECTION_STR, + event_hub_path=EVENT_HUB, + ) + + try: + with consumer_client: + consumer_client.receive(on_events=on_events, consumer_group='$Default', + partition_id='0', track_last_enqueued_event_properties=True) + + except KeyboardInterrupt: + print('Stop receiving.') diff --git a/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv_with_partition_manager.py b/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv_with_partition_manager.py new file mode 100644 index 000000000000..d10fcd544eff --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv_with_partition_manager.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show receiving events from an Event Hub with partition manager. +In the `receive` method of `EventHubConsumerClient`: +If no partition id is specified, the partition_manager are used for load-balance and checkpoint. +If partition id is specified, the partition_manager can only be used for checkpoint. +""" +import os +from azure.storage.blob import ContainerClient +from azure.eventhub import EventHubConsumerClient +from azure.eventhub.extensions.checkpointstoreblob import BlobPartitionManager + + +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] +STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] + + +def do_operation(event): + # do some operations on the event, avoid time-consuming ops + pass + + +def on_events(partition_context, events): + # put your code here + print("received events: {} from partition: {}".format(len(events), partition_context.partition_id)) + for event in events: + do_operation(event) + + partition_context.update_checkpoint(events[-1]) + + +if __name__ == '__main__': + container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor") + partition_manager = BlobPartitionManager(container_client) + consumer_client = EventHubConsumerClient.from_connection_string( + conn_str=CONNECTION_STR, + partition_manager=partition_manager, # For load balancing and checkpoint. Leave None for no load balancing + ) + + try: + with consumer_client: + """ + Without specified partition_id, the receive will try to receive events from all partitions and if provided with + partition manager, the client will load-balance partition assignment with other EventHubConsumerClient instances + which also try to receive events from all partitions and use the same storage resource. + """ + consumer_client.receive(on_events=on_events, consumer_group='$Default') + # With specified partition_id, load-balance will be disabled + # client.receive(on_events=on_events, consumer_group='$Default', partition_id='0') + except KeyboardInterrupt: + print('Stop receiving.') diff --git a/sdk/eventhub/azure-eventhubs/samples/sync_samples/sample_code_eventhub.py b/sdk/eventhub/azure-eventhubs/samples/sync_samples/sample_code_eventhub.py new file mode 100644 index 000000000000..8c1eaa9de893 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/sync_samples/sample_code_eventhub.py @@ -0,0 +1,146 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import time +import logging + + +def create_eventhub_producer_client(): + # [START create_eventhub_producer_client_sync] + import os + from azure.eventhub import EventHubProducerClient + + EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] + EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, + event_hub_path=EVENT_HUB) + # [END create_eventhub_producer_client_sync] + return producer + + +def create_eventhub_consumer_client(): + # [START create_eventhub_consumer_client_sync] + import os + + EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] + EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + from azure.eventhub import EventHubConsumerClient + consumer = EventHubConsumerClient.from_connection_string( + conn_str=EVENT_HUB_CONNECTION_STR, + event_hub_path=EVENT_HUB + ) + # [END create_eventhub_consumer_client_sync] + return consumer + + +def example_eventhub_sync_send_and_receive(): + producer = create_eventhub_producer_client() + consumer = create_eventhub_consumer_client() + try: + logger = logging.getLogger("azure.eventhub") + + # [START create_event_data] + from azure.eventhub import EventData + + event_data = EventData("String data") + event_data = EventData(b"Bytes data") + event_data = EventData([b"A", b"B", b"C"]) + + list_data = ['Message {}'.format(i) for i in range(10)] + event_data = EventData(body=list_data) + # [END create_event_data] + + # [START eventhub_producer_client_create_batch_sync] + event_data_batch = producer.create_batch(max_size=10000) + while True: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # The EventDataBatch object reaches its max_size. + # You can send the full EventDataBatch object and create a new one here. + break + # [END eventhub_producer_client_create_batch_sync] + + # [START eventhub_producer_client_send_sync] + with producer: + event_data = EventData(b"A single event") + producer.send(event_data) + # [END eventhub_producer_client_send_sync] + time.sleep(1) + + # [START eventhub_consumer_client_receive_sync] + logger = logging.getLogger("azure.eventhub") + + def on_events(partition_context, events): + logger.info("Received {} messages from partition: {}".format( + len(events), partition_context.partition_id)) + # Do ops on received events + + with consumer: + consumer.receive(on_events=on_events, consumer_group='$Default') + # [END eventhub_consumer_client_receive_sync] + finally: + pass + + +def example_eventhub_producer_ops(): + # [START eventhub_producer_client_close_sync] + import os + from azure.eventhub import EventHubProducerClient, EventData + + EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] + EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, + event_hub_path=EVENT_HUB) + try: + producer.send(EventData(b"A single event")) + finally: + # Close down the producer handler. + producer.close() + # [END eventhub_producer_client_close_sync] + + +def example_eventhub_consumer_ops(): + # [START eventhub_consumer_client_close_sync] + import os + import threading + + EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] + EVENT_HUB = os.environ['EVENT_HUB_NAME'] + + from azure.eventhub import EventHubConsumerClient + consumer = EventHubConsumerClient.from_connection_string( + conn_str=EVENT_HUB_CONNECTION_STR, + event_hub_path=EVENT_HUB + ) + + logger = logging.getLogger("azure.eventhub") + + def on_events(partition_context, events): + logger.info("Received {} messages from partition: {}".format( + len(events), partition_context.partition_id)) + # Do ops on received events + + # The receive method is blocking call, so execute it in a thread to + # better demonstrate how to stop the receiving by calling he close method. + + worker = threading.Thread(target=consumer.receive, + kwargs={"on_events": on_events, + "consumer_group": "$Default"}) + worker.start() + time.sleep(10) # Keep receiving for 10s then close. + # Close down the consumer handler explicitly. + consumer.close() + # [END eventhub_consumer_client_close_sync] + + +if __name__ == '__main__': + example_eventhub_producer_ops() + example_eventhub_consumer_ops() + # example_eventhub_sync_send_and_receive() diff --git a/sdk/eventhub/azure-eventhubs/samples/sync_samples/send.py b/sdk/eventhub/azure-eventhubs/samples/sync_samples/send.py new file mode 100644 index 000000000000..7991c80a1638 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/samples/sync_samples/send.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show sending individual events to an Event Hub partition. +Although this works, sending events in batches will get better performance. +See 'send_event_data_batch.py' for an example of batching. +""" + +# pylint: disable=C0111 + +import time +import os +from azure.eventhub import EventHubProducerClient, EventData + +EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + +producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, event_hub_path=EVENT_HUB) + +start_time = time.time() +with producer: + ed = EventData("msg") + producer.send(ed) # The event will be distributed to available partitions via round-robin. + + ed = EventData("msg sent to partition_id 0") + producer.send(ed, partition_id='0') # Specifying partition_id + + ed = EventData("msg sent with partition_key") + producer.send(ed, partition_key="p_key") # Specifying partition_key + + # Send a list of events + event_list = [] + for i in range(1500): + event_list.append(EventData('Hello World')) + producer.send(event_list) + +print("Send messages in {} seconds".format(time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/examples/send_event_data_batch.py b/sdk/eventhub/azure-eventhubs/samples/sync_samples/send_event_data_batch.py similarity index 59% rename from sdk/eventhub/azure-eventhubs/examples/send_event_data_batch.py rename to sdk/eventhub/azure-eventhubs/samples/sync_samples/send_event_data_batch.py index dfb7b8f3f749..53568366064a 100644 --- a/sdk/eventhub/azure-eventhubs/examples/send_event_data_batch.py +++ b/sdk/eventhub/azure-eventhubs/samples/sync_samples/send_event_data_batch.py @@ -13,31 +13,27 @@ import time import os -from azure.eventhub import EventHubClient, EventData, EventHubSharedKeyCredential +from azure.eventhub import EventHubProducerClient, EventData -HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net +EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] EVENT_HUB = os.environ['EVENT_HUB_NAME'] -USER = os.environ['EVENT_HUB_SAS_POLICY'] -KEY = os.environ['EVENT_HUB_SAS_KEY'] - -def create_batch_data(producer): - event_data_batch = producer.create_batch(max_size=10000) +def create_batch_data(producer_client): + batch_data = producer_client.create_batch(max_size=10000) while True: try: - event_data_batch.try_add(EventData('Message inside EventBatchData')) + batch_data.try_add(EventData('Message inside EventBatchData')) except ValueError: # EventDataBatch object reaches max_size. # New EventDataBatch object can be created here to send more data break - return event_data_batch + return batch_data + +producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, event_hub_path=EVENT_HUB) -client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), - network_tracing=False) -producer = client.create_producer() start_time = time.time() with producer: event_data_batch = create_batch_data(producer)