diff --git a/pylintrc b/pylintrc index c5cddb6d9030..399344e8ea99 100644 --- a/pylintrc +++ b/pylintrc @@ -2,8 +2,8 @@ ignore-patterns=test_*,conftest,setup reports=no -# PYLINT DIRECTORY BLACKLIST. Ignore eventprocessor temporarily until new eventprocessor code is merged to master -ignore=_generated,samples,examples,test,tests,doc,.tox,eventprocessor +# PYLINT DIRECTORY BLACKLIST. +ignore=_generated,samples,examples,test,tests,doc,.tox init-hook='import sys; sys.path.insert(0, os.path.abspath(os.getcwd().rsplit("azure-sdk-for-python", 1)[0] + "azure-sdk-for-python/scripts/pylint_custom_plugin"))' load-plugins=pylint_guidelines_checker diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/__init__.py similarity index 67% rename from sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/__init__.py rename to sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/__init__.py index f4b48afac6f3..e3eefa4774f4 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/__init__.py @@ -5,13 +5,16 @@ from .event_processor import EventProcessor from .partition_processor import PartitionProcessor, CloseReason -from .partition_manager import PartitionManager -from .sqlite3_partition_manager import Sqlite3PartitionManager +from .partition_manager import PartitionManager, OwnershipLostError +from .partition_context import PartitionContext +from .sample_partition_manager import SamplePartitionManager __all__ = [ 'CloseReason', 'EventProcessor', 'PartitionProcessor', 'PartitionManager', - 'Sqlite3PartitionManager', -] \ No newline at end of file + 'OwnershipLostError', + 'PartitionContext', + 'SamplePartitionManager', +] diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/_ownership_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/_ownership_manager.py new file mode 100644 index 000000000000..094ca8e0ce39 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/_ownership_manager.py @@ -0,0 +1,133 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ----------------------------------------------------------------------------------- + +import time +import random +import math +from typing import List +from collections import Counter, defaultdict +from azure.eventhub.aio import EventHubClient +from .partition_manager import PartitionManager + + +class OwnershipManager(object): + """Increases or decreases the number of partitions owned by an EventProcessor + so the number of owned partitions are balanced among multiple EventProcessors + + An EventProcessor calls claim_ownership() of this class every x seconds, + where x is set by keyword argument "polling_interval" in EventProcessor, + to claim the ownership of partitions, create tasks for the claimed ownership, and cancel tasks that no longer belong + to the claimed ownership. + + """ + def __init__( + self, eventhub_client: EventHubClient, consumer_group_name: str, owner_id: str, + partition_manager: PartitionManager, ownership_timeout: float + ): + self.cached_parition_ids = [] # type: List[str] + self.eventhub_client = eventhub_client + self.eventhub_name = eventhub_client.eh_name + self.consumer_group_name = consumer_group_name + self.owner_id = owner_id + self.partition_manager = partition_manager + self.ownership_timeout = ownership_timeout + + async def claim_ownership(self): + """Claims ownership for this EventProcessor + 1. Retrieves all partition ids of an event hub from azure event hub service + 2. Retrieves current ownership list via this EventProcessor's PartitionManager. + 3. Balances number of ownership. Refer to _balance_ownership() for details. + 4. Claims the ownership for the balanced number of partitions. + + :return: List[Dict[Any]] + """ + if not self.cached_parition_ids: + await self._retrieve_partition_ids() + to_claim = await self._balance_ownership(self.cached_parition_ids) + claimed_list = await self.partition_manager.claim_ownership(to_claim) if to_claim else None + return claimed_list + + async def _retrieve_partition_ids(self): + """List all partition ids of the event hub that the EventProcessor is working on. + + :return: List[str] + """ + self.cached_parition_ids = await self.eventhub_client.get_partition_ids() + + async def _balance_ownership(self, all_partition_ids): + """Balances and claims ownership of partitions for this EventProcessor. + The balancing algorithm is: + 1. Find partitions with inactive ownership and partitions that haven never been claimed before + 2. Find the number of active owners, including this EventProcessor, for all partitions. + 3. Calculate the average count of partitions that an owner should own. + (number of partitions // number of active owners) + 4. Calculate the largest allowed count of partitions that an owner can own. + math.ceil(number of partitions / number of active owners). + This should be equal or 1 greater than the average count + 5. Adjust the number of partitions owned by this EventProcessor (owner) + a. if this EventProcessor owns more than largest allowed count, abandon one partition + b. if this EventProcessor owns less than average count, add one from the inactive or unclaimed partitions, + or steal one from another owner that has the largest number of ownership among all owners (EventProcessors) + c. Otherwise, no change to the ownership + + The balancing algorithm adjust one partition at a time to gradually build the balanced ownership. + Ownership must be renewed to keep it active. So the returned result includes both existing ownership and + the newly adjusted ownership. + This method balances but doesn't claim ownership. The caller of this method tries to claim the result ownership + list. But it may not successfully claim all of them because of concurrency. Other EventProcessors may happen to + claim a partition at that time. Since balancing and claiming are run in infinite repeatedly, + it achieves balancing among all EventProcessors after some time of running. + + :return: List[Dict[str, Any]], A list of ownership. + """ + ownership_list = await self.partition_manager.list_ownership( + self.eventhub_name, self.consumer_group_name + ) + now = time.time() + ownership_dict = {x["partition_id"]: x for x in ownership_list} # put the list to dict for fast lookup + not_owned_partition_ids = [pid for pid in all_partition_ids if pid not in ownership_dict] + timed_out_partition_ids = [ownership["partition_id"] for ownership in ownership_list + if ownership["last_modified_time"] + self.ownership_timeout < now] + claimable_partition_ids = not_owned_partition_ids + timed_out_partition_ids + active_ownership = [ownership for ownership in ownership_list + if ownership["last_modified_time"] + self.ownership_timeout >= now] + active_ownership_by_owner = defaultdict(list) + for ownership in active_ownership: + active_ownership_by_owner[ownership["owner_id"]].append(ownership) + active_ownership_self = active_ownership_by_owner[self.owner_id] + + # calculate expected count per owner + all_partition_count = len(all_partition_ids) + # owners_count is the number of active owners. If self.owner_id is not yet among the active owners, + # then plus 1 to include self. This will make owners_count >= 1. + owners_count = len(active_ownership_by_owner) + \ + (0 if self.owner_id in active_ownership_by_owner else 1) + expected_count_per_owner = all_partition_count // owners_count + most_count_allowed_per_owner = math.ceil(all_partition_count / owners_count) + # end of calculating expected count per owner + + to_claim = active_ownership_self + if len(active_ownership_self) > most_count_allowed_per_owner: # needs to abandon a partition + to_claim.pop() # abandon one partition if owned too many + elif len(active_ownership_self) < expected_count_per_owner: + # Either claims an inactive partition, or steals from other owners + if claimable_partition_ids: # claim an inactive partition if there is + random_partition_id = random.choice(claimable_partition_ids) + random_chosen_to_claim = ownership_dict.get(random_partition_id, + {"partition_id": random_partition_id, + "eventhub_name": self.eventhub_name, + "consumer_group_name": self.consumer_group_name + }) + random_chosen_to_claim["owner_id"] = self.owner_id + to_claim.append(random_chosen_to_claim) + else: # steal from another owner that has the most count + active_ownership_count_group_by_owner = Counter( + dict((x, len(y)) for x, y in active_ownership_by_owner.items())) + most_frequent_owner_id = active_ownership_count_group_by_owner.most_common(1)[0][0] + # randomly choose a partition to steal from the most_frequent_owner + to_steal_partition = random.choice(active_ownership_by_owner[most_frequent_owner_id]) + to_steal_partition["owner_id"] = self.owner_id + to_claim.append(to_steal_partition) + return to_claim diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py new file mode 100644 index 000000000000..37f9a20d67c5 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -0,0 +1,271 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ----------------------------------------------------------------------------------- + +from typing import Dict, Type +import uuid +import asyncio +import logging + +from azure.eventhub import EventPosition, EventHubError +from azure.eventhub.aio import EventHubClient +from .partition_context import PartitionContext +from .partition_manager import PartitionManager, OwnershipLostError +from ._ownership_manager import OwnershipManager +from .partition_processor import CloseReason, PartitionProcessor +from .utils import get_running_loop + +log = logging.getLogger(__name__) + +OWNER_LEVEL = 0 + + +class EventProcessor(object): # pylint:disable=too-many-instance-attributes + """ + An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given + consumer group. The received data will be sent to PartitionProcessor to be processed. + + It provides the user a convenient way to receive events from multiple partitions and save checkpoints. + If multiple EventProcessors are running for an event hub, they will automatically balance load. + + Example: + .. code-block:: python + + import asyncio + import logging + import os + from azure.eventhub.aio import EventHubClient + from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor + from azure.eventhub.aio.eventprocessor import SamplePartitionManager + + RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout + RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. + # Actual number of retries clould be less if RECEIVE_TIMEOUT is too small + CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] + + logging.basicConfig(level=logging.INFO) + + async def do_operation(event): + # do some sync or async operations. If the operation is i/o bound, async will have better performance + print(event) + + + class MyPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + if events: + await asyncio.gather(*[do_operation(event) for event in events]) + await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) + + async def main(): + client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, + retry_total=RETRY_TOTAL) + partition_manager = SamplePartitionManager(db_filename=":memory:") # a filename to persist checkpoint + try: + event_processor = EventProcessor(client, "$default", MyPartitionProcessor, + partition_manager, polling_interval=10) + asyncio.create_task(event_processor.start()) + await asyncio.sleep(60) + await event_processor.stop() + finally: + await partition_manager.close() + + if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(main()) + + """ + def __init__( + self, eventhub_client: EventHubClient, consumer_group_name: str, + partition_processor_type: Type[PartitionProcessor], + partition_manager: PartitionManager, *, + initial_event_position: EventPosition = EventPosition("-1"), polling_interval: float = 10.0 + ): + """ + Instantiate an EventProcessor. + + :param eventhub_client: An instance of ~azure.eventhub.aio.EventClient object + :type eventhub_client: ~azure.eventhub.aio.EventClient + :param consumer_group_name: The name of the consumer group this event processor is associated with. Events will + be read only in the context of this group. + :type consumer_group_name: str + :param partition_processor_type: A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor. + :type partition_processor_type: type + :param partition_manager: Interacts with the storage system, dealing with ownership and checkpoints. + For an easy start, SamplePartitionManager comes with the package. + :type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager. + :param initial_event_position: The event position to start a partition consumer. + if the partition has no checkpoint yet. This will be replaced by "reset" checkpoint in the near future. + :type initial_event_position: EventPosition + :param polling_interval: The interval between any two pollings of balancing and claiming + :type polling_interval: float + + """ + + self._consumer_group_name = consumer_group_name + self._eventhub_client = eventhub_client + self._eventhub_name = eventhub_client.eh_name + self._partition_processor_factory = partition_processor_type + self._partition_manager = partition_manager + self._initial_event_position = initial_event_position # will be replaced by reset event position in preview 4 + self._polling_interval = polling_interval + self._ownership_timeout = self._polling_interval * 2 + self._tasks = {} # type: Dict[str, asyncio.Task] + self._id = str(uuid.uuid4()) + self._running = False + + def __repr__(self): + return 'EventProcessor: id {}'.format(self._id) + + async def start(self): + """Start the EventProcessor. + + 1. Calls the OwnershipManager to keep claiming and balancing ownership of partitions in an + infinitely loop until self.stop() is called. + 2. Cancels tasks for partitions that are no longer owned by this EventProcessor + 3. Creates tasks for partitions that are newly claimed by this EventProcessor + 4. Keeps tasks running for partitions that haven't changed ownership + 5. Each task repeatedly calls EvenHubConsumer.receive() to retrieve events and + call user defined partition processor + + :return: None + + """ + log.info("EventProcessor %r is being started", self._id) + ownership_manager = OwnershipManager(self._eventhub_client, self._consumer_group_name, self._id, + self._partition_manager, self._ownership_timeout) + if not self._running: + self._running = True + while self._running: + try: + claimed_ownership_list = await ownership_manager.claim_ownership() + except Exception as err: + log.warning("An exception (%r) occurred during balancing and claiming ownership for eventhub %r " + "consumer group %r. Retrying after %r seconds", + err, self._eventhub_name, self._consumer_group_name, self._polling_interval) + await asyncio.sleep(self._polling_interval) + continue + + to_cancel_list = self._tasks.keys() + if claimed_ownership_list: + claimed_partition_ids = [x["partition_id"] for x in claimed_ownership_list] + to_cancel_list = self._tasks.keys() - claimed_partition_ids + self._create_tasks_for_claimed_ownership(claimed_ownership_list) + else: + log.info("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id) + if to_cancel_list: + self._cancel_tasks_for_partitions(to_cancel_list) + log.info("EventProcesor %r has cancelled partitions %r", self._id, to_cancel_list) + await asyncio.sleep(self._polling_interval) + + async def stop(self): + """Stop claiming ownership and all the partition consumers owned by this EventProcessor + + This method stops claiming ownership of owned partitions and cancels tasks that are running + EventHubConsumer.receive() for the partitions owned by this EventProcessor. + + :return: None + + """ + self._running = False + for _ in range(len(self._tasks)): + _, task = self._tasks.popitem() + task.cancel() + log.info("EventProcessor %r has been cancelled", self._id) + await asyncio.sleep(2) # give some time to finish after cancelled. + + def _cancel_tasks_for_partitions(self, to_cancel_partitions): + for partition_id in to_cancel_partitions: + if partition_id in self._tasks: + task = self._tasks.pop(partition_id) + task.cancel() + + def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list): + for ownership in to_claim_ownership_list: + partition_id = ownership["partition_id"] + if partition_id not in self._tasks: + self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership)) + + async def _receive(self, ownership): + log.info("start ownership, %r", ownership) + partition_processor = self._partition_processor_factory() + partition_id = ownership["partition_id"] + eventhub_name = ownership["eventhub_name"] + consumer_group_name = ownership["consumer_group_name"] + owner_id = ownership["owner_id"] + partition_context = PartitionContext( + eventhub_name, + consumer_group_name, + partition_id, + owner_id, + self._partition_manager + ) + partition_consumer = self._eventhub_client.create_consumer( + consumer_group_name, + partition_id, + EventPosition(ownership.get("offset", self._initial_event_position.value)) + ) + + async def process_error(err): + log.warning( + "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" + " has met an error. The exception is %r.", + owner_id, eventhub_name, partition_id, consumer_group_name, err + ) + try: + await partition_processor.process_error(err, partition_context) + except Exception as err_again: # pylint:disable=broad-except + log.warning( + "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" + " has another error during running process_error(). The exception is %r.", + owner_id, eventhub_name, partition_id, consumer_group_name, err_again + ) + + async def close(reason): + log.info( + "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" + " is being closed. Reason is: %r", + owner_id, eventhub_name, partition_id, consumer_group_name, reason + ) + try: + await partition_processor.close(reason, partition_context) + except Exception as err: # pylint:disable=broad-except + log.warning( + "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" + " has an error during running close(). The exception is %r.", + owner_id, eventhub_name, partition_id, consumer_group_name, err + ) + + try: + while True: + try: + await partition_processor.initialize(partition_context) + events = await partition_consumer.receive() + await partition_processor.process_events(events, partition_context) + except asyncio.CancelledError: + log.info( + "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" + " is cancelled", + owner_id, + eventhub_name, + partition_id, + consumer_group_name + ) + if self._running is False: + await close(CloseReason.SHUTDOWN) + else: + await close(CloseReason.OWNERSHIP_LOST) + raise + except EventHubError as eh_err: + await process_error(eh_err) + await close(CloseReason.EVENTHUB_EXCEPTION) + # An EventProcessor will pick up this partition again after the ownership is released + break + except OwnershipLostError: + await close(CloseReason.OWNERSHIP_LOST) + break + except Exception as other_error: # pylint:disable=broad-except + await process_error(other_error) + await close(CloseReason.PROCESS_EVENTS_ERROR) + break + finally: + await partition_consumer.close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/checkpoint_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_context.py similarity index 61% rename from sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/checkpoint_manager.py rename to sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_context.py index 2714f675b28c..6aaf939143a2 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/checkpoint_manager.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_context.py @@ -7,30 +7,33 @@ from .partition_manager import PartitionManager -class CheckpointManager(object): - """ - CheckpointManager is responsible for the creation of checkpoints. - The interaction with the chosen storage service is done via ~azure.eventhub.eventprocessor.PartitionManager. +class PartitionContext(object): + """Contains partition related context information for a PartitionProcessor instance to use. + Users can use update_checkpoint() of this class to save checkpoint data. """ - def __init__(self, partition_id: str, eventhub_name: str, consumer_group_name: str, owner_id: str, partition_manager: PartitionManager): + def __init__(self, eventhub_name: str, consumer_group_name: str, + partition_id: str, owner_id: str, partition_manager: PartitionManager): self.partition_id = partition_id self.eventhub_name = eventhub_name self.consumer_group_name = consumer_group_name self.owner_id = owner_id - self.partition_manager = partition_manager + self._partition_manager = partition_manager async def update_checkpoint(self, offset, sequence_number=None): """ - Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service. + Updates the checkpoint using the given information for the associated partition and consumer group in the + chosen storage service. :param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with. :type offset: str - :param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with. + :param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be + associated with. :type sequence_number: int :return: None """ - await self.partition_manager.update_checkpoint( + # TODO: whether change this method to accept event_data as well + await self._partition_manager.update_checkpoint( self.eventhub_name, self.consumer_group_name, self.partition_id, self.owner_id, offset, sequence_number ) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py similarity index 83% rename from sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/partition_manager.py rename to sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py index e4ecb1bec824..4bb84779dd53 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/partition_manager.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py @@ -34,15 +34,14 @@ async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> last_modified_time etag """ - pass @abstractmethod - async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: + async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: """ Tries to claim a list of specified ownership. - :param partitions: Iterable of dictionaries containing all the ownership to claim. - :type partitions: Iterable of dict + :param ownership_list: Iterable of dictionaries containing all the ownership to claim. + :type ownership_list: Iterable of dict :return: Iterable of dictionaries containing the following partition ownership information: eventhub_name consumer_group_name @@ -54,13 +53,13 @@ async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterabl last_modified_time etag """ - pass @abstractmethod async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, offset, sequence_number) -> None: """ - Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service. + Updates the checkpoint using the given information for the associated partition and + consumer group in the chosen storage service. :param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. @@ -73,11 +72,15 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_ :type owner_id: str :param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with. :type offset: str - :param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with. + :param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint + will be associated with. :type sequence_number: int - :return: + :return: None + :raise: `OwnershipLostError`, `CheckpointError` """ - pass - async def close(self): - pass + +class OwnershipLostError(Exception): + """Raises when update_checkpoint detects the ownership has been lost + + """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/partition_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py similarity index 52% rename from sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/partition_processor.py rename to sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py index 10aafc79c492..8b0fb2ca7e5c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/partition_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py @@ -6,27 +6,33 @@ from typing import List from abc import ABC, abstractmethod from enum import Enum -from .checkpoint_manager import CheckpointManager - from azure.eventhub import EventData +from .partition_context import PartitionContext class CloseReason(Enum): SHUTDOWN = 0 # user call EventProcessor.stop() OWNERSHIP_LOST = 1 # lose the ownership of a partition. EVENTHUB_EXCEPTION = 2 # Exception happens during receiving events + PROCESS_EVENTS_ERROR = 3 # Exception happens during process_events class PartitionProcessor(ABC): """ PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class - implementing this abstract class will be created for every partition the associated ~azure.eventhub.eventprocessor.EventProcessor owns. + implementing this abstract class will be created for every partition the associated + ~azure.eventhub.eventprocessor.EventProcessor owns. """ - def __init__(self, checkpoint_manager: CheckpointManager): - self._checkpoint_manager = checkpoint_manager - async def close(self, reason): + async def initialize(self, partition_context: PartitionContext): + """ + + :param partition_context: The context information of this partition. + :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext + """ + + async def close(self, reason, partition_context: PartitionContext): """Called when EventProcessor stops processing this PartitionProcessor. There are different reasons to trigger the PartitionProcessor to close. @@ -34,25 +40,31 @@ async def close(self, reason): :param reason: Reason for closing the PartitionProcessor. :type reason: ~azure.eventhub.eventprocessor.CloseReason + :param partition_context: The context information of this partition. + Use its method update_checkpoint to save checkpoint to the data store. + :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """ - pass @abstractmethod - async def process_events(self, events: List[EventData]): + async def process_events(self, events: List[EventData], partition_context: PartitionContext): """Called when a batch of events have been received. :param events: Received events. :type events: list[~azure.eventhub.common.EventData] + :param partition_context: The context information of this partition. + Use its method update_checkpoint to save checkpoint to the data store. + :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """ - pass - async def process_error(self, error): + async def process_error(self, error, partition_context: PartitionContext): """Called when an error happens :param error: The error that happens. :type error: Exception + :param partition_context: The context information of this partition. + Use its method update_checkpoint to save checkpoint to the data store. + :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """ - pass diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sample_partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sample_partition_manager.py new file mode 100644 index 000000000000..82559fc8c274 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sample_partition_manager.py @@ -0,0 +1,144 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ----------------------------------------------------------------------------------- + +import time +import uuid +import sqlite3 +import logging +from azure.eventhub.aio.eventprocessor import PartitionManager, OwnershipLostError + +logger = logging.getLogger(__name__) + + +def _check_table_name(table_name: str): + for c in table_name: + if not (c.isalnum() or c == "_"): + raise ValueError("Table name \"{}\" is not in correct format".format(table_name)) + return table_name + + +class SamplePartitionManager(PartitionManager): + """An implementation of PartitionManager by using the sqlite3 in Python standard library. + Sqlite3 is a mini sql database that runs in memory or files. + Please don't use this PartitionManager for production use. + + + """ + primary_keys_dict = {"eventhub_name": "text", "consumer_group_name": "text", "partition_id": "text"} + other_fields_dict = {"owner_id": "text", "owner_level": "integer", "sequence_number": "integer", "offset": "text", + "last_modified_time": "real", "etag": "text"} + checkpoint_fields = ["sequence_number", "offset"] + fields_dict = {**primary_keys_dict, **other_fields_dict} + primary_keys = list(primary_keys_dict.keys()) + other_fields = list(other_fields_dict.keys()) + fields = primary_keys + other_fields + + def __init__(self, db_filename: str = ":memory:", ownership_table: str = "ownership"): + """ + + :param db_filename: name of file that saves the sql data. + Sqlite3 will run in memory without a file when db_filename is ":memory:". + :param ownership_table: The table name of the sqlite3 database. + """ + super(SamplePartitionManager, self).__init__() + self.ownership_table = _check_table_name(ownership_table) + conn = sqlite3.connect(db_filename) + c = conn.cursor() + try: + sql = "create table if not exists " + _check_table_name(ownership_table)\ + + "("\ + + ",".join([x[0]+" "+x[1] for x in self.fields_dict.items()])\ + + ", constraint pk_ownership PRIMARY KEY ("\ + + ",".join(self.primary_keys)\ + + "))" + c.execute(sql) + finally: + c.close() + self.conn = conn + + async def list_ownership(self, eventhub_name, consumer_group_name): + cursor = self.conn.cursor() + try: + cursor.execute("select " + ",".join(self.fields) + + " from "+_check_table_name(self.ownership_table)+" where eventhub_name=? " + "and consumer_group_name=?", + (eventhub_name, consumer_group_name)) + return [dict(zip(self.fields, row)) for row in cursor.fetchall()] + finally: + cursor.close() + + async def claim_ownership(self, ownership_list): + result = [] + cursor = self.conn.cursor() + try: + for p in ownership_list: + cursor.execute("select etag from " + _check_table_name(self.ownership_table) + + " where "+ " and ".join([field+"=?" for field in self.primary_keys]), + tuple(p.get(field) for field in self.primary_keys)) + cursor_fetch = cursor.fetchall() + if not cursor_fetch: + p["last_modified_time"] = time.time() + p["etag"] = str(uuid.uuid4()) + try: + fields_without_checkpoint = list(filter(lambda x: x not in self.checkpoint_fields, self.fields)) + sql = "insert into " + _check_table_name(self.ownership_table) + " (" \ + + ",".join(fields_without_checkpoint) \ + + ") values (?,?,?,?,?,?,?)" + cursor.execute(sql, tuple(p.get(field) for field in fields_without_checkpoint)) + except sqlite3.OperationalError as op_err: + logger.info("EventProcessor %r failed to claim partition %r " + "because it was claimed by another EventProcessor at the same time. " + "The Sqlite3 exception is %r", p["owner_id"], p["partition_id"], op_err) + continue + else: + result.append(p) + else: + if p.get("etag") == cursor_fetch[0][0]: + p["last_modified_time"] = time.time() + p["etag"] = str(uuid.uuid4()) + other_fields_without_checkpoint = list( + filter(lambda x: x not in self.checkpoint_fields, self.other_fields) + ) + sql = "update " + _check_table_name(self.ownership_table) + " set "\ + + ','.join([field+"=?" for field in other_fields_without_checkpoint])\ + + " where "\ + + " and ".join([field+"=?" for field in self.primary_keys]) + + cursor.execute(sql, tuple(p.get(field) for field in other_fields_without_checkpoint) + + tuple(p.get(field) for field in self.primary_keys)) + result.append(p) + else: + logger.info("EventProcessor %r failed to claim partition %r " + "because it was claimed by another EventProcessor at the same time", p["owner_id"], + p["partition_id"]) + self.conn.commit() + return result + finally: + cursor.close() + + async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, + offset, sequence_number): + cursor = self.conn.cursor() + try: + cursor.execute("select owner_id from " + _check_table_name(self.ownership_table) + + " where eventhub_name=? and consumer_group_name=? and partition_id=?", + (eventhub_name, consumer_group_name, partition_id)) + cursor_fetch = cursor.fetchall() + if cursor_fetch and owner_id == cursor_fetch[0][0]: + cursor.execute("update " + _check_table_name(self.ownership_table) + + " set offset=?, sequence_number=? " + "where eventhub_name=? and consumer_group_name=? and partition_id=?", + (offset, sequence_number, eventhub_name, consumer_group_name, partition_id)) + self.conn.commit() + else: + logger.info("EventProcessor couldn't checkpoint to partition %r because it no longer has the ownership", + partition_id) + raise OwnershipLostError() + + finally: + cursor.close() + + async def close(self): + self.conn.close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/utils.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/utils.py similarity index 96% rename from sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/utils.py rename to sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/utils.py index 368cd8469f10..1d8add0f49a0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/utils.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/utils.py @@ -10,7 +10,7 @@ def get_running_loop(): try: return asyncio.get_running_loop() except AttributeError: # 3.5 / 3.6 - loop = asyncio._get_running_loop() # pylint: disable=protected-access + loop = asyncio._get_running_loop() # pylint: disable=protected-access, no-member if loop is None: raise RuntimeError('No running event loop') return loop diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/event_processor.py deleted file mode 100644 index 71741c56dffa..000000000000 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/event_processor.py +++ /dev/null @@ -1,218 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ----------------------------------------------------------------------------------- - -from typing import Callable, List -import uuid -import asyncio -import logging - -from azure.eventhub import EventPosition, EventHubError -from azure.eventhub.aio import EventHubClient -from .checkpoint_manager import CheckpointManager -from .partition_manager import PartitionManager -from .partition_processor import PartitionProcessor, CloseReason -from .utils import get_running_loop - -log = logging.getLogger(__name__) - -OWNER_LEVEL = 0 - - -class EventProcessor(object): - """ - An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given - consumer group. The received data will be sent to PartitionProcessor to be processed. - - It provides the user a convenient way to receive events from multiple partitions and save checkpoints. - If multiple EventProcessors are running for an event hub, they will automatically balance load. - This load balancing won't be available until preview 3. - - Example: - .. code-block:: python - - class MyPartitionProcessor(PartitionProcessor): - async def process_events(self, events): - if events: - # do something sync or async to process the events - await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number) - - import asyncio - from azure.eventhub.aio import EventHubClient - from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager - client = EventHubClient.from_connection_string("", receive_timeout=5, retry_total=3) - partition_manager = Sqlite3PartitionManager() - try: - event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager) - asyncio.ensure_future(event_processor.start()) - await asyncio.sleep(100) # allow it to run 100 seconds - await event_processor.stop() - finally: - await partition_manager.close() - - """ - def __init__(self, eventhub_client: EventHubClient, consumer_group_name: str, - partition_processor_factory: Callable[[CheckpointManager], PartitionProcessor], - partition_manager: PartitionManager, **kwargs): - """ - Instantiate an EventProcessor. - - :param eventhub_client: An instance of ~azure.eventhub.aio.EventClient object - :type eventhub_client: ~azure.eventhub.aio.EventClient - :param consumer_group_name: The name of the consumer group this event processor is associated with. Events will - be read only in the context of this group. - :type consumer_group_name: str - :param partition_processor_factory: A callable(type or function) object that creates an instance of a class - implementing the ~azure.eventhub.eventprocessor.PartitionProcessor. - :type partition_processor_factory: callable object - :param partition_manager: Interacts with the storage system, dealing with ownership and checkpoints. - For preview 2, sample Sqlite3PartitionManager is provided. - :type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager. - :param initial_event_position: The offset to start a partition consumer if the partition has no checkpoint yet. - :type initial_event_position: int or str - - """ - self._consumer_group_name = consumer_group_name - self._eventhub_client = eventhub_client - self._eventhub_name = eventhub_client.eh_name - self._partition_processor_factory = partition_processor_factory - self._partition_manager = partition_manager - self._initial_event_position = kwargs.get("initial_event_position", "-1") - self._max_batch_size = eventhub_client._config.max_batch_size - self._receive_timeout = eventhub_client._config.receive_timeout - self._tasks = [] # type: List[asyncio.Task] - self._id = str(uuid.uuid4()) - - def __repr__(self): - return 'EventProcessor: id {}'.format(self._id) - - async def start(self): - """Start the EventProcessor. - - 1. retrieve the partition ids from eventhubs. - 2. claim partition ownership of these partitions. - 3. repeatedly call EvenHubConsumer.receive() to retrieve events and call user defined PartitionProcessor.process_events(). - - :return: None - - """ - log.info("EventProcessor %r is being started", self._id) - partition_ids = await self._eventhub_client.get_partition_ids() - claimed_list = await self._claim_partitions(partition_ids) - await self._start_claimed_partitions(claimed_list) - - async def stop(self): - """Stop all the partition consumer - - This method cancels tasks that are running EventHubConsumer.receive() for the partitions owned by this EventProcessor. - - :return: None - - """ - for i in range(len(self._tasks)): - task = self._tasks.pop() - task.cancel() - log.info("EventProcessor %r has been cancelled", self._id) - await asyncio.sleep(2) # give some time to finish after cancelled - - async def _claim_partitions(self, partition_ids): - partitions_ownership = await self._partition_manager.list_ownership(self._eventhub_name, self._consumer_group_name) - partitions_ownership_dict = dict() - for ownership in partitions_ownership: - partitions_ownership_dict[ownership["partition_id"]] = ownership - - to_claim_list = [] - for pid in partition_ids: - p_ownership = partitions_ownership_dict.get(pid) - if p_ownership: - to_claim_list.append(p_ownership) - else: - new_ownership = {"eventhub_name": self._eventhub_name, "consumer_group_name": self._consumer_group_name, - "owner_id": self._id, "partition_id": pid, "owner_level": OWNER_LEVEL} - to_claim_list.append(new_ownership) - claimed_list = await self._partition_manager.claim_ownership(to_claim_list) - return claimed_list - - async def _start_claimed_partitions(self, claimed_partitions): - for partition in claimed_partitions: - partition_id = partition["partition_id"] - offset = partition.get("offset", self._initial_event_position) - consumer = self._eventhub_client.create_consumer(self._consumer_group_name, partition_id, - EventPosition(str(offset))) - partition_processor = self._partition_processor_factory( - checkpoint_manager=CheckpointManager(partition_id, self._eventhub_name, self._consumer_group_name, - self._id, self._partition_manager) - ) - loop = get_running_loop() - task = loop.create_task( - _receive(consumer, partition_processor, self._receive_timeout)) - self._tasks.append(task) - try: - await asyncio.gather(*self._tasks) - finally: - log.info("EventProcessor %r has stopped", self._id) - - -async def _receive(partition_consumer, partition_processor, receive_timeout): - try: - while True: - try: - events = await partition_consumer.receive(timeout=receive_timeout) - except asyncio.CancelledError as cancelled_error: - log.info( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r " - "is cancelled", - partition_processor._checkpoint_manager.owner_id, - partition_processor._checkpoint_manager.eventhub_name, - partition_processor._checkpoint_manager.partition_id, - partition_processor._checkpoint_manager.consumer_group_name - ) - await partition_processor.process_error(cancelled_error) - await partition_processor.close(reason=CloseReason.SHUTDOWN) - break - except EventHubError as eh_err: - reason = CloseReason.LEASE_LOST if eh_err.error == "link:stolen" else CloseReason.EVENTHUB_EXCEPTION - log.warning( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r " - "has met an exception receiving events. It's being closed. The exception is %r.", - partition_processor._checkpoint_manager.owner_id, - partition_processor._checkpoint_manager.eventhub_name, - partition_processor._checkpoint_manager.partition_id, - partition_processor._checkpoint_manager.consumer_group_name, - eh_err - ) - await partition_processor.process_error(eh_err) - await partition_processor.close(reason=reason) - break - try: - await partition_processor.process_events(events) - except asyncio.CancelledError as cancelled_error: - log.info( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r " - "is cancelled.", - partition_processor._checkpoint_manager.owner_id, - partition_processor._checkpoint_manager.eventhub_name, - partition_processor._checkpoint_manager.partition_id, - partition_processor._checkpoint_manager.consumer_group_name - ) - await partition_processor.process_error(cancelled_error) - await partition_processor.close(reason=CloseReason.SHUTDOWN) - break - except Exception as exp: # user code has caused an error - log.warning( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r " - "has met an exception from user code process_events. It's being closed. The exception is %r.", - partition_processor._checkpoint_manager.owner_id, - partition_processor._checkpoint_manager.eventhub_name, - partition_processor._checkpoint_manager.partition_id, - partition_processor._checkpoint_manager.consumer_group_name, - exp - ) - await partition_processor.process_error(exp) - await partition_processor.close(reason=CloseReason.EVENTHUB_EXCEPTION) - break - # TODO: will review whether to break and close partition processor after user's code has an exception - # TODO: try to inform other EventProcessors to take the partition when this partition is closed in preview 3? - finally: - await partition_consumer.close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/sqlite3_partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/sqlite3_partition_manager.py deleted file mode 100644 index eb08e970fa89..000000000000 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/sqlite3_partition_manager.py +++ /dev/null @@ -1,110 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ----------------------------------------------------------------------------------- - -import time -import uuid -import sqlite3 -from .partition_manager import PartitionManager - - -def _check_table_name(table_name: str): - for c in table_name: - if not (c.isalnum() or c == "_"): - raise ValueError("Table name \"{}\" is not in correct format".format(table_name)) - return table_name - - -class Sqlite3PartitionManager(PartitionManager): - """An implementation of PartitionManager by using the sqlite3 in Python standard library. - Sqlite3 is a mini sql database that runs in memory or files. - - - """ - def __init__(self, db_filename: str = ":memory:", ownership_table: str = "ownership"): - """ - - :param db_filename: name of file that saves the sql data. - Sqlite3 will run in memory without a file when db_filename is ":memory:". - :param ownership_table: The table name of the sqlite3 database. - """ - super(Sqlite3PartitionManager, self).__init__() - self.ownership_table = _check_table_name(ownership_table) - conn = sqlite3.connect(db_filename) - c = conn.cursor() - try: - c.execute("create table " + ownership_table + - "(eventhub_name text," - "consumer_group_name text," - "owner_id text," - "partition_id text," - "owner_level integer," - "sequence_number integer," - "offset text," - "last_modified_time integer," - "etag text)") - except sqlite3.OperationalError: - pass - finally: - c.close() - self.conn = conn - - async def list_ownership(self, eventhub_name, consumer_group_name): - cursor = self.conn.cursor() - try: - fields = ["eventhub_name", "consumer_group_name", "owner_id", "partition_id", "owner_level", - "sequence_number", - "offset", "last_modified_time", "etag"] - cursor.execute("select " + ",".join(fields) + - " from "+_check_table_name(self.ownership_table)+" where eventhub_name=? " - "and consumer_group_name=?", - (eventhub_name, consumer_group_name)) - result_list = [] - - for row in cursor.fetchall(): - d = dict(zip(fields, row)) - result_list.append(d) - return result_list - finally: - cursor.close() - - async def claim_ownership(self, partitions): - cursor = self.conn.cursor() - try: - for p in partitions: - cursor.execute("select * from " + _check_table_name(self.ownership_table) + - " where eventhub_name=? " - "and consumer_group_name=? " - "and partition_id =?", - (p["eventhub_name"], p["consumer_group_name"], - p["partition_id"])) - if not cursor.fetchall(): - cursor.execute("insert into " + _check_table_name(self.ownership_table) + - " (eventhub_name,consumer_group_name,partition_id,owner_id,owner_level,last_modified_time,etag) " - "values (?,?,?,?,?,?,?)", - (p["eventhub_name"], p["consumer_group_name"], p["partition_id"], p["owner_id"], p["owner_level"], - time.time(), str(uuid.uuid4()) - )) - else: - cursor.execute("update " + _check_table_name(self.ownership_table) + " set owner_id=?, owner_level=?, last_modified_time=?, etag=? " - "where eventhub_name=? and consumer_group_name=? and partition_id=?", - (p["owner_id"], p["owner_level"], time.time(), str(uuid.uuid4()), - p["eventhub_name"], p["consumer_group_name"], p["partition_id"])) - self.conn.commit() - return partitions - finally: - cursor.close() - - async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, - offset, sequence_number): - cursor = self.conn.cursor() - try: - cursor.execute("update " + _check_table_name(self.ownership_table) + " set offset=?, sequence_number=? where eventhub_name=? and consumer_group_name=? and partition_id=?", - (offset, sequence_number, eventhub_name, consumer_group_name, partition_id)) - self.conn.commit() - finally: - cursor.close() - - async def close(self): - self.conn.close() diff --git a/sdk/eventhub/azure-eventhubs/examples/eventprocessor/event_processor_example.py b/sdk/eventhub/azure-eventhubs/examples/eventprocessor/event_processor_example.py index 8c4c9ced7d29..c0826e274704 100644 --- a/sdk/eventhub/azure-eventhubs/examples/eventprocessor/event_processor_example.py +++ b/sdk/eventhub/azure-eventhubs/examples/eventprocessor/event_processor_example.py @@ -2,9 +2,8 @@ import logging import os from azure.eventhub.aio import EventHubClient -from azure.eventhub.eventprocessor import EventProcessor -from azure.eventhub.eventprocessor import PartitionProcessor -from azure.eventhub.eventprocessor import Sqlite3PartitionManager +from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor +from azure.eventhub.aio.eventprocessor import SamplePartitionManager RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small @@ -19,32 +18,22 @@ async def do_operation(event): class MyPartitionProcessor(PartitionProcessor): - def __init__(self, checkpoint_manager): - super(MyPartitionProcessor, self).__init__(checkpoint_manager) - - async def process_events(self, events): + async def process_events(self, events, partition_context): if events: await asyncio.gather(*[do_operation(event) for event in events]) - await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number) - - -def partition_processor_factory(checkpoint_manager): - return MyPartitionProcessor(checkpoint_manager) - - -async def run_awhile(duration): - client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, - retry_total=RETRY_TOTAL) - partition_manager = Sqlite3PartitionManager() - event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager) - try: - asyncio.ensure_future(event_processor.start()) - await asyncio.sleep(duration) - await event_processor.stop() - finally: - await partition_manager.close() + await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) + else: + print("empty events received", "partition:", partition_context.partition_id) if __name__ == '__main__': loop = asyncio.get_event_loop() - loop.run_until_complete(run_awhile(60)) + client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) + partition_manager = SamplePartitionManager(db_filename="eventprocessor_test_db") + event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=1) + try: + loop.run_until_complete(event_processor.start()) + except KeyboardInterrupt: + loop.run_until_complete(event_processor.stop()) + finally: + loop.stop() diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py index 741a521d8fef..1e3cae9eefa7 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_long_running_eventprocessor.py @@ -13,7 +13,7 @@ from logging.handlers import RotatingFileHandler from azure.eventhub.aio import EventHubClient -from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager +from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor, SamplePartitionManager from azure.eventhub import EventData @@ -44,23 +44,23 @@ def get_logger(filename, level=logging.INFO): class MyEventProcessor(PartitionProcessor): - async def close(self, reason): + async def close(self, reason, partition_context): logger.info("PartitionProcessor closed (reason {}, id {})".format( reason, - self._checkpoint_manager.partition_id + partition_context.partition_id )) - async def process_events(self, events): + async def process_events(self, events, partition_context): if events: event = events[-1] print("Processing id {}, offset {}, sq_number {})".format( - self._checkpoint_manager.partition_id, + partition_context.partition_id, event.offset, event.sequence_number)) - await self._checkpoint_manager.update_checkpoint(event.offset, event.sequence_number) + await partition_context.update_checkpoint(event.offset, event.sequence_number) - async def process_error(self, error): - logger.info("Event Processor Error for partition {}, {!r}".format(self._checkpoint_manager.partition_id, error)) + async def process_error(self, error, partition_context): + logger.info("Event Processor Error for partition {}, {!r}".format(partition_context.partition_id, error)) async def wait_and_close(host, duration): @@ -133,7 +133,7 @@ async def test_long_running_eph(live_eventhub): client, live_eventhub['consumer_group'], MyEventProcessor, - Sqlite3PartitionManager() + SamplePartitionManager() ) tasks = asyncio.gather( @@ -153,4 +153,4 @@ async def test_long_running_eph(live_eventhub): config['consumer_group'] = "$Default" config['partition'] = "0" loop = asyncio.get_event_loop() - loop.run_until_complete(test_long_running_eph(config)) \ No newline at end of file + loop.run_until_complete(test_long_running_eph(config)) diff --git a/sdk/eventhub/azure-eventhubs/tests/eventprocessor_tests/test_eventprocessor.py b/sdk/eventhub/azure-eventhubs/tests/eventprocessor_tests/test_eventprocessor.py new file mode 100644 index 000000000000..93cf137e1af5 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/tests/eventprocessor_tests/test_eventprocessor.py @@ -0,0 +1,311 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import pytest +import asyncio + +from azure.eventhub import EventData, EventHubError +from azure.eventhub.aio import EventHubClient +from azure.eventhub.aio.eventprocessor import EventProcessor, SamplePartitionManager, PartitionProcessor, \ + CloseReason, OwnershipLostError + + +class LoadBalancerPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + pass + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_loadbalancer_balance(connstr_senders): + + connection_str, senders = connstr_senders + for sender in senders: + sender.send(EventData("EventProcessor Test")) + eventhub_client = EventHubClient.from_connection_string(connection_str, receive_timeout=3) + partition_manager = SamplePartitionManager() + + event_processor1 = EventProcessor(eventhub_client, "$default", LoadBalancerPartitionProcessor, + partition_manager, polling_interval=1) + asyncio.ensure_future(event_processor1.start()) + await asyncio.sleep(5) + assert len(event_processor1._tasks) == 2 # event_processor1 claims two partitions + + event_processor2 = EventProcessor(eventhub_client, "$default", LoadBalancerPartitionProcessor, + partition_manager, polling_interval=1) + + asyncio.ensure_future(event_processor2.start()) + await asyncio.sleep(5) + assert len(event_processor1._tasks) == 1 # two event processors balance. So each has 1 task + assert len(event_processor2._tasks) == 1 + + event_processor3 = EventProcessor(eventhub_client, "$default", LoadBalancerPartitionProcessor, + partition_manager, polling_interval=1) + asyncio.ensure_future(event_processor3.start()) + await asyncio.sleep(5) + assert len(event_processor3._tasks) == 0 + await event_processor3.stop() + + await event_processor1.stop() + await asyncio.sleep(5) + assert len(event_processor2._tasks) == 2 # event_procesor2 takes another one after event_processor1 stops + await event_processor2.stop() + + +@pytest.mark.asyncio +async def test_load_balancer_abandon(): + class TestPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + await asyncio.sleep(0.1) + + class MockEventHubClient(object): + eh_name = "test_eh_name" + + def create_consumer(self, consumer_group_name, partition_id, event_position): + return MockEventhubConsumer() + + async def get_partition_ids(self): + return [str(pid) for pid in range(6)] + + class MockEventhubConsumer(object): + async def receive(self): + return [] + + partition_manager = SamplePartitionManager() + + event_processor = EventProcessor(MockEventHubClient(), "$default", TestPartitionProcessor, + partition_manager, polling_interval=0.5) + asyncio.get_running_loop().create_task(event_processor.start()) + await asyncio.sleep(5) + + ep_list = [] + for _ in range(2): + ep = EventProcessor(MockEventHubClient(), "$default", TestPartitionProcessor, + partition_manager, polling_interval=0.5) + asyncio.get_running_loop().create_task(ep.start()) + ep_list.append(ep) + await asyncio.sleep(5) + assert len(event_processor._tasks) == 2 + for ep in ep_list: + await ep.stop() + await event_processor.stop() + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_loadbalancer_list_ownership_error(connstr_senders): + class ErrorPartitionManager(SamplePartitionManager): + async def list_ownership(self, eventhub_name, consumer_group_name): + raise RuntimeError("Test runtime error") + + connection_str, senders = connstr_senders + for sender in senders: + sender.send(EventData("EventProcessor Test")) + eventhub_client = EventHubClient.from_connection_string(connection_str, receive_timeout=3) + partition_manager = ErrorPartitionManager() + + event_processor = EventProcessor(eventhub_client, "$default", LoadBalancerPartitionProcessor, + partition_manager, polling_interval=1) + asyncio.ensure_future(event_processor.start()) + await asyncio.sleep(5) + assert event_processor._running is True + assert len(event_processor._tasks) == 0 + await event_processor.stop() + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_partition_processor(connstr_senders): + partition_processor1 = None + partition_processor2 = None + + class TestPartitionProcessor(PartitionProcessor): + def __init__(self): + self.initialize_called = False + self.error = None + self.close_reason = None + self.received_events = [] + self.checkpoint = None + + async def initialize(self, partition_context): + nonlocal partition_processor1, partition_processor2 + if partition_context.partition_id == "1": + partition_processor1 = self + else: + partition_processor2 = self + + async def process_events(self, events, partition_context): + self.received_events.extend(events) + if events: + offset, sn = events[-1].offset, events[-1].sequence_number + await partition_context.update_checkpoint(offset, sn) + self.checkpoint = (offset, sn) + + async def process_error(self, error, partition_context): + self.error = error + assert partition_context is not None + + async def close(self, reason, partition_context): + self.close_reason = reason + assert partition_context is not None + + connection_str, senders = connstr_senders + for sender in senders: + sender.send(EventData("EventProcessor Test")) + eventhub_client = EventHubClient.from_connection_string(connection_str, receive_timeout=3) + partition_manager = SamplePartitionManager() + + event_processor = EventProcessor(eventhub_client, "$default", TestPartitionProcessor, + partition_manager, polling_interval=1) + asyncio.ensure_future(event_processor.start()) + await asyncio.sleep(10) + await event_processor.stop() + assert partition_processor1 is not None and partition_processor2 is not None + assert len(partition_processor1.received_events) == 1 and len(partition_processor2.received_events) == 1 + assert partition_processor1.checkpoint is not None + assert partition_processor1.close_reason == CloseReason.SHUTDOWN + assert partition_processor1.error is None + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_partition_processor_process_events_error(connstr_senders): + class ErrorPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + if partition_context.partition_id == "1": + raise RuntimeError("processing events error") + else: + pass + + async def process_error(self, error, partition_context): + if partition_context.partition_id == "1": + assert isinstance(error, RuntimeError) + else: + raise RuntimeError("There shouldn't be an error for partition other than 1") + + async def close(self, reason, partition_context): + if partition_context.partition_id == "1": + assert reason == CloseReason.PROCESS_EVENTS_ERROR + else: + assert reason == CloseReason.SHUTDOWN + + connection_str, senders = connstr_senders + for sender in senders: + sender.send(EventData("EventProcessor Test")) + eventhub_client = EventHubClient.from_connection_string(connection_str, receive_timeout=3) + partition_manager = SamplePartitionManager() + + event_processor = EventProcessor(eventhub_client, "$default", ErrorPartitionProcessor, + partition_manager, polling_interval=1) + asyncio.ensure_future(event_processor.start()) + await asyncio.sleep(10) + await event_processor.stop() + + +@pytest.mark.asyncio +async def test_partition_processor_process_eventhub_consumer_error(): + class TestPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + pass + + async def process_error(self, error, partition_context): + assert isinstance(error, EventHubError) + + async def close(self, reason, partition_context): + assert reason == CloseReason.EVENTHUB_EXCEPTION + + class MockEventHubClient(object): + eh_name = "test_eh_name" + + def create_consumer(self, consumer_group_name, partition_id, event_position): + return MockEventhubConsumer() + + async def get_partition_ids(self): + return ["0", "1"] + + class MockEventhubConsumer(object): + async def receive(self): + raise EventHubError("Mock EventHubConsumer EventHubError") + + eventhub_client = MockEventHubClient() + partition_manager = SamplePartitionManager() + + event_processor = EventProcessor(eventhub_client, "$default", TestPartitionProcessor, + partition_manager, polling_interval=1) + asyncio.ensure_future(event_processor.start()) + await asyncio.sleep(5) + await event_processor.stop() + + +@pytest.mark.asyncio +async def test_partition_processor_process_error_close_error(): + class TestPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + raise RuntimeError("process_error") + + async def process_error(self, error, partition_context): + assert isinstance(error, RuntimeError) + raise RuntimeError("error from process_error") + + async def close(self, reason, partition_context): + assert reason == CloseReason.PROCESS_EVENTS_ERROR + raise RuntimeError("close error") + + class MockEventHubClient(object): + eh_name = "test_eh_name" + + def create_consumer(self, consumer_group_name, partition_id, event_position): + return MockEventhubConsumer() + + async def get_partition_ids(self): + return ["0", "1"] + + class MockEventhubConsumer(object): + async def receive(self): + return [EventData("mock events")] + + eventhub_client = MockEventHubClient() #EventHubClient.from_connection_string(connection_str, receive_timeout=3) + partition_manager = SamplePartitionManager() + + event_processor = EventProcessor(eventhub_client, "$default", TestPartitionProcessor, + partition_manager, polling_interval=1) + asyncio.ensure_future(event_processor.start()) + await asyncio.sleep(5) + await event_processor.stop() + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_partition_processor_process_update_checkpoint_error(connstr_senders): + class ErrorPartitionManager(SamplePartitionManager): + async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, + offset, sequence_number): + if partition_id == "1": + raise OwnershipLostError("Mocked ownership lost") + + class TestPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + if events: + await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) + + async def process_error(self, error, partition_context): + assert isinstance(error, OwnershipLostError) + + async def close(self, reason, partition_context): + if partition_context.partition_id == "1": + assert reason == CloseReason.OWNERSHIP_LOST + else: + assert reason == CloseReason.SHUTDOWN + + connection_str, senders = connstr_senders + for sender in senders: + sender.send(EventData("EventProcessor Test")) + eventhub_client = EventHubClient.from_connection_string(connection_str, receive_timeout=3) + partition_manager = ErrorPartitionManager() + + event_processor = EventProcessor(eventhub_client, "$default", TestPartitionProcessor, + partition_manager, polling_interval=1) + asyncio.ensure_future(event_processor.start()) + await asyncio.sleep(10) + await event_processor.stop()