Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 1.0.0 (2020-01-06)
## 1.0.0 (2020-01-13)
Stable release. No new features or API changes.

## 1.0.0b6 (2019-12-04)
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub-checkpointstoreblob/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 1.0.0 (2020-01-06)
## 1.0.0 (2020-01-13)
Stable release. No new features or API changes.

## 1.0.0b6 (2019-12-04)
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 5.0.0 (2020-01-06)
## 5.0.0 (2020-01-13)

**Breaking changes**

Expand Down
17 changes: 14 additions & 3 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,13 @@ logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
partition_context.update_checkpoint(event)

with client:
client.receive(on_event=on_event)
client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')
```
Expand Down Expand Up @@ -236,11 +240,15 @@ logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
await partition_context.update_checkpoint(event)

async def receive():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
async with client:
await client.receive(on_event=on_event)
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# await client.receive(on_event=on_event, partition_id='0')

Expand Down Expand Up @@ -290,7 +298,10 @@ async def on_event(partition_context, event):

async def receive(client):
try:
await client.receive(on_event=on_event)
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
await client.close()

Expand Down
24 changes: 14 additions & 10 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ class EventHubConsumerClient(ClientBase):
The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with
load-balancing and checkpointing.

When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same
checkpointing location, they will balance automatically.
When multiple `EventHubConsumerClient`s are running against the same event hub, consumer group and checkpointing
location, the partitions will be evenly distributed among them.

To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the
`EventHubConsumerClient`.
If a checkpoint store is not provided, the checkpoint will be maintained internally in memory.

An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()`
and specify the partition_id. Load-balancing won't work in single-partition receiving mode.
and specify the partition_id.
Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store
is set.

:param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace.
The namespace format is: `<yournamespace>.servicebus.windows.net`.
Expand All @@ -65,9 +68,9 @@ class EventHubConsumerClient(ClientBase):
The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and
new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume
receiving.
:keyword float idle_timeout: Timeout in seconds, after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no further activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
Expand Down Expand Up @@ -177,9 +180,9 @@ def from_connection_string(cls, conn_str, consumer_group, **kwargs):
information. The failed internal partition consumer will be closed (`on_partition_close` will be called
if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if
provided) to resume receiving.
:keyword float idle_timeout: Timeout in seconds after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no furthur activity. By default the value is None, meaning that the client will not shutdown due
to inactivity unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
Expand Down Expand Up @@ -235,8 +238,9 @@ def receive(self, on_event, **kwargs):
:keyword starting_position: Start receiving from this event position
if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a
a dict with partition ID as the key and position as the value for individual partitions, or a single
value for all partitions. The value type can be str, int, datetime.datetime. Also supported are the
value for all partitions. The value type can be str, int or datetime.datetime. Also supported are the
values "-1" for receiving from the beginning of the stream, and "@latest" for receiving only new events.
Default value is "@latest".
:paramtype starting_position: str, int, datetime.datetime or dict[str,Any]
:keyword starting_position_inclusive: Determine whether the given starting_position is inclusive(>=) or
not (>). True for inclusive and False for exclusive. This can be a dict with partition ID as the key and
Expand Down
12 changes: 6 additions & 6 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class EventHubProducerClient(ClientBase):
:keyword str user_agent: The user agent that should be appended to the built-in user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default
value is 3.
:keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity
unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
Expand Down Expand Up @@ -162,9 +162,9 @@ def from_connection_string(cls, conn_str, **kwargs):
:keyword str user_agent: The user agent that should be appended to the built-in user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ class EventHubConsumerClient(ClientBaseAsync):
The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with
load-balancing and checkpointing.

When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same
checkpointing location, they will balance automatically.
When multiple `EventHubConsumerClients are running against the same event hub, consumer group and checkpointing
location, the partitions will be evenly distributed among them.

To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the
`EventHubConsumerClient`.
If a checkpoint store is not provided, the checkpoint will be maintained internally in memory.

An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()`
and specify the partition_id. Load-balancing won't work in single-partition receiving mode.
and specify the partition_id.
Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store
is set.

:param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace.
The namespace format is: `<yournamespace>.servicebus.windows.net`.
Expand All @@ -71,9 +74,9 @@ class EventHubConsumerClient(ClientBaseAsync):
The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and
new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume
receiving.
:keyword float idle_timeout: Timeout in seconds after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no further activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
Expand Down Expand Up @@ -196,9 +199,9 @@ def from_connection_string(
information. The failed internal partition consumer will be closed (`on_partition_close` will be called
if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if
provided) to resume receiving.
:keyword float idle_timeout: Timeout in seconds after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no further activity. By default the value is None, meaning that the client will not shutdown due
to inactivity unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
Expand Down Expand Up @@ -285,7 +288,7 @@ async def receive(
:keyword starting_position: Start receiving from this event position
if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a
a dict with partition ID as the key and position as the value for individual partitions, or a single
value for all partitions. The value type can be str, int, datetime.datetime. Also supported are the
value for all partitions. The value type can be str, int or datetime.datetime. Also supported are the
values "-1" for receiving from the beginning of the stream, and "@latest" for receiving only new events.
:paramtype starting_position: str, int, datetime.datetime or dict[str,Any]
:keyword starting_position_inclusive: Determine whether the given starting_position is inclusive(>=) or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class EventHubProducerClient(ClientBaseAsync):
:keyword str user_agent: The user agent that should be appended to the built-in user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default
value is 3.
:keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity
unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
Expand Down Expand Up @@ -179,9 +179,9 @@ def from_connection_string(
:keyword str user_agent: The user agent that should be appended to the built-in user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close
if there is no further activity. By default the value is None, meaning that the service determines when to
close an idle connection.
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType
Expand Down
5 changes: 3 additions & 2 deletions sdk/eventhub/azure-eventhub/migration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Becomes this in V5:
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
logger.info("Message received:{}".format(event.body_as_str()))
await partition_context.update_checkpoint(event)

client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR, consumer_group="$Default", eventhub_name=EVENTHUB_NAME
Expand Down Expand Up @@ -215,8 +216,7 @@ async def on_event(partition_context, event):
partition_id = partition_context.partition_id
events_processed[partition_id] += 1
logger.info("Partition id {}, Events processed {}".format(partition_id, events_processed[partition_id]))
if events_processed[partition_id] % 10 == 0:
await partition_context.update_checkpoint(event)
await partition_context.update_checkpoint(event)

async def on_partition_initialize(context):
logger.info("Partition {} initialized".format(context.partition_id))
Expand All @@ -243,6 +243,7 @@ async def main():
on_error=on_error, # optional
on_partition_initialize=on_partition_initialize, # optional
on_partition_close=on_partition_close, # optional
starting_position="-1", # "-1" is from the beginning of the partition.
)

if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
print("Received event from partition: {}.".format(partition_context.partition_id))
await partition_context.update_checkpoint(event)


async def on_partition_initialize(partition_context):
Expand Down Expand Up @@ -58,7 +59,8 @@ async def main():
on_event=on_event,
on_error=on_error,
on_partition_close=on_partition_close,
on_partition_initialize=on_partition_initialize
on_partition_initialize=on_partition_initialize,
starting_position="-1", # "-1" is from the beginning of the partition.
)

if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
async def on_event(partition_context, event):
# Put your code here.
print("Received event from partition: {}.".format(partition_context.partition_id))
await partition_context.update_checkpoint(event)


async def on_partition_initialize(partition_context):
Expand Down Expand Up @@ -57,16 +58,17 @@ async def main():

print('Consumer will keep receiving for {} seconds, start time is {}.'.format(RECEIVE_DURATION, time.time()))

task = asyncio.ensure_future(
client.receive(
on_event=on_event,
on_error=on_error,
on_partition_close=on_partition_close,
on_partition_initialize=on_partition_initialize
async with client:
task = asyncio.ensure_future(
client.receive(
on_event=on_event,
on_error=on_error,
on_partition_close=on_partition_close,
on_partition_initialize=on_partition_initialize,
starting_position="-1", # "-1" is from the beginning of the partition.
)
)
)
await asyncio.sleep(RECEIVE_DURATION)
await client.close()
await asyncio.sleep(RECEIVE_DURATION)
await task

print('Consumer has stopped receiving, end time is {}.'.format(time.time()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async def on_event(partition_context, event):
partition_context.partition_id,
partition_context.last_enqueued_event_properties)
)
await partition_context.update_checkpoint(event)


async def main():
Expand All @@ -39,7 +40,8 @@ async def main():
await client.receive(
on_event=on_event,
partition_id='0',
track_last_enqueued_event_properties=True
track_last_enqueued_event_properties=True,
starting_position="-1", # "-1" is from the beginning of the partition.
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ async def receive(client):
a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances
which also try to receive events from all partitions and use the same storage resource.
"""
await client.receive(on_event=on_event)
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# With specified partition_id, load-balance will be disabled, for example:
# await client.receive(on_event=on_event, partition_id = '0'))
# await client.receive(on_event=on_event, partition_id='0'))


async def main():
Expand Down
Loading