diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 88b693d157ec..578d0f26c059 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -251,6 +251,14 @@ def create_consumer( :type operation: str :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int + :type track_last_enqueued_event_properties: bool + :param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. :param loop: An event loop. If not specified the default event loop will be used. :rtype: ~azure.eventhub.aio.consumer_async.EventHubConsumer @@ -266,6 +274,7 @@ def create_consumer( owner_level = kwargs.get("owner_level") operation = kwargs.get("operation") prefetch = kwargs.get("prefetch") or self._config.prefetch + track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) loop = kwargs.get("loop") path = self._address.path + operation if operation else self._address.path @@ -273,7 +282,8 @@ def create_consumer( self._address.hostname, path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, - prefetch=prefetch, loop=loop) + prefetch=prefetch, + track_last_enqueued_event_properties=track_last_enqueued_event_properties, loop=loop) return handler def create_producer( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 015b11190212..4ae4a3b33b72 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -8,7 +8,7 @@ from typing import List import time -from uamqp import errors, types # type: ignore +from uamqp import errors, types, utils # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore from azure.eventhub import EventData, EventPosition @@ -36,6 +36,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan _timeout = 0 _epoch_symbol = b'com.microsoft:epoch' _timeout_symbol = b'com.microsoft:timeout' + _receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric' def __init__( # pylint: disable=super-init-not-called self, client, source, **kwargs): @@ -55,6 +56,14 @@ def __init__( # pylint: disable=super-init-not-called :param owner_level: The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set. :type owner_level: int + :type track_last_enqueued_event_properties: bool + :param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. :param loop: An event loop. """ event_position = kwargs.get("event_position", None) @@ -62,6 +71,7 @@ def __init__( # pylint: disable=super-init-not-called owner_level = kwargs.get("owner_level", None) keep_alive = kwargs.get("keep_alive", None) auto_reconnect = kwargs.get("auto_reconnect", True) + track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) loop = kwargs.get("loop", None) super(EventHubConsumer, self).__init__() @@ -88,6 +98,8 @@ def __init__( # pylint: disable=super-init-not-called link_property_timeout_ms = (self._client._config.receive_timeout or self._timeout) * 1000 # pylint:disable=protected-access self._link_properties[types.AMQPSymbol(self._timeout_symbol)] = types.AMQPLong(int(link_property_timeout_ms)) self._handler = None + self._track_last_enqueued_event_properties = track_last_enqueued_event_properties + self._last_enqueued_event_properties = {} def __aiter__(self): return self @@ -105,6 +117,8 @@ async def __anext__(self): event_data._trace_link_message() # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 + if self._track_last_enqueued_event_properties: + self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access return event_data except Exception as exception: # pylint:disable=broad-except last_exception = await self._handle_exception(exception) @@ -123,6 +137,12 @@ def _create_handler(self): source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access + + desired_capabilities = None + if self._track_last_enqueued_event_properties: + symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] + desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array)) + self._handler = ReceiveClientAsync( source, auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access @@ -135,6 +155,7 @@ def _create_handler(self): client_name=self._name, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access + desired_capabilities=desired_capabilities, # pylint:disable=protected-access loop=self._loop) self._messages_iter = None @@ -182,12 +203,31 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): data_batch.append(event_data) event_data._trace_link_message() # pylint:disable=protected-access + if self._track_last_enqueued_event_properties and len(data_batch): + self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access + return data_batch async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): return await self._do_retryable_operation(self._receive, timeout=timeout, max_batch_size=max_batch_size, **kwargs) + @property + def last_enqueued_event_properties(self): + """ + The latest enqueued event information. This property will be updated each time an event is received when + the receiver is created with `track_last_enqueued_event_properties` being `True`. + The dict includes following information of the partition: + + - `sequence_number` + - `offset` + - `enqueued_time` + - `retrieval_time` + + :rtype: dict or None + """ + return self._last_enqueued_event_properties if self._track_last_enqueued_event_properties else None + @property def queue_size(self): # type: () -> int diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 06d264b5b9ac..b3ae901c3b55 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -249,6 +249,14 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs :type operation: str :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int + :type track_last_enqueued_event_properties: bool + :param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. :rtype: ~azure.eventhub.consumer.EventHubConsumer Example: @@ -263,13 +271,15 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs owner_level = kwargs.get("owner_level") operation = kwargs.get("operation") prefetch = kwargs.get("prefetch") or self._config.prefetch + track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) path = self._address.path + operation if operation else self._address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self._address.hostname, path, consumer_group, partition_id) handler = EventHubConsumer( self, source_url, event_position=event_position, owner_level=owner_level, - prefetch=prefetch) + prefetch=prefetch, + track_last_enqueued_event_properties=track_last_enqueued_event_properties) return handler def create_producer(self, partition_id=None, operation=None, send_timeout=None): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 3f2545829748..6a4ce46e75b9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -59,6 +59,10 @@ class EventData(object): PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(PROP_PARTITION_KEY) PROP_TIMESTAMP = b"x-opt-enqueued-time" PROP_DEVICE_ID = b"iothub-connection-device-id" + PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number" + PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset" + PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc" + PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc" def __init__(self, body=None, to_device=None): """ @@ -71,8 +75,10 @@ def __init__(self, body=None, to_device=None): """ self._annotations = {} + self._delivery_annotations = {} self._app_properties = {} self._msg_properties = MessageProperties() + self._runtime_info = {} if to_device: self._msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) if body and isinstance(body, list): @@ -148,11 +154,24 @@ def _trace_link_message(self, parent_span=None): @staticmethod def _from_message(message): + # pylint:disable=protected-access event_data = EventData(body='') event_data.message = message - event_data._msg_properties = message.properties # pylint:disable=protected-access - event_data._annotations = message.annotations # pylint:disable=protected-access - event_data._app_properties = message.application_properties # pylint:disable=protected-access + event_data._msg_properties = message.properties + event_data._annotations = message.annotations + event_data._app_properties = message.application_properties + event_data._delivery_annotations = message.delivery_annotations + if event_data._delivery_annotations: + event_data._runtime_info = { + "sequence_number": + event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None), + "offset": + event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None), + "enqueued_time": + event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None), + "retrieval_time": + event_data._delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None) + } return event_data @property diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index ff996a57747a..4c6f2eca58a9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -9,7 +9,7 @@ import time from typing import List -from uamqp import types, errors # type: ignore +from uamqp import types, errors, utils # type: ignore from uamqp import ReceiveClient, Source # type: ignore from azure.eventhub.common import EventData, EventPosition @@ -38,6 +38,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan _timeout = 0 _epoch_symbol = b'com.microsoft:epoch' _timeout_symbol = b'com.microsoft:timeout' + _receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric' def __init__(self, client, source, **kwargs): """ @@ -54,12 +55,21 @@ def __init__(self, client, source, **kwargs): :param owner_level: The priority of the exclusive consumer. An exclusive consumer will be created if owner_level is set. :type owner_level: int + :type track_last_enqueued_event_properties: bool + :param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. """ event_position = kwargs.get("event_position", None) prefetch = kwargs.get("prefetch", 300) owner_level = kwargs.get("owner_level", None) keep_alive = kwargs.get("keep_alive", None) auto_reconnect = kwargs.get("auto_reconnect", True) + track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False) super(EventHubConsumer, self).__init__() self._running = False @@ -71,7 +81,7 @@ def __init__(self, client, source, **kwargs): self._owner_level = owner_level self._keep_alive = keep_alive self._auto_reconnect = auto_reconnect - self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access + self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access self._reconnect_backoff = 1 self._link_properties = {} self._redirected = None @@ -84,6 +94,8 @@ def __init__(self, client, source, **kwargs): link_property_timeout_ms = (self._client._config.receive_timeout or self._timeout) * 1000 # pylint:disable=protected-access self._link_properties[types.AMQPSymbol(self._timeout_symbol)] = types.AMQPLong(int(link_property_timeout_ms)) self._handler = None + self._track_last_enqueued_event_properties = track_last_enqueued_event_properties + self._last_enqueued_event_properties = {} def __iter__(self): return self @@ -101,6 +113,8 @@ def __next__(self): event_data._trace_link_message() # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 + if self._track_last_enqueued_event_properties: + self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access return event_data except Exception as exception: # pylint:disable=broad-except last_exception = self._handle_exception(exception) @@ -119,6 +133,12 @@ def _create_handler(self): source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access + + desired_capabilities = None + if self._track_last_enqueued_event_properties: + symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)] + desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array)) + self._handler = ReceiveClient( source, auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access @@ -130,7 +150,8 @@ def _create_handler(self): keep_alive_interval=self._keep_alive, client_name=self._name, properties=self._client._create_properties( # pylint:disable=protected-access - self._client._config.user_agent)) # pylint:disable=protected-access + self._client._config.user_agent), # pylint:disable=protected-access + desired_capabilities=desired_capabilities) # pylint:disable=protected-access self._messages_iter = None def _redirect(self, redirect): @@ -176,12 +197,31 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): data_batch.append(event_data) event_data._trace_link_message() # pylint:disable=protected-access + if self._track_last_enqueued_event_properties and len(data_batch): + self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access + return data_batch def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): return self._do_retryable_operation(self._receive, timeout=timeout, max_batch_size=max_batch_size, **kwargs) + @property + def last_enqueued_event_properties(self): + """ + The latest enqueued event information. This property will be updated each time an event is received when + the receiver is created with `track_last_enqueued_event_properties` being `True`. + The dict includes following information of the partition: + + - `sequence_number` + - `offset` + - `enqueued_time` + - `retrieval_time` + + :rtype: dict or None + """ + return self._last_enqueued_event_properties if self._track_last_enqueued_event_properties else None + @property def queue_size(self): # type:() -> int diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py new file mode 100644 index 000000000000..53d2e626a7f5 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_track_last_enqueued_event_info_async.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show running concurrent consumers. +""" + +import os +import time +import asyncio + +from azure.eventhub.aio import EventHubClient +from azure.eventhub import EventPosition, EventHubSharedKeyCredential + +HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net +EVENT_HUB = os.environ['EVENT_HUB_NAME'] +USER = os.environ['EVENT_HUB_SAS_POLICY'] +KEY = os.environ['EVENT_HUB_SAS_KEY'] + +EVENT_POSITION = EventPosition("-1") + + +async def pump(client, partition): + consumer = client.create_consumer(consumer_group="$default", partition_id=partition, event_position=EVENT_POSITION, + prefetch=5, track_last_enqueued_event_properties=True) + async with consumer: + total = 0 + start_time = time.time() + for event_data in await consumer.receive(timeout=10): + last_offset = event_data.offset + last_sn = event_data.sequence_number + print("Received: {}, {}".format(last_offset, last_sn)) + total += 1 + end_time = time.time() + run_time = end_time - start_time + print("Consumer last enqueued event properties: {}.".format(consumer.last_enqueued_event_properties)) + print("Received {} messages in {} seconds".format(total, run_time)) + + +loop = asyncio.get_event_loop() +client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), + network_tracing=False) +tasks = [asyncio.ensure_future(pump(client, "0"))] +loop.run_until_complete(asyncio.wait(tasks)) diff --git a/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py b/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py new file mode 100644 index 000000000000..576ef19089e6 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/examples/recv_track_last_enqueued_event_info.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show receiving events from an Event Hub partition. +""" +import os +import time +from azure.eventhub import EventHubClient, EventPosition, EventHubSharedKeyCredential + +HOSTNAME = os.environ['EVENT_HUB_HOSTNAME'] # .servicebus.windows.net +EVENT_HUB = os.environ['EVENT_HUB_NAME'] + +USER = os.environ['EVENT_HUB_SAS_POLICY'] +KEY = os.environ['EVENT_HUB_SAS_KEY'] + +EVENT_POSITION = EventPosition("-1") +PARTITION = "0" + + +total = 0 +last_sn = -1 +last_offset = "-1" +client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), + network_tracing=False) + +consumer = client.create_consumer(consumer_group="$default", partition_id=PARTITION, + event_position=EVENT_POSITION, prefetch=5000, + track_last_enqueued_event_properties=True) +with consumer: + start_time = time.time() + batch = consumer.receive(timeout=5) + for event_data in batch: + last_offset = event_data.offset + last_sn = event_data.sequence_number + print("Received: {}, {}".format(last_offset, last_sn)) + print(event_data.body_as_str()) + total += 1 + batch = consumer.receive(timeout=5) + print("Consumer last enqueued event properties: {}.".format(consumer.last_enqueued_event_properties)) + print("Received {} messages in {} seconds".format(total, time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py index 2a2e4836c2d5..a9744f259c8c 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py @@ -314,3 +314,34 @@ async def test_receive_over_websocket_async(connstr_senders): received = await receiver.receive(max_batch_size=50, timeout=5) assert len(received) == 20 + + +@pytest.mark.asyncio +@pytest.mark.liveTest +async def test_receive_run_time_metric_async(connstr_senders): + connection_str, senders = connstr_senders + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, + network_tracing=False) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", + event_position=EventPosition('@latest'), prefetch=500, + track_last_enqueued_event_properties=True) + + event_list = [] + for i in range(20): + event_list.append(EventData("Event Number {}".format(i))) + + async with receiver: + received = await receiver.receive(timeout=5) + assert len(received) == 0 + + senders[0].send(event_list) + + await asyncio.sleep(1) + + received = await receiver.receive(max_batch_size=50, timeout=5) + assert len(received) == 20 + assert receiver.last_enqueued_event_properties + assert receiver.last_enqueued_event_properties.get('sequence_number', None) + assert receiver.last_enqueued_event_properties.get('offset', None) + assert receiver.last_enqueued_event_properties.get('enqueued_time', None) + assert receiver.last_enqueued_event_properties.get('retrieval_time', None) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_receive.py index d241a8e6e585..c9da841f71b2 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_receive.py @@ -272,3 +272,33 @@ def test_receive_over_websocket_sync(connstr_senders): received = receiver.receive(max_batch_size=50, timeout=5) assert len(received) == 20 + + +@pytest.mark.liveTest +def test_receive_run_time_metric(connstr_senders): + connection_str, senders = connstr_senders + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, + network_tracing=False) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", + event_position=EventPosition('@latest'), prefetch=500, + track_last_enqueued_event_properties=True) + + event_list = [] + for i in range(20): + event_list.append(EventData("Event Number {}".format(i))) + + with receiver: + received = receiver.receive(timeout=5) + assert len(received) == 0 + + senders[0].send(event_list) + + time.sleep(1) + + received = receiver.receive(max_batch_size=50, timeout=5) + assert len(received) == 20 + assert receiver.last_enqueued_event_properties + assert receiver.last_enqueued_event_properties.get('sequence_number', None) + assert receiver.last_enqueued_event_properties.get('offset', None) + assert receiver.last_enqueued_event_properties.get('enqueued_time', None) + assert receiver.last_enqueued_event_properties.get('retrieval_time', None)