Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
consumer_client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path)
consumer_client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

```

Expand Down Expand Up @@ -128,7 +128,7 @@ from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
partition_ids = client.get_partition_ids()
```

Expand All @@ -143,7 +143,7 @@ from azure.eventhub import EventHubProducerClient, EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

try:
event_list = []
Expand All @@ -167,7 +167,7 @@ from azure.eventhub import EventHubProducerClient, EventData
try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

event_data_batch = client.create_batch(max_size=10000)
can_add = True
Expand Down Expand Up @@ -195,7 +195,7 @@ from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

logger = logging.getLogger("azure.eventhub")

Expand Down Expand Up @@ -224,7 +224,7 @@ from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

try:
event_list = []
Expand All @@ -251,7 +251,7 @@ from azure.eventhub import EventData
try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

event_data_batch = await client.create_batch(max_size=10000)
can_add = True
Comment thread
annatisch marked this conversation as resolved.
Expand Down Expand Up @@ -279,7 +279,7 @@ from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path)
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

logger = logging.getLogger("azure.eventhub")

Expand Down Expand Up @@ -330,28 +330,42 @@ from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManage
RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
blob_name_str = '<<STRING FOR THE BLOB NAME>>'

async def do_operation(event):
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
print(event)


async def process_events(partition_context, events):
await asyncio.gather(*[do_operation(event) for event in events])
await partition_context.update_checkpoint(events[-1])

if __name__ == '__main__':
loop = asyncio.get_event_loop()
container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str)
partition_manager = BlobPartitionManager(container_client=container_client)
client = EventHubConsumerClient.from_connection_string(connection_str, partition_manager=partition_manager, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL)

async def receive(client):
try:
loop.run_until_complete(client.receive(process_events, "$default"))
await client.receive(on_events=process_events, consumer_group="$Default")
except KeyboardInterrupt:
loop.run_until_complete(client.close())
finally:
loop.stop()
await client.close()


async def main():
container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str)
partition_manager = BlobPartitionManager(container_client)
client = EventHubConsumerClient.from_connection_string(
connection_str,
event_hub_path=event_hub_path,
partition_manager=partition_manager, # For load balancing and checkpoint. Leave None for no load balancing
)
async with client:
await receive(client)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```

### Use EventHubConsumerClient to work with IoT Hub
Expand Down Expand Up @@ -393,10 +407,10 @@ For instance, this error is raised if you try to send an EventData that is alrea

These are [more samples](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples) in our repo demonstrating the usage of the library.

- [./samples/sync_samples/send.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/send.py) - use EventHubProducerClient to publish events
- [./samples/sync_samples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/recv.py) - use EventHubConsumerClient to consume events
- [./samples/async_examples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py) - async/await support of a EventHubProducerClient
- [./samples/async_examples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py) - async/await support of a EventHubConsumerClient
- [./samples/sync_samples/send.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/sync_samples/send.py) - use EventHubProducerClient to publish events
- [./samples/sync_samples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/sync_samples/recv.py) - use EventHubConsumerClient to consume events
- [./samples/async_samples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/async_samples/send_async.py) - async/await support of a EventHubProducerClient
- [./samples/async_samples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/samples/async_samples/recv_async.py) - async/await support of a EventHubConsumerClient

### Documentation
Comment thread
yunhaoling marked this conversation as resolved.

Expand Down