From ccff32102ff97b3187518923f8f02654fbd472ce Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Thu, 2 May 2024 19:28:42 +0800 Subject: [PATCH] HFP: Add example and fix AG errors --- bumble/hfp.py | 55 ++++-- examples/hfp_gateway.html | 350 ++++++++++++++++++++++++++++++++++++ examples/hfp_gateway.json | 3 +- examples/run_hfp_gateway.py | 260 +++++++++++++++++++++------ 4 files changed, 598 insertions(+), 70 deletions(-) create mode 100644 examples/hfp_gateway.html diff --git a/bumble/hfp.py b/bumble/hfp.py index 6727b3df..69dab26b 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -50,7 +50,7 @@ ProtocolError, BT_GENERIC_AUDIO_SERVICE, BT_HANDSFREE_SERVICE, - BT_HEADSET_AUDIO_GATEWAY_SERVICE, + BT_HANDSFREE_AUDIO_GATEWAY_SERVICE, BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID, ) @@ -1156,7 +1156,7 @@ class AgProtocol(pyee.EventEmitter): active_codec: AudioCodec hf_indicator: When HF update their indicators, notify the new state. Args: - hf_indicator: HfIndicator + hf_indicator: HfIndicatorState codec_connection_request: Emit when HF sends AT+BCC to request codec connection. answer: Emit when HF sends ATA to answer phone call. hang_up: Emit when HF sends AT+CHUP to hang up phone call. @@ -1168,7 +1168,12 @@ class AgProtocol(pyee.EventEmitter): Args: operation: CallHoldOperation call_index: Optional[int] - + speaker_volume: Emitted when AG update speaker volume autonomously. + Args: + volume: Int + microphone_volume: Emitted when AG update microphone volume autonomously. + Args: + volume: Int """ supported_hf_features: int @@ -1191,6 +1196,7 @@ class AgProtocol(pyee.EventEmitter): inband_ringtone_enabled: bool cme_error_enabled: bool cli_notification_enabled: bool + call_waiting_enabled: bool _remained_slc_setup_features: Set[HfFeature] def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: @@ -1218,6 +1224,7 @@ def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: self.indicator_report_enabled = False self.cme_error_enabled = False self.cli_notification_enabled = False + self.call_waiting_enabled = False self.hf_indicators = collections.OrderedDict() @@ -1465,7 +1472,12 @@ def _on_cmer( display: Optional[bytes] = None, indicator: bytes = b'', ) -> None: - if int(mode) != 3 or keypad or display or int(indicator) not in (0, 1): + if ( + int(mode) != 3 + or (keypad and int(keypad)) + or (display and int(display)) + or int(indicator) not in (0, 1) + ): logger.error( f'Unexpected values: mode={mode!r}, keypad={keypad!r}, ' f'display={display!r}, indicator={indicator!r}' @@ -1479,6 +1491,10 @@ def _on_cmee(self, enabled: bytes) -> None: self.cme_error_enabled = bool(int(enabled)) self.send_ok() + def _on_ccwa(self, enabled: bytes) -> None: + self.call_waiting_enabled = bool(int(enabled)) + self.send_ok() + def _on_bind(self, *args) -> None: if not self.supports_ag_feature(AgFeature.HF_INDICATORS): self.send_error() @@ -1578,6 +1594,15 @@ def _on_clip(self, enabled: bytes) -> None: if not self.supports_hf_feature(HfFeature.CLI_PRESENTATION_CAPABILITY): logger.error('Remote doesn not support CLI but sends AT+CLIP') self.cli_notification_enabled = True if enabled == b'1' else False + self.send_ok() + + def _on_vgs(self, level: bytes) -> None: + self.emit('speaker_volume', int(level)) + self.send_ok() + + def _on_vgm(self, level: bytes) -> None: + self.emit('microphone_volume', int(level)) + self.send_ok() # ----------------------------------------------------------------------------- @@ -1761,7 +1786,7 @@ def make_ag_sdp_records( sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, sdp.DataElement.sequence( [ - sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE), + sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), ] ), @@ -1788,7 +1813,7 @@ def make_ag_sdp_records( [ sdp.DataElement.sequence( [ - sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE), + sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), sdp.DataElement.unsigned_integer_16(version), ] ) @@ -1820,6 +1845,7 @@ async def find_hf_sdp_record( sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, ], ) for attribute_lists in search_result: @@ -1839,10 +1865,17 @@ async def find_hf_sdp_record( version = ProfileVersion(profile_descriptor_list[0].value[1].value) elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: features = HfSdpFeature(attribute.value.value) - if not channel or not version or features is None: - logger.warning(f"Bad result {attribute_lists}.") - return None - return (channel, version, features) + elif attribute.id == sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID: + class_id_list = attribute.value.value + uuid = class_id_list[0].value + # AG record may also contain HF UUID in its profile descriptor list. + # If found, skip this record. + if uuid == BT_HANDSFREE_AUDIO_GATEWAY_SERVICE: + channel, version, features = (None, None, None) + break + + if channel is not None and version is not None and features is not None: + return (channel, version, features) return None @@ -1859,7 +1892,7 @@ async def find_ag_sdp_record( """ async with sdp.Client(connection) as sdp_client: search_result = await sdp_client.search_attributes( - uuids=[BT_HEADSET_AUDIO_GATEWAY_SERVICE], + uuids=[BT_HANDSFREE_AUDIO_GATEWAY_SERVICE], attribute_ids=[ sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, diff --git a/examples/hfp_gateway.html b/examples/hfp_gateway.html new file mode 100644 index 00000000..1559c439 --- /dev/null +++ b/examples/hfp_gateway.html @@ -0,0 +1,350 @@ + + + + + + + + + +
+ +
+ + +
+ + +
+ +
+
+ +
+ + +
+
+
+ +
+ + +
+
+
+ + +
+
+ +
+
+
+ Codec + + +
+
+ +
+ +
+
+ +
+
+ +
+
+ +
+
+ +
+ +
+

AG Indicators

+
+ +
+ + +
+
+
+ +
+ + +
+
+
+ +
+ + +
+
+
+ +
+ + +
+
+
+ +
+ + +
+
+
+ +
+ + +
+
+
+ +
+ + +
+
+
+ +
+ + + + +
+ + +

Calls

+
+ +
+ + + + +
+ +
+

Log

+ +
+
+ + + + + + + \ No newline at end of file diff --git a/examples/hfp_gateway.json b/examples/hfp_gateway.json index 5e3d72b6..67bb2788 100644 --- a/examples/hfp_gateway.json +++ b/examples/hfp_gateway.json @@ -1,4 +1,5 @@ { "name": "Bumble Phone", - "class_of_device": 6291980 + "class_of_device": 6291980, + "keystore": "JsonKeyStore" } diff --git a/examples/run_hfp_gateway.py b/examples/run_hfp_gateway.py index 272bb845..8e596f80 100644 --- a/examples/run_hfp_gateway.py +++ b/examples/run_hfp_gateway.py @@ -16,9 +16,14 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import json import sys import os +import io import logging +import websockets + +from typing import Optional import bumble.core from bumble.device import Device @@ -26,12 +31,15 @@ from bumble.core import ( BT_BR_EDR_TRANSPORT, ) -from bumble import rfcomm, hfp -from bumble.hci import HCI_SynchronousDataPacket +from bumble import hci, rfcomm, hfp logger = logging.getLogger(__name__) +ws: Optional[websockets.WebSocketServerProtocol] = None +ag_protocol: Optional[hfp.AgProtocol] = None +source_file: Optional[io.BufferedReader] = None + def _default_configuration() -> hfp.AgConfiguration: return hfp.AgConfiguration( @@ -41,12 +49,13 @@ def _default_configuration() -> hfp.AgConfiguration: hfp.AgFeature.REJECT_CALL, hfp.AgFeature.CODEC_NEGOTIATION, hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, + hfp.AgFeature.ENHANCED_CALL_STATUS, ], supported_ag_indicators=[ hfp.AgIndicatorState.call(), - hfp.AgIndicatorState.service(), - hfp.AgIndicatorState.callsetup(), hfp.AgIndicatorState.callsetup(), + hfp.AgIndicatorState.callheld(), + hfp.AgIndicatorState.service(), hfp.AgIndicatorState.signal(), hfp.AgIndicatorState.roam(), hfp.AgIndicatorState.battchg(), @@ -60,17 +69,123 @@ def _default_configuration() -> hfp.AgConfiguration: ) +def send_message(type: str, **kwargs) -> None: + if ws: + asyncio.create_task(ws.send(json.dumps({'type': type, **kwargs}))) + + +def on_speaker_volume(level: int): + send_message(type='speaker_volume', level=level) + + +def on_microphone_volume(level: int): + send_message(type='microphone_volume', level=level) + + +def on_sco_state_change(codec: int): + if codec == hfp.AudioCodec.CVSD: + sample_rate = 8000 + elif codec == hfp.AudioCodec.MSBC: + sample_rate = 16000 + else: + sample_rate = 0 + + send_message(type='sco_state_change', sample_rate=sample_rate) + + +def on_sco_packet(packet: hci.HCI_SynchronousDataPacket): + if ws: + asyncio.create_task(ws.send(packet.data)) + if source_file and (pcm_data := source_file.read(packet.data_total_length)): + assert ag_protocol + host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host + host.send_hci_packet( + hci.HCI_SynchronousDataPacket( + connection_handle=packet.connection_handle, + packet_status=0, + data_total_length=len(pcm_data), + data=pcm_data, + ) + ) + + +def on_hfp_state_change(connected: bool): + send_message(type='hfp_state_change', connected=connected) + + +async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str): + del path + global ws + ws = ws_client + + async for message in ws_client: + if not ag_protocol: + continue + + json_message = json.loads(message) + message_type = json_message['type'] + connection = ag_protocol.dlc.multiplexer.l2cap_channel.connection + device = connection.device + + try: + if message_type == 'at_response': + ag_protocol.send_response(json_message['response']) + elif message_type == 'ag_indicator': + ag_protocol.update_ag_indicator( + hfp.AgIndicator(json_message['indicator']), + int(json_message['value']), + ) + elif message_type == 'negotiate_codec': + codec = hfp.AudioCodec(int(json_message['codec'])) + await ag_protocol.negotiate_codec(codec) + elif message_type == 'connect_sco': + if ag_protocol.active_codec == hfp.AudioCodec.CVSD: + esco_param = hfp.ESCO_PARAMETERS[ + hfp.DefaultCodecParameters.ESCO_CVSD_S4 + ] + elif ag_protocol.active_codec == hfp.AudioCodec.MSBC: + esco_param = hfp.ESCO_PARAMETERS[ + hfp.DefaultCodecParameters.ESCO_MSBC_T2 + ] + else: + raise ValueError(f'Unsupported codec {codec}') + + await device.send_command( + hci.HCI_Enhanced_Setup_Synchronous_Connection_Command( + connection_handle=connection.handle, **esco_param.asdict() + ) + ) + elif message_type == 'disconnect_sco': + # Copy the values to avoid iteration error. + for sco_link in list(device.sco_links.values()): + await sco_link.disconnect() + elif message_type == 'update_calls': + ag_protocol.calls = [ + hfp.CallInfo( + index=int(call['index']), + direction=hfp.CallInfoDirection(int(call['direction'])), + status=hfp.CallInfoStatus(int(call['status'])), + number=call['number'], + multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, + mode=hfp.CallInfoMode.VOICE, + ) + for call in json_message['calls'] + ] + + except Exception as e: + send_message(type='error', message=e) + + # ----------------------------------------------------------------------------- async def main() -> None: - if len(sys.argv) < 4: + if len(sys.argv) < 3: print( 'Usage: run_hfp_gateway.py ' - '' + '[bluetooth-address] [wav-file-for-source]' ) print( - ' specifying a channel number, or "discover" to list all RFCOMM channels' + 'example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8 sample.wav' ) - print('example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8') return print('<<< connecting to HCI...') @@ -84,56 +199,85 @@ async def main() -> None: device.classic_enabled = True await device.power_on() - # Connect to a peer - target_address = sys.argv[3] - print(f'=== Connecting to {target_address}...') - connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) - print(f'=== Connected to {connection.peer_address}!') - - # Get a list of all the Handsfree services (should only be 1) - if not (hfp_record := await hfp.find_hf_sdp_record(connection)): - print('!!! no service found') - return - - # Pick the first one - channel, version, hf_sdp_features = hfp_record - print(f'HF version: {version}') - print(f'HF features: {hf_sdp_features}') - - # Request authentication - print('*** Authenticating...') - await connection.authenticate() - print('*** Authenticated') - - # Enable encryption - print('*** Enabling encryption...') - await connection.encrypt() - print('*** Encryption on') - - # Create a client and start it - print('@@@ Starting to RFCOMM client...') - rfcomm_client = rfcomm.Client(connection) - rfcomm_mux = await rfcomm_client.start() - print('@@@ Started') - - print(f'### Opening session for channel {channel}...') - try: - session = await rfcomm_mux.open_dlc(channel) - print('### Session open', session) - except bumble.core.ConnectionError as error: - print(f'### Session open failed: {error}') - await rfcomm_mux.disconnect() - print('@@@ Disconnected from RFCOMM server') - return - - def on_sco(connection_handle: int, packet: HCI_SynchronousDataPacket): - # Reset packet and loopback - packet.packet_status = 0 - device.host.send_hci_packet(packet) - - device.host.on('sco_packet', on_sco) - - ag_protocol = hfp.AgProtocol(session, _default_configuration()) + rfcomm_server = rfcomm.Server(device) + configuration = _default_configuration() + + def on_dlc(dlc: rfcomm.DLC): + global ag_protocol + ag_protocol = hfp.AgProtocol(dlc, configuration) + ag_protocol.on('speaker_volume', on_speaker_volume) + ag_protocol.on('microphone_volume', on_microphone_volume) + on_hfp_state_change(True) + dlc.multiplexer.l2cap_channel.on( + 'close', lambda: on_hfp_state_change(False) + ) + + channel = rfcomm_server.listen(on_dlc) + device.sdp_service_records = { + 1: hfp.make_ag_sdp_records(1, channel, configuration) + } + + def on_sco_connection(sco_link): + assert ag_protocol + on_sco_state_change(ag_protocol.active_codec) + sco_link.on('disconnection', lambda _: on_sco_state_change(0)) + sco_link.on('pdu', on_sco_packet) + + device.on('sco_connection', on_sco_connection) + if len(sys.argv) >= 4: + # Connect to a peer + target_address = sys.argv[3] + print(f'=== Connecting to {target_address}...') + connection = await device.connect( + target_address, transport=BT_BR_EDR_TRANSPORT + ) + print(f'=== Connected to {connection.peer_address}!') + + # Get a list of all the Handsfree services (should only be 1) + if not (hfp_record := await hfp.find_hf_sdp_record(connection)): + print('!!! no service found') + return + + # Pick the first one + channel, version, hf_sdp_features = hfp_record + print(f'HF version: {version}') + print(f'HF features: {hf_sdp_features}') + + # Request authentication + print('*** Authenticating...') + await connection.authenticate() + print('*** Authenticated') + + # Enable encryption + print('*** Enabling encryption...') + await connection.encrypt() + print('*** Encryption on') + + # Create a client and start it + print('@@@ Starting to RFCOMM client...') + rfcomm_client = rfcomm.Client(connection) + rfcomm_mux = await rfcomm_client.start() + print('@@@ Started') + + print(f'### Opening session for channel {channel}...') + try: + session = await rfcomm_mux.open_dlc(channel) + print('### Session open', session) + except bumble.core.ConnectionError as error: + print(f'### Session open failed: {error}') + await rfcomm_mux.disconnect() + print('@@@ Disconnected from RFCOMM server') + return + + on_dlc(session) + + await websockets.serve(ws_server, port=8888) + + if len(sys.argv) >= 5: + global source_file + source_file = open(sys.argv[4], 'rb') + # Skip header + source_file.seek(44) await hci_transport.source.terminated