Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor cleanup of the internals of the usb transport implementation #120

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 58 additions & 61 deletions bumble/transport/usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@

import usb1

from .common import Transport, ParserSource
from .. import hci
from ..colors import color
from bumble.transport.common import Transport, ParserSource
from bumble import hci
from bumble.colors import color
from bumble.utils import AsyncRunner


# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -113,7 +114,7 @@ class UsbPacketSink:
def __init__(self, device, acl_out):
self.device = device
self.acl_out = acl_out
self.transfer = device.getTransfer()
self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.cancel_done = self.loop.create_future()
Expand All @@ -137,21 +138,20 @@ def on_packet(self, packet):
# The queue was previously empty, re-prime the pump
self.process_queue()

def on_packet_sent(self, transfer):
def transfer_callback(self, transfer):
status = transfer.getStatus()
# logger.debug(f'<<< USB out transfer callback: status={status}')

# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent_)
self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else:
logger.warning(
color(f'!!! out transfer not completed: status={status}', 'red')
color(f'!!! OUT transfer not completed: status={status}', 'red')
)

def on_packet_sent_(self):
def on_packet_sent(self):
if self.packets:
self.packets.popleft()
self.process_queue()
Expand All @@ -163,22 +163,20 @@ def process_queue(self):
packet = self.packets[0]
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk(
self.acl_out, packet[1:], callback=self.on_packet_sent
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
logger.debug('submit ACL')
self.transfer.submit()
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.transfer.setControl(
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.on_packet_sent,
callback=self.transfer_callback,
)
logger.debug('submit COMMAND')
self.transfer.submit()
self.acl_out_transfer.submit()
else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red'))

Expand All @@ -193,11 +191,11 @@ async def terminate(self):
self.packets.clear()

# If we have a transfer in flight, cancel it
if self.transfer.isSubmitted():
if self.acl_out_transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have
# already completed
try:
self.transfer.cancel()
self.acl_out_transfer.cancel()

logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done
Expand All @@ -206,35 +204,30 @@ async def terminate(self):
logger.debug('OUT transfer likely already completed')

class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, metadata, acl_in, events_in):
def __init__(self, device, metadata, acl_in, events_in):
super().__init__()
self.context = context
self.device = device
self.metadata = metadata
self.acl_in = acl_in
self.acl_in_transfer = None
self.events_in = events_in
self.events_in_transfer = None
self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue()
self.dequeue_task = None
self.closed = False
self.event_loop_done = self.loop.create_future()
self.cancel_done = {
hci.HCI_EVENT_PACKET: self.loop.create_future(),
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
}
self.events_in_transfer = None
self.acl_in_transfer = None

# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.closed = False

def start(self):
# Set up transfer objects for input
self.events_in_transfer = device.getTransfer()
self.events_in_transfer.setInterrupt(
self.events_in,
READ_SIZE,
callback=self.on_packet_received,
callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET,
)
self.events_in_transfer.submit()
Expand All @@ -243,22 +236,23 @@ def start(self):
self.acl_in_transfer.setBulk(
self.acl_in,
READ_SIZE,
callback=self.on_packet_received,
callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET,
)
self.acl_in_transfer.submit()

self.dequeue_task = self.loop.create_task(self.dequeue())
self.event_thread.start()

def on_packet_received(self, transfer):
@property
def usb_transfer_submitted(self):
return (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
)

def transfer_callback(self, transfer):
packet_type = transfer.getUserData()
status = transfer.getStatus()
# logger.debug(
# f'<<< USB IN transfer callback: status={status} '
# f'packet_type={packet_type} '
# f'length={transfer.getActualLength()}'
# )

# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
Expand All @@ -267,18 +261,18 @@ def on_packet_received(self, transfer):
+ transfer.getBuffer()[: transfer.getActualLength()]
)
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)

# Re-submit the transfer so we can receive more data
transfer.submit()
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(
self.cancel_done[packet_type].set_result, None
)
return
else:
logger.warning(
color(f'!!! transfer not completed: status={status}', 'red')
color(f'!!! IN transfer not completed: status={status}', 'red')
)

# Re-submit the transfer so we can receive more data
transfer.submit()
self.loop.call_soon_threadsafe(self.on_transport_lost)

async def dequeue(self):
while not self.closed:
Expand All @@ -288,21 +282,6 @@ async def dequeue(self):
return
self.parser.feed_data(packet)

def run(self):
logger.debug('starting USB event loop')
while (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
):
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass

logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)

def close(self):
self.closed = True

Expand Down Expand Up @@ -331,15 +310,14 @@ async def terminate(self):
f'IN[{packet_type}] transfer likely already completed'
)

# Wait for the thread to terminate
await self.event_loop_done

class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink)
self.context = context
self.device = device
self.interface = interface
self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future()

# Get exclusive access
device.claimInterface(interface)
Expand All @@ -352,6 +330,22 @@ def __init__(self, context, device, interface, setting, source, sink):
source.start()
sink.start()

# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.event_thread.start()

def run(self):
logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted:
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass

logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)

async def close(self):
self.source.close()
self.sink.close()
Expand All @@ -361,6 +355,9 @@ async def close(self):
self.device.close()
self.context.close()

# Wait for the thread to terminate
await self.event_loop_done

# Find the device according to the spec moniker
load_libusb()
context = usb1.USBContext()
Expand Down Expand Up @@ -540,7 +537,7 @@ def find_endpoints(device):
except usb1.USBError:
logger.warning('failed to set configuration')

source = UsbPacketSource(context, device, device_metadata, acl_in, events_in)
source = UsbPacketSource(device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error:
Expand Down