Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,65 @@ async def _upload_ownership(
].timestamp()
ownership.update(metadata)

async def _claim_one_partition(self, ownership: Dict[str, Any]) -> Dict[str, Any]:
partition_id = ownership["partition_id"]
namespace = ownership["fully_qualified_namespace"]
eventhub_name = ownership["eventhub_name"]
consumer_group = ownership["consumer_group"]
owner_id = ownership["owner_id"]
metadata = {"ownerid": owner_id}
try:
await self._upload_ownership(ownership, metadata)
return ownership
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
"lost ownership to partition %r",
owner_id,
namespace,
eventhub_name,
consumer_group,
partition_id,
)
raise OwnershipLostError()
except Exception as error: # pylint:disable=broad-except
logger.warning(
"An exception occurred when EventProcessor instance %r claim_ownership for "
"namespace %r eventhub %r consumer group %r partition %r. "
"The ownership is now lost. Exception "
"is %r",
owner_id,
namespace,
eventhub_name,
consumer_group,
partition_id,
error,
)
return ownership # Keep the ownership if an unexpected error happens

async def list_ownership(
self, fully_qualified_namespace: str, eventhub_name: str, consumer_group: str
) -> Iterable[Dict[str, Any]]:
"""Retrieves a complete ownership list from the storage blob.

:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the partition ownerships are associated with,
relative to the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the ownerships are associated with.
:rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
- `partition_id` (str): The partition ID which the checkpoint is created for.
- `owner_id` (str): A UUID representing the current owner of this partition.
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
on storage implementation.
"""
try:
blob_prefix = "{}/{}/{}/ownership/".format(
fully_qualified_namespace, eventhub_name, consumer_group
Expand Down Expand Up @@ -166,45 +222,25 @@ async def list_ownership(
)
raise

async def _claim_one_partition(self, ownership: Dict[str, Any]) -> Dict[str, Any]:
partition_id = ownership["partition_id"]
namespace = ownership["fully_qualified_namespace"]
eventhub_name = ownership["eventhub_name"]
consumer_group = ownership["consumer_group"]
owner_id = ownership["owner_id"]
metadata = {"ownerid": owner_id}
try:
await self._upload_ownership(ownership, metadata)
return ownership
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
"lost ownership to partition %r",
owner_id,
namespace,
eventhub_name,
consumer_group,
partition_id,
)
raise OwnershipLostError()
except Exception as error: # pylint:disable=broad-except
logger.warning(
"An exception occurred when EventProcessor instance %r claim_ownership for "
"namespace %r eventhub %r consumer group %r partition %r. "
"The ownership is now lost. Exception "
"is %r",
owner_id,
namespace,
eventhub_name,
consumer_group,
partition_id,
error,
)
return ownership # Keep the ownership if an unexpected error happens

async def claim_ownership(
self, ownership_list: Iterable[Dict[str, Any]]
) -> Iterable[Dict[str, Any]]:
"""Tries to claim ownership for a list of specified partitions.

:param Iterable[Dict[str,Any]] ownership_list: Iterable of dictionaries containing all the ownerships to claim.
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
- `partition_id` (str): The partition ID which the checkpoint is created for.
- `owner_id` (str): A UUID representing the owner attempting to claim this partition.
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
on storage implementation.
"""
results = await asyncio.gather(
*[self._claim_one_partition(x) for x in ownership_list],
return_exceptions=True
Expand All @@ -214,6 +250,27 @@ async def claim_ownership(
]

async def update_checkpoint(self, checkpoint: Dict[str, Any]) -> None:
"""Updates the checkpoint using the given information for the offset, associated partition and
consumer group in the storage blob.

Note: If you plan to implement a custom checkpoint store with the intention of running between
cross-language EventHubs SDKs, it is recommended to persist the offset value as an integer.

:param Dict[str,Any] checkpoint: A dict containing checkpoint information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoint is associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoint is associated with.
- `partition_id` (str): The partition ID which the checkpoint is created for.
- `sequence_number` (int): The sequence number of the :class:`EventData<azure.eventhub.EventData>`
the new checkpoint will be associated with.
- `offset` (str): The offset of the :class:`EventData<azure.eventhub.EventData>`
the new checkpoint will be associated with.

:rtype: None
"""
metadata = {
"offset": str(checkpoint["offset"]),
"sequencenumber": str(checkpoint["sequence_number"]),
Expand All @@ -237,6 +294,24 @@ async def update_checkpoint(self, checkpoint: Dict[str, Any]) -> None:
async def list_checkpoints(
self, fully_qualified_namespace, eventhub_name, consumer_group
):
"""List the updated checkpoints from the storage blob.

:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net".
:param str eventhub_name: The name of the specific Event Hub the checkpoints are associated with, relative to
the Event Hubs namespace that contains it.
:param str consumer_group: The name of the consumer group the checkpoints are associated with.
:rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition checkpoint information:

- `fully_qualified_namespace` (str): The fully qualified namespace that the Event Hub belongs to.
The format is like "<namespace>.servicebus.windows.net".
- `eventhub_name` (str): The name of the specific Event Hub the checkpoints are associated with,
relative to the Event Hubs namespace that contains it.
- `consumer_group` (str): The name of the consumer group the checkpoints are associated with.
- `partition_id` (str): The partition ID which the checkpoint is created for.
- `sequence_number` (int): The sequence number of the :class:`EventData<azure.eventhub.EventData>`.
- `offset` (str): The offset of the :class:`EventData<azure.eventhub.EventData>`.
"""
blob_prefix = "{}/{}/{}/checkpoint/".format(
fully_qualified_namespace, eventhub_name, consumer_group
)
Expand Down
Loading