Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3a32907
Draft EventProcessor Loadbalancing
Aug 22, 2019
39b1b86
EventProcessor Load balancing
Aug 22, 2019
17f5153
small changes from bryan's review
Aug 23, 2019
04ef548
remove checkpoint manager from initialize
Aug 23, 2019
9be1741
small changes
Aug 23, 2019
1b5753c
Draft EventProcessor Loadbalancing
Aug 22, 2019
b4b77f9
EventProcessor Load balancing
Aug 22, 2019
1787fdd
small changes from bryan's review
Aug 23, 2019
c2d0155
remove checkpoint manager from initialize
Aug 23, 2019
1074385
small changes
Aug 23, 2019
386baf0
Fix code review feedback
Aug 29, 2019
1afbf0c
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
c126bea
Packaging update of azure-mgmt-datalake-analytics
AutorestCI Aug 30, 2019
40c7f03
Packaging update of azure-loganalytics
AutorestCI Aug 30, 2019
cf22c7c
Packaging update of azure-mgmt-storage
AutorestCI Aug 30, 2019
c7440b2
Merge branch 'eventhubs_preview3' into eventhubs_yx
Aug 30, 2019
fa804f4
code review fixes and pylint error
Aug 30, 2019
470cf7e
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
e5f3b50
reduce dictionary access
Aug 30, 2019
8343876
Revert "Packaging update of azure-mgmt-storage"
Sep 2, 2019
66c5b31
Revert "Packaging update of azure-loganalytics"
Sep 2, 2019
bcd851a
Revert "Packaging update of azure-mgmt-datalake-analytics"
Sep 2, 2019
d740bb0
Trivial code change
Sep 2, 2019
aad6978
Refine exception handling for eventprocessor
Sep 3, 2019
a55dc13
Enable pylint for eventprocessor
Sep 3, 2019
a339985
Expose OwnershipLostError
Sep 3, 2019
9102713
Move eventprocessor to aio
Sep 4, 2019
278592c
change checkpoint_manager to partition context
Sep 4, 2019
665f28c
fix pylint error
Sep 4, 2019
0060f9d
fix a small issue
Sep 4, 2019
7b4273a
Catch list_ownership/claim_ownership exceptions and retry
Sep 5, 2019
bdf97c8
Fix code review issues
Sep 6, 2019
02a4daf
fix event processor long running test
Sep 6, 2019
a9446de
Remove utils.py
Sep 6, 2019
8dfdec9
Remove close() method
Sep 6, 2019
2aace82
Updated docstrings
Sep 6, 2019
36ba0a3
add pytest
Sep 7, 2019
7f95d9e
small fixes
Sep 7, 2019
f5870af
Merge branch 'eventhubs_preview3' into eventhubs_yx
Sep 7, 2019
f30d143
Revert "Remove utils.py"
Sep 7, 2019
893bee0
change asyncio.create_task to 3.5 friendly code
Sep 7, 2019
4b41fa5
Remove Callable
Sep 7, 2019
fef0551
raise CancelledError instead of break
Sep 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

import time
import random
import math
from collections import Counter
from azure.eventhub.aio import EventHubClient


class OwnershipManager(object):
"""Increases or decreases the number of partitions owned by an EventProcessor
so the number of owned partitions are balanced among multiple EventProcessors

An EventProcessor calls claim_ownership() of this class every x seconds,
where x is set by keyword argument "polling_interval" in EventProcessor,
to claim the ownership of partitions, create tasks for the claimed ownership, and cancel tasks that no longer belong
to the claimed ownership.

"""
def __init__(self, event_processor, eventhub_client: EventHubClient, ownership_timeout: int):
self.all_parition_ids = []
Comment thread
YijunXieMS marked this conversation as resolved.
Outdated
self.eventhub_client = eventhub_client
self.eventhub_name = eventhub_client.eh_name

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason that we make the eventhub_name an attribute on the instance, but that we also continue to get eh_name from the client instance later? Presumably, the values should stay the same....

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventhub_name, consumer_group_name are two important attributes for an ownership. They usually appear together. They don't change anywhere in an ownership manager. Putting them together has better readability, I think.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me rephrase, why are you both assigning the name to an attribute and accessing the name from self.eventhub_client.eh_name in other methods?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a mistake. Changed all to self.eventhub_name

self.consumer_group_name = event_processor._consumer_group_name

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code style; I prefer to not pass in a complex type (e.g. event_processor) in order to grab two properties from it. Especially since the properties in this case are private! It would be better to pass in consumer_group_name and partition_manager as separate parameters. This will avoid unnecessary coupling between the classes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed event_processor and added consumer_group_name and owner_id as arguments

self.owner_id = event_processor._id
self.partition_manager = event_processor._partition_manager
self.ownership_timeout = ownership_timeout

async def claim_ownership(self):
"""Claims ownership for this EventProcessor
1. Retrieves all partition ids of an event hub from azure event hub service
2. Retrieves current ownership list via this EventProcessor's PartitionManager.
3. Balances number of ownership. Refer to _balance_ownership() for details.
4. Claims the ownership for the balanced number of partitions.

:return: List[Dict[Any]]
"""
if not self.all_parition_ids:
await self._retrieve_partition_ids()
to_claim = await self._balance_ownership()
claimed_list = await self._claim_ownership(to_claim)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the same name (except for leading underscore), but with slightly different functionality is somewhat confusing. The claim_ownership method actually updates (or rebalances) owerships. _claim_ownership actually does the claim...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed _claim_ownership since it's a very short function.
It can be implemented with one line of code.

return claimed_list

async def _retrieve_partition_ids(self):
"""List all partition ids of the event hub that the EventProcessor is working on.

:return: List[str]
"""
self.all_parition_ids = await self.eventhub_client.get_partition_ids()

async def _balance_ownership(self):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_balance_ownership should take the partition ids as a parameter. This avoids using semi-global state. Presumably, the only reason we have to keep the partition_ids as an instance attribute is that we don't want to repeatedly query the values.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, partition_ids as an instance attribute is to avoid repeatedly querying. Changed _balance_ownership to take partition ids as a parameter.

"""Balances and claims ownership of partitions for this EventProcessor.
The balancing algorithm is:
1. Find partitions with inactive ownership and partitions that haven never been claimed before
2. Find the number of active owners, including this EventProcessor, for all partitions.
3. Calculate the average count of partitions that an owner should own.
(number of partitions // number of active owners)
4. Calculate the largest allowed count of partitions that an owner can own.
math.ceil(number of partitions / number of active owners). This should be equal or 1 greater than the average count
5. Adjust the number of partitions owned by this EventProcessor (owner)
a. if this EventProcessor owns more than largest allowed count, abandon one partition
b. if this EventProcessor owns less than average count, add one from the inactive or unclaimed partitions,
or steal one from another owner that has the largest number of ownership among all owners (EventProcessors)
c. Otherwise, no change to the ownership

The balancing algorithm adjust one partition at a time to gradually build the balanced ownership.
Ownership must be renewed to keep it active. So the returned result includes both existing ownership and
the newly adjusted ownership.
This method balances but doesn't claim ownership. The caller of this method tries to claim the result ownership
list. But it may not successfully claim all of them because of concurrency. Other EventProcessors may happen to
claim a partition at that time. Since balancing and claiming are run in infinite repeatedly,
it achieves balancing among all EventProcessors after some time of running.

:return: List[Dict[str, Any]], A list of ownership.
"""
ownership_list = await self.partition_manager.list_ownership(self.eventhub_client.eh_name, self.consumer_group_name)
ownership_dict = dict((x["partition_id"], x) for x in ownership_list) # put the list to dict for fast lookup

claimable_partition_ids = [] # partitions with inactive ownership and partitions that have never been claimed yet
active_ownership_self = [] # active ownership of this EventProcessor
active_ownership_count_group_by_owner = Counter()
for partition_id in self.all_parition_ids:
ownership = ownership_dict.get(partition_id)
if not ownership: # no ownership found for this partition. So it is claimable
claimable_partition_ids.append(partition_id)
else:
last_modified_time = ownership["last_modified_time"]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the value of last_modified_time supposed to be? Seconds since Jan 1, 1970?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's time.time( )

owner_id = ownership["owner_id"]
now = time.time()
if now > self.ownership_timeout + last_modified_time: # ownership timed out. So it is claimable
claimable_partition_ids.append(partition_id)
else: # the ownership is still active
if owner_id == self.owner_id: # partition is actively owned by this running EventProcessor
active_ownership_self.append(ownership)
active_ownership_count_group_by_owner[owner_id] = active_ownership_count_group_by_owner.get(owner_id, 0) + 1 # all active owners

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really odd usage of collections.Counter. It does it's own bookkeeping of counts (that is its purpose in life). You may just as well use a set if you are going to use it yourself...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used set before Counter. I changed to Counter because I want to use Counter.most_common( ) @ line 122. Counter receives an iterable in the constructor but it doesn't have a method to gradually receive individual values.


# calculate expected count per owner
all_partition_count = len(self.all_parition_ids)
owners_count = len(active_ownership_count_group_by_owner) + (1 if self.owner_id not in active_ownership_count_group_by_owner else 0)
expected_count_per_owner = all_partition_count // owners_count
most_count_allowed_per_owner = math.ceil(all_partition_count / owners_count)
# end of calculating expected count per owner

to_claim = active_ownership_self
if len(active_ownership_self) > most_count_allowed_per_owner: # needs to abandon a partition
to_claim.pop() # abandon one partition if owned too many
# TODO: Release a ownership immediately so other EventProcessors won't need to wait it to timeout
elif len(active_ownership_self) < expected_count_per_owner: # Either claims an inactive partition, or steals from other owners
if claimable_partition_ids: # claim an inactive partition if there is
random_partition_id = random.choice(claimable_partition_ids)
random_chosen_to_claim = ownership_dict.get(random_partition_id,
{"partition_id": random_partition_id,
"eventhub_name": self.eventhub_client.eh_name,
"consumer_group_name": self.consumer_group_name,
"owner_level": 0}) # TODO: consider removing owner_level
random_chosen_to_claim["owner_id"] = self.owner_id
to_claim.append(random_chosen_to_claim)
else: # steal from another owner that has the most count
most_frequent_owner_id = active_ownership_count_group_by_owner.most_common(1)[0][0]
# randomly choose a partition to steal from the most_frequent_owner
to_steal_partition = random.choice(list(filter(lambda x: x["owner_id"] == most_frequent_owner_id,
ownership_list)))
to_steal_partition["owner_id"] = self.owner_id
to_claim.append(to_steal_partition)
return to_claim

async def _claim_ownership(self, ownership_list):
if ownership_list:
claimed_list = await self.partition_manager.claim_ownership(ownership_list)
return claimed_list
else:
return None
Loading