diff --git a/sdk/eventhub/azure-eventhub/migration_guide.md b/sdk/eventhub/azure-eventhub/migration_guide.md index a4fb99d8a07f..276ecbb9a463 100644 --- a/sdk/eventhub/azure-eventhub/migration_guide.md +++ b/sdk/eventhub/azure-eventhub/migration_guide.md @@ -1,266 +1,178 @@ -# Guide to migrate from azure-eventhub v1 to v5 +# Guide for migrating azure-eventhub to v5 from v1 -This document is intended for users that are familiar with V1 of the Python SDK for Event Hubs library (`azure-eventhub 1.x.x`) and wish -to migrate their application to V5 of the same library. +This guide is intended to assist in the migration to `azure-eventhub` v5 from v1. It will focus on side-by-side comparisons for similar operations between the two packages. -For users new to the Python SDK for Event Hubs, please see the [readme file for the azure-eventhub](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/README.md). +Familiarity with the `azure-eventhub` v1 package is assumed. For those new to the Event Hubs client library for Python, please refer to the [README for `azure-eventhub`](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/README.md) rather than this guide. -## General changes -Version 5 of the azure-eventhub package is the result of our efforts to create a client library that is user-friendly and idiomatic to the Python ecosystem. -Alongside an API redesign driven by the new [Azure SDK Design Guidelines for Python](https://azure.github.io/azure-sdk/python_introduction.html#design-principles), -the latest version improves on several areas from V1. +## Table of contents -### Specific clients for sending and receiving -In V5 we've simplified the API surface, making two distinct clients, rather than having a single `EventHubClient`: -* `EventHubProducerClient` for sending messages. [Sync API](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html#azure.eventhub.EventHubProducerClient) -and [Async API](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.aio.html#azure.eventhub.aio.EventHubProducerClient) -* `EventHubConsumerClient` for receiving messages. [Sync API](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html#azure.eventhub.EventHubConsumerClient) -and [Async API](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.aio.html#azure.eventhub.aio.EventHubConsumerClient) +* [Migration benefits](#migration-benefits) + - [Cross Service SDK improvements](#cross-service-sdk-improvements) +* [Important changes](#important-changes) + - [Client hierarchy](#client-hierarchy) + - [Client constructors](#client-constructors) + - [Sending](#sending-events) + - [Receiving](#receiving-events) + - [Receiving with checkpoints](#receiving-with-checkpoints) +* [Additional samples](#additional-samples) -We've also merged the functionality from `EventProcessorHost` into -`EventHubConsumerClient`, allowing `EventHubConsumerClient` to be the single -point of entry for receiving of any type (from single partition, all partitions, or with load balancing and checkpointing features) within Event Hubs. +## Migration benefits -V5 has both sync and async APIs. Sync API is under package `azure.eventhub` whereas async API is under package `azure.eventhub.aio`. -They have the same class names under the two packages. For instance, class `EventHubConsumerClient` with sync API under package `azure.eventhub` has its -async counterpart under package `auzre.eventhub.aio`. -The code samples in this migration guide use async APIs. +A natural question to ask when considering whether or not to adopt a new version or library is what the benefits of doing so would be. As Azure has matured and been embraced by a more diverse group of developers, we have been focused on learning the patterns and practices to best support developer productivity and to understand the gaps that the Python client libraries have. -### Client constructors +There were several areas of consistent feedback expressed across the Azure client library ecosystem. One of the most important is that the client libraries for different Azure services have not had a consistent approach to organization, naming, and API structure. Additionally, many developers have felt that the learning curve was difficult, and the APIs did not offer a good, approachable, and consistent onboarding story for those learning Azure or exploring a specific Azure service. -| In v1 | Equivalent in v5 | Sample | -|---|---|---| -| `EventHubClientAsync()` | `EventHubProducerClient()` or `EventHubConsumerClient()` | [using credential](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/samples/async_samples/client_identity_authentication_async.py) | -| `EventHubClientAsync.from_connection_string()` | `EventHubProducerClient.from_connection_string` or `EventHubConsumerClient.from_connection_string` |[client creation](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/samples/async_samples/client_creation_async.py) | -| `EventProcessorHost()`| `EventHubConsumerClient(..., checkpoint_store)`| [receive events using checkpoint store](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_store_async.py) | +To try and improve the development experience across Azure services, a set of uniform [design guidelines](https://azure.github.io/azure-sdk/general_introduction.html) was created for all languages to drive a consistent experience with established API patterns for all services. A set of [Python-specific guidelines](https://azure.github.io/azure-sdk/python_introduction.html) was also introduced to ensure that Python clients have a natural and idiomatic feel with respect to the Python ecosystem. Further details are available in the guidelines for those interested. -In V5, the SDK provides `BlobCheckpointStore` in extension packages azure-eventhub-checkpointstoreblob (for sync) and azure-eventhub-checkpointstoreblob-aio (for async). -You can define your own `CheckpointStore` class to persist checkpoint data. +### Cross Service SDK improvements -### Receiving events +The modern Event Hubs client library also provides the ability to share in some of the cross-service improvements made to the Azure development experience, such as +- using the new [`azure-identity`](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/identity/azure-identity/README.md) library to share a single authentication approach between clients +- a unified logging and diagnostics pipeline offering a common view of the activities across each of the client libraries -| In v1 | Equivalent in v5 | Sample | -|---|---|---| -| `EventHubClientAsync.add_async_receiver()` and `AsyncReceiver.receive()`| `EventHubConsumerClient.receive()`| [receive events](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/samples/async_samples/recv_async.py) | +## Important changes -### Sending events -The process of building event batches is more transparent with `send_batch` of V5. +### Client hierarchy +In the interest of simplifying the API surface, we've made two distinct clients, rather than having a single `EventHubClient`: +* `EventHubProducerClient` for sending events. +* `EventHubConsumerClient` for receiving events. -| In v1 | Equivalent in v5 | Sample | -|---|---|---| -| `EventHubClientAsync.add_async_sender()` and `AsyncSender.send()`| `EventHubProducerClient.send_batch()`| [send events](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py) | +We've also merged the functionality from `EventProcessorHost` into `EventHubConsumerClient`. -## Migration samples +#### Approachability +By having a single entry point for sending, the `EventHubProducerClient` helps with the discoverability of the API +as you can explore all available features for sending events through methods from a single client, as opposed to searching +through documentation or exploring namespace for the types that you can instantiate. -* [Receiving events](#migrating-code-from-eventhubclient-and-asyncreceiver-to-eventhubconsumerclient-for-receiving-events) -* [Sending events](#migrating-code-from-eventhubclient-and-asyncsender-to-eventhubproducerclient-for-sending-events) -* [Receiving events with checkpointing](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events) +Similarly, by having a single entry point for receiving of any type (from single partition, all partitions, or with load balancing and checkpointing features) within Event Hubs, the `EventHubConsumerClient` helps with the discoverability of the API as you can explore all available features for receiving events through methods from a single client, as opposed to searching +through documentation or exploring namespace for the types that you can instantiate. -### Migrating code from `EventHubClient` and `AsyncReceiver` to `EventHubConsumerClient` for receiving events +#### Consistency +We now have methods with similar names, signature and location for sending and receiving. +This provides consistency and predictability on the various features of the library. -In V1, `AsyncReceiver.receive()` returns a list of EventData. +### Client constructors -In V5, EventHubConsumerClient.receive() calls user callback on_event to process events. +- While we continue to support connection strings when constructing a client, the main difference is when using Azure Active Directory. +We now use the new [`azure-identity`](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/identity/azure-identity/README.md) library +to share a single authentication solution between clients of different Azure services. -For example, this code which keeps receiving from a partition in V1: +In v1: +```python + # Authenticate with address + eventhub_client = EventHubClient(address) + # Authenticate with connection string + eventhub_client = EventHubClient.from_connection_string(conn_str) +``` +In v5: ```python -client = EventHubClientAsync.from_connection_string(connection_str, eventhub=EVENTHUB_NAME) -receiver = client.add_async_receiver(consumer_group="$Default", partition="0", offset=Offset('@latest')) -try: - await client.run_async() - logger = logging.getLogger("azure.eventhub") - while True: - received = await receiver.receive(timeout=5) - for event_data in received: - logger.info("Message received:{}".format(event_data.body_as_str())) -finally: - await client.stop_async() + # Authenticate with connection string + producer_client = EventHubProducerClient.from_connection_string(conn_str) + consumer_client = EventHubConsumerClient.from_connection_string(conn_str) + + # Authenticate with Active Directory + from azure.identity import EnvironmentCredential + producer_client = EventHubProducerClient(fully_qualified_namespace, eventhub_name, credential=EnvironmentCredential()) + consumer_client = EventHubConsumerClient(fully_qualified_namespace, eventhub_name, consumer_group='$Default', credential=EnvironmentCredential()) + + # Authenticate consumer with connection string and checkpoint + from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore + checkpoint_store = BlobCheckpointStore.from_connection_string(storage_conn_str, container_name) + consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group='$Default', checkpoint_store=checkpoint_store) ``` +### Sending events +* `add_sender`, `run`, and `stop` methods are replaced by `from_connection_string` method on `EventGridProducerClient` to more approachably open a connection ready for sending. +* `send` method is replaced by `send_batch` method on `EventGridProducerClient` to clarify that, instead of single `EventData`, either an `EventDataBatch` or list of `EventData` are sent in one call not exceeding the event hub frame size limit. +* `EventDataBatch` is created using the `create_batch` method and `EventData` messages are added to the batch using the `add` method, until the size limit is reached. -Becomes this in V5: - +In v1: ```python -logger = logging.getLogger("azure.eventhub") -async def on_event(partition_context, event): - logger.info("Message received:{}".format(event.body_as_str())) - await partition_context.update_checkpoint(event) - -client = EventHubConsumerClient.from_connection_string( - conn_str=CONNECTION_STR, consumer_group="$Default", eventhub_name=EVENTHUB_NAME -) -async with client: - await client.receive(on_event=on_event, partition_id="0", starting_position="@latest") + client = EventHubClient(address) + sender = client.add_sender() + client.run() + sender.send(EventData('Single message')) + client.stop() ``` -### Migrating code from `EventHubClient` and `AsyncSender` to `EventHubProducerClient` for sending events +In v5: +```python + producer_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name) -In V1, you create an `EventHubClient`, then create a `AsyncSender`, and call `AsyncSender.send` to send an event that may have -a list/generator of messages. + # Send EventDataBatch + event_data_batch = producer.create_batch() + event_data_batch.add(EventData('Single message')) + producer.send_batch(event_data_batch) -In V5, this has been consolidated into a one method - `EventHubProducerClient.send_batch`. -Batching merges information from multiple events into a single send, reducing -the amount of network communication needed vs sending events one at a time. -This method deterministically tells you whether the batch of events are sent to the event hub. + # Send list of EventData + event_data_batch = [EventData('Single message')] + producer.send_batch(event_data_batch) +``` -So in V1: +### Receiving events +* `add_receiver`, `run`, and `stop` methods are replaced by `from_connection_string` method on `EventGridConsumerClient` to more approachably open a connection ready for receiving. +* `receive` method is renamed `receive_batch` on `EventGridConsumerClient` to be more consistent in the usage of `batch` suffix in other methods on the producer and consumer when receiving or sending batches. +* `receive` method on `EventGridConsumerClient` now receives only a single event as opposed to previously receiving a batch of events to more clearly reflect the naming, in which `batch` is not used as a suffix. + +In v1: ```python -client = EventHubClientAsync.from_connection_string(connection_str, eventhub=EVENTHUB_NAME) -sender = client.add_async_sender(partition="0") -try: - await client.run_async() - event_data = EventData(b"A single event") - await sender.send(event_data) -finally: - await client.stop_async() + client = EventHubClient(address) + receiver = client.add_receiver(consumer_group, partition) + client.run() + batch = receiver.receive() + client.stop() ``` -In V5: +In v5: ```python -producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENTHUB_NAME) -async with producer: - event_data_batch = await producer.create_batch(partition_id="0") - event_data_batch.add(EventData(b"A single event")) - await producer.send_batch(event_data_batch) + # Receive + def on_event(partition_context, event): + print("Received event from partition: {}.".format(partition_context.partition_id)) + + consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group, eventhub_name=eh_name) + with consumer_client: + consumer_client.receive(on_event=on_event) + + # Receive batch + def on_event_batch(partition_context, event_batch): + print("Partition {}, Received count: {}".format(partition_context.partition_id, len(event_batch))) + + consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group, eventhub_name=eh_name) + with consumer_client: + consumer_client.receive_batch(on_event_batch=on_event_batch) ``` -### Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events - -In V1, `EventProcessorHost` allowed you to balance the load between multiple instances of -your program when receiving events. - -In V5, `EventHubConsumerClient` allows you to do the same with the `receive()` method if you -pass a `CheckpointStore` to the constructor. - -> **Note:** V1 checkpoints are not compatible with V5 checkpoints. -If pointed at the same blob, consumption will begin at the first message. -V1 checkpoint json in the respective blobs can be manually converted (per-partition) if needed. -In V1 checkpoints (sequence_number and offset) are stored in the format of json along with ownership information -as the content of the blob, while in V5, checkpoints are kept in the metadata of a blob and the metadata is composed of name-value pairs. -Please check [update_checkpoint](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py#L231-L250) in V5 for implementation detail. +### Receiving with checkpoints +Consuming events and saving checkpoints using a checkpoint store was not available in v1. -So in V1: +In v5: ```python -import logging -import asyncio -import os - -from azure.eventprocessorhost import ( - AbstractEventProcessor, - AzureStorageCheckpointLeaseManager, - EventHubConfig, - EventProcessorHost, - EPHOptions) - -logger = logging.getLogger("azure.eventhub") - - -class EventProcessor(AbstractEventProcessor): - def __init__(self, params=None): - super().__init__(params) - self._msg_counter = 0 - - async def open_async(self, context): - logger.info("Connection established {}".format(context.partition_id)) - - async def close_async(self, context, reason): - logger.info("Connection closed (reason {}, id {})".format( - reason, - context.partition_id)) - - async def process_events_async(self, context, messages): - self._msg_counter += len(messages) - logger.info("Partition id {}, Events processed {}".format(context.partition_id, self._msg_counter)) - await context.checkpoint_async() - - async def process_error_async(self, context, error): - logger.error("Event Processor Error {!r}".format(error)) - -# Storage Account Credentials -STORAGE_ACCOUNT_NAME = os.environ.get('AZURE_STORAGE_ACCOUNT') -STORAGE_KEY = os.environ.get('AZURE_STORAGE_ACCESS_KEY') -LEASE_CONTAINER_NAME = "leases" - -NAMESPACE = os.environ.get('EVENT_HUB_NAMESPACE') -EVENTHUB = os.environ.get('EVENT_HUB_NAME') -USER = os.environ.get('EVENT_HUB_SAS_POLICY') -KEY = os.environ.get('EVENT_HUB_SAS_KEY') - -# Eventhub config and storage manager -eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$Default") -eh_options = EPHOptions() -eh_options.debug_trace = False -storage_manager = AzureStorageCheckpointLeaseManager( - STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME) - -# Event loop and host -loop = asyncio.get_event_loop() -host = EventProcessorHost( - EventProcessor, - eh_config, - storage_manager, - ep_params=["param1","param2"], - eph_options=eh_options, - loop=loop) -try: - loop.run_until_complete(host.open_async()) -finally: - await host.close_async() - loop.stop() - + # Receive with checkpoint + from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore + + def on_event(partition_context, event): + print("Received event from partition: {}.".format(partition_context.partition_id)) + partition_context.update_checkpoint(event) + + checkpoint_store = BlobCheckpointStore.from_connection_string(storage_conn_str, container_name) + consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group, checkpoint_store=checkpoint_store) + with consumer_client: + consumer_client.receive(on_event=on_event) + + # Receive batch with checkpoint + from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore + + def on_event_batch(partition_context, event_batch): + print("Partition {}, Received count: {}".format(partition_context.partition_id, len(event_batch))) + # TODO: find out whether anything should be passed in, and if so, pass it in + partition_context.update_checkpoint() + + checkpoint_store = BlobCheckpointStore.from_connection_string(storage_conn_str, container_name) + consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group, checkpoint_store=checkpoint_store) + with consumer_client: + consumer_client.receive_batch(on_event_batch=on_event_batch) ``` -And in V5: -```python -import asyncio -import os -import logging -from collections import defaultdict -from azure.eventhub.aio import EventHubConsumerClient -from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore - -logging.basicConfig(level=logging.INFO) -CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] -STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] -BLOB_CONTAINER_NAME = "your-blob-container-name" -logger = logging.getLogger("azure.eventhub") - -events_processed = defaultdict(int) -async def on_event(partition_context, event): - partition_id = partition_context.partition_id - events_processed[partition_id] += 1 - logger.info("Partition id {}, Events processed {}".format(partition_id, events_processed[partition_id])) - await partition_context.update_checkpoint(event) - -async def on_partition_initialize(context): - logger.info("Partition {} initialized".format(context.partition_id)) - -async def on_partition_close(context, reason): - logger.info("Partition {} has closed, reason {})".format(context.partition_id, reason)) - -async def on_error(context, error): - if context: - logger.error("Partition {} has a partition related error {!r}.".format(context.partition_id, error)) - else: - logger.error("Receiving event has a non-partition error {!r}".format(error)) - -async def main(): - checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME) - client = EventHubConsumerClient.from_connection_string( - CONNECTION_STR, - consumer_group="$Default", - checkpoint_store=checkpoint_store, - ) - async with client: - await client.receive( - on_event, - on_error=on_error, # optional - on_partition_initialize=on_partition_initialize, # optional - on_partition_close=on_partition_close, # optional - starting_position="-1", # "-1" is from the beginning of the partition. - ) - -if __name__ == '__main__': - loop = asyncio.get_event_loop() - loop.run_until_complete(main()) -``` +## Additional samples + +More examples can be found at [Samples for azure-eventhub](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub/samples) \ No newline at end of file