Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
barbibulle committed Jan 28, 2024
1 parent aba1ac0 commit 31ec1c4
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 39 deletions.
1 change: 0 additions & 1 deletion bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
Tuple,
Type,
TypeVar,
Set,
Union,
cast,
overload,
Expand Down
72 changes: 36 additions & 36 deletions bumble/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
from typing import cast, Any, Optional
import logging

from bumble import avc
from bumble import avctp
from bumble import avdtp
from bumble import avrcp
from bumble import crypto
from bumble import rfcomm
from bumble import sdp
from bumble.colors import color
from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
Expand All @@ -47,13 +53,6 @@
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
)
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM
from bumble import crypto
from bumble.avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM
from bumble.avctp import MessageAssembler as AVCTP_MessageAssembler, AVCTP_PSM
from bumble.avrcp import AVRCP_PID
from bumble.avc import Frame as AVC_Frame


# -----------------------------------------------------------------------------
Expand All @@ -64,28 +63,28 @@

# -----------------------------------------------------------------------------
PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP',
AVDTP_PSM: 'AVDTP',
AVCTP_PSM: 'AVCTP'
rfcomm.RFCOMM_PSM: 'RFCOMM',
sdp.SDP_PSM: 'SDP',
avdtp.AVDTP_PSM: 'AVDTP',
avctp.AVCTP_PSM: 'AVCTP'
# TODO: add more PSM values
}

AVCTP_PID_NAMES = {AVRCP_PID: 'AVRCP'}
AVCTP_PID_NAMES = {avrcp.AVRCP_PID: 'AVRCP'}

# -----------------------------------------------------------------------------
class PacketTracer:
class AclStream:
psms: MutableMapping[int, int]
peer: Optional[PacketTracer.AclStream]
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
avctp_assemblers: MutableMapping[int, avctp.MessageAssembler]

def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.avctp_assemblers = {} # AVCTP assemblers, by source_cid
self.avrcp_assemblers = {} # AVRCP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None

Expand Down Expand Up @@ -115,9 +114,7 @@ def on_acl_pdu(self, pdu: bytes) -> None:
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
):
if self.peer and (
psm := self.peer.psms.get(
connection_response.source_cid
)
psm := self.peer.psms.get(connection_response.source_cid)
):
# Found a pending connection
self.psms[connection_response.destination_cid] = psm
Expand All @@ -130,41 +127,37 @@ def on_acl_pdu(self, pdu: bytes) -> None:
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
connection_response.destination_cid
] = avdtp.MessageAssembler(
self.peer.on_avdtp_message
)
elif psm == AVCTP_PSM:
] = avdtp.MessageAssembler(self.peer.on_avdtp_message)
elif psm == avctp.AVCTP_PSM:
self.avctp_assemblers[
connection_response.source_cid
] = AVCTP_MessageAssembler(self.on_avctp_message)
] = avctp.MessageAssembler(self.on_avctp_message)
self.peer.avctp_assemblers[
connection_response.destination_cid
] = AVCTP_MessageAssembler(self.peer.on_avctp_message)
] = avctp.MessageAssembler(self.peer.on_avctp_message)
else:
# Try to find the PSM associated with this PDU
if self.peer and (psm := self.peer.psms.get(l2cap_pdu.cid)):
if psm == SDP_PSM:
sdp_pdu = SDP_PDU.from_bytes(l2cap_pdu.payload)
if psm == sdp.SDP_PSM:
sdp_pdu = sdp.SDP_PDU.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(sdp_pdu)
elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
elif psm == rfcomm.RFCOMM_PSM:
rfcomm_frame = rfcomm.RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame)
elif psm == avdtp.AVDTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
)
assembler = self.avdtp_assemblers.get(l2cap_pdu.cid)
if assembler:
assembler.on_pdu(l2cap_pdu.payload)
elif psm == AVCTP_PSM:
if avdtp_assembler := self.avdtp_assemblers.get(l2cap_pdu.cid):
avdtp_assembler.on_pdu(l2cap_pdu.payload)
elif psm == avctp.AVCTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVCTP]: {l2cap_pdu.payload.hex()}'
)
assembler = self.avctp_assemblers.get(l2cap_pdu.cid)
if assembler:
assembler.on_pdu(l2cap_pdu.payload)
if avctp_assembler := self.avctp_assemblers.get(l2cap_pdu.cid):
avctp_assembler.on_pdu(l2cap_pdu.payload)
else:
psm_string = name_or_number(PSM_NAMES, psm)
self.analyzer.emit(
Expand All @@ -181,9 +174,16 @@ def on_avdtp_message(
f'{color("AVDTP", "green")} [{transaction_label}] {message}'
)

def on_avctp_message(self, transaction_label: int, is_command: bool, ipid: bool, pid: int, payload: bytes):
if pid == AVRCP_PID:
avc_frame = AVC_Frame.from_bytes(payload)
def on_avctp_message(
self,
transaction_label: int,
is_command: bool,
ipid: bool,
pid: int,
payload: bytes,
):
if pid == avrcp.AVRCP_PID:
avc_frame = avc.Frame.from_bytes(payload)
details = str(avc_frame)
else:
details = payload.hex()
Expand Down
3 changes: 1 addition & 2 deletions bumble/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
Union,
overload,
)
import traceback

from pyee import EventEmitter

Expand Down Expand Up @@ -459,7 +458,7 @@ def experimental(msg: str):
"""

def wrapper(function):
@wraps(function)
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)
Expand Down

0 comments on commit 31ec1c4

Please sign in to comment.