Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
# do something
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.

async def main():
Expand All @@ -89,10 +89,8 @@ async def main():
checkpoint_store=checkpoint_store,
)

try:
async with client:
await client.receive(on_event)
except KeyboardInterrupt:
await client.close()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
Expand Down
10 changes: 4 additions & 6 deletions sdk/eventhub/azure-eventhub-checkpointstoreblob/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'


def process_events(partition_context, event):
# do something
def on_event(partition_context, event):
# Put your code here.
partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.

def main():
Expand All @@ -88,10 +88,8 @@ def main():
checkpoint_store=checkpoint_store,
)

try:
client.receive(process_events)
except KeyboardInterrupt:
client.close()
with client:
client.receive(on_event)

if __name__ == '__main__':
main()
Expand Down
15 changes: 6 additions & 9 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()

host = '<< HOSTNAME OF THE EVENT HUB >>'
fully_qualified_namespace = '<< HOSTNAME OF THE EVENT HUB >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
consumer_group = '<< CONSUMER GROUP >>'
consumer_client = EventHubConsumerClient(host, eventhub_name, consumer_group, credential)
consumer_client = EventHubConsumerClient(fully_qualified_namespace, eventhub_name, consumer_group, credential)

```

Expand Down Expand Up @@ -297,13 +297,10 @@ async def on_event(partition_context, event):
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.

async def receive(client):
try:
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
await client.close()
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)

async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
Expand Down