diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index acd20181845f..5cc8df6941a0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -35,6 +35,7 @@ class EventHubConsumer(ConsumerProducerMixin): """ timeout = 0 _epoch = b'com.microsoft:epoch' + _timeout = b'com.microsoft:timeout' def __init__( # pylint: disable=super-init-not-called self, client, source, event_position=None, prefetch=300, owner_level=None, @@ -72,11 +73,13 @@ def __init__( # pylint: disable=super-init-not-called self.reconnect_backoff = 1 self.redirected = None self.error = None - self.properties = None + self._link_properties = {} partition = self.source.split('/')[-1] self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition) if owner_level: - self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(owner_level))} + self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level)) + link_property_timeout_ms = (self.client.config.receive_timeout or self.timeout) * 1000 + self._link_properties[types.AMQPSymbol(self._timeout)] = types.AMQPLong(int(link_property_timeout_ms)) self._handler = None def __aiter__(self): @@ -110,7 +113,7 @@ def _create_handler(self): auth=self.client.get_auth(**alt_creds), debug=self.client.config.network_tracing, prefetch=self.prefetch, - link_properties=self.properties, + link_properties=self._link_properties, timeout=self.timeout, error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 9541fcc0efa2..16ddb97a36b9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -8,7 +8,7 @@ from typing import Iterable, Union import time -from uamqp import constants, errors +from uamqp import types, constants, errors from uamqp import SendClientAsync from azure.eventhub.common import EventData, _BatchSendEventData @@ -28,6 +28,7 @@ class EventHubProducer(ConsumerProducerMixin): to a partition. """ + _timeout = b'com.microsoft:timeout' def __init__( # pylint: disable=super-init-not-called self, client, target, partition=None, send_timeout=60, @@ -75,6 +76,7 @@ def __init__( # pylint: disable=super-init-not-called self._handler = None self._outcome = None self._condition = None + self._link_properties = {types.AMQPSymbol(self._timeout): types.AMQPLong(int(self.timeout * 1000))} def _create_handler(self): self._handler = SendClientAsync( @@ -85,6 +87,7 @@ def _create_handler(self): error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, + link_properties=self._link_properties, properties=self.client._create_properties( self.client.config.user_agent), # pylint: disable=protected-access loop=self.loop) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 95cdeedd43aa..53368dc292b0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -35,6 +35,7 @@ class EventHubConsumer(ConsumerProducerMixin): """ timeout = 0 _epoch = b'com.microsoft:epoch' + _timeout = b'com.microsoft:timeout' def __init__(self, client, source, event_position=None, prefetch=300, owner_level=None, keep_alive=None, auto_reconnect=True): @@ -65,13 +66,15 @@ def __init__(self, client, source, event_position=None, prefetch=300, owner_leve self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=self.client.config.max_retries, on_error=_error_handler) self.reconnect_backoff = 1 - self.properties = None + self._link_properties = {} self.redirected = None self.error = None partition = self.source.split('/')[-1] self.name = "EHConsumer-{}-partition{}".format(uuid.uuid4(), partition) if owner_level: - self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(owner_level))} + self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level)) + link_property_timeout_ms = (self.client.config.receive_timeout or self.timeout) * 1000 + self._link_properties[types.AMQPSymbol(self._timeout)] = types.AMQPLong(int(link_property_timeout_ms)) self._handler = None def __iter__(self): @@ -105,7 +108,7 @@ def _create_handler(self): auth=self.client.get_auth(**alt_creds), debug=self.client.config.network_tracing, prefetch=self.prefetch, - link_properties=self.properties, + link_properties=self._link_properties, timeout=self.timeout, error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 9ea370bc6aef..d85a965381ad 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -9,7 +9,7 @@ import time from typing import Iterable, Union -from uamqp import constants, errors +from uamqp import types, constants, errors from uamqp import compat from uamqp import SendClient @@ -40,6 +40,7 @@ class EventHubProducer(ConsumerProducerMixin): to a partition. """ + _timeout = b'com.microsoft:timeout' def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True): """ @@ -83,6 +84,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N self._handler = None self._outcome = None self._condition = None + self._link_properties = {types.AMQPSymbol(self._timeout): types.AMQPLong(int(self.timeout * 1000))} def _create_handler(self): self._handler = SendClient( @@ -93,6 +95,7 @@ def _create_handler(self): error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, + link_properties=self._link_properties, properties=self.client._create_properties(self.client.config.user_agent)) # pylint: disable=protected-access def _open(self, timeout_time=None):