From 4394954536067ee88032f73f61c7406b665e8a32 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 5 Apr 2024 23:46:06 +0800 Subject: [PATCH] HFP AG implementation --- bumble/hfp.py | 696 +++++++++++++++++++++++++++++++++++++++++++--- setup.cfg | 2 +- tests/hfp_test.py | 281 +++++++++++++++++-- 3 files changed, 907 insertions(+), 72 deletions(-) diff --git a/bumble/hfp.py b/bumble/hfp.py index 145523d5..f733e984 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -15,6 +15,9 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations + +import collections import collections.abc import logging import asyncio @@ -22,16 +25,32 @@ import enum import traceback import pyee -from typing import Dict, List, Union, Set, Any, Optional, Type, TYPE_CHECKING +import re +from typing import ( + Dict, + List, + Union, + Set, + Any, + Optional, + Type, + Tuple, + ClassVar, + Iterable, + TYPE_CHECKING, +) from typing_extensions import Self from bumble import at +from bumble import device from bumble import rfcomm +from bumble import sdp from bumble.colors import color from bumble.core import ( ProtocolError, BT_GENERIC_AUDIO_SERVICE, BT_HANDSFREE_SERVICE, + BT_HEADSET_AUDIO_GATEWAY_SERVICE, BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID, ) @@ -40,15 +59,6 @@ CodingFormat, CodecID, ) -from bumble.sdp import ( - DataElement, - ServiceAttribute, - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, -) # ----------------------------------------------------------------------------- @@ -66,6 +76,9 @@ def __init__(self, error_name: str = '', details: str = ''): super().__init__(None, 'hfp', error_name, details) +class HfLoopTermination(HfpProtocolError): ... + + # ----------------------------------------------------------------------------- # Protocol Support # ----------------------------------------------------------------------------- @@ -329,6 +342,21 @@ class CallInfo: type: Optional[int] = None +class CmeError(enum.IntEnum): + """ + CME ERROR codes (partial listed). + + TS 127 007 - V6.8.0, 9.2.1 General errors + """ + + PHONE_FAILURE = 0 + OPERATION_NOT_ALLOWED = 3 + OPERATION_NOT_SUPPORTED = 4 + MEMORY_FULL_ = 20 + INVALID_INDEX = 21 + NOT_FOUND = 22 + + # ----------------------------------------------------------------------------- # Hands-Free Control Interoperability Requirements # ----------------------------------------------------------------------------- @@ -402,12 +430,21 @@ class CallInfo: @dataclasses.dataclass -class Configuration: +class HfConfiguration: supported_hf_features: List[HfFeature] supported_hf_indicators: List[HfIndicator] supported_audio_codecs: List[AudioCodec] +@dataclasses.dataclass +class AgConfiguration: + supported_ag_features: Iterable[AgFeature] + supported_ag_indicators: collections.abc.Sequence[AgIndicatorState] + supported_hf_indicators: Iterable[HfIndicator] + supported_ag_call_hold_operations: Iterable[CallHoldOperation] + supported_audio_codecs: Iterable[AudioCodec] + + class AtResponseType(enum.Enum): """ Indicates if a response is expected from an AT command, and if multiple responses are accepted. @@ -435,18 +472,72 @@ def parse_from(cls: Type[Self], buffer: bytearray) -> Self: ) +@dataclasses.dataclass +class AtCommand: + class SubCode(enum.StrEnum): + NONE = '' + SET = '=' + TEST = '=?' + READ = '?' + + code: str + sub_code: SubCode + parameters: list + + _PARSE_PATTERN: ClassVar[re.Pattern] = re.compile( + r'AT\+(?P[A-Z]+)(?P=\?|=|\?)?(?P.*)' + ) + + @classmethod + def parse_from(cls: Type[Self], buffer: bytearray) -> Self: + if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())): + if buffer.startswith(b'ATA'): + return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[]) + if buffer.startswith(b'ATD'): + return cls( + code='D', sub_code=AtCommand.SubCode.NONE, parameters=[buffer[3:]] + ) + raise HfpProtocolError('Invalid command') + + parameters = [] + if parameters_text := match.group('parameters'): + parameters = at.parse_parameters(parameters_text.encode()) + + return cls( + code=match.group('code'), + sub_code=AtCommand.SubCode(match.group('sub_code') or ''), + parameters=parameters, + ) + + @dataclasses.dataclass class AgIndicatorState: - description: str - index: int + description: AgIndicator supported_values: Set[int] current_status: int + index: Optional[int] = None + enabled: bool = True + + @property + def supported_values_text(self) -> str: + min_value = min(self.supported_values) + max_value = max(self.supported_values) + if len(self.supported_values) == (max_value - min_value + 1): + return f'({min_value}-{max_value})' + else: + return f'({",".join(str(v) for v in self.supported_values)})' + + @property + def on_test_text(self) -> str: + return f'(\"{self.description.value}\",{self.supported_values_text})' @dataclasses.dataclass class HfIndicatorState: + indicator: HfIndicator supported: bool = False enabled: bool = False + current_status: int = 0 class HfProtocol(pyee.EventEmitter): @@ -477,14 +568,14 @@ class HfProtocol(pyee.EventEmitter): command_lock: asyncio.Lock if TYPE_CHECKING: response_queue: asyncio.Queue[AtResponse] - unsolicited_queue: asyncio.Queue[AtResponse] + unsolicited_queue: asyncio.Queue[Optional[AtResponse]] else: response_queue: asyncio.Queue unsolicited_queue: asyncio.Queue read_buffer: bytearray active_codec: AudioCodec - def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None: + def __init__(self, dlc: rfcomm.DLC, configuration: HfConfiguration) -> None: super().__init__() # Configure internal state. @@ -494,13 +585,14 @@ def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None: self.unsolicited_queue = asyncio.Queue() self.read_buffer = bytearray() self.active_codec = AudioCodec.CVSD + self._slc_initialized = False # Build local features. self.supported_hf_features = sum(configuration.supported_hf_features) self.supported_audio_codecs = configuration.supported_audio_codecs self.hf_indicators = { - indicator: HfIndicatorState() + indicator: HfIndicatorState(indicator=indicator) for indicator in configuration.supported_hf_indicators } @@ -511,6 +603,9 @@ def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None: # Bind the AT reader to the RFCOMM channel. self.dlc.sink = self._read_at + self.dlc.multiplexer.l2cap_channel.on( + 'close', lambda: self.unsolicited_queue.put_nowait(None) + ) def supports_hf_feature(self, feature: HfFeature) -> bool: return (self.supported_hf_features & feature) != 0 @@ -639,7 +734,7 @@ async def initiate_slc(self): self.ag_indicators = [] for index, indicator in enumerate(response.parameters): - description = indicator[0].decode() + description = AgIndicator(indicator[0].decode()) supported_values = [] for value in indicator[1]: value = value.split(b'-') @@ -733,6 +828,7 @@ async def initiate_slc(self): self.hf_indicators[indicator].enabled = True logger.info("SLC setup completed") + self._slc_initialized = True async def setup_audio_connection(self): """4.11.2 Audio Connection Setup by HF.""" @@ -829,6 +925,8 @@ async def update_ag_indicator(self, index: int, value: int): async def handle_unsolicited(self): """Handle unsolicited result codes sent by the audio gateway.""" result = await self.unsolicited_queue.get() + if not result: + raise HfLoopTermination() if result.code == "+BCS": await self.setup_codec_connection(int(result.parameters[0])) elif result.code == "+CIEV": @@ -846,14 +944,346 @@ async def run(self): """ try: - await self.initiate_slc() + if not self._slc_initialized: + await self.initiate_slc() while True: await self.handle_unsolicited() + except HfLoopTermination: + logger.info('Loop terminated') except Exception: logger.error("HFP-HF protocol failed with the following error:") logger.error(traceback.format_exc()) +class AgProtocol(pyee.EventEmitter): + """ + Implementation for the Audio-Gateway side of the Hands-Free profile. + + Reference specification Hands-Free Profile v1.8. + + Emitted events: + slc_complete: Emit when SLC procedure is completed. + codec_negotiation: When codec is renegotiated, notify the new codec. + Args: + active_codec: AudioCodec + hf_indicator: When HF update their indicators, notify the new state. + Args: + hf_indicator: HfIndicator + codec_connection_request: Emit when HF sends AT+BCC to request codec connection. + answer: Emit when HF sends ATA to answer phone call. + hang_up: Emit when HF sends AT+CHUP to hang up phone call. + dial: Emit when HF sends ATD to dial phone call. + """ + + supported_hf_features: int + supported_hf_indicators: Set[HfIndicator] + supported_audio_codecs: List[AudioCodec] + + supported_ag_features: int + supported_ag_call_hold_operations: List[CallHoldOperation] + + ag_indicators: List[AgIndicatorState] + hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState] + + dlc: rfcomm.DLC + command_lock: asyncio.Lock + + read_buffer: bytearray + active_codec: AudioCodec + + indicator_report_enabled: bool + inband_ringtone_enabled: bool + cme_error_enabled: bool + _remained_slc_setup_features: Set[HfFeature] + + def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: + super().__init__() + + # Configure internal state. + self.dlc = dlc + self.command_lock = asyncio.Lock() + self.read_buffer = bytearray() + self.active_codec = AudioCodec.CVSD + + # Build local features. + self.supported_ag_features = sum(configuration.supported_ag_features) + self.supported_ag_call_hold_operations = list( + configuration.supported_ag_call_hold_operations + ) + self.ag_indicators = list(configuration.supported_ag_indicators) + self.supported_hf_indicators = set(configuration.supported_hf_indicators) + self.inband_ringtone_enabled = True + self._remained_slc_setup_features = set() + + # Clear remote features. + self.supported_hf_features = 0 + self.supported_audio_codecs = [] + self.indicator_report_enabled = False + self.cme_error_enabled = False + + self.hf_indicators = collections.OrderedDict() + + # Bind the AT reader to the RFCOMM channel. + self.dlc.sink = self._read_at + + def supports_hf_feature(self, feature: HfFeature) -> bool: + return (self.supported_hf_features & feature) != 0 + + def supports_ag_feature(self, feature: AgFeature) -> bool: + return (self.supported_ag_features & feature) != 0 + + def _read_at(self, data: bytes): + """ + Reads AT messages from the RFCOMM channel. + """ + # Append to the read buffer. + self.read_buffer.extend(data) + + # Locate header and trailer. + trailer = self.read_buffer.find(b'\r') + if trailer == -1: + return + + # Isolate the AT response code and parameters. + raw_command = self.read_buffer[:trailer] + command = AtCommand.parse_from(raw_command) + logger.debug(f"<<< {raw_command.decode()}") + + # Consume the response bytes. + self.read_buffer = self.read_buffer[trailer + 1 :] + + if command.sub_code == AtCommand.SubCode.TEST: + handler_name = f'_on_{command.code.lower()}_test' + elif command.sub_code == AtCommand.SubCode.READ: + handler_name = f'_on_{command.code.lower()}_read' + else: + handler_name = f'_on_{command.code.lower()}' + + if handler := getattr(self, handler_name, None): + handler(*command.parameters) + else: + logger.warning('Handler %s not found', handler_name) + self.send_response('ERROR') + + def send_response(self, response: str) -> None: + """Sends an AT response.""" + self.dlc.write(f'\r\n{response}\r\n') + + def send_cme_error(self, error_code: CmeError) -> None: + """Sends an CME ERROR response. + + If CME Error is not enabled by HF, sends ERROR instead. + """ + if self.cme_error_enabled: + self.send_response(f'+CME ERROR: {error_code.value}') + else: + self.send_error() + + def send_ok(self) -> None: + """Sends an OK response.""" + self.send_response('OK') + + def send_error(self) -> None: + """Sends an ERROR response.""" + self.send_response('ERROR') + + def set_inband_ringtone_enabled(self, enabled: bool) -> None: + """Enables or disables in-band ringtone.""" + + self.inband_ringtone_enabled = enabled + self.send_response(f'+BSIR: {1 if enabled else 0}') + + def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None: + """Updates AG indicator. + + Args: + indicator: Name of the indicator. + value: new value of the indicator. + """ + + search_result = next( + ( + (index, state) + for index, state in enumerate(self.ag_indicators) + if state.description == indicator + ), + None, + ) + if not search_result: + raise KeyError(f'{indicator} is not supported.') + + index, indicator_state = search_result + if not self.indicator_report_enabled: + logger.warning('AG indicator report is disabled') + if not indicator_state.enabled: + logger.warning(f'AG indicator {indicator} is disabled') + + indicator_state.current_status = value + self.send_response(f'+CIEV: {index+1},{value}') + + async def negotiate_codec(self, codec: AudioCodec) -> None: + """Starts codec negotiation.""" + + if not self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION): + logger.warning('Local does not support Codec Negotiation') + if not self.supports_hf_feature(HfFeature.CODEC_NEGOTIATION): + logger.warning('Peer does not support Codec Negotiation') + if codec not in self.supported_audio_codecs: + logger.warning(f'{codec} is not supported by peer') + + at_bcs_future = asyncio.get_running_loop().create_future() + self.once('codec_negotiation', at_bcs_future.set_result) + self.send_response(f'+BCS: {codec.value}') + if (new_codec := await at_bcs_future) != codec: + raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}') + + def _check_remained_slc_commands(self) -> None: + if not self._remained_slc_setup_features: + self.emit('slc_complete') + + def _on_brsf(self, hf_features: bytes) -> None: + self.supported_hf_features = int(hf_features) + self.send_response(f'+BRSF: {self.supported_ag_features}') + self.send_ok() + + if self.supports_hf_feature( + HfFeature.HF_INDICATORS + ) and self.supports_ag_feature(AgFeature.HF_INDICATORS): + self._remained_slc_setup_features.add(HfFeature.HF_INDICATORS) + + if self.supports_hf_feature( + HfFeature.THREE_WAY_CALLING + ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): + self._remained_slc_setup_features.add(HfFeature.THREE_WAY_CALLING) + + def _on_bac(self, *args) -> None: + self.supported_audio_codecs = [AudioCodec(int(value)) for value in args] + self.send_ok() + + def _on_bcs(self, codec: bytes) -> None: + self.active_codec = AudioCodec(int(codec)) + self.send_ok() + self.emit('codec_negotiation', self.active_codec) + + def _on_cind_test(self) -> None: + if not self.ag_indicators: + self.send_cme_error(CmeError.NOT_FOUND) + return + + self.send_response( + f'+CIND: {",".join(indicator.on_test_text for indicator in self.ag_indicators)}' + ) + self.send_ok() + + def _on_cind_read(self) -> None: + if not self.ag_indicators: + self.send_cme_error(CmeError.NOT_FOUND) + return + + self.send_response( + f'+CIND: {",".join(str(indicator.current_status) for indicator in self.ag_indicators)}' + ) + self.send_ok() + + self._check_remained_slc_commands() + + def _on_cmer( + self, + mode: bytes, + keypad: Optional[bytes] = None, + display: Optional[bytes] = None, + indicator: bytes = b'', + ) -> None: + if int(mode) != 3 or keypad or display or int(indicator) not in (0, 1): + logger.error( + f'Unexpected values: mode={mode!r}, keypad={keypad!r}, display={display!r}, indicator={indicator!r}' + ) + self.send_cme_error(CmeError.INVALID_INDEX) + + self.indicator_report_enabled = bool(int(indicator)) + self.send_ok() + + def _on_cmee(self, enabled: bytes) -> None: + self.cme_error_enabled = bool(int(enabled)) + self.send_ok() + + def _on_bind(self, *args) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + peer_supported_indicators = set(int(indicator) for indicator in args) + self.hf_indicators = collections.OrderedDict( + { + indicator: HfIndicatorState(indicator=indicator) + for indicator in self.supported_hf_indicators.intersection( + peer_supported_indicators + ) + } + ) + self.send_ok() + + def _on_bind_test(self) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + self.send_response( + f'+BIND: ({",".join(str(index) for index in self.supported_hf_indicators)})' + ) + self.send_ok() + + def _on_bind_read(self) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + for index in self.hf_indicators: + self.send_response(f'+BIND: {index},1') + + self.send_ok() + + self._remained_slc_setup_features.remove(HfFeature.HF_INDICATORS) + self._check_remained_slc_commands() + + def _on_biev(self, index_bytes: bytes, value_bytes: bytes) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + index = HfIndicator(int(index_bytes)) + if index not in self.hf_indicators: + self.send_error() + return + + self.hf_indicators[index].current_status = int(value_bytes) + self.emit('hf_indicator', self.hf_indicators[index]) + self.send_ok() + + def _on_bia(self, *args) -> None: + for enabled, state in zip(args, self.ag_indicators): + state.enabled = bool(int(enabled)) + self.send_ok() + + def _on_bcc(self) -> None: + self.emit('codec_connection_request') + self.send_ok() + + def _on_a(self) -> None: + """ATA handler.""" + self.emit('answer') + self.send_ok() + + def _on_d(self, number: bytes) -> None: + """ATD handler.""" + self.emit('dial', number.decode()) + self.send_ok() + + def _on_chup(self) -> None: + self.emit('hang_up') + self.send_ok() + + # ----------------------------------------------------------------------------- # Normative SDP definitions # ----------------------------------------------------------------------------- @@ -907,9 +1337,12 @@ class AgSdpFeature(enum.IntFlag): VOICE_RECOGNITION_TEST = 0x80 -def sdp_records( - service_record_handle: int, rfcomm_channel: int, configuration: Configuration -) -> List[ServiceAttribute]: +def make_hf_sdp_records( + service_record_handle: int, + rfcomm_channel: int, + configuration: HfConfiguration, + version: ProfileVersion = ProfileVersion.V1_8, +) -> List[sdp.ServiceAttribute]: """ Generates the SDP record for HFP Hands-Free support. @@ -941,53 +1374,226 @@ def sdp_records( hf_supported_features |= HfSdpFeature.WIDE_BAND return [ - ServiceAttribute( - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - DataElement.unsigned_integer_32(service_record_handle), + sdp.ServiceAttribute( + sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_32(service_record_handle), + ), + sdp.ServiceAttribute( + sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_HANDSFREE_SERVICE), + sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.sequence( + [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] + ), + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), + sdp.DataElement.unsigned_integer_8(rfcomm_channel), + ] + ), + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_HANDSFREE_SERVICE), + sdp.DataElement.unsigned_integer_16(version), + ] + ) + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_16(hf_supported_features), + ), + ] + + +def make_ag_sdp_records( + service_record_handle: int, + rfcomm_channel: int, + configuration: AgConfiguration, + version: ProfileVersion = ProfileVersion.V1_8, +) -> List[sdp.ServiceAttribute]: + """ + Generates the SDP record for HFP Audio-Gateway support. + + The record exposes the features supported in the input configuration, + and the allocated RFCOMM channel. + """ + + ag_supported_features = 0 + + if AgFeature.EC_NR in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.EC_NR + if AgFeature.THREE_WAY_CALLING in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.THREE_WAY_CALLING + if ( + AgFeature.ENHANCED_VOICE_RECOGNITION_STATUS + in configuration.supported_ag_features + ): + ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS + if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST + if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY + if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION + if AudioCodec.MSBC in configuration.supported_audio_codecs: + ag_supported_features |= AgSdpFeature.WIDE_BAND + + return [ + sdp.ServiceAttribute( + sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_32(service_record_handle), ), - ServiceAttribute( - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.uuid(BT_HANDSFREE_SERVICE), - DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), + sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE), + sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), ] ), ), - ServiceAttribute( - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), - DataElement.sequence( + sdp.DataElement.sequence( + [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] + ), + sdp.DataElement.sequence( [ - DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), - DataElement.unsigned_integer_8(rfcomm_channel), + sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), + sdp.DataElement.unsigned_integer_8(rfcomm_channel), ] ), ] ), ), - ServiceAttribute( - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.sequence( + sdp.DataElement.sequence( [ - DataElement.uuid(BT_HANDSFREE_SERVICE), - DataElement.unsigned_integer_16(ProfileVersion.V1_8), + sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE), + sdp.DataElement.unsigned_integer_16(version), ] ) ] ), ), - ServiceAttribute( - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, - DataElement.unsigned_integer_16(hf_supported_features), + sdp.ServiceAttribute( + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_16(ag_supported_features), ), ] +async def find_hf_sdp_record( + connection: device.Connection, +) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]: + """Searches a Hands-Free SDP record from remote device. + + Args: + connection: ACL connection to make SDP search. + + Returns: + Dictionary mapping from channel number to service class UUID list. + """ + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[BT_HANDSFREE_SERVICE], + attribute_ids=[ + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + for attribute_lists in search_result: + channel: Optional[int] = None + version: Optional[ProfileVersion] = None + features: Optional[HfSdpFeature] = None + for attribute in attribute_lists: + # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. + if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + protocol_descriptor_list = attribute.value.value + channel = protocol_descriptor_list[1].value[1].value + elif ( + attribute.id + == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + profile_descriptor_list = attribute.value.value + version = ProfileVersion(profile_descriptor_list[0].value[1].value) + elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + features = HfSdpFeature(attribute.value.value) + if not channel or not version or features is None: + logger.warning(f"Bad result {attribute_lists}.") + return None + return (channel, version, features) + return None + + +async def find_ag_sdp_record( + connection: device.Connection, +) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]: + """Searches an Audio-Gateway SDP record from remote device. + + Args: + connection: ACL connection to make SDP search. + + Returns: + Dictionary mapping from channel number to service class UUID list. + """ + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[BT_HEADSET_AUDIO_GATEWAY_SERVICE], + attribute_ids=[ + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + for attribute_lists in search_result: + channel: Optional[int] = None + version: Optional[ProfileVersion] = None + features: Optional[AgSdpFeature] = None + for attribute in attribute_lists: + # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. + if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + protocol_descriptor_list = attribute.value.value + channel = protocol_descriptor_list[1].value[1].value + elif ( + attribute.id + == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + profile_descriptor_list = attribute.value.value + version = ProfileVersion(profile_descriptor_list[0].value[1].value) + elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + features = AgSdpFeature(attribute.value.value) + if not channel or not version or features is None: + logger.warning(f"Bad result {attribute_lists}.") + return None + return (channel, version, features) + return None + + # ----------------------------------------------------------------------------- # ESCO Codec Default Parameters # ----------------------------------------------------------------------------- diff --git a/setup.cfg b/setup.cfg index dbede6fa..080eb68f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -83,7 +83,7 @@ build = build >= 0.7 test = pytest >= 8.0 - pytest-asyncio >= 0.21.1 + pytest-asyncio >= 0.23.5 pytest-html >= 3.2.0 coverage >= 6.4 development = diff --git a/tests/hfp_test.py b/tests/hfp_test.py index dc281805..d04c8b96 100644 --- a/tests/hfp_test.py +++ b/tests/hfp_test.py @@ -19,8 +19,9 @@ import logging import os import pytest +import pytest_asyncio -from typing import Tuple +from typing import Tuple, Optional from .test_utils import TwoDevices from bumble import core @@ -35,10 +36,76 @@ logger = logging.getLogger(__name__) +# ----------------------------------------------------------------------------- +def _default_hf_configuration() -> hfp.HfConfiguration: + return hfp.HfConfiguration( + supported_hf_features=[ + hfp.HfFeature.CODEC_NEGOTIATION, + hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED, + hfp.HfFeature.HF_INDICATORS, + ], + supported_hf_indicators=[ + hfp.HfIndicator.ENHANCED_SAFETY, + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_audio_codecs=[ + hfp.AudioCodec.CVSD, + hfp.AudioCodec.MSBC, + ], + ) + + +# ----------------------------------------------------------------------------- +def _default_hf_sdp_features() -> hfp.HfSdpFeature: + return hfp.HfSdpFeature.WIDE_BAND + + +# ----------------------------------------------------------------------------- +def _default_ag_configuration() -> hfp.AgConfiguration: + return hfp.AgConfiguration( + supported_ag_features=[ + hfp.AgFeature.HF_INDICATORS, + hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, + hfp.AgFeature.REJECT_CALL, + hfp.AgFeature.CODEC_NEGOTIATION, + hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, + ], + supported_ag_indicators=[ + hfp.AgIndicatorState( + description=hfp.AgIndicator.CALL, + supported_values={0, 1}, + current_status=0, + ), + hfp.AgIndicatorState( + description=hfp.AgIndicator.CALL, + supported_values={0, 1, 2, 3}, + current_status=0, + ), + ], + supported_hf_indicators=[ + hfp.HfIndicator.ENHANCED_SAFETY, + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_ag_call_hold_operations=[], + supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], + ) + + +# ----------------------------------------------------------------------------- +def _default_ag_sdp_features() -> hfp.AgSdpFeature: + return hfp.AgSdpFeature.WIDE_BAND | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY + + # ----------------------------------------------------------------------------- async def make_hfp_connections( - hf_config: hfp.Configuration, -) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]: + hf_config: Optional[hfp.HfConfiguration] = None, + ag_config: Optional[hfp.AgConfiguration] = None, +): + if not hf_config: + hf_config = _default_hf_configuration() + if not ag_config: + ag_config = _default_ag_configuration() + # Setup devices devices = TwoDevices() await devices.setup_connection() @@ -55,38 +122,200 @@ async def make_hfp_connections( # Setup HFP connection hf = hfp.HfProtocol(client_dlc, hf_config) - ag = hfp.HfpProtocol(server_dlc) - return hf, ag + ag = hfp.AgProtocol(server_dlc, ag_config) + + await hf.initiate_slc() + return (hf, ag) # ----------------------------------------------------------------------------- +@pytest_asyncio.fixture +async def hfp_connections(): + hf, ag = await make_hfp_connections() + hf_loop_task = asyncio.create_task(hf.run()) + try: + yield (hf, ag) + finally: + # Close the coroutine. + hf.unsolicited_queue.put_nowait(None) + await hf_loop_task + +# ----------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_slc(): - hf_config = hfp.Configuration( - supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[] - ) - hf, ag = await make_hfp_connections(hf_config) - - async def ag_loop(): - while line := await ag.next_line(): - if line.startswith('AT+BRSF'): - ag.send_response_line('+BRSF: 0') - elif line.startswith('AT+CIND=?'): - ag.send_response_line( - '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' - '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' - '("callheld",(0-2))' +async def test_slc_with_minimal_features(): + hf, ag = await make_hfp_connections( + hfp.HfConfiguration( + supported_audio_codecs=[], + supported_hf_features=[], + supported_hf_indicators=[], + ), + hfp.AgConfiguration( + supported_ag_call_hold_operations=[], + supported_ag_features=[], + supported_ag_indicators=[ + hfp.AgIndicatorState( + description=hfp.AgIndicator.CALL, + supported_values={0, 1}, + current_status=0, ) - elif line.startswith('AT+CIND?'): - ag.send_response_line('+CIND: 0,0,1,4,1,5,0') - ag.send_response_line('OK') + ], + supported_hf_indicators=[], + supported_audio_codecs=[], + ), + ) - ag_task = asyncio.create_task(ag_loop()) + assert hf.supported_ag_features == ag.supported_ag_features + assert hf.supported_hf_features == ag.supported_hf_features + for a, b in zip(hf.ag_indicators, ag.ag_indicators): + assert a.description == b.description + assert a.current_status == b.current_status - await hf.initiate_slc() - ag_task.cancel() + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + assert hf.supported_ag_features == ag.supported_ag_features + assert hf.supported_hf_features == ag.supported_hf_features + for a, b in zip(hf.ag_indicators, ag.ag_indicators): + assert a.description == b.description + assert a.current_status == b.current_status + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + hf.on('ag_indicator', future.set_result) + + ag.update_ag_indicator(hfp.AgIndicator.CALL, 1) + + indicator: hfp.AgIndicatorState = await future + assert indicator.current_status == 1 + assert indicator.description == hfp.AgIndicator.CALL + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hf_indicator', future.set_result) + + await hf.execute_command('AT+BIEV=2,100') + + indicator: hfp.HfIndicatorState = await future + assert indicator.current_status == 100 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_codec_negotiation( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + + futures = [ + asyncio.get_running_loop().create_future(), + asyncio.get_running_loop().create_future(), + ] + hf.on('codec_negotiation', futures[0].set_result) + ag.on('codec_negotiation', futures[1].set_result) + await ag.negotiate_codec(hfp.AudioCodec.MSBC) + + assert await futures[0] == await futures[1] + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + NUMBER = 'ATD123456789' + + future = asyncio.get_running_loop().create_future() + ag.on('dial', future.set_result) + await hf.execute_command(f'ATD{NUMBER}') + + number: str = await future + assert number == NUMBER + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('answer', lambda: future.set_result(None)) + await hf.answer_incoming_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_reject_incoming_call( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hang_up', lambda: future.set_result(None)) + await hf.reject_incoming_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hang_up', lambda: future.set_result(None)) + await hf.terminate_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_hf_sdp_record(): + devices = TwoDevices() + await devices.setup_connection() + + devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records( + 1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8 + ) + + assert await hfp.find_hf_sdp_record(devices.connections[1]) == ( + 2, + hfp.ProfileVersion.V1_8, + _default_hf_sdp_features(), + ) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ag_sdp_record(): + devices = TwoDevices() + await devices.setup_connection() + + devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records( + 1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8 + ) + + assert await hfp.find_ag_sdp_record(devices.connections[1]) == ( + 2, + hfp.ProfileVersion.V1_8, + _default_ag_sdp_features(), + ) # -----------------------------------------------------------------------------