From 6a51166af75bb58e113fe9ba78fe9ab4dbe2c026 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Wed, 17 Jul 2024 17:48:26 -0700 Subject: [PATCH] better packet queue logic --- bumble/transport/usb.py | 86 ++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index e3de98c7..2067ec6d 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -15,10 +15,10 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations import asyncio import logging import threading -import collections import ctypes import platform @@ -114,13 +114,17 @@ def __init__(self, device, acl_out): self.device = device self.acl_out = acl_out self.acl_out_transfer = device.getTransfer() - self.packets = collections.deque() # Queue of packets waiting to be sent + self.acl_out_transfer_ready = asyncio.Semaphore(1) + self.packets: asyncio.Queue[bytes] = ( + asyncio.Queue() + ) # Queue of packets waiting to be sent self.loop = asyncio.get_running_loop() + self.queue_task = None self.cancel_done = self.loop.create_future() self.closed = False def start(self): - pass + self.queue_task = asyncio.create_task(self.process_queue()) def on_packet(self, packet): # Ignore packets if we're closed @@ -132,62 +136,64 @@ def on_packet(self, packet): return # Queue the packet - self.packets.append(packet) - if len(self.packets) == 1: - # The queue was previously empty, re-prime the pump - self.process_queue() + self.packets.put_nowait(packet) def transfer_callback(self, transfer): + self.acl_out_transfer_ready.release() status = transfer.getStatus() # pylint: disable=no-member - if status == usb1.TRANSFER_COMPLETED: - self.loop.call_soon_threadsafe(self.on_packet_sent) - elif status == usb1.TRANSFER_CANCELLED: + if status == usb1.TRANSFER_CANCELLED: self.loop.call_soon_threadsafe(self.cancel_done.set_result, None) - else: + return + + if status != usb1.TRANSFER_COMPLETED: logger.warning( color(f'!!! OUT transfer not completed: status={status}', 'red') ) - def on_packet_sent(self): - if self.packets: - self.packets.popleft() - self.process_queue() - - def process_queue(self): - if len(self.packets) == 0: - return # Nothing to do - - packet = self.packets[0] - packet_type = packet[0] - if packet_type == hci.HCI_ACL_DATA_PACKET: - self.acl_out_transfer.setBulk( - self.acl_out, packet[1:], callback=self.transfer_callback - ) - self.acl_out_transfer.submit() - elif packet_type == hci.HCI_COMMAND_PACKET: - self.acl_out_transfer.setControl( - USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, - 0, - 0, - 0, - packet[1:], - callback=self.transfer_callback, - ) - self.acl_out_transfer.submit() - else: - logger.warning(color(f'unsupported packet type {packet_type}', 'red')) + async def process_queue(self): + while True: + # Wait for a packet to transfer. + packet = await self.packets.get() + + # Wait until we can start a transfer. + await self.acl_out_transfer_ready.acquire() + + # Transfer the packet. + packet_type = packet[0] + if packet_type == hci.HCI_ACL_DATA_PACKET: + self.acl_out_transfer.setBulk( + self.acl_out, packet[1:], callback=self.transfer_callback + ) + self.acl_out_transfer.submit() + elif packet_type == hci.HCI_COMMAND_PACKET: + self.acl_out_transfer.setControl( + USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, + 0, + 0, + 0, + packet[1:], + callback=self.transfer_callback, + ) + self.acl_out_transfer.submit() + else: + logger.warning( + color(f'unsupported packet type {packet_type}', 'red') + ) def close(self): self.closed = True + if self.queue_task: + self.queue_task.cancel() async def terminate(self): if not self.closed: self.close() # Empty the packet queue so that we don't send any more data - self.packets.clear() + while not self.packets.empty(): + self.packets.get_nowait() # If we have a transfer in flight, cancel it if self.acl_out_transfer.isSubmitted():