diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/README.md b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/README.md index 21b4a303935b..c342480b90a7 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/README.md +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/README.md @@ -3,7 +3,7 @@ Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs. This Checkpoint Store package works as a plug-in package to `EventHubConsumerClient`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information. -Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhub-checkpointstoreblob](./). +Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhub-checkpointstoreblob](../azure-eventhub-checkpointstoreblob). [Source code](./) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0/azure.eventhub.aio.html#azure.eventhub.aio.CheckpointStore) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/) diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py index 62b37f29c9c6..ae79624b280a 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py @@ -5,11 +5,12 @@ CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] +BLOB_CONTAINER_NAME = "your-blob-container-name" # Please make sure the blob container resource exists. async def on_event(partition_context, event): # Put your code here. - # Do some sync or async operations. If the operation is i/o intensive, async will have better performance + # Do some sync or async operations. If the operation is i/o intensive, async will have better performance. print(event) await partition_context.update_checkpoint(event) @@ -22,7 +23,7 @@ async def main(client): loop = asyncio.get_event_loop() checkpoint_store = BlobCheckpointStore.from_connection_string( STORAGE_CONNECTION_STR, - container_name="eventprocessor" + container_name=BLOB_CONTAINER_NAME ) client = EventHubConsumerClient.from_connection_string( CONNECTION_STR, diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob/README.md b/sdk/eventhub/azure-eventhub-checkpointstoreblob/README.md index f01aa2573e5b..53c262cf1915 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob/README.md +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob/README.md @@ -84,7 +84,7 @@ def main(): client = EventHubConsumerClient.from_connection_string( connection_str, consumer_group, - eventhub_name = eventhub_name, + eventhub_name=eventhub_name, checkpoint_store=checkpoint_store, ) diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob/samples/event_processor_blob_storage_example.py b/sdk/eventhub/azure-eventhub-checkpointstoreblob/samples/event_processor_blob_storage_example.py index 02e6849818f1..d57999a037af 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob/samples/event_processor_blob_storage_example.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob/samples/event_processor_blob_storage_example.py @@ -4,18 +4,26 @@ CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] +BLOB_CONTAINER_NAME = "your-blob-container-name" # Please make sure the blob container resource exists. def on_event(partition_context, event): - # do something with event + # Put your code here. + # Avoid time-consuming operations. print(event) partition_context.update_checkpoint(event) if __name__ == '__main__': - checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor") + checkpoint_store = BlobCheckpointStore.from_connection_string( + STORAGE_CONNECTION_STR, + container_name=BLOB_CONTAINER_NAME + ) client = EventHubConsumerClient.from_connection_string( - CONNECTION_STR, "$default", checkpoint_store=checkpoint_store) + CONNECTION_STR, + "$Default", + checkpoint_store=checkpoint_store + ) try: client.receive(on_event) diff --git a/sdk/eventhub/azure-eventhub/HISTORY.md b/sdk/eventhub/azure-eventhub/HISTORY.md index c593932e690e..24a57beb3878 100644 --- a/sdk/eventhub/azure-eventhub/HISTORY.md +++ b/sdk/eventhub/azure-eventhub/HISTORY.md @@ -5,7 +5,7 @@ **Breaking changes** - `EventData` - - Removed deprecated property `application_properties` and deprecated method `encode_message()` + - Removed deprecated property `application_properties` and deprecated method `encode_message()`. - `EventHubConsumerClient` - `on_error` would be called when `EventHubConsumerClient` failed to claim ownership of partitions. - `on_partition_close` and `on_partition_initialize` would be called in the case of exceptions raised by `on_event` callback. @@ -54,7 +54,7 @@ - `PartitionContext` now has attribute `last_enqueued_event_properties` which is populated if `track_last_enqueued_event_properties` is set to `True` in the `receive` method. -** New features ** +**New features** - Added new parameter `idle_timeout` in construct and `from_connection_string` to `EventHubConsumerClient` and `EventHubProducerClient` after which the underlying connection will close if there is no further activity. diff --git a/sdk/eventhub/azure-eventhub/README.md b/sdk/eventhub/azure-eventhub/README.md index 02d1659017d1..b6fee9b64661 100644 --- a/sdk/eventhub/azure-eventhub/README.md +++ b/sdk/eventhub/azure-eventhub/README.md @@ -148,7 +148,7 @@ connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' eventhub_name = '<< NAME OF THE EVENT HUB >>' client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name) -event_data_batch = client.create_batch(max_size_in_bytes=10000) +event_data_batch = client.create_batch() can_add = True while can_add: try: @@ -200,7 +200,7 @@ consumer_group = '<< CONSUMER GROUP >>' eventhub_name = '<< NAME OF THE EVENT HUB >>' async def create_batch(client): - event_data_batch = await client.create_batch(max_size_in_bytes=10000) + event_data_batch = await client.create_batch() can_add = True while can_add: try: @@ -284,7 +284,7 @@ connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' consumer_group = '<< CONSUMER GROUP >>' eventhub_name = '<< NAME OF THE EVENT HUB >>' storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>' -container_name = '<>' +container_name = '<>' async def on_event(partition_context, event): # do something diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index e00e3dff2424..6b73a458eb95 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -48,7 +48,7 @@ class EventData(object): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START create_event_data] :end-before: [END create_event_data] :language: python diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py index 64b95c04d166..f8643d45bcbb 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py @@ -86,7 +86,7 @@ class EventHubConsumerClient(ClientBase): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START create_eventhub_consumer_client_sync] :end-before: [END create_eventhub_consumer_client_sync] :language: python @@ -196,7 +196,7 @@ def from_connection_string(cls, conn_str, consumer_group, **kwargs): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START create_eventhub_consumer_client_from_conn_str_sync] :end-before: [END create_eventhub_consumer_client_from_conn_str_sync] :language: python @@ -270,7 +270,7 @@ def receive(self, on_event, **kwargs): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START eventhub_consumer_client_receive_sync] :end-before: [END eventhub_consumer_client_receive_sync] :language: python @@ -389,7 +389,7 @@ def close(self): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START eventhub_consumer_client_close_sync] :end-before: [END eventhub_consumer_client_close_sync] :language: python diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index cea84f002b9e..6343b85aef25 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -49,7 +49,7 @@ class EventHubProducerClient(ClientBase): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START create_eventhub_producer_client_sync] :end-before: [END create_eventhub_producer_client_sync] :language: python @@ -172,7 +172,7 @@ def from_connection_string(cls, conn_str, **kwargs): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START create_eventhub_producer_client_from_conn_str_sync] :end-before: [END create_eventhub_producer_client_from_conn_str_sync] :language: python @@ -200,7 +200,7 @@ def send_batch(self, event_data_batch, **kwargs): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START eventhub_producer_client_send_sync] :end-before: [END eventhub_producer_client_send_sync] :language: python @@ -232,12 +232,13 @@ def create_batch(self, **kwargs): will assign to all partitions using round-robin. :keyword str partition_key: With the given partition_key, event data will be sent to a particular partition of the Event Hub decided by the service. - :keyword int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold. + :keyword int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold. By + default, the value is determined by your Event Hubs tier. :rtype: ~azure.eventhub.EventDataBatch .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START eventhub_producer_client_create_batch_sync] :end-before: [END eventhub_producer_client_create_batch_sync] :language: python @@ -322,7 +323,7 @@ def close(self): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py + .. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py :start-after: [START eventhub_producer_client_close_sync] :end-before: [END eventhub_producer_client_close_sync] :language: python diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py index 3aae0aeb9aca..e7f2ffec028a 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py @@ -92,7 +92,7 @@ class EventHubConsumerClient(ClientBaseAsync): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START create_eventhub_consumer_client_async] :end-before: [END create_eventhub_consumer_client_async] :language: python @@ -215,7 +215,7 @@ def from_connection_string( .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START create_eventhub_consumer_client_from_conn_str_async] :end-before: [END create_eventhub_consumer_client_from_conn_str_async] :language: python @@ -320,7 +320,7 @@ async def receive( .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START eventhub_consumer_client_receive_async] :end-before: [END eventhub_consumer_client_receive_async] :language: python @@ -439,7 +439,7 @@ async def close(self) -> None: .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START eventhub_consumer_client_close_async] :end-before: [END eventhub_consumer_client_close_async] :language: python diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index de531176242f..ffe60685bef0 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -51,7 +51,7 @@ class EventHubProducerClient(ClientBaseAsync): .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START create_eventhub_producer_client_async] :end-before: [END create_eventhub_producer_client_async] :language: python @@ -189,7 +189,7 @@ def from_connection_string( .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START create_eventhub_producer_client_from_conn_str_async] :end-before: [END create_eventhub_producer_client_from_conn_str_async] :language: python @@ -231,7 +231,7 @@ async def send_batch( .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START eventhub_producer_client_send_async] :end-before: [END eventhub_producer_client_send_async] :language: python @@ -267,12 +267,13 @@ async def create_batch( will assign to all partitions using round-robin. :param str partition_key: With the given partition_key, event data will be sent to a particular partition of the Event Hub decided by the service. - :param int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold. + :param int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold. By + default, the value is determined by your Event Hubs tier. :rtype: ~azure.eventhub.EventDataBatch .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START eventhub_producer_client_create_batch_async] :end-before: [END eventhub_producer_client_create_batch_async] :language: python @@ -351,7 +352,7 @@ async def close(self) -> None: .. admonition:: Example: - .. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py :start-after: [START eventhub_producer_client_close_async] :end-before: [END eventhub_producer_client_close_async] :language: python diff --git a/sdk/eventhub/azure-eventhub/migration_guide.md b/sdk/eventhub/azure-eventhub/migration_guide.md index f00f1216956a..a6d10e0c1b47 100644 --- a/sdk/eventhub/azure-eventhub/migration_guide.md +++ b/sdk/eventhub/azure-eventhub/migration_guide.md @@ -27,8 +27,8 @@ The code samples in this migration guide use async APIs. | In v1 | Equivalent in v5 | Sample | |---|---|---| -| `EventHubClientAsync()` | `EventHubProducerClient()` or `EventHubConsumerClient()` | [using credential](./samples/async_samples/client_secret_auth_async.py ) | -| `EventHubClientAsync.from_connection_string()` | `EventHubProducerClient.from_connection_string` or `EventHubConsumerClient.from_connection_string` |[receive events](./samples/async_samples/recv_async.py), [send events](./samples/async_samples/send_async.py) | +| `EventHubClientAsync()` | `EventHubProducerClient()` or `EventHubConsumerClient()` | [using credential](./samples/async_samples/client_identity_authentication_async.py ) | +| `EventHubClientAsync.from_connection_string()` | `EventHubProducerClient.from_connection_string` or `EventHubConsumerClient.from_connection_string` |[client creation](./samples/async_samples/client_creation_async.py) | | `EventProcessorHost()`| `EventHubConsumerClient(..., checkpoint_store)`| [receive events using checkpoint store](./samples/async_samples/recv_with_checkpoint_store_async.py) | ### Receiving events @@ -59,7 +59,7 @@ For example, this code which keeps receiving from a partition in V1: ```python client = EventHubClientAsync.from_connection_string(connection_str, eventhub=EVENTHUB_NAME) -receiver = client.add_async_receiver(consumer_group="$default", partition="0", offset=Offset('@latest')) +receiver = client.add_async_receiver(consumer_group="$Default", partition="0", offset=Offset('@latest')) try: await client.run_async() logger = logging.getLogger("azure.eventhub") @@ -79,7 +79,7 @@ async def on_event(partition_context, event): logger.info("Message received:{}".format(event.body_as_str())) client = EventHubConsumerClient.from_connection_string( - conn_str=CONNECTION_STR, consumer_group="$default", eventhub_name=EVENTHUB_NAME + conn_str=CONNECTION_STR, consumer_group="$Default", eventhub_name=EVENTHUB_NAME ) async with client: await client.receive(on_event=on_event, partition_id="0", starting_position="@latest") @@ -172,7 +172,7 @@ USER = os.environ.get('EVENT_HUB_SAS_POLICY') KEY = os.environ.get('EVENT_HUB_SAS_KEY') # Eventhub config and storage manager -eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default") +eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$Default") eh_options = EPHOptions() eh_options.debug_trace = False storage_manager = AzureStorageCheckpointLeaseManager( @@ -207,6 +207,7 @@ from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore logging.basicConfig(level=logging.INFO) CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] +BLOB_CONTAINER_NAME = "your-blob-container-name" logger = logging.getLogger("azure.eventhub") events_processed = defaultdict(int) @@ -230,7 +231,7 @@ async def on_error(context, error): logger.error("Receiving event has a non-partition error {!r}".format(error)) async def main(): - checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, "aStorageBlobContainerName") + checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME) client = EventHubConsumerClient.from_connection_string( CONNECTION_STR, consumer_group="$Default", diff --git a/sdk/eventhub/azure-eventhub/samples/README.md b/sdk/eventhub/azure-eventhub/samples/README.md index 3ef81d64a16f..c63108813195 100644 --- a/sdk/eventhub/azure-eventhub/samples/README.md +++ b/sdk/eventhub/azure-eventhub/samples/README.md @@ -25,7 +25,7 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp - Send event data batch to a specific partition by partition id - Send event data batch with customized properties -- [send.py](./sync_samples/send_stream.py) ([async version](./async_samples/send_stream_async.py)) - Examples to do streaming sending: +- [send_stream.py](./sync_samples/send_stream.py) ([async version](./async_samples/send_stream_async.py)) - Examples to do streaming sending: - Send in a stream - [recv.py](./sync_samples/recv.py) ([async version](./async_samples/recv_async.py)) - Examples to receive events: diff --git a/sdk/eventhub/azure-eventhub/samples/docstring_samples/sample_code_eventhub_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/sample_code_eventhub_async.py similarity index 78% rename from sdk/eventhub/azure-eventhub/samples/docstring_samples/sample_code_eventhub_async.py rename to sdk/eventhub/azure-eventhub/samples/async_samples/sample_code_eventhub_async.py index 7863ab502916..83b29fa96476 100644 --- a/sdk/eventhub/azure-eventhub/samples/docstring_samples/sample_code_eventhub_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/sample_code_eventhub_async.py @@ -4,18 +4,32 @@ # license information. #-------------------------------------------------------------------------- +""" +Examples to show basic async use case of python azure-eventhub SDK, including: + - Create EventHubProducerClient + - Create EventHubConsumerClient + - Create EventData + - Create EventDataBatch + - Send EventDataBatch + - Receive EventData + - Close EventHubProducerClient + - Close EventHubConsumerClient +""" + import logging import asyncio -def create_async_eventhub_producer_client(): +def example_create_async_eventhub_producer_client(): # [START create_eventhub_producer_client_from_conn_str_async] import os from azure.eventhub.aio import EventHubProducerClient event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR'] eventhub_name = os.environ['EVENT_HUB_NAME'] - producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_str, - eventhub_name=eventhub_name) + producer = EventHubProducerClient.from_connection_string( + conn_str=event_hub_connection_str, + eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string. + ) # [END create_eventhub_producer_client_from_conn_str_async] # [START create_eventhub_producer_client_async] @@ -34,15 +48,17 @@ def create_async_eventhub_producer_client(): return producer -def create_async_eventhub_consumer_client(): +def example_create_async_eventhub_consumer_client(): # [START create_eventhub_consumer_client_from_conn_str_async] import os from azure.eventhub.aio import EventHubConsumerClient event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR'] eventhub_name = os.environ['EVENT_HUB_NAME'] - consumer = EventHubConsumerClient.from_connection_string(conn_str=event_hub_connection_str, - consumer_group='$Default', - eventhub_name=eventhub_name) + consumer = EventHubConsumerClient.from_connection_string( + conn_str=event_hub_connection_str, + consumer_group='$Default', + eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string. + ) # [END create_eventhub_consumer_client_from_conn_str_async] # [START create_eventhub_consumer_client_async] @@ -63,12 +79,12 @@ def create_async_eventhub_consumer_client(): async def example_eventhub_async_send_and_receive(): - producer = create_async_eventhub_producer_client() - consumer = create_async_eventhub_consumer_client() + producer = example_create_async_eventhub_producer_client() + consumer = example_create_async_eventhub_consumer_client() try: # [START eventhub_producer_client_create_batch_async] from azure.eventhub import EventData - event_data_batch = await producer.create_batch(max_size_in_bytes=10000) + event_data_batch = await producer.create_batch() while True: try: event_data_batch.add(EventData('Message inside EventBatchData')) @@ -80,7 +96,7 @@ async def example_eventhub_async_send_and_receive(): # [START eventhub_producer_client_send_async] async with producer: - event_data_batch = await producer.create_batch(max_size_in_bytes=10000) + event_data_batch = await producer.create_batch() while True: try: event_data_batch.add(EventData('Message inside EventBatchData')) @@ -106,7 +122,7 @@ async def on_event(partition_context, event): pass -async def example_eventhub_async_producer_ops(): +async def example_eventhub_async_producer_send_and_close(): # [START eventhub_producer_client_close_async] import os from azure.eventhub.aio import EventHubProducerClient @@ -117,10 +133,10 @@ async def example_eventhub_async_producer_ops(): producer = EventHubProducerClient.from_connection_string( conn_str=event_hub_connection_str, - eventhub_name=eventhub_name + eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string. ) try: - event_data_batch = await producer.create_batch(max_size_in_bytes=10000) + event_data_batch = await producer.create_batch() while True: try: event_data_batch.add(EventData('Message inside EventBatchData')) @@ -135,7 +151,7 @@ async def example_eventhub_async_producer_ops(): # [END eventhub_producer_client_close_async] -async def example_eventhub_async_consumer_ops(): +async def example_eventhub_async_consumer_receive_and_close(): # [START eventhub_consumer_client_close_async] import os @@ -146,7 +162,7 @@ async def example_eventhub_async_consumer_ops(): consumer = EventHubConsumerClient.from_connection_string( conn_str=event_hub_connection_str, consumer_group='$Default', - eventhub_name=eventhub_name + eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string. ) logger = logging.getLogger("azure.eventhub") @@ -168,6 +184,7 @@ async def on_event(partition_context, event): if __name__ == '__main__': loop = asyncio.get_event_loop() - loop.run_until_complete(example_eventhub_async_producer_ops()) - loop.run_until_complete(example_eventhub_async_consumer_ops()) + loop.run_until_complete(example_eventhub_async_consumer_receive_and_close()) + # loop.run_until_complete(example_eventhub_async_producer_send_and_close()) # loop.run_until_complete(example_eventhub_async_send_and_receive()) + diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/client_creation.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/client_creation.py index e4764de3f270..f61596d9d257 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/client_creation.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/client_creation.py @@ -30,12 +30,12 @@ def create_producer_client(): # Create producer client from connection string. - producer_client = EventHubProducerClient.from_CONNECTION_STRING( + producer_client = EventHubProducerClient.from_connection_string( conn_str=CONNECTION_STRING # connection string contains EventHub name. ) # Illustration of commonly used parameters. - producer_client = EventHubProducerClient.from_CONNECTION_STRING( + producer_client = EventHubProducerClient.from_connection_string( conn_str=CONNECTION_STRING, eventhub_name=EVENTHUB_NAME, # EventHub name should be specified if it doesn't show up in connection string. logging_enable=False, # To enable network tracing log, set logging_enable to True. @@ -65,13 +65,13 @@ def create_consumer_client(): # Create consumer client from connection string. - consumer_client = EventHubConsumerClient.from_CONNECTION_STRING( + consumer_client = EventHubConsumerClient.from_connection_string( conn_str=CONNECTION_STRING, # connection string contains EventHub name. consumer_group=CONSUMER_GROUP ) # Illustration of commonly used parameters. - consumer_client = EventHubConsumerClient.from_CONNECTION_STRING( + consumer_client = EventHubConsumerClient.from_connection_string( conn_str=CONNECTION_STRING, consumer_group=CONSUMER_GROUP, eventhub_name=EVENTHUB_NAME, # EventHub name should be specified if it doesn't show up in connection string. diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_time_interval.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_time_interval.py index 356068b3cf2e..e6850b2778e4 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_time_interval.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_time_interval.py @@ -27,7 +27,7 @@ def on_event(partition_context, event): - # Put your code here. to do some operations on the event. + # Put your code here. # Avoid time-consuming operations. p_id = partition_context.partition_id print("Received event from partition: {}".format(p_id)) diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_store.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_store.py index be849a04f34a..92fd671b3288 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_store.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_store.py @@ -22,7 +22,7 @@ def on_event(partition_context, event): - # Put your code here. to do some operations on the event. + # Put your code here. # Avoid time-consuming operations. print("Received event from partition: {}.".format(partition_context.partition_id)) partition_context.update_checkpoint(event) diff --git a/sdk/eventhub/azure-eventhub/samples/docstring_samples/sample_code_eventhub.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/sample_code_eventhub.py similarity index 80% rename from sdk/eventhub/azure-eventhub/samples/docstring_samples/sample_code_eventhub.py rename to sdk/eventhub/azure-eventhub/samples/sync_samples/sample_code_eventhub.py index 5b7ea6b76a84..0558ca769d7f 100644 --- a/sdk/eventhub/azure-eventhub/samples/docstring_samples/sample_code_eventhub.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/sample_code_eventhub.py @@ -4,11 +4,23 @@ # license information. #-------------------------------------------------------------------------- +""" +Examples to show basic use case of python azure-eventhub SDK, including: + - Create EventHubProducerClient + - Create EventHubConsumerClient + - Create EventData + - Create EventDataBatch + - Send EventDataBatch + - Receive EventData + - Close EventHubProducerClient + - Close EventHubConsumerClient +""" + import time import logging -def create_eventhub_producer_client(): +def example_create_eventhub_producer_client(): # [START create_eventhub_producer_client_from_conn_str_sync] import os from azure.eventhub import EventHubProducerClient @@ -16,7 +28,7 @@ def create_eventhub_producer_client(): eventhub_name = os.environ['EVENT_HUB_NAME'] producer = EventHubProducerClient.from_connection_string( conn_str=event_hub_connection_str, - eventhub_name=eventhub_name + eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string. ) # [END create_eventhub_producer_client_from_conn_str_sync] @@ -32,14 +44,14 @@ def create_eventhub_producer_client(): credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key) producer = EventHubProducerClient( fully_qualified_namespace=fully_qualified_namespace, - eventhub_name=eventhub_name, + eventhub_name=eventhub_name, # EventHub name should be specified if it doesn't show up in connection string. credential=credential ) # [END create_eventhub_producer_client_sync] return producer -def create_eventhub_consumer_client(): +def example_create_eventhub_consumer_client(): # [START create_eventhub_consumer_client_from_conn_str_sync] import os from azure.eventhub import EventHubConsumerClient @@ -48,7 +60,7 @@ def create_eventhub_consumer_client(): consumer = EventHubConsumerClient.from_connection_string( conn_str=event_hub_connection_str, consumer_group='$Default', - eventhub_name=eventhub_name + eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string. ) # [END create_eventhub_consumer_client_from_conn_str_sync] @@ -65,14 +77,15 @@ def create_eventhub_consumer_client(): consumer = EventHubConsumerClient( fully_qualified_namespace=fully_qualified_namespace, eventhub_name=eventhub_name, + consumer_group='$Default', credential=credential) # [END create_eventhub_consumer_client_sync] return consumer def example_eventhub_sync_send_and_receive(): - producer = create_eventhub_producer_client() - consumer = create_eventhub_consumer_client() + producer = example_create_eventhub_producer_client() + consumer = example_create_eventhub_consumer_client() try: logger = logging.getLogger("azure.eventhub") @@ -85,7 +98,7 @@ def example_eventhub_sync_send_and_receive(): # [END create_event_data] # [START eventhub_producer_client_create_batch_sync] - event_data_batch = producer.create_batch(max_size_in_bytes=10000) + event_data_batch = producer.create_batch() while True: try: event_data_batch.add(EventData('Message inside EventBatchData')) @@ -97,7 +110,7 @@ def example_eventhub_sync_send_and_receive(): # [START eventhub_producer_client_send_sync] with producer: - event_data_batch = producer.create_batch(max_size_in_bytes=10000) + event_data_batch = producer.create_batch() while True: try: @@ -125,7 +138,7 @@ def on_event(partition_context, event): pass -def example_eventhub_producer_ops(): +def example_eventhub_producer_send_and_close(): # [START eventhub_producer_client_close_sync] import os from azure.eventhub import EventHubProducerClient, EventData @@ -135,10 +148,10 @@ def example_eventhub_producer_ops(): producer = EventHubProducerClient.from_connection_string( conn_str=event_hub_connection_str, - eventhub_name=eventhub_name + eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string. ) try: - event_data_batch = producer.create_batch(max_size_in_bytes=10000) + event_data_batch = producer.create_batch() while True: try: @@ -155,7 +168,7 @@ def example_eventhub_producer_ops(): # [END eventhub_producer_client_close_sync] -def example_eventhub_consumer_ops(): +def example_eventhub_consumer_receive_and_close(): # [START eventhub_consumer_client_close_sync] import os import threading @@ -167,7 +180,7 @@ def example_eventhub_consumer_ops(): consumer = EventHubConsumerClient.from_connection_string( conn_str=event_hub_connection_str, consumer_group="$Default", - eventhub_name=eventhub_name + eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string. ) logger = logging.getLogger("azure.eventhub") @@ -191,6 +204,6 @@ def on_event(partition_context, event): if __name__ == '__main__': - example_eventhub_producer_ops() - example_eventhub_consumer_ops() - # example_eventhub_sync_send_and_receive() + example_eventhub_sync_send_and_receive() + # example_eventhub_producer_send_and_close() + # example_eventhub_consumer_receive_and_close() diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_eventprocessor_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_eventprocessor_async.py index 708f3b02a952..3ddc3c6d382d 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_eventprocessor_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_eventprocessor_async.py @@ -259,7 +259,6 @@ async def error_handler(partition_context, err): assert partition_initialize_handler.partition_context -@pytest.mark.liveTest @pytest.mark.asyncio async def test_partition_processor_process_events_error(): @@ -642,7 +641,6 @@ def get_partition_ids(self): assert len(to_claim_ownership) == expected_result -@pytest.mark.liveTest @pytest.mark.asyncio async def test_partition_processor_process_update_checkpoint_error():