diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_blobstoragecsaio.py b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_blobstoragecsaio.py index c7bb4a54dee5..cba36dd14ace 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_blobstoragecsaio.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_blobstoragecsaio.py @@ -129,9 +129,65 @@ async def _upload_ownership( ].timestamp() ownership.update(metadata) + async def _claim_one_partition(self, ownership: Dict[str, Any]) -> Dict[str, Any]: + partition_id = ownership["partition_id"] + namespace = ownership["fully_qualified_namespace"] + eventhub_name = ownership["eventhub_name"] + consumer_group = ownership["consumer_group"] + owner_id = ownership["owner_id"] + metadata = {"ownerid": owner_id} + try: + await self._upload_ownership(ownership, metadata) + return ownership + except (ResourceModifiedError, ResourceExistsError): + logger.info( + "EventProcessor instance %r of namespace %r eventhub %r consumer group %r " + "lost ownership to partition %r", + owner_id, + namespace, + eventhub_name, + consumer_group, + partition_id, + ) + raise OwnershipLostError() + except Exception as error: # pylint:disable=broad-except + logger.warning( + "An exception occurred when EventProcessor instance %r claim_ownership for " + "namespace %r eventhub %r consumer group %r partition %r. " + "The ownership is now lost. Exception " + "is %r", + owner_id, + namespace, + eventhub_name, + consumer_group, + partition_id, + error, + ) + return ownership # Keep the ownership if an unexpected error happens + async def list_ownership( self, fully_qualified_namespace: str, eventhub_name: str, consumer_group: str ) -> Iterable[Dict[str, Any]]: + """Retrieves a complete ownership list from the storage blob. + + :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + :param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with, + relative to the Event Hubs namespace that contains it. + :param str consumer_group: The name of the consumer group the ownerships are associated with. + :rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information: + + - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, + relative to the Event Hubs namespace that contains it. + - `consumer_group` (str): The name of the consumer group the ownership are associated with. + - `partition_id` (str): The partition ID which the checkpoint is created for. + - `owner_id` (str): A UUID representing the current owner of this partition. + - `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed. + - `etag` (str): The Etag value for the last time this ownership was modified. Optional depending + on storage implementation. + """ try: blob_prefix = "{}/{}/{}/ownership/".format( fully_qualified_namespace, eventhub_name, consumer_group @@ -166,45 +222,25 @@ async def list_ownership( ) raise - async def _claim_one_partition(self, ownership: Dict[str, Any]) -> Dict[str, Any]: - partition_id = ownership["partition_id"] - namespace = ownership["fully_qualified_namespace"] - eventhub_name = ownership["eventhub_name"] - consumer_group = ownership["consumer_group"] - owner_id = ownership["owner_id"] - metadata = {"ownerid": owner_id} - try: - await self._upload_ownership(ownership, metadata) - return ownership - except (ResourceModifiedError, ResourceExistsError): - logger.info( - "EventProcessor instance %r of namespace %r eventhub %r consumer group %r " - "lost ownership to partition %r", - owner_id, - namespace, - eventhub_name, - consumer_group, - partition_id, - ) - raise OwnershipLostError() - except Exception as error: # pylint:disable=broad-except - logger.warning( - "An exception occurred when EventProcessor instance %r claim_ownership for " - "namespace %r eventhub %r consumer group %r partition %r. " - "The ownership is now lost. Exception " - "is %r", - owner_id, - namespace, - eventhub_name, - consumer_group, - partition_id, - error, - ) - return ownership # Keep the ownership if an unexpected error happens - async def claim_ownership( self, ownership_list: Iterable[Dict[str, Any]] ) -> Iterable[Dict[str, Any]]: + """Tries to claim ownership for a list of specified partitions. + + :param Iterable[Dict[str,Any]] ownership_list: Iterable of dictionaries containing all the ownerships to claim. + :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information: + + - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, + relative to the Event Hubs namespace that contains it. + - `consumer_group` (str): The name of the consumer group the ownership are associated with. + - `partition_id` (str): The partition ID which the checkpoint is created for. + - `owner_id` (str): A UUID representing the owner attempting to claim this partition. + - `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed. + - `etag` (str): The Etag value for the last time this ownership was modified. Optional depending + on storage implementation. + """ results = await asyncio.gather( *[self._claim_one_partition(x) for x in ownership_list], return_exceptions=True @@ -214,6 +250,27 @@ async def claim_ownership( ] async def update_checkpoint(self, checkpoint: Dict[str, Any]) -> None: + """Updates the checkpoint using the given information for the offset, associated partition and + consumer group in the storage blob. + + Note: If you plan to implement a custom checkpoint store with the intention of running between + cross-language EventHubs SDKs, it is recommended to persist the offset value as an integer. + + :param Dict[str,Any] checkpoint: A dict containing checkpoint information: + + - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, + relative to the Event Hubs namespace that contains it. + - `consumer_group` (str): The name of the consumer group the checkpoint is associated with. + - `partition_id` (str): The partition ID which the checkpoint is created for. + - `sequence_number` (int): The sequence number of the :class:`EventData` + the new checkpoint will be associated with. + - `offset` (str): The offset of the :class:`EventData` + the new checkpoint will be associated with. + + :rtype: None + """ metadata = { "offset": str(checkpoint["offset"]), "sequencenumber": str(checkpoint["sequence_number"]), @@ -237,6 +294,24 @@ async def update_checkpoint(self, checkpoint: Dict[str, Any]) -> None: async def list_checkpoints( self, fully_qualified_namespace, eventhub_name, consumer_group ): + """List the updated checkpoints from the storage blob. + + :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + :param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to + the Event Hubs namespace that contains it. + :param str consumer_group: The name of the consumer group the checkpoints are associated with. + :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information: + + - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + - `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with, + relative to the Event Hubs namespace that contains it. + - `consumer_group` (str): The name of the consumer group the checkpoints are associated with. + - `partition_id` (str): The partition ID which the checkpoint is created for. + - `sequence_number` (int): The sequence number of the :class:`EventData`. + - `offset` (str): The offset of the :class:`EventData`. + """ blob_prefix = "{}/{}/{}/checkpoint/".format( fully_qualified_namespace, eventhub_name, consumer_group ) diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py b/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py index a4e7ad7dea4a..9c1633f431fb 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py @@ -2,7 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Dict, Optional, Any +from typing import Dict, Optional, Any, Iterable, Union import logging import time import calendar @@ -150,39 +150,6 @@ def _upload_ownership(self, ownership, metadata): ) ownership.update(metadata) - def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_group): - try: - blob_prefix = "{}/{}/{}/ownership/".format( - fully_qualified_namespace, eventhub_name, consumer_group - ) - blobs = self._container_client.list_blobs( - name_starts_with=blob_prefix.lower(), include=["metadata"] - ) - result = [] - for blob in blobs: - ownership = { - "fully_qualified_namespace": fully_qualified_namespace, - "eventhub_name": eventhub_name, - "consumer_group": consumer_group, - "partition_id": blob.name.split("/")[-1], - "owner_id": blob.metadata["ownerid"], - "etag": blob.etag, - "last_modified_time": _to_timestamp(blob.last_modified), - } - result.append(ownership) - return result - except Exception as error: # pylint:disable=broad-except - logger.warning( - "An exception occurred during list_ownership for " - "namespace %r eventhub %r consumer group %r. " - "Exception is %r", - fully_qualified_namespace, - eventhub_name, - consumer_group, - error, - ) - raise - def _claim_one_partition(self, ownership): partition_id = ownership["partition_id"] fully_qualified_namespace = ownership["fully_qualified_namespace"] @@ -219,7 +186,78 @@ def _claim_one_partition(self, ownership): ) return ownership # Keep the ownership if an unexpected error happens + def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_group): + # type: (str, str, str) -> Iterable[Dict[str, Any]] + """Retrieves a complete ownership list from the storage blob. + + :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + :param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with, + relative to the Event Hubs namespace that contains it. + :param str consumer_group: The name of the consumer group the ownerships are associated with. + :rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information: + + - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, + relative to the Event Hubs namespace that contains it. + - `consumer_group` (str): The name of the consumer group the ownership are associated with. + - `partition_id` (str): The partition ID which the checkpoint is created for. + - `owner_id` (str): A UUID representing the current owner of this partition. + - `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed. + - `etag` (str): The Etag value for the last time this ownership was modified. Optional depending + on storage implementation. + """ + try: + blob_prefix = "{}/{}/{}/ownership/".format( + fully_qualified_namespace, eventhub_name, consumer_group + ) + blobs = self._container_client.list_blobs( + name_starts_with=blob_prefix.lower(), include=["metadata"] + ) + result = [] + for blob in blobs: + ownership = { + "fully_qualified_namespace": fully_qualified_namespace, + "eventhub_name": eventhub_name, + "consumer_group": consumer_group, + "partition_id": blob.name.split("/")[-1], + "owner_id": blob.metadata["ownerid"], + "etag": blob.etag, + "last_modified_time": _to_timestamp(blob.last_modified), + } + result.append(ownership) + return result + except Exception as error: # pylint:disable=broad-except + logger.warning( + "An exception occurred during list_ownership for " + "namespace %r eventhub %r consumer group %r. " + "Exception is %r", + fully_qualified_namespace, + eventhub_name, + consumer_group, + error, + ) + raise + def claim_ownership(self, ownership_list): + # type: (Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]] + """Tries to claim ownership for a list of specified partitions. + + :param Iterable[Dict[str,Any]] ownership_list: Iterable of dictionaries containing all the ownerships to claim. + :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information: + + - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, + relative to the Event Hubs namespace that contains it. + - `consumer_group` (str): The name of the consumer group the ownership are associated with. + - `partition_id` (str): The partition ID which the checkpoint is created for. + - `owner_id` (str): A UUID representing the owner attempting to claim this partition. + - `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed. + - `etag` (str): The Etag value for the last time this ownership was modified. Optional depending + on storage implementation. + """ gathered_results = [] for x in ownership_list: try: @@ -229,6 +267,28 @@ def claim_ownership(self, ownership_list): return gathered_results def update_checkpoint(self, checkpoint): + # type: (Dict[str, Optional[Union[str, int]]]) -> None + """Updates the checkpoint using the given information for the offset, associated partition and + consumer group in the storage blob. + + Note: If you plan to implement a custom checkpoint store with the intention of running between + cross-language EventHubs SDKs, it is recommended to persist the offset value as an integer. + + :param Dict[str,Any] checkpoint: A dict containing checkpoint information: + + - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + - `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with, + relative to the Event Hubs namespace that contains it. + - `consumer_group` (str): The name of the consumer group the checkpoint is associated with. + - `partition_id` (str): The partition ID which the checkpoint is created for. + - `sequence_number` (int): The sequence number of the :class:`EventData` + the new checkpoint will be associated with. + - `offset` (str): The offset of the :class:`EventData` + the new checkpoint will be associated with. + + :rtype: None + """ metadata = { "offset": str(checkpoint["offset"]), "sequencenumber": str(checkpoint["sequence_number"]), @@ -252,6 +312,25 @@ def update_checkpoint(self, checkpoint): def list_checkpoints( self, fully_qualified_namespace, eventhub_name, consumer_group ): + # type: (str, str, str) -> Iterable[Dict[str, Any]] + """List the updated checkpoints from the storage blob. + + :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + :param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to + the Event Hubs namespace that contains it. + :param str consumer_group: The name of the consumer group the checkpoints are associated with. + :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information: + + - `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to. + The format is like ".servicebus.windows.net". + - `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with, + relative to the Event Hubs namespace that contains it. + - `consumer_group` (str): The name of the consumer group the checkpoints are associated with. + - `partition_id` (str): The partition ID which the checkpoint is created for. + - `sequence_number` (int): The sequence number of the :class:`EventData`. + - `offset` (str): The offset of the :class:`EventData`. + """ blob_prefix = "{}/{}/{}/checkpoint/".format( fully_qualified_namespace, eventhub_name, consumer_group ) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py index 3bbe5d393fac..0972c7807b31 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py @@ -87,7 +87,7 @@ def list_checkpoints( self, fully_qualified_namespace, eventhub_name, consumer_group ): # type: (str, str, str) -> Iterable[Dict[str, Any]] - """List the updated checkpoints from the store. + """List the updated checkpoints from the chosen storage service. :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. The format is like ".servicebus.windows.net". diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/checkpoint_store.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/checkpoint_store.py index 55e7124735c7..346faf8036e5 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/checkpoint_store.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_eventprocessor/checkpoint_store.py @@ -89,7 +89,7 @@ async def update_checkpoint( async def list_checkpoints( self, fully_qualified_namespace: str, eventhub_name: str, consumer_group: str ) -> Iterable[Dict[str, Any]]: - """List the updated checkpoints from the store. + """List the updated checkpoints from the chosen storage service. :param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to. The format is like ".servicebus.windows.net".