Skip to content

Commit

Permalink
Merge pull request #518 from google/gbg/usb-queue
Browse files Browse the repository at this point in the history
USB: better packet queue logic
  • Loading branch information
barbibulle authored Jul 18, 2024
2 parents 142bdce + 6a51166 commit 1c278be
Showing 1 changed file with 46 additions and 40 deletions.
86 changes: 46 additions & 40 deletions bumble/transport/usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import threading
import collections
import ctypes
import platform

Expand Down Expand Up @@ -114,13 +114,17 @@ def __init__(self, device, acl_out):
self.device = device
self.acl_out = acl_out
self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.acl_out_transfer_ready = asyncio.Semaphore(1)
self.packets: asyncio.Queue[bytes] = (
asyncio.Queue()
) # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.queue_task = None
self.cancel_done = self.loop.create_future()
self.closed = False

def start(self):
pass
self.queue_task = asyncio.create_task(self.process_queue())

def on_packet(self, packet):
# Ignore packets if we're closed
Expand All @@ -132,62 +136,64 @@ def on_packet(self, packet):
return

# Queue the packet
self.packets.append(packet)
if len(self.packets) == 1:
# The queue was previously empty, re-prime the pump
self.process_queue()
self.packets.put_nowait(packet)

def transfer_callback(self, transfer):
self.acl_out_transfer_ready.release()
status = transfer.getStatus()

# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED:
if status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else:
return

if status != usb1.TRANSFER_COMPLETED:
logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red')
)

def on_packet_sent(self):
if self.packets:
self.packets.popleft()
self.process_queue()

def process_queue(self):
if len(self.packets) == 0:
return # Nothing to do

packet = self.packets[0]
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.transfer_callback,
)
self.acl_out_transfer.submit()
else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
async def process_queue(self):
while True:
# Wait for a packet to transfer.
packet = await self.packets.get()

# Wait until we can start a transfer.
await self.acl_out_transfer_ready.acquire()

# Transfer the packet.
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.transfer_callback,
)
self.acl_out_transfer.submit()
else:
logger.warning(
color(f'unsupported packet type {packet_type}', 'red')
)

def close(self):
self.closed = True
if self.queue_task:
self.queue_task.cancel()

async def terminate(self):
if not self.closed:
self.close()

# Empty the packet queue so that we don't send any more data
self.packets.clear()
while not self.packets.empty():
self.packets.get_nowait()

# If we have a transfer in flight, cancel it
if self.acl_out_transfer.isSubmitted():
Expand Down

0 comments on commit 1c278be

Please sign in to comment.