Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ def create_consumer(
:type operation: str
:param prefetch: The message prefetch count of the consumer. Default is 300.
:type prefetch: int
:type track_last_enqueued_event_properties: bool
:param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information
on the last enqueued event on its associated partition, and track that information as events are received.
When information about the partition's last enqueued event is being tracked, each event received from the
Event Hubs service will carry metadata about the partition. This results in a small amount of additional
network bandwidth consumption that is generally a favorable trade-off when considered against periodically
making requests for partition properties using the Event Hub client.
It is set to `False` by default.
:param loop: An event loop. If not specified the default event loop will be used.
:rtype: ~azure.eventhub.aio.consumer_async.EventHubConsumer

Expand All @@ -266,14 +274,16 @@ def create_consumer(
owner_level = kwargs.get("owner_level")
operation = kwargs.get("operation")
prefetch = kwargs.get("prefetch") or self._config.prefetch
track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False)
loop = kwargs.get("loop")

path = self._address.path + operation if operation else self._address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self._address.hostname, path, consumer_group, partition_id)
handler = EventHubConsumer(
self, source_url, event_position=event_position, owner_level=owner_level,
prefetch=prefetch, loop=loop)
prefetch=prefetch,
track_last_enqueued_event_properties=track_last_enqueued_event_properties, loop=loop)
return handler

def create_producer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import List
import time

from uamqp import errors, types # type: ignore
from uamqp import errors, types, utils # type: ignore
from uamqp import ReceiveClientAsync, Source # type: ignore

from azure.eventhub import EventData, EventPosition
Expand Down Expand Up @@ -36,6 +36,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan
_timeout = 0
_epoch_symbol = b'com.microsoft:epoch'
_timeout_symbol = b'com.microsoft:timeout'
_receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric'

def __init__( # pylint: disable=super-init-not-called
self, client, source, **kwargs):
Expand All @@ -55,13 +56,22 @@ def __init__( # pylint: disable=super-init-not-called
:param owner_level: The priority of the exclusive consumer. An exclusive
consumer will be created if owner_level is set.
:type owner_level: int
:type track_last_enqueued_event_properties: bool
:param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information
on the last enqueued event on its associated partition, and track that information as events are received.
When information about the partition's last enqueued event is being tracked, each event received from the
Event Hubs service will carry metadata about the partition. This results in a small amount of additional
network bandwidth consumption that is generally a favorable trade-off when considered against periodically
making requests for partition properties using the Event Hub client.
It is set to `False` by default.
:param loop: An event loop.
"""
event_position = kwargs.get("event_position", None)
prefetch = kwargs.get("prefetch", 300)
owner_level = kwargs.get("owner_level", None)
keep_alive = kwargs.get("keep_alive", None)
auto_reconnect = kwargs.get("auto_reconnect", True)
track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False)
loop = kwargs.get("loop", None)

super(EventHubConsumer, self).__init__()
Expand All @@ -88,6 +98,8 @@ def __init__( # pylint: disable=super-init-not-called
link_property_timeout_ms = (self._client._config.receive_timeout or self._timeout) * 1000 # pylint:disable=protected-access
self._link_properties[types.AMQPSymbol(self._timeout_symbol)] = types.AMQPLong(int(link_property_timeout_ms))
self._handler = None
self._track_last_enqueued_event_properties = track_last_enqueued_event_properties
self._last_enqueued_event_properties = {}

def __aiter__(self):
return self
Expand All @@ -105,6 +117,8 @@ async def __anext__(self):
event_data._trace_link_message() # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
if self._track_last_enqueued_event_properties:
self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access
return event_data
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
Expand All @@ -123,6 +137,12 @@ def _create_handler(self):
source = Source(self._source)
if self._offset is not None:
source.set_filter(self._offset._selector()) # pylint:disable=protected-access

desired_capabilities = None
if self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))

self._handler = ReceiveClientAsync(
source,
auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access
Expand All @@ -135,6 +155,7 @@ def _create_handler(self):
client_name=self._name,
properties=self._client._create_properties( # pylint:disable=protected-access
self._client._config.user_agent), # pylint:disable=protected-access
desired_capabilities=desired_capabilities, # pylint:disable=protected-access
loop=self._loop)
self._messages_iter = None

Expand Down Expand Up @@ -182,12 +203,31 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

if self._track_last_enqueued_event_properties and len(data_batch):
self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access

return data_batch

async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
return await self._do_retryable_operation(self._receive, timeout=timeout,
max_batch_size=max_batch_size, **kwargs)

@property
def last_enqueued_event_properties(self):
"""
The latest enqueued event information. This property will be updated each time an event is received when
the receiver is created with `track_last_enqueued_event_properties` being `True`.
The dict includes following information of the partition:

- `sequence_number`
- `offset`
- `enqueued_time`
- `retrieval_time`

:rtype: dict or None
"""
return self._last_enqueued_event_properties if self._track_last_enqueued_event_properties else None

@property
def queue_size(self):
# type: () -> int
Expand Down
12 changes: 11 additions & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs
:type operation: str
:param prefetch: The message prefetch count of the consumer. Default is 300.
:type prefetch: int
:type track_last_enqueued_event_properties: bool
:param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information
on the last enqueued event on its associated partition, and track that information as events are received.
When information about the partition's last enqueued event is being tracked, each event received from the
Event Hubs service will carry metadata about the partition. This results in a small amount of additional
network bandwidth consumption that is generally a favorable trade-off when considered against periodically
making requests for partition properties using the Event Hub client.
It is set to `False` by default.
:rtype: ~azure.eventhub.consumer.EventHubConsumer

Example:
Expand All @@ -263,13 +271,15 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs
owner_level = kwargs.get("owner_level")
operation = kwargs.get("operation")
prefetch = kwargs.get("prefetch") or self._config.prefetch
track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False)

path = self._address.path + operation if operation else self._address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self._address.hostname, path, consumer_group, partition_id)
handler = EventHubConsumer(
self, source_url, event_position=event_position, owner_level=owner_level,
prefetch=prefetch)
prefetch=prefetch,
track_last_enqueued_event_properties=track_last_enqueued_event_properties)
return handler

def create_producer(self, partition_id=None, operation=None, send_timeout=None):
Expand Down
25 changes: 22 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class EventData(object):
PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(PROP_PARTITION_KEY)
PROP_TIMESTAMP = b"x-opt-enqueued-time"
PROP_DEVICE_ID = b"iothub-connection-device-id"
PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number"
PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset"
PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc"
PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc"

def __init__(self, body=None, to_device=None):
"""
Expand All @@ -71,8 +75,10 @@ def __init__(self, body=None, to_device=None):
"""

self._annotations = {}
self._delivery_annotations = {}
self._app_properties = {}
self._msg_properties = MessageProperties()
self._runtime_info = {}
if to_device:
self._msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device)
if body and isinstance(body, list):
Expand Down Expand Up @@ -148,11 +154,24 @@ def _trace_link_message(self, parent_span=None):

@staticmethod
def _from_message(message):
# pylint:disable=protected-access
event_data = EventData(body='')
event_data.message = message
event_data._msg_properties = message.properties # pylint:disable=protected-access
event_data._annotations = message.annotations # pylint:disable=protected-access
event_data._app_properties = message.application_properties # pylint:disable=protected-access
event_data._msg_properties = message.properties
event_data._annotations = message.annotations
event_data._app_properties = message.application_properties
event_data._delivery_annotations = message.delivery_annotations
if event_data._delivery_annotations:
event_data._runtime_info = {
"sequence_number":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
"offset":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None),
"enqueued_time":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None),
"retrieval_time":
event_data._delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
}
return event_data

@property
Expand Down
46 changes: 43 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time
from typing import List

from uamqp import types, errors # type: ignore
from uamqp import types, errors, utils # type: ignore
from uamqp import ReceiveClient, Source # type: ignore

from azure.eventhub.common import EventData, EventPosition
Expand Down Expand Up @@ -38,6 +38,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan
_timeout = 0
_epoch_symbol = b'com.microsoft:epoch'
_timeout_symbol = b'com.microsoft:timeout'
_receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric'

def __init__(self, client, source, **kwargs):
"""
Expand All @@ -54,12 +55,21 @@ def __init__(self, client, source, **kwargs):
:param owner_level: The priority of the exclusive consumer. An exclusive
consumer will be created if owner_level is set.
:type owner_level: int
:type track_last_enqueued_event_properties: bool
:param track_last_enqueued_event_properties: Indicates whether or not the consumer should request information
on the last enqueued event on its associated partition, and track that information as events are received.
When information about the partition's last enqueued event is being tracked, each event received from the
Event Hubs service will carry metadata about the partition. This results in a small amount of additional
network bandwidth consumption that is generally a favorable trade-off when considered against periodically
making requests for partition properties using the Event Hub client.
It is set to `False` by default.
"""
event_position = kwargs.get("event_position", None)
prefetch = kwargs.get("prefetch", 300)
owner_level = kwargs.get("owner_level", None)
keep_alive = kwargs.get("keep_alive", None)
auto_reconnect = kwargs.get("auto_reconnect", True)
track_last_enqueued_event_properties = kwargs.get("track_last_enqueued_event_properties", False)

super(EventHubConsumer, self).__init__()
self._running = False
Expand All @@ -71,7 +81,7 @@ def __init__(self, client, source, **kwargs):
self._owner_level = owner_level
self._keep_alive = keep_alive
self._auto_reconnect = auto_reconnect
self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access
self._retry_policy = errors.ErrorPolicy(max_retries=self._client._config.max_retries, on_error=_error_handler) # pylint:disable=protected-access
self._reconnect_backoff = 1
self._link_properties = {}
self._redirected = None
Expand All @@ -84,6 +94,8 @@ def __init__(self, client, source, **kwargs):
link_property_timeout_ms = (self._client._config.receive_timeout or self._timeout) * 1000 # pylint:disable=protected-access
self._link_properties[types.AMQPSymbol(self._timeout_symbol)] = types.AMQPLong(int(link_property_timeout_ms))
self._handler = None
self._track_last_enqueued_event_properties = track_last_enqueued_event_properties
self._last_enqueued_event_properties = {}

def __iter__(self):
return self
Expand All @@ -101,6 +113,8 @@ def __next__(self):
event_data._trace_link_message() # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
if self._track_last_enqueued_event_properties:
self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access
return event_data
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
Expand All @@ -119,6 +133,12 @@ def _create_handler(self):
source = Source(self._source)
if self._offset is not None:
source.set_filter(self._offset._selector()) # pylint:disable=protected-access

desired_capabilities = None
if self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))

self._handler = ReceiveClient(
source,
auth=self._client._get_auth(**alt_creds), # pylint:disable=protected-access
Expand All @@ -130,7 +150,8 @@ def _create_handler(self):
keep_alive_interval=self._keep_alive,
client_name=self._name,
properties=self._client._create_properties( # pylint:disable=protected-access
self._client._config.user_agent)) # pylint:disable=protected-access
self._client._config.user_agent), # pylint:disable=protected-access
desired_capabilities=desired_capabilities) # pylint:disable=protected-access
self._messages_iter = None

def _redirect(self, redirect):
Expand Down Expand Up @@ -176,12 +197,31 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

if self._track_last_enqueued_event_properties and len(data_batch):
self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access

return data_batch

def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
return self._do_retryable_operation(self._receive, timeout=timeout,
max_batch_size=max_batch_size, **kwargs)

@property
def last_enqueued_event_properties(self):
"""
The latest enqueued event information. This property will be updated each time an event is received when
the receiver is created with `track_last_enqueued_event_properties` being `True`.
The dict includes following information of the partition:

- `sequence_number`
- `offset`
- `enqueued_time`
- `retrieval_time`

:rtype: dict or None
"""
return self._last_enqueued_event_properties if self._track_last_enqueued_event_properties else None

@property
def queue_size(self):
# type:() -> int
Expand Down
Loading