Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Azure EventHubs Checkpoint Store client library for Python using Storage Blobs

Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs.
This Checkpoint Store package works as a plug-in package to `EventProcessor`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.
This Checkpoint Store package works as a plug-in package to `EventHubConsumerClient`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.

Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob).

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
import logging
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager
from azure.storage.blob.aio import ContainerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]

logging.basicConfig(level=logging.INFO)


async def do_operation(event):
# put your code here
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
print(event)


async def process_events(partition_context, events):
# put your code here
await asyncio.gather(*[do_operation(event) for event in events])
await partition_context.update_checkpoint(events[-1])


if __name__ == '__main__':
loop = asyncio.get_event_loop()
container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor")
partition_manager = BlobPartitionManager(container_client=container_client)
client = EventHubConsumerClient.from_connection_string(CONNECTION_STR, partition_manager=partition_manager)
try:
loop.run_until_complete(client.receive(process_events, "$default"))
except KeyboardInterrupt:
loop.run_until_complete(client.close())
finally:
loop.stop()
2 changes: 2 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs.
This Checkpoint Store package works as a plug-in package to `EventHubConsumerClient`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.

Please note that this is a sync library, for async version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob-aio](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio).

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

## Getting started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,29 @@
from azure.eventhub.extensions.checkpointstoreblob import BlobPartitionManager
from azure.storage.blob import ContainerClient

RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]

logging.basicConfig(level=logging.INFO)


def do_operation(event):
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
# put your code here
print(event)


def process_events(partition_context, events):
if events:
print("received events: {} from partition: {}".format(len(events), partition_context.partition_id))
for event in events:
do_operation(event)
partition_context.update_checkpoint(events[-1])
else:
print("empty events received", "partition:", partition_context.partition_id)
# put your code here
print("received events: {} from partition: {}".format(len(events), partition_context.partition_id))
for event in events:
do_operation(event)
partition_context.update_checkpoint(events[-1])


if __name__ == '__main__':
container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor")
partition_manager = BlobPartitionManager(container_client=container_client)
client = EventHubConsumerClient.from_connection_string(CONNECTION_STR, partition_manager=partition_manager, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL)
client = EventHubConsumerClient.from_connection_string(CONNECTION_STR, partition_manager=partition_manager)
try:
client.receive(process_events, "$default")
except KeyboardInterrupt:
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ a consumer group in an Event Hub instance.The `EventHubConsumerClient` uses an i
and to store the relevant information required by the load balancing algorithm.

Search pypi with the prefix `azure-eventhub-checkpointstore` to
find packages that support this and use the PartitionManager implementation from one such package.
find packages that support this and use the PartitionManager implementation from one such package. Please note that both sync and async libraries are provided.

In the below example, we create an instance of `EventHubConsumerClient` and use a `BlobPartitionManager`. You need
to [create an Azure Storage account](https://docs.microsoft.com/en-us/azure/storage/common/storage-quickstart-create-account?tabs=azure-portal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ async def send(self, event_data: Union[EventData, EventDataBatch, Iterable[Event
"""Sends event data and blocks until acknowledgement is received or operation times out.

:param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects.
:type event_data: ~azure.eventhub.EventData or ~azure.eventhub.EventDataBatch or Iterator[~azure.eventhub.EventData]
:type event_data: ~azure.eventhub.EventData or ~azure.eventhub.EventDataBatch or
Iterator[~azure.eventhub.EventData]
:keyword str partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
:keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service
Expand Down
67 changes: 66 additions & 1 deletion sdk/eventhub/azure-eventhubs/doc/azure.eventhub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ azure.eventhub package
:undoc-members:
:inherited-members:


.. autoclass:: azure.eventhub.EventHubConsumerClient
:members:
:undoc-members:
Expand All @@ -22,6 +21,72 @@ azure.eventhub package
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.EventPosition
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.EventHubSharedKeyCredential
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.EventHubSASTokenCredential
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.PartitionManager
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.CloseReason
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.TransportType
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.EventHubError
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.ConnectError
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.ConnectionLostError
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.EventDataError
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.EventDataSendError
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.AuthenticationError
:members:
:undoc-members:
:inherited-members:

.. autoclass:: azure.eventhub.OwnershipLostError
:members:
:undoc-members:
:inherited-members:


Subpackages
-----------

Expand Down
4 changes: 0 additions & 4 deletions sdk/eventhub/azure-eventhubs/examples/__init__.py

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading