Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs.
This Checkpoint Store package works as a plug-in package to `EventHubConsumerClient`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.

Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhub-checkpointstoreblob](./).
Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhub-checkpointstoreblob](../azure-eventhub-checkpointstoreblob).

[Source code](./) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0/azure.eventhub.aio.html#azure.eventhub.aio.CheckpointStore) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name" # Please make sure the blob container resource exists.


async def on_event(partition_context, event):
# Put your code here.
# Do some sync or async operations. If the operation is i/o intensive, async will have better performance
# Do some sync or async operations. If the operation is i/o intensive, async will have better performance.
print(event)
await partition_context.update_checkpoint(event)

Expand All @@ -22,7 +23,7 @@ async def main(client):
loop = asyncio.get_event_loop()
checkpoint_store = BlobCheckpointStore.from_connection_string(
STORAGE_CONNECTION_STR,
container_name="eventprocessor"
container_name=BLOB_CONTAINER_NAME
)
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR,
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub-checkpointstoreblob/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def main():
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name = eventhub_name,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,26 @@

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name" # Please make sure the blob container resource exists.


def on_event(partition_context, event):
# do something with event
# Put your code here.
# Avoid time-consuming operations.
print(event)
partition_context.update_checkpoint(event)


if __name__ == '__main__':
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, "eventprocessor")
checkpoint_store = BlobCheckpointStore.from_connection_string(
STORAGE_CONNECTION_STR,
container_name=BLOB_CONTAINER_NAME
)
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR, "$default", checkpoint_store=checkpoint_store)
CONNECTION_STR,
"$Default",
checkpoint_store=checkpoint_store
)

try:
client.receive(on_event)
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhub/HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
**Breaking changes**

- `EventData`
- Removed deprecated property `application_properties` and deprecated method `encode_message()`
- Removed deprecated property `application_properties` and deprecated method `encode_message()`.
- `EventHubConsumerClient`
- `on_error` would be called when `EventHubConsumerClient` failed to claim ownership of partitions.
- `on_partition_close` and `on_partition_initialize` would be called in the case of exceptions raised by `on_event` callback.
Expand Down Expand Up @@ -54,7 +54,7 @@
- `PartitionContext` now has attribute `last_enqueued_event_properties` which is populated if `track_last_enqueued_event_properties` is set to `True` in the `receive` method.


** New features **
**New features**

- Added new parameter `idle_timeout` in construct and `from_connection_string` to `EventHubConsumerClient` and `EventHubProducerClient`
after which the underlying connection will close if there is no further activity.
Expand Down
6 changes: 3 additions & 3 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)

event_data_batch = client.create_batch(max_size_in_bytes=10000)
event_data_batch = client.create_batch()
can_add = True
while can_add:
try:
Expand Down Expand Up @@ -200,7 +200,7 @@ consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
event_data_batch = await client.create_batch(max_size_in_bytes=10000)
event_data_batch = await client.create_batch()
can_add = True
while can_add:
try:
Expand Down Expand Up @@ -284,7 +284,7 @@ connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<STRING FOR THE BLOB NAME>>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
# do something
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class EventData(object):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START create_event_data]
:end-before: [END create_event_data]
:language: python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class EventHubConsumerClient(ClientBase):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START create_eventhub_consumer_client_sync]
:end-before: [END create_eventhub_consumer_client_sync]
:language: python
Expand Down Expand Up @@ -196,7 +196,7 @@ def from_connection_string(cls, conn_str, consumer_group, **kwargs):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START create_eventhub_consumer_client_from_conn_str_sync]
:end-before: [END create_eventhub_consumer_client_from_conn_str_sync]
:language: python
Expand Down Expand Up @@ -270,7 +270,7 @@ def receive(self, on_event, **kwargs):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START eventhub_consumer_client_receive_sync]
:end-before: [END eventhub_consumer_client_receive_sync]
:language: python
Expand Down Expand Up @@ -389,7 +389,7 @@ def close(self):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START eventhub_consumer_client_close_sync]
:end-before: [END eventhub_consumer_client_close_sync]
:language: python
Expand Down
13 changes: 7 additions & 6 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class EventHubProducerClient(ClientBase):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START create_eventhub_producer_client_sync]
:end-before: [END create_eventhub_producer_client_sync]
:language: python
Expand Down Expand Up @@ -172,7 +172,7 @@ def from_connection_string(cls, conn_str, **kwargs):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START create_eventhub_producer_client_from_conn_str_sync]
:end-before: [END create_eventhub_producer_client_from_conn_str_sync]
:language: python
Expand Down Expand Up @@ -200,7 +200,7 @@ def send_batch(self, event_data_batch, **kwargs):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START eventhub_producer_client_send_sync]
:end-before: [END eventhub_producer_client_send_sync]
:language: python
Expand Down Expand Up @@ -232,12 +232,13 @@ def create_batch(self, **kwargs):
will assign to all partitions using round-robin.
:keyword str partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
:keyword int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold.
:keyword int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold. By
default, the value is determined by your Event Hubs tier.
:rtype: ~azure.eventhub.EventDataBatch

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START eventhub_producer_client_create_batch_sync]
:end-before: [END eventhub_producer_client_create_batch_sync]
:language: python
Expand Down Expand Up @@ -322,7 +323,7 @@ def close(self):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub.py
.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START eventhub_producer_client_close_sync]
:end-before: [END eventhub_producer_client_close_sync]
:language: python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class EventHubConsumerClient(ClientBaseAsync):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START create_eventhub_consumer_client_async]
:end-before: [END create_eventhub_consumer_client_async]
:language: python
Expand Down Expand Up @@ -215,7 +215,7 @@ def from_connection_string(

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START create_eventhub_consumer_client_from_conn_str_async]
:end-before: [END create_eventhub_consumer_client_from_conn_str_async]
:language: python
Expand Down Expand Up @@ -320,7 +320,7 @@ async def receive(

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START eventhub_consumer_client_receive_async]
:end-before: [END eventhub_consumer_client_receive_async]
:language: python
Expand Down Expand Up @@ -439,7 +439,7 @@ async def close(self) -> None:

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START eventhub_consumer_client_close_async]
:end-before: [END eventhub_consumer_client_close_async]
:language: python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class EventHubProducerClient(ClientBaseAsync):

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START create_eventhub_producer_client_async]
:end-before: [END create_eventhub_producer_client_async]
:language: python
Expand Down Expand Up @@ -189,7 +189,7 @@ def from_connection_string(

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START create_eventhub_producer_client_from_conn_str_async]
:end-before: [END create_eventhub_producer_client_from_conn_str_async]
:language: python
Expand Down Expand Up @@ -231,7 +231,7 @@ async def send_batch(

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START eventhub_producer_client_send_async]
:end-before: [END eventhub_producer_client_send_async]
:language: python
Expand Down Expand Up @@ -267,12 +267,13 @@ async def create_batch(
will assign to all partitions using round-robin.
:param str partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
:param int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold.
:param int max_size_in_bytes: The maximum size of bytes data that an EventDataBatch object can hold. By
default, the value is determined by your Event Hubs tier.
:rtype: ~azure.eventhub.EventDataBatch

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START eventhub_producer_client_create_batch_async]
:end-before: [END eventhub_producer_client_create_batch_async]
:language: python
Expand Down Expand Up @@ -351,7 +352,7 @@ async def close(self) -> None:

.. admonition:: Example:

.. literalinclude:: ../samples/docstring_samples/sample_code_eventhub_async.py
.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START eventhub_producer_client_close_async]
:end-before: [END eventhub_producer_client_close_async]
:language: python
Expand Down
13 changes: 7 additions & 6 deletions sdk/eventhub/azure-eventhub/migration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ The code samples in this migration guide use async APIs.

| In v1 | Equivalent in v5 | Sample |
|---|---|---|
| `EventHubClientAsync()` | `EventHubProducerClient()` or `EventHubConsumerClient()` | [using credential](./samples/async_samples/client_secret_auth_async.py ) |
| `EventHubClientAsync.from_connection_string()` | `EventHubProducerClient.from_connection_string` or `EventHubConsumerClient.from_connection_string` |[receive events](./samples/async_samples/recv_async.py), [send events](./samples/async_samples/send_async.py) |
| `EventHubClientAsync()` | `EventHubProducerClient()` or `EventHubConsumerClient()` | [using credential](./samples/async_samples/client_identity_authentication_async.py ) |
| `EventHubClientAsync.from_connection_string()` | `EventHubProducerClient.from_connection_string` or `EventHubConsumerClient.from_connection_string` |[client creation](./samples/async_samples/client_creation_async.py) |
| `EventProcessorHost()`| `EventHubConsumerClient(..., checkpoint_store)`| [receive events using checkpoint store](./samples/async_samples/recv_with_checkpoint_store_async.py) |

### Receiving events
Expand Down Expand Up @@ -59,7 +59,7 @@ For example, this code which keeps receiving from a partition in V1:

```python
client = EventHubClientAsync.from_connection_string(connection_str, eventhub=EVENTHUB_NAME)
receiver = client.add_async_receiver(consumer_group="$default", partition="0", offset=Offset('@latest'))
receiver = client.add_async_receiver(consumer_group="$Default", partition="0", offset=Offset('@latest'))
try:
await client.run_async()
logger = logging.getLogger("azure.eventhub")
Expand All @@ -79,7 +79,7 @@ async def on_event(partition_context, event):
logger.info("Message received:{}".format(event.body_as_str()))

client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR, consumer_group="$default", eventhub_name=EVENTHUB_NAME
conn_str=CONNECTION_STR, consumer_group="$Default", eventhub_name=EVENTHUB_NAME
)
async with client:
await client.receive(on_event=on_event, partition_id="0", starting_position="@latest")
Expand Down Expand Up @@ -172,7 +172,7 @@ USER = os.environ.get('EVENT_HUB_SAS_POLICY')
KEY = os.environ.get('EVENT_HUB_SAS_KEY')

# Eventhub config and storage manager
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$Default")
eh_options = EPHOptions()
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(
Expand Down Expand Up @@ -207,6 +207,7 @@ from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
logging.basicConfig(level=logging.INFO)
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name"
logger = logging.getLogger("azure.eventhub")

events_processed = defaultdict(int)
Expand All @@ -230,7 +231,7 @@ async def on_error(context, error):
logger.error("Receiving event has a non-partition error {!r}".format(error))

async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, "aStorageBlobContainerName")
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR,
consumer_group="$Default",
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp
- Send event data batch to a specific partition by partition id
- Send event data batch with customized properties

- [send.py](./sync_samples/send_stream.py) ([async version](./async_samples/send_stream_async.py)) - Examples to do streaming sending:
- [send_stream.py](./sync_samples/send_stream.py) ([async version](./async_samples/send_stream_async.py)) - Examples to do streaming sending:
- Send in a stream

- [recv.py](./sync_samples/recv.py) ([async version](./async_samples/recv_async.py)) - Examples to receive events:
Expand Down
Loading