diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/HISTORY.md b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/HISTORY.md index 2060fa27d6d2..8e929a30f01f 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/HISTORY.md +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/HISTORY.md @@ -1,6 +1,6 @@ # Release History -## 1.0.0 (2020-01-06) +## 1.0.0 (2020-01-13) Stable release. No new features or API changes. ## 1.0.0b6 (2019-12-04) diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob/HISTORY.md b/sdk/eventhub/azure-eventhub-checkpointstoreblob/HISTORY.md index c30c2fba7f39..2498f3200d83 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob/HISTORY.md +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob/HISTORY.md @@ -1,6 +1,6 @@ # Release History -## 1.0.0 (2020-01-06) +## 1.0.0 (2020-01-13) Stable release. No new features or API changes. ## 1.0.0b6 (2019-12-04) diff --git a/sdk/eventhub/azure-eventhub/HISTORY.md b/sdk/eventhub/azure-eventhub/HISTORY.md index 24a57beb3878..6e48b02ca198 100644 --- a/sdk/eventhub/azure-eventhub/HISTORY.md +++ b/sdk/eventhub/azure-eventhub/HISTORY.md @@ -1,6 +1,6 @@ # Release History -## 5.0.0 (2020-01-06) +## 5.0.0 (2020-01-13) **Breaking changes** diff --git a/sdk/eventhub/azure-eventhub/README.md b/sdk/eventhub/azure-eventhub/README.md index ccc29e28f113..0cd12f1f2eaa 100644 --- a/sdk/eventhub/azure-eventhub/README.md +++ b/sdk/eventhub/azure-eventhub/README.md @@ -176,9 +176,13 @@ logging.basicConfig(level=logging.INFO) def on_event(partition_context, event): logger.info("Received event from partition {}".format(partition_context.partition_id)) + partition_context.update_checkpoint(event) with client: - client.receive(on_event=on_event) + client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # receive events from specified partition: # client.receive(on_event=on_event, partition_id='0') ``` @@ -236,11 +240,15 @@ logging.basicConfig(level=logging.INFO) async def on_event(partition_context, event): logger.info("Received event from partition {}".format(partition_context.partition_id)) + await partition_context.update_checkpoint(event) async def receive(): client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name) async with client: - await client.receive(on_event=on_event) + await client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # receive events from specified partition: # await client.receive(on_event=on_event, partition_id='0') @@ -290,7 +298,10 @@ async def on_event(partition_context, event): async def receive(client): try: - await client.receive(on_event=on_event) + await client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) except KeyboardInterrupt: await client.close() diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py index ec850ce1589f..c350ca81703f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py @@ -36,14 +36,17 @@ class EventHubConsumerClient(ClientBase): The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with load-balancing and checkpointing. - When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same - checkpointing location, they will balance automatically. + When multiple `EventHubConsumerClient`s are running against the same event hub, consumer group and checkpointing + location, the partitions will be evenly distributed among them. + To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the `EventHubConsumerClient`. If a checkpoint store is not provided, the checkpoint will be maintained internally in memory. An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()` - and specify the partition_id. Load-balancing won't work in single-partition receiving mode. + and specify the partition_id. + Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store + is set. :param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace. The namespace format is: `.servicebus.windows.net`. @@ -65,9 +68,9 @@ class EventHubConsumerClient(ClientBase): The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume receiving. - :keyword float idle_timeout: Timeout in seconds, after which the underlying connection will close - if there is no further activity. By default the value is None, meaning that the service determines when to - close an idle connection. + :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection + if there is no further activity. By default the value is None, meaning that the client will not shutdown due to + inactivity unless initiated by the service. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType @@ -177,9 +180,9 @@ def from_connection_string(cls, conn_str, consumer_group, **kwargs): information. The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume receiving. - :keyword float idle_timeout: Timeout in seconds after which the underlying connection will close - if there is no further activity. By default the value is None, meaning that the service determines when to - close an idle connection. + :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection + if there is no furthur activity. By default the value is None, meaning that the client will not shutdown due + to inactivity unless initiated by the service. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType @@ -235,8 +238,9 @@ def receive(self, on_event, **kwargs): :keyword starting_position: Start receiving from this event position if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a a dict with partition ID as the key and position as the value for individual partitions, or a single - value for all partitions. The value type can be str, int, datetime.datetime. Also supported are the + value for all partitions. The value type can be str, int or datetime.datetime. Also supported are the values "-1" for receiving from the beginning of the stream, and "@latest" for receiving only new events. + Default value is "@latest". :paramtype starting_position: str, int, datetime.datetime or dict[str,Any] :keyword starting_position_inclusive: Determine whether the given starting_position is inclusive(>=) or not (>). True for inclusive and False for exclusive. This can be a dict with partition ID as the key and diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 6343b85aef25..0a30fba39a18 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -37,9 +37,9 @@ class EventHubProducerClient(ClientBase): :keyword str user_agent: The user agent that should be appended to the built-in user agent string. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. - :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close - if there is no further activity. By default the value is None, meaning that the service determines when to - close an idle connection. + :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection + if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity + unless initiated by the service. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType @@ -162,9 +162,9 @@ def from_connection_string(cls, conn_str, **kwargs): :keyword str user_agent: The user agent that should be appended to the built-in user agent string. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. - :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close - if there is no further activity. By default the value is None, meaning that the service determines when to - close an idle connection. + :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection + if there is no activity. By default the value is None, meaning that the client will not shutdown due to + inactivity unless initiated by the service. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py index 80fb544d578e..f433c54f4a4d 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py @@ -42,14 +42,17 @@ class EventHubConsumerClient(ClientBaseAsync): The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with load-balancing and checkpointing. - When multiple `EventHubConsumerClient` operate within one or more processes or machines targeting the same - checkpointing location, they will balance automatically. + When multiple `EventHubConsumerClients are running against the same event hub, consumer group and checkpointing + location, the partitions will be evenly distributed among them. + To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the `EventHubConsumerClient`. If a checkpoint store is not provided, the checkpoint will be maintained internally in memory. An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()` - and specify the partition_id. Load-balancing won't work in single-partition receiving mode. + and specify the partition_id. + Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store + is set. :param str fully_qualified_namespace: The fully qualified host name for the Event Hubs namespace. The namespace format is: `.servicebus.windows.net`. @@ -71,9 +74,9 @@ class EventHubConsumerClient(ClientBaseAsync): The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume receiving. - :keyword float idle_timeout: Timeout in seconds after which the underlying connection will close - if there is no further activity. By default the value is None, meaning that the service determines when to - close an idle connection. + :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection + if there is no further activity. By default the value is None, meaning that the client will not shutdown due to + inactivity unless initiated by the service. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType @@ -196,9 +199,9 @@ def from_connection_string( information. The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume receiving. - :keyword float idle_timeout: Timeout in seconds after which the underlying connection will close - if there is no further activity. By default the value is None, meaning that the service determines when to - close an idle connection. + :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection + if there is no further activity. By default the value is None, meaning that the client will not shutdown due + to inactivity unless initiated by the service. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType @@ -285,7 +288,7 @@ async def receive( :keyword starting_position: Start receiving from this event position if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a a dict with partition ID as the key and position as the value for individual partitions, or a single - value for all partitions. The value type can be str, int, datetime.datetime. Also supported are the + value for all partitions. The value type can be str, int or datetime.datetime. Also supported are the values "-1" for receiving from the beginning of the stream, and "@latest" for receiving only new events. :paramtype starting_position: str, int, datetime.datetime or dict[str,Any] :keyword starting_position_inclusive: Determine whether the given starting_position is inclusive(>=) or diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index ffe60685bef0..9d69d52ea18f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -39,9 +39,9 @@ class EventHubProducerClient(ClientBaseAsync): :keyword str user_agent: The user agent that should be appended to the built-in user agent string. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. - :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close - if there is no further activity. By default the value is None, meaning that the service determines when to - close an idle connection. + :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection + if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity + unless initiated by the service. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType @@ -179,9 +179,9 @@ def from_connection_string( :keyword str user_agent: The user agent that should be appended to the built-in user agent string. :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default value is 3. - :keyword float idle_timeout: Timeout, in seconds, after which the underlying connection will close - if there is no further activity. By default the value is None, meaning that the service determines when to - close an idle connection. + :keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection + if there is no activity. By default the value is None, meaning that the client will not shutdown due to + inactivity unless initiated by the service. :keyword transport_type: The type of transport protocol that will be used for communicating with the Event Hubs service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.eventhub.TransportType diff --git a/sdk/eventhub/azure-eventhub/migration_guide.md b/sdk/eventhub/azure-eventhub/migration_guide.md index a6d10e0c1b47..d8b9c379999c 100644 --- a/sdk/eventhub/azure-eventhub/migration_guide.md +++ b/sdk/eventhub/azure-eventhub/migration_guide.md @@ -77,6 +77,7 @@ Becomes this in V5: logger = logging.getLogger("azure.eventhub") async def on_event(partition_context, event): logger.info("Message received:{}".format(event.body_as_str())) + await partition_context.update_checkpoint(event) client = EventHubConsumerClient.from_connection_string( conn_str=CONNECTION_STR, consumer_group="$Default", eventhub_name=EVENTHUB_NAME @@ -215,8 +216,7 @@ async def on_event(partition_context, event): partition_id = partition_context.partition_id events_processed[partition_id] += 1 logger.info("Partition id {}, Events processed {}".format(partition_id, events_processed[partition_id])) - if events_processed[partition_id] % 10 == 0: - await partition_context.update_checkpoint(event) + await partition_context.update_checkpoint(event) async def on_partition_initialize(context): logger.info("Partition {} initialized".format(context.partition_id)) @@ -243,6 +243,7 @@ async def main(): on_error=on_error, # optional on_partition_initialize=on_partition_initialize, # optional on_partition_close=on_partition_close, # optional + starting_position="-1", # "-1" is from the beginning of the partition. ) if __name__ == '__main__': diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_async.py index 99db62ce0278..bfb850f91c77 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_async.py @@ -21,6 +21,7 @@ async def on_event(partition_context, event): # Put your code here. # If the operation is i/o intensive, async will have better performance. print("Received event from partition: {}.".format(partition_context.partition_id)) + await partition_context.update_checkpoint(event) async def on_partition_initialize(partition_context): @@ -58,7 +59,8 @@ async def main(): on_event=on_event, on_error=on_error, on_partition_close=on_partition_close, - on_partition_initialize=on_partition_initialize + on_partition_initialize=on_partition_initialize, + starting_position="-1", # "-1" is from the beginning of the partition. ) if __name__ == '__main__': diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_for_period_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_for_period_async.py index 2e7060a4062d..63e0f8de3eff 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_for_period_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_for_period_async.py @@ -22,6 +22,7 @@ async def on_event(partition_context, event): # Put your code here. print("Received event from partition: {}.".format(partition_context.partition_id)) + await partition_context.update_checkpoint(event) async def on_partition_initialize(partition_context): @@ -57,16 +58,17 @@ async def main(): print('Consumer will keep receiving for {} seconds, start time is {}.'.format(RECEIVE_DURATION, time.time())) - task = asyncio.ensure_future( - client.receive( - on_event=on_event, - on_error=on_error, - on_partition_close=on_partition_close, - on_partition_initialize=on_partition_initialize + async with client: + task = asyncio.ensure_future( + client.receive( + on_event=on_event, + on_error=on_error, + on_partition_close=on_partition_close, + on_partition_initialize=on_partition_initialize, + starting_position="-1", # "-1" is from the beginning of the partition. + ) ) - ) - await asyncio.sleep(RECEIVE_DURATION) - await client.close() + await asyncio.sleep(RECEIVE_DURATION) await task print('Consumer has stopped receiving, end time is {}.'.format(time.time())) diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_track_last_enqueued_event_prop_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_track_last_enqueued_event_prop_async.py index c84d89879d65..2d66895f6b7a 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_track_last_enqueued_event_prop_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_track_last_enqueued_event_prop_async.py @@ -27,6 +27,7 @@ async def on_event(partition_context, event): partition_context.partition_id, partition_context.last_enqueued_event_properties) ) + await partition_context.update_checkpoint(event) async def main(): @@ -39,7 +40,8 @@ async def main(): await client.receive( on_event=on_event, partition_id='0', - track_last_enqueued_event_properties=True + track_last_enqueued_event_properties=True, + starting_position="-1", # "-1" is from the beginning of the partition. ) diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_by_batch_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_by_batch_async.py index 68674c306b7d..b8590fa929b4 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_by_batch_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_by_batch_async.py @@ -42,9 +42,12 @@ async def receive(client): a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances which also try to receive events from all partitions and use the same storage resource. """ - await client.receive(on_event=on_event) + await client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # With specified partition_id, load-balance will be disabled, for example: - # await client.receive(on_event=on_event, partition_id = '0')) + # await client.receive(on_event=on_event, partition_id='0')) async def main(): diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_by_time_interval_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_by_time_interval_async.py index a44e6fc7b522..e674c5d4b07a 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_by_time_interval_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_by_time_interval_async.py @@ -46,9 +46,12 @@ async def receive(client): a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances which also try to receive events from all partitions and use the same storage resource. """ - await client.receive(on_event=on_event) + await client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # With specified partition_id, load-balance will be disabled, for example: - # await client.receive(on_event=on_event, partition_id = '0')) + # await client.receive(on_event=on_event, partition_id='0')) async def main(): diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_store_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_store_async.py index 07e80731cc1a..37be2bf4e14e 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_store_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_checkpoint_store_async.py @@ -34,9 +34,12 @@ async def receive(client): a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances which also try to receive events from all partitions and use the same storage resource. """ - await client.receive(on_event=on_event) + await client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # With specified partition_id, load-balance will be disabled, for example: - # await client.receive(on_event=on_event, partition_id = '0')) + # await client.receive(on_event=on_event, partition_id='0')) async def main(): diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_custom_starting_position_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_custom_starting_position_async.py index 6f430038c0d1..1965fbdd3d64 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_custom_starting_position_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/recv_with_custom_starting_position_async.py @@ -44,6 +44,7 @@ async def on_error(partition_context, error): async def on_event(partition_context, event): # Put your code here. print("Received event: {} from partition: {}.".format(event.body_as_str(), partition_context.partition_id)) + await partition_context.update_checkpoint(event) async def main(): diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/sample_code_eventhub_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/sample_code_eventhub_async.py index 83b29fa96476..3a083e686fd3 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/sample_code_eventhub_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/sample_code_eventhub_async.py @@ -113,10 +113,13 @@ async def example_eventhub_async_send_and_receive(): async def on_event(partition_context, event): logger.info("Received event from partition: {}".format(partition_context.partition_id)) - # Do asnchronous ops on received events + # Do some asynchronous ops on received event async with consumer: - await consumer.receive(on_event=on_event) + await consumer.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # [END eventhub_consumer_client_receive_async] finally: pass @@ -169,7 +172,7 @@ async def example_eventhub_async_consumer_receive_and_close(): async def on_event(partition_context, event): logger.info("Received event from partition: {}".format(partition_context.partition_id)) - # Do asynchronous ops on the received event + # Do some asynchronous ops on the received event # The receive method is a coroutine which will be blocking when awaited. # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method. diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv.py index 75ba2e2ff85e..c7c1adb430cd 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv.py @@ -58,7 +58,8 @@ def on_error(partition_context, error): on_event=on_event, on_partition_initialize=on_partition_initialize, on_partition_close=on_partition_close, - on_error=on_error + on_error=on_error, + starting_position="-1", # "-1" is from the beginning of the partition. ) except KeyboardInterrupt: print('Stopped receiving.') diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_for_period.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_for_period.py index 842604c6d68e..720d6bb17b3e 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_for_period.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_for_period.py @@ -63,7 +63,8 @@ def on_error(partition_context, error): "on_event": on_event, "on_partition_initialize": on_partition_initialize, "on_partition_close": on_partition_close, - "on_error": on_error + "on_error": on_error, + "starting_position": "-1", # "-1" is from the beginning of the partition. }, daemon=True ) diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_track_last_enqueued_event_prop.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_track_last_enqueued_event_prop.py index 0d7baf502bb9..01822dad5bb2 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_track_last_enqueued_event_prop.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_track_last_enqueued_event_prop.py @@ -42,7 +42,8 @@ def on_event(partition_context, event): consumer_client.receive( on_event=on_event, partition_id='0', - track_last_enqueued_event_properties=True + track_last_enqueued_event_properties=True, + starting_position="-1", # "-1" is from the beginning of the partition. ) except KeyboardInterrupt: print('Stopped receiving.') diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_batch.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_batch.py index 67f161a252c6..db6f1bb91cf1 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_batch.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_batch.py @@ -51,7 +51,10 @@ def on_event(partition_context, event): with a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances which also try to receive events from all partitions and use the same storage resource. """ - consumer_client.receive(on_event=on_event) + consumer_client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # With specified partition_id, load-balance will be disabled, for example: # client.receive(on_event=on_event, partition_id='0') except KeyboardInterrupt: diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_time_interval.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_time_interval.py index e6850b2778e4..ade41b18317b 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_time_interval.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_by_time_interval.py @@ -54,7 +54,10 @@ def on_event(partition_context, event): with a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances which also try to receive events from all partitions and use the same storage resource. """ - consumer_client.receive(on_event=on_event) + consumer_client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # With specified partition_id, load-balance will be disabled, for example: # client.receive(on_event=on_event, partition_id='0') except KeyboardInterrupt: diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_store.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_store.py index 92fd671b3288..fefca29cd558 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_store.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/recv_with_checkpoint_store.py @@ -43,7 +43,10 @@ def on_event(partition_context, event): with a checkpoint store, the client will load-balance partition assignment with other EventHubConsumerClient instances which also try to receive events from all partitions and use the same storage resource. """ - consumer_client.receive(on_event=on_event) + consumer_client.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) # With specified partition_id, load-balance will be disabled, for example: # client.receive(on_event=on_event, partition_id='0') except KeyboardInterrupt: diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/sample_code_eventhub.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/sample_code_eventhub.py index 0558ca769d7f..e0a273553658 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/sample_code_eventhub.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/sample_code_eventhub.py @@ -129,7 +129,7 @@ def example_eventhub_sync_send_and_receive(): def on_event(partition_context, event): logger.info("Received event from partition: {}".format(partition_context.partition_id)) - # Do ops on received events + # Do ops on the received event with consumer: consumer.receive(on_event=on_event) @@ -187,14 +187,17 @@ def example_eventhub_consumer_receive_and_close(): def on_event(partition_context, event): logger.info("Received event from partition: {}".format(partition_context.partition_id)) - # Do ops on the received event + # Do some ops on the received event # The 'receive' method is a blocking call, it can be executed in a thread for # non-blocking behavior, and combined with the 'close' method. worker = threading.Thread( target=consumer.receive, - kwargs={"on_event": on_event} + kwargs={ + "on_event": on_event, + "starting_position": "-1", # "-1" is from the beginning of the partition. + } ) worker.start() time.sleep(10) # Keep receiving for 10s then close.