Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

USB: better packet queue logic #518

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 46 additions & 40 deletions bumble/transport/usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import threading
import collections
import ctypes
import platform

Expand Down Expand Up @@ -114,13 +114,17 @@ def __init__(self, device, acl_out):
self.device = device
self.acl_out = acl_out
self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.acl_out_transfer_ready = asyncio.Semaphore(1)
self.packets: asyncio.Queue[bytes] = (
asyncio.Queue()
) # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.queue_task = None
self.cancel_done = self.loop.create_future()
self.closed = False

def start(self):
pass
self.queue_task = asyncio.create_task(self.process_queue())

def on_packet(self, packet):
# Ignore packets if we're closed
Expand All @@ -132,62 +136,64 @@ def on_packet(self, packet):
return

# Queue the packet
self.packets.append(packet)
if len(self.packets) == 1:
# The queue was previously empty, re-prime the pump
self.process_queue()
self.packets.put_nowait(packet)

def transfer_callback(self, transfer):
self.acl_out_transfer_ready.release()
status = transfer.getStatus()

# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED:
if status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else:
return

if status != usb1.TRANSFER_COMPLETED:
logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red')
)

def on_packet_sent(self):
if self.packets:
self.packets.popleft()
self.process_queue()

def process_queue(self):
if len(self.packets) == 0:
return # Nothing to do

packet = self.packets[0]
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.transfer_callback,
)
self.acl_out_transfer.submit()
else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
async def process_queue(self):
while True:
# Wait for a packet to transfer.
packet = await self.packets.get()

# Wait until we can start a transfer.
await self.acl_out_transfer_ready.acquire()

# Transfer the packet.
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.transfer_callback,
)
self.acl_out_transfer.submit()
else:
logger.warning(
color(f'unsupported packet type {packet_type}', 'red')
)

def close(self):
self.closed = True
if self.queue_task:
self.queue_task.cancel()

async def terminate(self):
if not self.closed:
self.close()

# Empty the packet queue so that we don't send any more data
self.packets.clear()
while not self.packets.empty():
self.packets.get_nowait()

# If we have a transfer in flight, cancel it
if self.acl_out_transfer.isSubmitted():
Expand Down
Loading