Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 56 additions & 68 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,11 @@ connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

try:
event_list = []
for i in range(10):
event_list.append(EventData(b"A single event"))
with client:
client.send(event_list)
except:
raise
finally:
pass
event_list = []
for i in range(10):
event_list.append(EventData(b"A single event"))
with client:
client.send(event_list)
```

#### Send a batch of events
Expand All @@ -164,25 +159,20 @@ Events may be added to the `EventDataBatch` using the `try_add` method until the
```python
from azure.eventhub import EventHubProducerClient, EventData

try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

event_data_batch = client.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
event_data_batch = client.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.

with client:
client.send(event_data_batch)
except:
raise
finally:
pass
with client:
client.send(event_data_batch)
```

### Consume events from an Event Hub
Expand All @@ -202,15 +192,10 @@ logger = logging.getLogger("azure.eventhub")
def on_events(partition_context, events):
logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id))

try:
with client:
client.receive(on_events=on_events, consumer_group="$Default")
# receive events from specified partition:
# client.receive(on_events=on_events, consumer_group="$Default", partition_id='0')
except:
raise
finally:
pass
with client:
client.receive(on_events=on_events, consumer_group="$Default")
# receive events from specified partition:
# client.receive(on_events=on_events, consumer_group="$Default", partition_id='0')
```

### Async publish events to an Event Hub
Expand All @@ -219,54 +204,59 @@ Publish events to an Event Hub asynchronously.

#### Send a single event or an array of events
```python
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

try:
event_list = []
for i in range(10):
event_list.append(EventData(b"A single event"))

async with client:
await client.send(event_list) # Send a list of events
await client.send(EventData(b"A single event")) # Send a single event
except:
raise
finally:
pass
event_list = []
for i in range(10):
event_list.append(EventData(b"A single event"))

async def send():
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
async with client:
await client.send(event_list) # Send a list of events
await client.send(EventData(b"A single event")) # Send a single event

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(send())
```

#### Send a batch of events

Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method.
Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached.
```python
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
event_data_batch = await client.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
return event_data_batch

async def send():
client = EventHubProducerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
batch_data = await create_batch(client)
async with client:
await client.send(event_data_batch)
except:
raise
finally:
pass
await client.send(batch_data)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(send())
```

### Async consume events from an Event Hub
Expand All @@ -275,26 +265,27 @@ Consume events asynchronously from an EventHub.

```python
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)

logger = logging.getLogger("azure.eventhub")

async def on_events(partition_context, events):
logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id))

try:
async def receive():
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
async with client:
received = await client.receive(on_events=on_events, consumer_group='$Default')
# receive events from specified partition:
# received = await client.receive(on_events=on_events, consumer_group='$Default', partition_id='0')
except:
raise
finally:
pass

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(receive())
```

### Consume events using a partition manager
Expand Down Expand Up @@ -343,14 +334,12 @@ async def process_events(partition_context, events):
await asyncio.gather(*[do_operation(event) for event in events])
await partition_context.update_checkpoint(events[-1])


async def receive(client):
try:
await client.receive(on_events=process_events, consumer_group="$Default")
except KeyboardInterrupt:
await client.close()


async def main():
container_client = ContainerClient.from_connection_string(storage_connection_str, blob_name_str)
partition_manager = BlobPartitionManager(container_client)
Expand All @@ -362,7 +351,6 @@ async def main():
async with client:
await receive(client)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Expand Down Expand Up @@ -414,7 +402,7 @@ These are [more samples](https://github.com/Azure/azure-sdk-for-python/blob/mast

### Documentation

Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html.
Reference documentation is available at [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html).

### Logging

Expand Down