Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
cb13bd8
Increment version
Feb 14, 2020
38e9f65
Update Development Status
Feb 14, 2020
aa30bc0
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 15, 2020
c4710ea
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 20, 2020
053f073
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 24, 2020
f0697f7
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 6, 2020
c83241a
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 6, 2020
18c6fba
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 6, 2020
607f134
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 9, 2020
88fd7d1
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 9, 2020
fcbaf65
Remove typing.Deque for Py3.5.3
Mar 9, 2020
f52e2d9
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 10, 2020
8aaaf1e
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 11, 2020
a12351c
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 25, 2020
cc9dbb9
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 27, 2020
932336a
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 30, 2020
2742ca0
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Mar 31, 2020
72ab463
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 2, 2020
213bb9d
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 3, 2020
b312d19
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 6, 2020
0638b9a
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 6, 2020
c7bd430
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 7, 2020
de47f42
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 7, 2020
2fcc005
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 15, 2020
cfa9d81
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 20, 2020
ac224c1
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 23, 2020
a2bad49
Fix class method super() call
Apr 24, 2020
a91692a
Exception hierarchy enhancement
Apr 24, 2020
a79d045
Remove exception type InvalidHandlerState
Apr 24, 2020
f8af228
Small fix
Apr 25, 2020
3d6e289
rename _is_live to _check_live
Apr 25, 2020
c0c2a0c
rename _can_run to _check_session
Apr 25, 2020
e6c28f8
update async sender
Apr 27, 2020
1305495
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 27, 2020
8473965
Merge branch 'master' into sb_exp
Apr 27, 2020
ed7038c
Code Review Feedback
Apr 28, 2020
6230837
Small fix
Apr 28, 2020
f19019b
Small fix
Apr 28, 2020
cd82467
Add back _check_live to ServiceBusReceiver
Apr 28, 2020
94ef6b9
Fix __anext__ _check_live
Apr 28, 2020
cdae1a3
Add MessageContentTooLarge and MessageAlreadySettledError
Apr 29, 2020
2183d32
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 29, 2020
5ebb499
Raise MessageContentTooLarge when uamqp raises MessageException with …
Apr 29, 2020
5fdef4c
Raise MessageContentTooLarge in client side validation
Apr 29, 2020
e233d8c
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Apr 29, 2020
abebb4e
Merge branch 'master' into sb_exp
Apr 30, 2020
b3ba4fa
Small fix
Apr 30, 2020
e5c91d4
Small fix
Apr 30, 2020
ff7776c
Revert to ValueError in create_batch
Apr 30, 2020
b409bb6
Update change log
Apr 30, 2020
467aa16
Fix test code
Apr 30, 2020
8116ed2
Import correct MessageContentTooLarge
Apr 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
* Added method `get_topic_sender` in `ServiceBusClient` to get a `ServiceBusSender` for a topic.
* Added method `get_subscription_receiver` in `ServiceBusClient` to get a `ServiceBusReceiver` for a subscription under specific topic.
* `ServiceBusSender.send()` can now send a list of messages in one call, if they fit into a single batch. If they do not fit a `ValueError` is thrown.
* `BatchMessage.add()` and `ServiceBusSender.send()` raises `MessageContentTooLarge`, which is a subclass of `ValueError` if the content is over-sized.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick:

I think we could add detail information -- is a subclass of Value in the next point so making this line:

BatchMessage.add() and ServiceBusSender.send() would raise MessageContentTooLarge if the content is over-sized.

* `ServiceBusReceiver.receive()` raises `ValueError` if the max_batch_size is greater than the prefetch of `ServiceBusClient`.
* Added exception classes `MessageError`, `MessageContentTooLarge`, `ServiceBusAuthenticationError`.
Copy link
Contributor

@yunhaoling yunhaoling Apr 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably add some description here?

  • Added exception classes MessageError, MessageContentTooLarge, ServiceBusAuthenticationError.
    • MessageError: general message error type?
    • MessageContentTooLarge: message over-size, subclass of ValueError
    • ServiceBusAuthenticationError: fail to be authenticated by the service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change log update will be in a separate PR.

* Removed exception class `InvalidHandlerState`.

**BugFixes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from ._common._configuration import Configuration
from .exceptions import (
InvalidHandlerState,
ServiceBusError,
ServiceBusAuthorizationError,
_create_servicebus_exception
Expand Down Expand Up @@ -236,9 +235,6 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):

def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs):
self._open()
if not self._running:
raise InvalidHandlerState("Client connection is closed.")

application_properties = {}
# Some mgmt calls do not support an associated link name (such as list_sessions). Most do, so on by default.
if keep_alive_associated_link:
Expand Down
20 changes: 10 additions & 10 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
MessageAlreadySettled,
MessageLockExpired,
SessionLockExpired,
MessageSettleFailed
)
MessageSettleFailed,
MessageContentTooLarge)
from .utils import utc_from_timestamp, utc_now

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -340,8 +340,8 @@ def add(self, message):
)

if size_after_add > self.max_size_in_bytes:
raise ValueError(
"EventDataBatch has reached its size limit: {}".format(
raise MessageContentTooLarge(
"BatchMessage has reached its size limit: {}".format(
self.max_size_in_bytes
)
)
Expand Down Expand Up @@ -453,7 +453,7 @@ def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None

def _is_live(self, action):
def _check_live(self, action):
# pylint: disable=no-member
if not self._receiver or not self._receiver._running: # pylint: disable=protected-access
raise MessageSettleFailed(action, "Orphan message had no open connection.")
Expand Down Expand Up @@ -611,7 +611,7 @@ def complete(self):
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
self._check_live(MESSAGE_COMPLETE)
self._settle_message(MESSAGE_COMPLETE)
self._settled = True

Expand All @@ -632,7 +632,7 @@ def dead_letter(self, reason=None, description=None):
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)
self._check_live(MESSAGE_DEAD_LETTER)

details = {
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
Expand All @@ -654,7 +654,7 @@ def abandon(self):
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
self._check_live(MESSAGE_ABANDON)
self._settle_message(MESSAGE_ABANDON)
self._settled = True

Expand All @@ -671,7 +671,7 @@ def defer(self):
:raises: ~azure.servicebus.exceptions.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
self._is_live(MESSAGE_DEFER)
self._check_live(MESSAGE_DEFER)
self._settle_message(MESSAGE_DEFER)
self._settled = True

Expand All @@ -696,7 +696,7 @@ def renew_lock(self):
raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.")
except AttributeError:
pass
self._is_live(MESSAGE_RENEW_LOCK)
self._check_live(MESSAGE_RENEW_LOCK)
token = self.lock_token
if not token:
raise ValueError("Unable to renew lock - no lock token found.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def _build_message(self, received, message_type=ReceivedMessage):
self._last_received_sequenced_number = message.sequence_number
return message

def _can_run(self):
return True
def _check_live(self):
"""check whether the receiver is alive"""

def _get_source(self):
return self._entity_uri
Expand Down Expand Up @@ -78,7 +78,7 @@ def _on_attach(self, source, target, properties, error): # pylint: disable=unus
self._session_id = session_filter.decode(self._config.encoding)
self._session._session_id = self._session_id

def _can_run(self):
def _check_live(self):
if self._session and self._session.expired:
raise SessionLockExpired(inner_exception=self._session.auto_renew_error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __iter__(self):
return self

def __next__(self):
self._can_run()
self._check_live()
while True:
try:
return self._do_retryable_operation(self._iter_next)
Expand Down Expand Up @@ -305,7 +305,10 @@ def receive(self, max_batch_size=None, max_wait_time=None):
:caption: Receive messages from ServiceBus.

"""
self._can_run()
self._check_live()
if max_batch_size and self._config.prefetch < max_batch_size:
raise ValueError("max_batch_size should be less than or equal to prefetch of ServiceBusReceiver, or you "
"could set a larger prefetch value when you're constructing the ServiceBusReceiver.")
return self._do_retryable_operation(
self._receive,
max_batch_size=max_batch_size,
Expand Down Expand Up @@ -334,7 +337,7 @@ def receive_deferred_messages(self, sequence_numbers):
:caption: Receive deferred messages from ServiceBus.

"""
self._can_run()
self._check_live()
if not sequence_numbers:
raise ValueError("At least one sequence number must be specified.")
self._open()
Expand Down Expand Up @@ -381,7 +384,7 @@ def peek(self, message_count=1, sequence_number=None):
:caption: Look at pending messages in the queue.

"""
self._can_run()
self._check_live()
if not sequence_number:
sequence_number = self._last_received_sequenced_number or 1
if int(message_count) < 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
from ._common import mgmt_handlers
from ._common.message import Message, BatchMessage
from .exceptions import (
MessageSendFailed,
OperationTimeoutError,
_ServiceBusErrorPolicy
)
_ServiceBusErrorPolicy,
)
from ._common.utils import create_authentication
from ._common.constants import (
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
Expand Down Expand Up @@ -183,10 +182,7 @@ def _open(self):
def _send(self, message, timeout=None, last_exception=None):
self._open()
self._set_msg_timeout(timeout, last_exception)
try:
self._handler.send_message(message.message)
except Exception as e:
raise MessageSendFailed(e)
self._handler.send_message(message.message)

def _schedule(self, message, schedule_time_utc):
# type: (Union[Message, BatchMessage], datetime.datetime) -> List[int]
Expand Down Expand Up @@ -319,8 +315,10 @@ def send(self, message):
batch = self.create_batch()
batch._from_list(message)
message = batch
except TypeError: # Message was not a list or generator.
except TypeError: # Message was not a list or generator.
pass
if isinstance(message, BatchMessage) and len(message) == 0:
raise ValueError("A BatchMessage or list of Message must have at least one Message")

self._do_retryable_operation(
self._send,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, session_id, receiver, encoding="UTF-8"):
self._locked_until_utc = None
self.auto_renew_error = None

def _can_run(self):
def _check_live(self):
if self.expired:
raise SessionLockExpired(inner_exception=self.auto_renew_error)

Expand All @@ -74,7 +74,7 @@ def get_session_state(self):
:dedent: 4
:caption: Get the session state
"""
self._can_run()
self._check_live()
response = self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access
REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION,
{MGMT_REQUEST_SESSION_ID: self.session_id},
Expand All @@ -101,7 +101,7 @@ def set_session_state(self, state):
:dedent: 4
:caption: Set the session state
"""
self._can_run()
self._check_live()
state = state.encode(self._encoding) if isinstance(state, six.text_type) else state
return self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access
REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION,
Expand All @@ -128,7 +128,7 @@ def renew_lock(self):
:dedent: 4
:caption: Renew the session lock before it expires
"""
self._can_run()
self._check_live()
expiry = self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access
REQUEST_RESPONSE_RENEW_SESSION_LOCK_OPERATION,
{MGMT_REQUEST_SESSION_ID: self.session_id},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ def from_connection_string(
:caption: Create a new instance of the ServiceBusReceiver from connection string.

"""
return super(ServiceBusSessionReceiver, self).from_connection_string(conn_str, **kwargs)
return super(ServiceBusSessionReceiver, cls).from_connection_string(conn_str, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def complete(self):
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
self._check_live(MESSAGE_COMPLETE)
await self._settle_message(MESSAGE_COMPLETE)
self._settled = True

Expand All @@ -88,7 +88,7 @@ async def dead_letter(self, reason=None, description=None):
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)
self._check_live(MESSAGE_DEAD_LETTER)
await self._settle_message(MESSAGE_DEAD_LETTER)
self._settled = True

Expand All @@ -102,7 +102,7 @@ async def abandon(self):
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
self._check_live(MESSAGE_ABANDON)
await self._settle_message(MESSAGE_ABANDON)
self._settled = True

Expand All @@ -116,7 +116,7 @@ async def defer(self):
:raises: ~azure.servicebus.exceptions.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEFER)
self._check_live(MESSAGE_DEFER)
await self._settle_message(MESSAGE_DEFER)
self._settled = True

Expand All @@ -142,7 +142,7 @@ async def renew_lock(self):
raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.")
except AttributeError:
pass
self._is_live(MESSAGE_RENEW_LOCK)
self._check_live(MESSAGE_RENEW_LOCK)
token = self.lock_token
if not token:
raise ValueError("Unable to renew lock - no lock token found.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
ASSOCIATEDLINKPROPERTYNAME
)
from ..exceptions import (
InvalidHandlerState,
ServiceBusError,
_create_servicebus_exception
)
Expand Down Expand Up @@ -139,8 +138,6 @@ async def _do_retryable_operation(self, operation, timeout=None, **kwargs):

async def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs):
await self._open()
if not self._running:
raise InvalidHandlerState("Client connection is closed.")

application_properties = {}
# Some mgmt calls do not support an associated link name (such as list_sessions). Most do, so on by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def __init__(
self._connection = kwargs.get("connection")

async def __anext__(self):
self._can_run()
self._check_live()
while True:
try:
return await self._do_retryable_operation(self._iter_next)
Expand Down Expand Up @@ -170,7 +170,7 @@ async def _open(self):
await asyncio.sleep(0.05)
self._running = True
except:
self.close()
await self.close()
raise

async def _receive(self, max_batch_size=None, timeout=None):
Expand Down Expand Up @@ -295,7 +295,10 @@ async def receive(self, max_batch_size=None, max_wait_time=None):
:caption: Receive messages from ServiceBus.

"""
self._can_run()
self._check_live()
if max_batch_size and self._config.prefetch < max_batch_size:
raise ValueError("max_batch_size should be less than or equal to prefetch of ServiceBusReceiver, or you "
"could set a larger prefetch value when you're constructing the ServiceBusReceiver.")
return await self._do_retryable_operation(
self._receive,
max_batch_size=max_batch_size,
Expand Down Expand Up @@ -324,7 +327,7 @@ async def receive_deferred_messages(self, sequence_numbers):
:caption: Receive deferred messages from ServiceBus.

"""
self._can_run()
self._check_live()
if not sequence_numbers:
raise ValueError("At least one sequence number must be specified.")
await self._open()
Expand Down Expand Up @@ -369,7 +372,7 @@ async def peek(self, message_count=1, sequence_number=0):
:dedent: 4
:caption: Peek messages in the queue.
"""
self._can_run()
self._check_live()
if not sequence_number:
sequence_number = self._last_received_sequenced_number or 1
if int(message_count) < 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
from .._common.message import Message, BatchMessage
from .._servicebus_sender import SenderMixin
from ._base_handler_async import BaseHandlerAsync
from ..exceptions import (
MessageSendFailed
)
from .._common.constants import (
REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION,
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
Expand Down Expand Up @@ -128,16 +125,13 @@ async def _open(self):
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or uamqp.constants.MAX_MESSAGE_LENGTH_BYTES
except:
self.close()
await self.close()
raise

async def _send(self, message, timeout=None, last_exception=None):
await self._open()
self._set_msg_timeout(timeout, last_exception)
try:
await self._handler.send_message_async(message.message)
except Exception as e:
raise MessageSendFailed(e)
await self._handler.send_message_async(message.message)

async def _schedule(self, message, schedule_time_utc):
# type: (Union[Message, BatchMessage], datetime.datetime) -> List[int]
Expand Down Expand Up @@ -267,8 +261,10 @@ async def send(self, message):
batch = await self.create_batch()
batch._from_list(message)
message = batch
except TypeError: # Message was not a list or generator.
except TypeError: # Message was not a list or generator.
pass
if isinstance(message, BatchMessage) and len(message) == 0:
raise ValueError("A BatchMessage or list of Message must have at least one Message")

await self._do_retryable_operation(
self._send,
Expand Down
Loading