Skip to content

Commit

Permalink
Merge pull request #342 from zxzxwu/typing
Browse files Browse the repository at this point in the history
Typing helper
  • Loading branch information
zxzxwu authored Nov 29, 2023
2 parents b8849ab + f3cd8f8 commit 24524d8
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 42 deletions.
4 changes: 4 additions & 0 deletions bumble/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -5296,6 +5296,10 @@ class HCI_Disconnection_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.5 Disconnection Complete Event
'''

status: int
connection_handle: int
reason: int


# -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)])
Expand Down
109 changes: 67 additions & 42 deletions bumble/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,39 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations

from collections.abc import Callable, MutableMapping
from typing import cast, Any
import logging

from .colors import color
from .att import ATT_CID, ATT_PDU
from .smp import SMP_CID, SMP_Command
from .core import name_or_number
from .l2cap import (
from bumble import avdtp
from bumble.colors import color
from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
from bumble.core import name_or_number
from bumble.l2cap import (
L2CAP_PDU,
L2CAP_CONNECTION_REQUEST,
L2CAP_CONNECTION_RESPONSE,
L2CAP_SIGNALING_CID,
L2CAP_LE_SIGNALING_CID,
L2CAP_Control_Frame,
L2CAP_Connection_Request,
L2CAP_Connection_Response,
)
from .hci import (
from bumble.hci import (
HCI_EVENT_PACKET,
HCI_ACL_DATA_PACKET,
HCI_DISCONNECTION_COMPLETE_EVENT,
HCI_AclDataPacketAssembler,
HCI_Packet,
HCI_Event,
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
)
from .rfcomm import RFCOMM_Frame, RFCOMM_PSM
from .sdp import SDP_PDU, SDP_PSM
from .avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM

# -----------------------------------------------------------------------------
# Logging
Expand All @@ -50,23 +59,25 @@
PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP',
AVDTP_PSM: 'AVDTP'
# TODO: add more PSM values
avdtp.AVDTP_PSM: 'AVDTP',
}


# -----------------------------------------------------------------------------
class PacketTracer:
class AclStream:
def __init__(self, analyzer):
psms: MutableMapping[int, int]
peer: PacketTracer.AclStream
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]

def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None # ACL stream in the other direction

# pylint: disable=too-many-nested-blocks
def on_acl_pdu(self, pdu):
def on_acl_pdu(self, pdu: bytes) -> None:
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)

if l2cap_pdu.cid == ATT_CID:
Expand All @@ -81,26 +92,30 @@ def on_acl_pdu(self, pdu):

# Check if this signals a new channel
if control_frame.code == L2CAP_CONNECTION_REQUEST:
self.psms[control_frame.source_cid] = control_frame.psm
connection_request = cast(L2CAP_Connection_Request, control_frame)
self.psms[connection_request.source_cid] = connection_request.psm
elif control_frame.code == L2CAP_CONNECTION_RESPONSE:
connection_response = cast(L2CAP_Connection_Response, control_frame)
if (
control_frame.result
connection_response.result
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
):
if self.peer:
if psm := self.peer.psms.get(control_frame.source_cid):
if psm := self.peer.psms.get(
connection_response.source_cid
):
# Found a pending connection
self.psms[control_frame.destination_cid] = psm
self.psms[connection_response.destination_cid] = psm

# For AVDTP connections, create a packet assembler for
# each direction
if psm == AVDTP_PSM:
if psm == avdtp.AVDTP_PSM:
self.avdtp_assemblers[
control_frame.source_cid
] = AVDTP_MessageAssembler(self.on_avdtp_message)
connection_response.source_cid
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
control_frame.destination_cid
] = AVDTP_MessageAssembler(
connection_response.destination_cid
] = avdtp.MessageAssembler(
self.peer.on_avdtp_message
)

Expand All @@ -113,7 +128,7 @@ def on_acl_pdu(self, pdu):
elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame)
elif psm == AVDTP_PSM:
elif psm == avdtp.AVDTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
Expand All @@ -130,22 +145,26 @@ def on_acl_pdu(self, pdu):
else:
self.analyzer.emit(l2cap_pdu)

def on_avdtp_message(self, transaction_label, message):
def on_avdtp_message(
self, transaction_label: int, message: avdtp.Message
) -> None:
self.analyzer.emit(
f'{color("AVDTP", "green")} [{transaction_label}] {message}'
)

def feed_packet(self, packet):
def feed_packet(self, packet: HCI_AclDataPacket) -> None:
self.packet_assembler.feed_packet(packet)

class Analyzer:
def __init__(self, label, emit_message):
acl_streams: MutableMapping[int, PacketTracer.AclStream]
peer: PacketTracer.Analyzer

def __init__(self, label: str, emit_message: Callable[..., None]) -> None:
self.label = label
self.emit_message = emit_message
self.acl_streams = {} # ACL streams, by connection handle
self.peer = None # Analyzer in the other direction

def start_acl_stream(self, connection_handle):
def start_acl_stream(self, connection_handle: int) -> PacketTracer.AclStream:
logger.info(
f'[{self.label}] +++ Creating ACL stream for connection '
f'0x{connection_handle:04X}'
Expand All @@ -160,7 +179,7 @@ def start_acl_stream(self, connection_handle):

return stream

def end_acl_stream(self, connection_handle):
def end_acl_stream(self, connection_handle: int) -> None:
if connection_handle in self.acl_streams:
logger.info(
f'[{self.label}] --- Removing ACL stream for connection '
Expand All @@ -171,34 +190,40 @@ def end_acl_stream(self, connection_handle):
# Let the other forwarder know so it can cleanup its stream as well
self.peer.end_acl_stream(connection_handle)

def on_packet(self, packet):
def on_packet(self, packet: HCI_Packet) -> None:
self.emit(packet)

if packet.hci_packet_type == HCI_ACL_DATA_PACKET:
acl_packet = cast(HCI_AclDataPacket, packet)
# Look for an existing stream for this handle, create one if it is the
# first ACL packet for that connection handle
if (stream := self.acl_streams.get(packet.connection_handle)) is None:
stream = self.start_acl_stream(packet.connection_handle)
stream.feed_packet(packet)
if (
stream := self.acl_streams.get(acl_packet.connection_handle)
) is None:
stream = self.start_acl_stream(acl_packet.connection_handle)
stream.feed_packet(acl_packet)
elif packet.hci_packet_type == HCI_EVENT_PACKET:
if packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(packet.connection_handle)
event_packet = cast(HCI_Event, packet)
if event_packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT:
self.end_acl_stream(
cast(HCI_Disconnection_Complete_Event, packet).connection_handle
)

def emit(self, message):
def emit(self, message: Any) -> None:
self.emit_message(f'[{self.label}] {message}')

def trace(self, packet, direction=0):
def trace(self, packet: HCI_Packet, direction: int = 0) -> None:
if direction == 0:
self.host_to_controller_analyzer.on_packet(packet)
else:
self.controller_to_host_analyzer.on_packet(packet)

def __init__(
self,
host_to_controller_label=color('HOST->CONTROLLER', 'blue'),
controller_to_host_label=color('CONTROLLER->HOST', 'cyan'),
emit_message=logger.info,
):
host_to_controller_label: str = color('HOST->CONTROLLER', 'blue'),
controller_to_host_label: str = color('CONTROLLER->HOST', 'cyan'),
emit_message: Callable[..., None] = logger.info,
) -> None:
self.host_to_controller_analyzer = PacketTracer.Analyzer(
host_to_controller_label, emit_message
)
Expand Down
8 changes: 8 additions & 0 deletions bumble/l2cap.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
'''

psm: int
source_cid: int

@staticmethod
def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]:
psm_length = 2
Expand Down Expand Up @@ -432,6 +435,11 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame):
See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
'''

source_cid: int
destination_cid: int
status: int
result: int

CONNECTION_SUCCESSFUL = 0x0000
CONNECTION_PENDING = 0x0001
CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
Expand Down

0 comments on commit 24524d8

Please sign in to comment.