Skip to content

Commit cda14a3

Browse files
Addresses warnings surfaced during p5 doc and API-view generation. (Azure#13069)
* Addresses warnings surfaced during p5 doc and API-view generation. Removes localization from URIs, adds tons of type hints, normalizes return types, begins propogating class docstring into init docstring per guidelines. * Fix backticks that were breaking link formatting in markdown in README. * Add initial link for forever-receive refdocs inside readme. * Address mypy issues. * pylint fixes * Defensive ServiceBusErrors for internal state misalignment. * Adjust readme streaming URL to point to proper location. * make receiver a ReceivedMessage param via kwargs and an expressive error to deincentivise use, but still required. Co-authored-by: Adam Ling (MSFT) <[email protected]>
1 parent 63f8c00 commit cda14a3

24 files changed

+168
-79
lines changed

sdk/servicebus/azure-servicebus/README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,9 @@ with ServiceBusClient.from_connection_string(connstr) as client:
158158
159159
### Receive messages from a queue
160160

161-
To receive from a queue, you can either perform an ad-hoc receive via "receiver.receive_messages()" or receive persistently through the receiver itself.
161+
To receive from a queue, you can either perform an ad-hoc receive via `receiver.receive_messages()` or receive persistently through the receiver itself.
162162

163-
#### Receive messages from a queue through iterating over ServiceBusReceiver
163+
#### [Receive messages from a queue through iterating over ServiceBusReceiver][streaming_receive_reference]
164164

165165
```Python
166166
from azure.servicebus import ServiceBusClient
@@ -173,7 +173,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
173173
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
174174
# Default is None; to receive forever.
175175
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
176-
for msg in receiver: # ServiceBusReceiver instance is a generator
176+
for msg in receiver: # ServiceBusReceiver instance is a generator. This is equivilent to get_streaming_message_iter().
177177
print(str(msg))
178178
# If it is desired to halt receiving early, one can break out of the loop here safely.
179179
```
@@ -183,7 +183,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
183183
> See [AutoLockRenewer](#autolockrenew) for a helper to perform this in the background automatically.
184184
> Lock duration is set in Azure on the queue or topic itself.
185185
186-
#### [Receive messages from a queue through `ServiceBusReceiver.receive_messages()`][receive_reference]
186+
#### [Receive messages from a queue through ServiceBusReceiver.receive_messages()][receive_reference]
187187

188188
> **NOTE:** `ServiceBusReceiver.receive_messages()` receives a single or constrained list of messages through an ad-hoc method call, as opposed to receiving perpetually from the generator. It always returns a list.
189189
@@ -462,6 +462,7 @@ contact [[email protected]](mailto:[email protected]) with any additio
462462
[client_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.ServiceBusClient
463463
[send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=send_messages#azure.servicebus.ServiceBusSender.send_messages
464464
[receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusReceiver.receive_messages
465+
[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=get_streaming_message_iter#azure.servicebus.ServiceBusReceiver.get_streaming_message_iter
465466
[session_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusSessionReceiver.receive_messages
466467
[session_send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=session_id#azure.servicebus.Message.session_id
467468
[complete_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=complete#azure.servicebus.ReceivedMessage.complete

sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ class AutoLockRenew(object):
5353
"""
5454

5555
def __init__(self, executor=None, max_workers=None):
56+
# type: (ThreadPoolExecutor, int) -> None
57+
"""Auto renew locks for messages and sessions using a background thread pool.
58+
59+
:param executor: A user-specified thread pool. This cannot be combined with
60+
setting `max_workers`.
61+
:type executor: ~concurrent.futures.ThreadPoolExecutor
62+
:param max_workers: Specify the maximum workers in the thread pool. If not
63+
specified the number used will be derived from the core count of the environment.
64+
This cannot be combined with `executor`.
65+
:type max_workers: int
66+
"""
5667
self._executor = executor or ThreadPoolExecutor(max_workers=max_workers)
5768
self._shutdown = threading.Event()
5869
self._sleep_time = 1
@@ -109,16 +120,18 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=
109120
on_lock_renew_failure(renewable, error)
110121

111122
def register(self, renewable, timeout=300, on_lock_renew_failure=None):
123+
# type: (Union[ReceivedMessage, ServiceBusSession], float, Optional[LockRenewFailureCallback]) -> None
112124
"""Register a renewable entity for automatic lock renewal.
113125
114126
:param renewable: A locked entity that needs to be renewed.
115-
:type renewable: ~azure.servicebus.ReceivedMessage or
116-
~azure.servicebus.ServiceBusSession
117-
:param float timeout: A time in seconds that the lock should be maintained for.
118-
Default value is 300 (5 minutes).
119-
:param Optional[LockRenewFailureCallback] on_lock_renew_failure:
120-
A callback may be specified to be called when the lock is lost on the renewable that is being registered.
121-
Default value is None (no callback).
127+
:type renewable: Union[~azure.servicebus.ReceivedMessage, ~azure.servicebus.ServiceBusSession]
128+
:param timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes).
129+
:type timeout: float
130+
:param on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable
131+
that is being registered. Default value is None (no callback).
132+
:type on_lock_renew_failure: Optional[LockRenewFailureCallback]
133+
134+
:rtype: None
122135
"""
123136
if self._shutdown.is_set():
124137
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
@@ -131,6 +144,8 @@ def close(self, wait=True):
131144
132145
:param wait: Whether to block until thread pool has shutdown. Default is `True`.
133146
:type wait: bool
147+
148+
:rtype: None
134149
"""
135150
self._shutdown.set()
136151
self._executor.shutdown(wait=wait)

sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import uuid
1010
import functools
1111
import logging
12-
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable
12+
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Any
1313

1414
import uamqp.message
1515

@@ -52,7 +52,8 @@
5252
MessageLockExpired,
5353
SessionLockExpired,
5454
MessageSettleFailed,
55-
MessageContentTooLarge)
55+
MessageContentTooLarge,
56+
ServiceBusError)
5657
from .utils import utc_from_timestamp, utc_now, copy_messages_to_sendable_if_needed
5758
if TYPE_CHECKING:
5859
from .._servicebus_receiver import ServiceBusReceiver
@@ -65,7 +66,7 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta
6566
"""A Service Bus Message.
6667
6768
:param body: The data to send in a single message.
68-
:type body: str or bytes
69+
:type body: Union[str, bytes]
6970
7071
:keyword dict properties: The user defined properties on the message.
7172
:keyword str session_id: The session identifier of the message for a sessionful entity.
@@ -95,6 +96,7 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta
9596
"""
9697

9798
def __init__(self, body, **kwargs):
99+
# type: (Union[str, bytes], Any) -> None
98100
# Although we might normally thread through **kwargs this causes
99101
# problems as MessageProperties won't absorb spurious args.
100102
self._encoding = kwargs.pop("encoding", 'UTF-8')
@@ -491,7 +493,6 @@ class BatchMessage(object):
491493
:vartype message: ~uamqp.BatchMessage
492494
493495
:param int max_size_in_bytes: The maximum size of bytes data that a BatchMessage object can hold.
494-
495496
"""
496497
def __init__(self, max_size_in_bytes=None):
497498
# type: (Optional[int]) -> None
@@ -570,11 +571,11 @@ class PeekMessage(Message):
570571
This message is still on the queue, and unlocked.
571572
A peeked message cannot be completed, abandoned, dead-lettered or deferred.
572573
It has no lock token or expiry.
573-
574574
"""
575575

576576
def __init__(self, message):
577-
super(PeekMessage, self).__init__(None, message=message)
577+
# type: (uamqp.message.Message) -> None
578+
super(PeekMessage, self).__init__(None, message=message) # type: ignore
578579

579580
def _to_outgoing_message(self):
580581
# type: () -> Message
@@ -741,12 +742,17 @@ class ReceivedMessageBase(PeekMessage):
741742
"""
742743

743744
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
745+
# type: (uamqp.message.Message, ReceiveSettleMode, Any) -> None
744746
super(ReceivedMessageBase, self).__init__(message=message)
745747
self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete)
746748
self._received_timestamp_utc = utc_now()
747749
self._is_deferred_message = kwargs.get("is_deferred_message", False)
748-
self.auto_renew_error = None
749-
self._receiver = None # type: ignore
750+
self.auto_renew_error = None # type: Optional[Exception]
751+
try:
752+
self._receiver = kwargs.pop("receiver") # type: Union[ServiceBusReceiver, ServiceBusSessionReceiver]
753+
except KeyError:
754+
raise TypeError("ReceivedMessage requires a receiver to be initialized. This class should never be" + \
755+
"initialized by a user; the Message class should be utilized instead.")
750756
self._expiry = None
751757

752758
def _check_live(self, action):
@@ -769,6 +775,7 @@ def _check_live(self, action):
769775
def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None):
770776
# type: (str, Optional[str], Optional[str]) -> Callable
771777
# pylint: disable=protected-access
778+
772779
if settle_operation == MESSAGE_COMPLETE:
773780
return functools.partial(
774781
self._receiver._settle_message,
@@ -822,13 +829,14 @@ def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, d
822829
@property
823830
def _lock_expired(self):
824831
# type: () -> bool
832+
# pylint: disable=protected-access
825833
"""
826834
Whether the lock on the message has expired.
827835
828836
:rtype: bool
829837
"""
830838
try:
831-
if self._receiver.session: # pylint: disable=protected-access
839+
if self._receiver.session: # type: ignore
832840
raise TypeError("Session messages do not expire. Please use the Session expiry instead.")
833841
except AttributeError: # Is not a session receiver
834842
pass
@@ -859,6 +867,7 @@ def lock_token(self):
859867
@property
860868
def locked_until_utc(self):
861869
# type: () -> Optional[datetime.datetime]
870+
# pylint: disable=protected-access
862871
"""
863872
The UTC datetime until which the message will be locked in the queue/subscription.
864873
When the lock expires, delivery count of hte message is incremented and the message
@@ -867,7 +876,7 @@ def locked_until_utc(self):
867876
:rtype: datetime.datetime
868877
"""
869878
try:
870-
if self._settled or self._receiver.session: # pylint: disable=protected-access
879+
if self._settled or self._receiver.session: # type: ignore
871880
return None
872881
except AttributeError: # not settled, and isn't session receiver.
873882
pass
@@ -1021,6 +1030,7 @@ def defer(self):
10211030

10221031
def renew_lock(self):
10231032
# type: () -> None
1033+
# pylint: disable=protected-access,no-member
10241034
"""Renew the message lock.
10251035
10261036
This will maintain the lock on the message to ensure it is not returned to the queue
@@ -1041,7 +1051,7 @@ def renew_lock(self):
10411051
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
10421052
"""
10431053
try:
1044-
if self._receiver.session:
1054+
if self._receiver.session: # type: ignore
10451055
raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.")
10461056
except AttributeError:
10471057
pass
@@ -1050,5 +1060,5 @@ def renew_lock(self):
10501060
if not token:
10511061
raise ValueError("Unable to renew lock - no lock token found.")
10521062

1053-
expiry = self._receiver._renew_locks(token) # pylint: disable=protected-access,no-member
1063+
expiry = self._receiver._renew_locks(token) # type: ignore
10541064
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0)

sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,15 @@ def deferred_message_op(
6262
status_code,
6363
message,
6464
description,
65+
receiver,
6566
mode=ReceiveSettleMode.PeekLock,
6667
message_type=ReceivedMessage
6768
):
6869
if status_code == 200:
6970
parsed = []
7071
for m in message.get_data()[b'messages']:
7172
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
72-
parsed.append(message_type(wrapped, mode, is_deferred_message=True))
73+
parsed.append(message_type(wrapped, mode, is_deferred_message=True, receiver=receiver))
7374
return parsed
7475
if status_code in [202, 204]:
7576
return []

sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ def _populate_attributes(self, **kwargs):
5151
self._max_wait_time = kwargs.get("max_wait_time", None)
5252

5353
def _build_message(self, received, message_type=ReceivedMessage):
54-
message = message_type(message=received, mode=self._mode)
55-
message._receiver = self # pylint: disable=protected-access
54+
message = message_type(message=received, mode=self._mode, receiver=self)
5655
self._last_received_sequenced_number = message.sequence_number
5756
return message
5857

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **
430430
)
431431

432432
def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs):
433-
# type: (str, str, str, Any) -> ServiceBusReceiver
433+
# type: (str, str, str, Any) -> ServiceBusSessionReceiver
434434
"""Get ServiceBusReceiver for the specific subscription under the topic.
435435
436436
:param str topic_name: The name of specific Service Bus Topic the client connects to.

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
277277
)
278278

279279
def _renew_locks(self, *lock_tokens):
280-
# type: (*str) -> Any
280+
# type: (str) -> Any
281281
message = {MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)}
282282
return self._mgmt_request_response_with_retry(
283283
REQUEST_RESPONSE_RENEWLOCK_OPERATION,
@@ -286,15 +286,25 @@ def _renew_locks(self, *lock_tokens):
286286
)
287287

288288
def get_streaming_message_iter(self, max_wait_time=None):
289+
# type: (float) -> Iterator[ReceivedMessage]
289290
"""Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until
290291
such a timeout occurs.
291292
292-
:param float max_wait_time: Maximum time to wait in seconds for the next message to arrive.
293+
:param max_wait_time: Maximum time to wait in seconds for the next message to arrive.
293294
If no messages arrive, and no timeout is specified, this call will not return
294295
until the connection is closed. If specified, and no messages arrive for the
295296
timeout period, the iterator will stop.
297+
:type max_wait_time: float
298+
:rtype: Iterator[ReceivedMessage]
296299
297-
:rtype Iterator[ReceivedMessage]
300+
.. admonition:: Example:
301+
302+
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
303+
:start-after: [START receive_forever]
304+
:end-before: [END receive_forever]
305+
:language: python
306+
:dedent: 4
307+
:caption: Receive indefinitely from an iterator in streaming fashion.
298308
"""
299309
return self._iter_contextual_wrapper(max_wait_time)
300310

@@ -308,6 +318,7 @@ def from_connection_string(
308318
"""Create a ServiceBusReceiver from a connection string.
309319
310320
:param conn_str: The connection string of a Service Bus.
321+
:type conn_str: str
311322
:keyword str queue_name: The path of specific Service Bus Queue the client connects to.
312323
:keyword str topic_name: The path of specific Service Bus Topic which contains the Subscription
313324
the client connects to.
@@ -384,7 +395,8 @@ def receive_messages(self, max_batch_size=None, max_wait_time=None):
384395
If no messages arrive, and no timeout is specified, this call will not return
385396
until the connection is closed. If specified, an no messages arrive within the
386397
timeout period, an empty list will be returned.
387-
:rtype: list[~azure.servicebus.ReceivedMessage]
398+
399+
:rtype: List[~azure.servicebus.ReceivedMessage]
388400
389401
.. admonition:: Example:
390402
@@ -411,9 +423,9 @@ def receive_deferred_messages(self, sequence_numbers):
411423
When receiving deferred messages from a partitioned entity, all of the supplied
412424
sequence numbers must be messages from the same partition.
413425
414-
:param list[int] sequence_numbers: A list of the sequence numbers of messages that have been
426+
:param List[int] sequence_numbers: A list of the sequence numbers of messages that have been
415427
deferred.
416-
:rtype: list[~azure.servicebus.ReceivedMessage]
428+
:rtype: List[~azure.servicebus.ReceivedMessage]
417429
418430
.. admonition:: Example:
419431
@@ -440,14 +452,12 @@ def receive_deferred_messages(self, sequence_numbers):
440452

441453
self._populate_message_properties(message)
442454

443-
handler = functools.partial(mgmt_handlers.deferred_message_op, mode=self._mode)
455+
handler = functools.partial(mgmt_handlers.deferred_message_op, mode=self._mode, receiver=self)
444456
messages = self._mgmt_request_response_with_retry(
445457
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
446458
message,
447459
handler
448460
)
449-
for m in messages:
450-
m._receiver = self # pylint: disable=protected-access
451461
return messages
452462

453463
def peek_messages(self, message_count=1, sequence_number=None):
@@ -460,7 +470,8 @@ def peek_messages(self, message_count=1, sequence_number=None):
460470
:param int message_count: The maximum number of messages to try and peek. The default
461471
value is 1.
462472
:param int sequence_number: A message sequence number from which to start browsing messages.
463-
:rtype: list[~azure.servicebus.PeekMessage]
473+
474+
:rtype: List[~azure.servicebus.PeekMessage]
464475
465476
.. admonition:: Example:
466477

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,10 @@ def schedule_messages(self, messages, schedule_time_utc):
198198
"""Send Message or multiple Messages to be enqueued at a specific time.
199199
Returns a list of the sequence numbers of the enqueued messages.
200200
:param messages: The message or list of messages to schedule.
201-
:type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message]
201+
:type messages: Union[~azure.servicebus.Message, List[~azure.servicebus.Message]]
202202
:param schedule_time_utc: The utc date and time to enqueue the messages.
203203
:type schedule_time_utc: ~datetime.datetime
204-
:rtype: list[int]
204+
:rtype: List[int]
205205
206206
.. admonition:: Example:
207207
@@ -266,6 +266,7 @@ def from_connection_string(
266266
"""Create a ServiceBusSender from a connection string.
267267
268268
:param conn_str: The connection string of a Service Bus.
269+
:type conn_str: str
269270
:keyword str queue_name: The path of specific Service Bus Queue the client connects to.
270271
Only one of queue_name or topic_name can be provided.
271272
:keyword str topic_name: The path of specific Service Bus Topic the client connects to.
@@ -280,7 +281,8 @@ def from_connection_string(
280281
keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value).
281282
Additionally the following keys may also be present: `'username', 'password'`.
282283
:keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
283-
:rtype: ~azure.servicebus.ServiceBusSenderClient
284+
285+
:rtype: ~azure.servicebus.ServiceBusSender
284286
285287
.. admonition:: Example:
286288

0 commit comments

Comments
 (0)