diff --git a/sdk/eventhub/azure-eventhub/README.md b/sdk/eventhub/azure-eventhub/README.md index 5d14a3ae7b3d..ccc29e28f113 100644 --- a/sdk/eventhub/azure-eventhub/README.md +++ b/sdk/eventhub/azure-eventhub/README.md @@ -38,26 +38,21 @@ There, you can also find detailed instructions for using the Azure CLI, Azure Po ### Authenticate the client -Interaction with Event Hubs starts with an instance of the EventHubClient class. You need the host name, SAS/AAD credential and event hub name to instantiate the client object. +Interaction with Event Hubs starts with an instance of EventHubConsumerClient or EventHubProducerClient class. You need either the host name, SAS/AAD credential and event hub name or a connection string to instantiate the client object. -#### Obtain a connection string +**Create client from connection string:** -For the Event Hubs client library to interact with an Event Hub, it will need to understand how to connect and authorize with it. -The easiest means for doing so is to use a connection string, which is created automatically when creating an Event Hubs namespace. +For the Event Hubs client library to interact with an Event Hub, the easiest means is to use a connection string, which is created automatically when creating an Event Hubs namespace. If you aren't familiar with shared access policies in Azure, you may wish to follow the step-by-step guide to [get an Event Hubs connection string](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string). -#### Create client - -There are several ways to instantiate the EventHubClient object and the following code snippets demonstrate two ways: - -**Create client from connection string:** ```python -from azure.eventhub import EventHubConsumerClient +from azure.eventhub import EventHubConsumerClient, EventHubProducerClient connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' consumer_group = '<< CONSUMER GROUP >>' eventhub_name = '<< NAME OF THE EVENT HUB >>' +producer_client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name) consumer_client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name) ``` diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index 6b73a458eb95..78bf705add9f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -273,7 +273,7 @@ class EventDataBatch(object): at which point a `ValueError` will be raised. Use the `send_batch` method of :class:`EventHubProducerClient` or the async :class:`EventHubProducerClient` - for sending. The `create_batch` method accepts partition_key as a parameter for sending a particular partition. + for sending. **Please use the create_batch method of EventHubProducerClient to create an EventDataBatch object instead of instantiating an EventDataBatch object directly.** diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py index f8643d45bcbb..ec850ce1589f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py @@ -30,7 +30,7 @@ class EventHubConsumerClient(ClientBase): - """The EventHubProducerClient class defines a high level interface for + """The EventHubConsumerClient class defines a high level interface for receiving events from the Azure Event Hubs service. The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with @@ -38,16 +38,15 @@ class EventHubConsumerClient(ClientBase): When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same checkpointing location, they will balance automatically. - To enable the load-balancing and / or checkpointing, checkpoint_store must be set when creating the + To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the `EventHubConsumerClient`. + If a checkpoint store is not provided, the checkpoint will be maintained internally in memory. An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()` - and specify the partition_id. - Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store - is set. + and specify the partition_id. Load-balancing won't work in single-partition receiving mode. :param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace. - This is likely to be similar to .servicebus.windows.net + The namespace format is: `.servicebus.windows.net`. :param str eventhub_name: The path of the specific Event Hub to connect the client to. :param str consumer_group: Receive events from the event hub for this consumer group. :param ~azure.core.credentials.TokenCredential credential: The credential object used for authentication which @@ -66,7 +65,7 @@ class EventHubConsumerClient(ClientBase): The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume receiving. - :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close + :keyword float idle_timeout: Timeout in seconds, after which the underlying connection will close if there is no further activity. By default the value is None, meaning that the service determines when to close an idle connection. :keyword transport_type: The type of transport protocol that will be used for communicating with diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py index 840672af5cde..3bbe5d393fac 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py @@ -10,7 +10,7 @@ class CheckpointStore(object): """CheckpointStore deals with the interaction with the chosen storage service. - It can list and claim partition ownerships; and list and save checkpoints. + It can list and claim partition ownerships as well as list and save checkpoints. """ @abstractmethod @@ -19,14 +19,14 @@ def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_grou """Retrieves a complete ownership list from the chosen storage service. :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". :param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with, relative to the Event Hubs namespace that contains it. :param str consumer_group: The name of the consumer group the ownerships are associated with. :rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information: - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. - `consumer_group` (str): The name of the consumer group the ownership are associated with. @@ -46,7 +46,7 @@ def claim_ownership(self, ownership_list): :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information: - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. - `consumer_group` (str): The name of the consumer group the ownership are associated with. @@ -69,7 +69,7 @@ def update_checkpoint(self, checkpoint): :param Dict[str,Any] checkpoint: A dict containing checkpoint information: - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. - `consumer_group` (str): The name of the consumer group the checkpoint is associated with. @@ -90,14 +90,14 @@ def list_checkpoints( """List the updated checkpoints from the store. :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". :param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. :param str consumer_group: The name of the consumer group the checkpoints are associated with. :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information: - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". - `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. - `consumer_group` (str): The name of the consumer group the checkpoints are associated with. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/partition_context.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/partition_context.py index 15e28734c5b0..57859b41eb74 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/partition_context.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/partition_context.py @@ -63,9 +63,6 @@ def update_checkpoint(self, event): # type: (EventData) -> None """Updates the receive checkpoint to the given events offset. - This operation will only update a checkpoint if a `checkpoint_store` was provided during - creation of the `EventHubConsumerClient`. Otherwise a warning will be logged. - :param ~azure.eventhub.EventData event: The EventData instance which contains the offset and sequence number information used for checkpoint. :rtype: None diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py index e7f2ffec028a..80fb544d578e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py @@ -36,7 +36,7 @@ class EventHubConsumerClient(ClientBaseAsync): - """The EventHubProducerClient class defines a high level interface for + """The EventHubConsumerClient class defines a high level interface for receiving events from the Azure Event Hubs service. The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with @@ -44,16 +44,15 @@ class EventHubConsumerClient(ClientBaseAsync): When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same checkpointing location, they will balance automatically. - To enable the load-balancing and / or checkpointing, checkpoint_store must be set when creating the + To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the `EventHubConsumerClient`. + If a checkpoint store is not provided, the checkpoint will be maintained internally in memory. An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()` - and specify the partition_id. - Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store - is set. + and specify the partition_id. Load-balancing won't work in single-partition receiving mode. :param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace. - This is likely to be similar to .servicebus.windows.net + The namespace format is: `.servicebus.windows.net`. :param str eventhub_name: The path of the specific Event Hub to connect the client to. :param str consumer_group: Receive events from the event hub for this consumer group. :param ~azure.core.credentials.TokenCredential credential: The credential object used for authentication which diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/checkpoint_store.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/checkpoint_store.py index 8bbe19e6cee1..55e7124735c7 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/checkpoint_store.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/checkpoint_store.py @@ -10,7 +10,7 @@ class CheckpointStore(ABC): """CheckpointStore deals with the interaction with the chosen storage service. - It can list and claim partition ownerships; and list and save checkpoints. + It can list and claim partition ownerships as well as list and save checkpoints. """ @abstractmethod @@ -20,14 +20,14 @@ async def list_ownership( """Retrieves a complete ownership list from the chosen storage service. :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". :param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with, relative to the Event Hubs namespace that contains it. :param str consumer_group: The name of the consumer group the ownerships are associated with. :rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information: - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. - `consumer_group` (str): The name of the consumer group the ownership are associated with. @@ -48,7 +48,7 @@ async def claim_ownership( :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information: - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. - `consumer_group` (str): The name of the consumer group the ownership are associated with. @@ -72,7 +72,7 @@ async def update_checkpoint( :param Dict[str,Any] checkpoint: A dict containing checkpoint information: - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. - `consumer_group` (str): The name of the consumer group the checkpoint is associated with. @@ -92,14 +92,14 @@ async def list_checkpoints( """List the updated checkpoints from the store. :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". :param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. :param str consumer_group: The name of the consumer group the checkpoints are associated with. :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information: - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. - The format is like ".servicebus.windows.net" + The format is like ".servicebus.windows.net". - `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. - `consumer_group` (str): The name of the consumer group the checkpoints are associated with. diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/partition_context.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/partition_context.py index 25ff92215067..b607e8cdb867 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/partition_context.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/partition_context.py @@ -60,9 +60,6 @@ def last_enqueued_event_properties(self) -> Optional[Dict[str, Any]]: async def update_checkpoint(self, event: "EventData") -> None: """Updates the receive checkpoint to the given events offset. - This operation will only update a checkpoint if a `checkpoint_store` was provided during - creation of the `EventHubConsumerClient`. Otherwise a warning will be logged. - :param ~azure.eventhub.EventData event: The EventData instance which contains the offset and sequence number information used for checkpoint. :rtype: None diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py index 644fa0581cef..b4e4605a22b2 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py @@ -44,7 +44,7 @@ async def on_event(partition_context, event): assert len( [checkpoint for checkpoint in checkpoints if checkpoint["sequence_number"] == on_event.sequence_number]) > 0 - task.cancel() + await task @pytest.mark.liveTest @@ -67,7 +67,7 @@ async def on_event(partition_context, event): client.receive(on_event, partition_id="0", starting_position="-1")) await asyncio.sleep(10) assert on_event.received == 1 - task.cancel() + await task @pytest.mark.liveTest diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_eventprocessor_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_eventprocessor_async.py index 3ddc3c6d382d..abe800dc4194 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_eventprocessor_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_eventprocessor_async.py @@ -170,7 +170,7 @@ async def on_error(partition_context, error): assert on_error.called is True finally: await event_processor.stop() - task.cancel() + await task await eventhub_client.close() @@ -250,7 +250,7 @@ async def error_handler(partition_context, err): await asyncio.sleep(2) assert len(event_processor._tasks) == 2 await event_processor.stop() - task.cancel() + await task await eventhub_client.close() assert event_map['0'] >= 1 and event_map['1'] >= 1 assert checkpoint is not None @@ -316,7 +316,7 @@ async def error_handler(partition_context, error): task = asyncio.ensure_future(event_processor.start()) await asyncio.sleep(10) await event_processor.stop() - # task.cancel() + await task await asyncio.sleep(1) await eventhub_client.close() assert isinstance(error_handler.error, RuntimeError) @@ -370,7 +370,7 @@ async def close(self): task = asyncio.ensure_future(event_processor.start()) await asyncio.sleep(5) await event_processor.stop() - task.cancel() + await task assert isinstance(error_handler.error, EventHubError) assert partition_close_handler.reason == CloseReason.OWNERSHIP_LOST @@ -441,7 +441,7 @@ async def release_ownership(self, partition_id): task = asyncio.ensure_future(event_processor.start()) await asyncio.sleep(5) await event_processor.stop() - # task.cancel() + await task assert partition_initialize_handler.called assert event_handler.called assert error_handler.called @@ -702,7 +702,7 @@ async def partition_close_handler(partition_context, reason): task = asyncio.ensure_future(event_processor.start()) await asyncio.sleep(10) await event_processor.stop() - # task.cancel() + await task await asyncio.sleep(1) await eventhub_client.close() assert partition_close_handler.called diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_receive_async.py index 173c40a12035..dc373ba33d4b 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_receive_async.py @@ -31,7 +31,7 @@ async def on_event(partition_context, event): senders[0].send(EventData(b"Receiving only a single event")) await asyncio.sleep(10) assert on_event.called is True - task.cancel() + await task @pytest.mark.parametrize("position, inclusive, expected_result", @@ -68,7 +68,7 @@ async def on_event(partition_context, event): track_last_enqueued_event_properties=True)) await asyncio.sleep(10) assert on_event.event_position is not None - task.cancel() + await task senders[0].send(EventData(expected_result)) client2 = EventHubConsumerClient.from_connection_string(connection_str, consumer_group='$default') async with client2: @@ -78,7 +78,7 @@ async def on_event(partition_context, event): track_last_enqueued_event_properties=True)) await asyncio.sleep(10) assert on_event.event.body_as_str() == expected_result - task.cancel() + await task @pytest.mark.liveTest @@ -110,8 +110,8 @@ async def on_error(partition_context, error): ed = EventData("Event Number {}".format(i)) senders[0].send(ed) await asyncio.sleep(10) - task1.cancel() - task2.cancel() + await task1 + await task2 assert isinstance(on_error.error, EventHubError) @@ -141,7 +141,7 @@ async def on_event(partition_context, event): task = asyncio.ensure_future(client.receive(on_event, partition_id="0", starting_position="-1")) await asyncio.sleep(10) - task.cancel() + await task assert len(on_event.received) == 5 for ed in on_event.received: assert ed.properties[b"raw_prop"] == b"raw_value"