Skip to content

Commit

Permalink
Controller: CIS implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
zxzxwu committed Jan 9, 2024
1 parent d8e6700 commit d17ebb5
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 2 deletions.
183 changes: 182 additions & 1 deletion bumble/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
HCI_Encryption_Change_Event,
HCI_Synchronous_Connection_Complete_Event,
HCI_LE_Advertising_Report_Event,
HCI_LE_CIS_Established_Event,
HCI_LE_CIS_Request_Event,
HCI_LE_Connection_Complete_Event,
HCI_LE_Read_Remote_Features_Complete_Event,
HCI_Number_Of_Completed_Packets_Event,
Expand All @@ -82,6 +84,15 @@ class DataObject:
pass


# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CisLink:
handle: int
cis_id: int
cig_id: int
acl_connection: Optional[Connection] = None


# -----------------------------------------------------------------------------
@dataclasses.dataclass
class Connection:
Expand Down Expand Up @@ -132,6 +143,8 @@ def __init__(
self.classic_connections: Dict[
Address, Connection
] = {} # Connections in BR/EDR
self.central_cis_links: Dict[int, CisLink] = {} # CIS links by handle
self.peripheral_cis_links: Dict[int, CisLink] = {} # CIS links by handle

self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0
self.hci_revision = 0
Expand Down Expand Up @@ -310,7 +323,7 @@ async def wait_for_termination(self):
############################################################
# Link connections
############################################################
def allocate_connection_handle(self):
def allocate_connection_handle(self) -> int:
handle = 0
max_handle = 0
for connection in itertools.chain(
Expand All @@ -322,6 +335,13 @@ def allocate_connection_handle(self):
if connection.handle == handle:
# Already used, continue searching after the current max
handle = max_handle + 1
for cis_handle in itertools.chain(
self.central_cis_links.keys(), self.peripheral_cis_links.keys()
):
max_handle = max(max_handle, cis_handle)
if cis_handle == handle:
# Already used, continue searching after the current max
handle = max_handle + 1
return handle

def find_le_connection_by_address(self, address):
Expand Down Expand Up @@ -549,6 +569,68 @@ def on_link_advertising_data(self, sender_address, data):
)
self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))

def on_link_cis_request(
self, central_address: Address, cig_id: int, cis_id: int
) -> None:
'''
Called when an incoming CIS request occurs from a central on the link
'''

connection = self.peripheral_connections.get(central_address)
assert connection

pending_cis_link = CisLink(
handle=self.allocate_connection_handle(),
cis_id=cis_id,
cig_id=cig_id,
acl_connection=connection,
)
self.peripheral_cis_links[pending_cis_link.handle] = pending_cis_link

self.send_hci_packet(
HCI_LE_CIS_Request_Event(
acl_connection_handle=connection.handle,
cis_connection_handle=pending_cis_link.handle,
cig_id=cig_id,
cis_id=cis_id,
)
)

def on_link_cis_established(self, cig_id: int, cis_id: int) -> None:
'''
Called when an incoming CIS established.
'''

cis_link = next(
cis_link
for cis_link in itertools.chain(
self.central_cis_links.values(), self.peripheral_cis_links.values()
)
if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
)

self.send_hci_packet(
HCI_LE_CIS_Established_Event(
status=HCI_SUCCESS,
connection_handle=cis_link.handle,
# CIS parameters are ignored.
cig_sync_delay=0,
cis_sync_delay=0,
transport_latency_c_to_p=0,
transport_latency_p_to_c=0,
phy_c_to_p=0,
phy_p_to_c=0,
nse=0,
bn_c_to_p=0,
bn_p_to_c=0,
ft_c_to_p=0,
ft_p_to_c=0,
max_pdu_c_to_p=0,
max_pdu_p_to_c=0,
iso_interval=0,
)
)

############################################################
# Classic link connections
############################################################
Expand Down Expand Up @@ -1387,6 +1469,105 @@ def on_hci_le_read_transmit_power_command(self, _command):
'''
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)

def on_hci_le_set_cig_parameters_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.97 LE Set CIG Parameter Command
'''

# Remove old CIG implicitly.
for handle, cis_link in self.central_cis_links.items():
if cis_link.cig_id == command.cig_id:
self.central_cis_links.pop(handle)

handles = []
for cis_id in command.cis_id:
handle = self.allocate_connection_handle()
handles.append(handle)
self.central_cis_links[handle] = CisLink(
cis_id=cis_id,
cig_id=command.cig_id,
handle=handle,
)
return struct.pack(
'<BBB', HCI_SUCCESS, command.cig_id, len(handles)
) + b''.join([struct.pack('<H', handle) for handle in handles])

def on_hci_le_create_cis_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.99 LE Create CIS Command
'''
if not self.link:
return

for cis_handle, acl_handle in zip(
command.cis_connection_handle, command.acl_connection_handle
):
if not (connection := self.find_connection_by_handle(acl_handle)):
logger.error(f'Cannot find connection with handle={acl_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])

if not (cis_link := self.central_cis_links.get(cis_handle)):
logger.error(f'Cannot find CIS with handle={cis_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])

self.link.create_cis(
self,
peripheral_address=connection.peer_address,
cig_id=cis_link.cig_id,
cis_id=cis_link.cis_id,
)

self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)

def on_hci_le_remove_cig_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.100 LE Remove CIG Command
'''

status = HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR

for cis_handle, cis_link in self.central_cis_links.items():
if cis_link.cig_id == command.cig_id:
self.central_cis_links.pop(cis_handle)
status = HCI_SUCCESS

return struct.pack('<BH', status, command.cig_id)

def on_hci_le_accept_cis_request_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.101 LE Accept CIS Request Command
'''
if not self.link:
return

if not (
pending_cis_link := self.peripheral_cis_links.get(command.connection_handle)
):
logger.error(f'Cannot find CIS with handle={command.connection_handle}')
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])

assert pending_cis_link.acl_connection
self.link.accept_cis(
peripheral_controller=self,
central_address=pending_cis_link.acl_connection.peer_address,
cig_id=pending_cis_link.cig_id,
cis_id=pending_cis_link.cis_id,
)

self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)

def on_hci_le_setup_iso_data_path_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command
Expand Down
3 changes: 2 additions & 1 deletion bumble/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -4863,7 +4863,8 @@ def from_parameters(cls, parameters):
HCI_Object.init_from_bytes(self, parameters, 1, fields)
return self

def __init__(self, subevent_code, parameters, **kwargs):
def __init__(self, subevent_code=None, parameters=None, **kwargs):
assert subevent_code is not None
self.subevent_code = subevent_code
if parameters is None and (fields := getattr(self, 'fields', None)) and kwargs:
parameters = bytes([subevent_code]) + HCI_Object.dict_to_bytes(
Expand Down
35 changes: 35 additions & 0 deletions bumble/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,41 @@ def on_connection_encrypted(
if peripheral_controller := self.find_controller(peripheral_address):
peripheral_controller.on_link_encrypted(central_address, rand, ediv, ltk)

def create_cis(
self,
central_controller: controller.Controller,
peripheral_address: Address,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'$$$ CIS Request {central_controller.random_address} -> {peripheral_address}'
)
if peripheral_controller := self.find_controller(peripheral_address):
peripheral_controller.on_link_cis_request(
central_address=central_controller.random_address,
cig_id=cig_id,
cis_id=cis_id,
)

def accept_cis(
self,
peripheral_controller: controller.Controller,
central_address: Address,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'$$$ CIS Accept {peripheral_controller.random_address} -> {central_address}'
)
if central_controller := self.find_controller(central_address):
asyncio.get_running_loop().call_soon(
central_controller.on_link_cis_established, cig_id, cis_id
)
asyncio.get_running_loop().call_soon(
peripheral_controller.on_link_cis_established, cig_id, cis_id
)

############################################################
# Classic handlers
############################################################
Expand Down
39 changes: 39 additions & 0 deletions tests/device_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
GATT_APPEARANCE_CHARACTERISTIC,
)

from .test_utils import TwoDevices

# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -412,6 +414,43 @@ async def test_extended_advertising_disconnection(auto_restart):
device.start_extended_advertising.assert_not_called()


# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cis():
devices = TwoDevices()
await devices.setup_connection()

def on_cis_request(
acl_connection: Connection,
cis_handle: int,
_cig_id: int,
_cis_id: int,
):
acl_connection.abort_on(
'disconnection', devices[1].accept_cis_request(cis_handle)
)

devices[1].on('cis_request', on_cis_request)

cis_handles = await devices[0].setup_cig(
cig_id=1,
cis_id=[2, 3],
sdu_interval=(0, 0),
framing=0,
max_sdu=(0, 0),
retransmission_number=0,
max_transport_latency=(0, 0),
)
assert len(cis_handles) == 2
cis_links = await devices[0].create_cis(
[
(cis_handles[0], devices.connections[0].handle),
(cis_handles[1], devices.connections[0].handle),
]
)
assert len(cis_links) == 2


# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
device = Device(host=Host(None, None))
Expand Down

0 comments on commit d17ebb5

Please sign in to comment.