Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SB] Missing Link Credit Subtraction #37427

Merged
merged 12 commits into from
Sep 19, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async def _keep_alive_async(self):
elapsed_time = current_time - start_time
if elapsed_time >= self._keep_alive_interval:
await asyncio.shield(self._connection.listen(wait=self._socket_timeout,
batch=self._link.current_link_credit))
batch=self._link.total_link_credit))
start_time = current_time
await asyncio.sleep(1)
except Exception as e: # pylint: disable=broad-except
Expand Down Expand Up @@ -759,7 +759,7 @@ async def _client_run_async(self, **kwargs):
:rtype: bool
"""
try:
if self._link.current_link_credit <= 0:
if self._link.total_link_credit <= 0:
await self._link.flow(link_credit=self._link_credit)
await self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(
self._on_link_state_change = kwargs.get("on_link_state_change")
self._on_attach = kwargs.get("on_attach")
self._error: Optional[AMQPLinkError] = None
self.total_link_credit = self.link_credit

async def __aenter__(self) -> "Link":
await self.attach()
Expand Down Expand Up @@ -273,5 +274,19 @@ async def detach(self, close: bool = False, error: Optional[AMQPError] = None) -
await self._set_state(LinkState.DETACHED)

async def flow(self, *, link_credit: Optional[int] = None, **kwargs) -> None:
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
await self._outgoing_flow(**kwargs)
# Given the desired link credit `link_credit`, the link credit sent via
# FlowFrame is calculated as follows: The link credit to flow on the wire
# `self.current_link_credit` is the desired link credit `link_credit`
# minus the current link credit on the wire `self.total_link_credit`.
self.current_link_credit = link_credit - self.total_link_credit \
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved
if link_credit is not None else self.link_credit

# If the link credit to flow is greater than 0 (i.e the desired link credit
# is greater than the current link credit on the wire), then we will send a
# flow to issue more link credit. Otherwise link credit on the wire is sufficient.
if self.current_link_credit > 0:
# Calculate the total link credit on the wire, by adding the credit
# we will flow to the total link credit.
self.total_link_credit = self.current_link_credit + self.total_link_credit \
if link_credit is not None else self.link_credit
await self._outgoing_flow(**kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ async def _incoming_transfer(self, frame):
self.received_delivery_id = frame[1] # delivery_id
# If more is false --> this is the last frame of the message
if not frame[5]:
self.current_link_credit -= 1
self.delivery_count += 1
self.current_link_credit -= 1
swathipil marked this conversation as resolved.
Show resolved Hide resolved
self.total_link_credit -= 1
if self.received_delivery_id is not None:
self._first_frame = frame
if not self.received_delivery_id and not self._received_payload:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def _keep_alive(self):
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time >= self._keep_alive_interval:
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
self._connection.listen(wait=self._socket_timeout, batch=self._link.total_link_credit)
start_time = current_time
time.sleep(1)
except Exception as e: # pylint: disable=broad-except
Expand Down Expand Up @@ -866,7 +866,7 @@ def _client_run(self, **kwargs):
:rtype: bool
"""
try:
if self._link.current_link_credit <= 0:
if self._link.total_link_credit <= 0:
self._link.flow(link_credit=self._link_credit)
self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
Expand Down
18 changes: 16 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(
self._on_link_state_change = kwargs.get("on_link_state_change")
self._on_attach = kwargs.get("on_attach")
self._error: Optional[AMQPLinkError] = None
self.total_link_credit = self.link_credit

def __enter__(self) -> "Link":
self.attach()
Expand Down Expand Up @@ -268,5 +269,18 @@ def detach(self, close: bool = False, error: Optional[AMQPError] = None) -> None
self._set_state(LinkState.DETACHED)

def flow(self, *, link_credit: Optional[int] = None, **kwargs: Any) -> None:
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
self._outgoing_flow(**kwargs)
# Given the desired link credit `link_credit`, the link credit sent via
# FlowFrame is calculated as follows: The link credit to flow on the wire
# `self.current_link_credit` is the desired link credit
# `link_credit` minus the current link credit on the wire `self.total_link_credit`.
self.current_link_credit = link_credit - self.total_link_credit if link_credit is not None \
else self.link_credit

# If the link credit to flow is greater than 0 (i.e the desired link credit is greater than
# the current link credit on the wire), then we will send a flow to issue more link credit.
# Otherwise link credit on the wire is sufficient.
if self.current_link_credit > 0:
# Calculate the total link credit on the wire, by adding the credit we will flow to the total link credit.
self.total_link_credit = self.current_link_credit + self.total_link_credit if link_credit is not None \
else self.link_credit
self._outgoing_flow(**kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def _incoming_transfer(self, frame):
# If more is false --> this is the last frame of the message
if not frame[5]:
self.current_link_credit -= 1
self.total_link_credit -=1
self.delivery_count += 1
self.received_delivery_id = frame[1] # delivery_id
if self.received_delivery_id is not None:
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-e ../../../tools/azure-sdk-tools
../../core/azure-core
-e ../../identity/azure-identity
azure-identity~=1.17.0
azure-mgmt-eventhub<=10.1.0
azure-mgmt-resource==20.0.0
aiohttp; python_version < '3.12'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,45 @@ def test_receive_transfer_continuation_frame():
assert link.delivery_count == 1
link._incoming_transfer(transfer_frame_three)
assert link.current_link_credit == 1
assert link.delivery_count == 2
assert link.delivery_count == 2


def test_receive_transfer_and_flow():
def mock_outgoing(): pass
session = None
link = ReceiverLink(
session,
3,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
on_transfer=Mock(),
)

link._outgoing_flow = mock_outgoing
link.total_link_credit = 0 # Set the total link credit to 0 to start, no credit on the wire

link.flow(link_credit=100) # Send a flow frame with desired link credit of 100

# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, batchable, payload
transfer_frame_one = [3, 0, b'/blah', 0, True, False, None, None, None, None, False, ""]
transfer_frame_two = [3, 1, b'/blah', 0, True, False, None, None, None, None, False, ""]
transfer_frame_three = [3, 2, b'/blah', 0, True, False, None, None, None, None, False, ""]

link._incoming_transfer(transfer_frame_one)
assert link.current_link_credit == 99
assert link.total_link_credit == 99

# Only received 1 transfer frame per receive call, we set desired link credit again
# this will send a flow of 1
link.flow(link_credit=100)
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
assert link.current_link_credit == 1
assert link.total_link_credit == 100

link._incoming_transfer(transfer_frame_two)
assert link.current_link_credit == 0
assert link.total_link_credit == 99
link._incoming_transfer(transfer_frame_three)
assert link.current_link_credit == -1
assert link.total_link_credit == 98
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 7.12.3 (2024-09-17)
## 7.12.3 (2024-09-19)

### Bugs Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async def _keep_alive_async(self):
elapsed_time = current_time - start_time
if elapsed_time >= self._keep_alive_interval:
await asyncio.shield(self._connection.listen(wait=self._socket_timeout,
batch=self._link.current_link_credit))
batch=self._link.total_link_credit))
start_time = current_time
await asyncio.sleep(1)
except Exception as e: # pylint: disable=broad-except
Expand Down Expand Up @@ -759,7 +759,7 @@ async def _client_run_async(self, **kwargs):
:rtype: bool
"""
try:
if self._link.current_link_credit <= 0:
if self._link.total_link_credit <= 0:
await self._link.flow(link_credit=self._link_credit)
await self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(
self._on_link_state_change = kwargs.get("on_link_state_change")
self._on_attach = kwargs.get("on_attach")
self._error: Optional[AMQPLinkError] = None
self.total_link_credit = self.link_credit

async def __aenter__(self) -> "Link":
await self.attach()
Expand Down Expand Up @@ -273,5 +274,19 @@ async def detach(self, close: bool = False, error: Optional[AMQPError] = None) -
await self._set_state(LinkState.DETACHED)

async def flow(self, *, link_credit: Optional[int] = None, **kwargs) -> None:
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
await self._outgoing_flow(**kwargs)
# Given the desired link credit `link_credit`, the link credit sent via
# FlowFrame is calculated as follows: The link credit to flow on the wire
# `self.current_link_credit` is the desired link credit `link_credit`
# minus the current link credit on the wire `self.total_link_credit`.
self.current_link_credit = link_credit - self.total_link_credit \
if link_credit is not None else self.link_credit

# If the link credit to flow is greater than 0 (i.e the desired link credit
# is greater than the current link credit on the wire), then we will send a
# flow to issue more link credit. Otherwise link credit on the wire is sufficient.
if self.current_link_credit > 0:
# Calculate the total link credit on the wire, by adding the credit
# we will flow to the total link credit.
self.total_link_credit = self.current_link_credit + self.total_link_credit \
if link_credit is not None else self.link_credit
await self._outgoing_flow(**kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async def _incoming_transfer(self, frame):
if not frame[5]:
self.delivery_count += 1
self.current_link_credit -= 1
self.total_link_credit -= 1
if self.received_delivery_id is not None:
self._first_frame = frame
if not self.received_delivery_id and not self._received_payload:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def _keep_alive(self):
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time >= self._keep_alive_interval:
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
self._connection.listen(wait=self._socket_timeout, batch=self._link.total_link_credit)
start_time = current_time
time.sleep(1)
except Exception as e: # pylint: disable=broad-except
Expand Down Expand Up @@ -866,7 +866,7 @@ def _client_run(self, **kwargs):
:rtype: bool
"""
try:
if self._link.current_link_credit <= 0:
if self._link.total_link_credit <= 0:
self._link.flow(link_credit=self._link_credit)
self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(
self._on_link_state_change = kwargs.get("on_link_state_change")
self._on_attach = kwargs.get("on_attach")
self._error: Optional[AMQPLinkError] = None
self.total_link_credit = self.link_credit

def __enter__(self) -> "Link":
self.attach()
Expand Down Expand Up @@ -268,5 +269,18 @@ def detach(self, close: bool = False, error: Optional[AMQPError] = None) -> None
self._set_state(LinkState.DETACHED)

def flow(self, *, link_credit: Optional[int] = None, **kwargs: Any) -> None:
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
self._outgoing_flow(**kwargs)
# Given the desired link credit `link_credit`, the link credit sent via
# FlowFrame is calculated as follows: The link credit to flow on the wire
# `self.current_link_credit` is the desired link credit
# `link_credit` minus the current link credit on the wire `self.total_link_credit`.
self.current_link_credit = link_credit - self.total_link_credit if link_credit is not None \
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
else self.link_credit

# If the link credit to flow is greater than 0 (i.e the desired link credit is greater than
# the current link credit on the wire), then we will send a flow to issue more link credit.
# Otherwise link credit on the wire is sufficient.
if self.current_link_credit > 0:
# Calculate the total link credit on the wire, by adding the credit we will flow to the total link credit.
self.total_link_credit = self.current_link_credit + self.total_link_credit if link_credit is not None \
else self.link_credit
self._outgoing_flow(**kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def _incoming_transfer(self, frame):
# If more is false --> this is the last frame of the message
if not frame[5]:
self.current_link_credit -= 1
self.total_link_credit -=1
self.delivery_count += 1
self.received_delivery_id = frame[1] # delivery_id
if self.received_delivery_id is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,14 @@ def _receive(
)
batch: Union[List["uamqp_Message"], List["pyamqp_Message"]] = []

while (
not received_messages_queue.empty() and len(batch) < max_message_count
):
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
batch.append(received_messages_queue.get())
received_messages_queue.task_done()
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in batch]

# Dynamically issue link credit if max_message_count >= 1 when the prefetch_count is the default value 0
if (
max_message_count
Expand All @@ -440,14 +448,6 @@ def _receive(
link_credit_needed = max_message_count - len(batch)
self._amqp_transport.reset_link_credit(amqp_receive_client, link_credit_needed)

while (
not received_messages_queue.empty() and len(batch) < max_message_count
):
batch.append(received_messages_queue.get())
received_messages_queue.task_done()
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in batch]

first_message_received = expired = False
receiving = True
while receiving and not expired and len(batch) < max_message_count:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,18 +426,17 @@ async def _receive(

batch: Union[List["uamqp_Message"], List["pyamqp_Message"]] = []

# Dynamically issue link credit if max_message_count >= 1 when the prefetch_count is the default value 0
if max_message_count and self._prefetch_count == 0 and max_message_count >= 1:
link_credit_needed = max_message_count - len(batch)
await self._amqp_transport.reset_link_credit_async(amqp_receive_client, link_credit_needed)


while not received_messages_queue.empty() and len(batch) < max_message_count:
batch.append(received_messages_queue.get())
received_messages_queue.task_done()
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in batch]

# Dynamically issue link credit if max_message_count >= 1 when the prefetch_count is the default value 0
if max_message_count and self._prefetch_count == 0 and max_message_count >= 1:
link_credit_needed = max_message_count - len(batch)
await self._amqp_transport.reset_link_credit_async(amqp_receive_client, link_credit_needed)

first_message_received = expired = False
receiving = True
while receiving and not expired and len(batch) < max_message_count:
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-servicebus/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-e ../../core/azure-core
-e ../../identity/azure-identity
azure-identity~=1.17.0
-e ../../../tools/azure-sdk-tools
azure-mgmt-servicebus~=8.0.0
aiohttp>=3.0
Expand Down