diff --git a/bumble/profiles/bap.py b/bumble/profiles/bap.py index 188ca271..117e95e6 100644 --- a/bumble/profiles/bap.py +++ b/bumble/profiles/bap.py @@ -685,10 +685,11 @@ def __bytes__(self) -> bytes: @dataclasses.dataclass class PacRecord: + '''Published Audio Capabilities Service, Table 3.2/3.4.''' + coding_format: hci.CodingFormat codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] - # TODO: Parse Metadata - metadata: bytes = b'' + metadata: le_audio.Metadata = dataclasses.field(default_factory=le_audio.Metadata) @classmethod def from_bytes(cls, data: bytes) -> PacRecord: @@ -701,7 +702,8 @@ def from_bytes(cls, data: bytes) -> PacRecord: ] offset += codec_specific_capabilities_size metadata_size = data[offset] - metadata = data[offset : offset + metadata_size] + offset += 1 + metadata = le_audio.Metadata.from_bytes(data[offset : offset + metadata_size]) codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC: @@ -719,12 +721,13 @@ def from_bytes(cls, data: bytes) -> PacRecord: def __bytes__(self) -> bytes: capabilities_bytes = bytes(self.codec_specific_capabilities) + metadata_bytes = bytes(self.metadata) return ( bytes(self.coding_format) + bytes([len(capabilities_bytes)]) + capabilities_bytes - + bytes([len(self.metadata)]) - + self.metadata + + bytes([len(metadata_bytes)]) + + metadata_bytes ) @@ -940,8 +943,7 @@ class State(enum.IntEnum): presentation_delay = 0 # Additional parameters in ENABLING, STREAMING, DISABLING State - # TODO: Parse this - metadata = b'' + metadata = le_audio.Metadata() def __init__( self, @@ -1088,7 +1090,7 @@ def on_enable(self, metadata: bytes) -> Tuple[AseResponseCode, AseReasonCode]: AseReasonCode.NONE, ) - self.metadata = metadata + self.metadata = le_audio.Metadata.from_bytes(metadata) self.state = self.State.ENABLING return (AseResponseCode.SUCCESS, AseReasonCode.NONE) @@ -1140,7 +1142,7 @@ def on_update_metadata( AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, AseReasonCode.NONE, ) - self.metadata = metadata + self.metadata = le_audio.Metadata.from_bytes(metadata) return (AseResponseCode.SUCCESS, AseReasonCode.NONE) def on_release(self) -> Tuple[AseResponseCode, AseReasonCode]: @@ -1217,8 +1219,9 @@ def value(self): self.State.STREAMING, self.State.DISABLING, ): + metadata_bytes = bytes(self.metadata) additional_parameters = ( - bytes([self.cig_id, self.cis_id, len(self.metadata)]) + self.metadata + bytes([self.cig_id, self.cis_id, len(metadata_bytes)]) + metadata_bytes ) else: additional_parameters = b'' diff --git a/bumble/profiles/le_audio.py b/bumble/profiles/le_audio.py index 8d1e6290..b152fd97 100644 --- a/bumble/profiles/le_audio.py +++ b/bumble/profiles/le_audio.py @@ -17,33 +17,67 @@ # ----------------------------------------------------------------------------- from __future__ import annotations import dataclasses -from typing import List +import struct +from typing import List, Type from typing_extensions import Self +from bumble import utils + # ----------------------------------------------------------------------------- # Classes # ----------------------------------------------------------------------------- @dataclasses.dataclass class Metadata: + '''Bluetooth Assigned Numbers, Section 6.12.6 - Metadata LTV structures. + + As Metadata fields may extend, and Spec doesn't forbid duplication, we don't parse + Metadata into a key-value style dataclass here. Rather, we encourage users to parse + again outside the lib. + ''' + + class Tag(utils.OpenIntEnum): + # fmt: off + PREFERRED_AUDIO_CONTEXTS = 0x01 + STREAMING_AUDIO_CONTEXTS = 0x02 + PROGRAM_INFO = 0x03 + LANGUAGE = 0x04 + CCID_LIST = 0x05 + PARENTAL_RATING = 0x06 + PROGRAM_INFO_URI = 0x07 + AUDIO_ACTIVE_STATE = 0x08 + BROADCAST_AUDIO_IMMEDIATE_RENDERING_FLAG = 0x09 + ASSISTED_LISTENING_STREAM = 0x0A + BROADCAST_NAME = 0x0B + EXTENDED_METADATA = 0xFE + VENDOR_SPECIFIC = 0xFF + @dataclasses.dataclass class Entry: - tag: int + tag: Metadata.Tag data: bytes - entries: List[Entry] + @classmethod + def from_bytes(cls: Type[Self], data: bytes) -> Self: + return cls(tag=Metadata.Tag(data[0]), data=data[1:]) + + def __bytes__(self) -> bytes: + return bytes([len(self.data) + 1, self.tag]) + self.data + + entries: List[Entry] = dataclasses.field(default_factory=list) @classmethod - def from_bytes(cls, data: bytes) -> Self: + def from_bytes(cls: Type[Self], data: bytes) -> Self: entries = [] offset = 0 length = len(data) - while length >= 2: + while offset < length: entry_length = data[offset] - entry_tag = data[offset + 1] - entry_data = data[offset + 2 : offset + 2 + entry_length - 1] - entries.append(cls.Entry(entry_tag, entry_data)) - length -= entry_length + offset += 1 + entries.append(cls.Entry.from_bytes(data[offset : offset + entry_length])) offset += entry_length return cls(entries) + + def __bytes__(self) -> bytes: + return b''.join([bytes(entry) for entry in self.entries]) diff --git a/tests/bap_test.py b/tests/bap_test.py index 0b6db1a7..e276790c 100644 --- a/tests/bap_test.py +++ b/tests/bap_test.py @@ -48,6 +48,7 @@ PublishedAudioCapabilitiesService, PublishedAudioCapabilitiesServiceProxy, ) +from bumble.profiles.le_audio import Metadata from tests.test_utils import TwoDevices @@ -97,7 +98,7 @@ def test_pac_record() -> None: pac_record = PacRecord( coding_format=CodingFormat(CodecID.LC3), codec_specific_capabilities=cap, - metadata=b'', + metadata=Metadata([Metadata.Entry(tag=Metadata.Tag.VENDOR_SPECIFIC, data=b'')]), ) assert PacRecord.from_bytes(bytes(pac_record)) == pac_record @@ -142,7 +143,7 @@ def test_ASE_Config_QOS() -> None: def test_ASE_Enable() -> None: operation = ASE_Enable( ase_id=[1, 2], - metadata=[b'foo', b'bar'], + metadata=[b'', b''], ) basic_check(operation) @@ -151,7 +152,7 @@ def test_ASE_Enable() -> None: def test_ASE_Update_Metadata() -> None: operation = ASE_Update_Metadata( ase_id=[1, 2], - metadata=[b'foo', b'bar'], + metadata=[b'', b''], ) basic_check(operation) diff --git a/tests/le_audio_test.py b/tests/le_audio_test.py new file mode 100644 index 00000000..264a96d0 --- /dev/null +++ b/tests/le_audio_test.py @@ -0,0 +1,39 @@ +# Copyright 2021-2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from bumble.profiles import le_audio + + +def test_parse_metadata(): + metadata = le_audio.Metadata( + entries=[ + le_audio.Metadata.Entry( + tag=le_audio.Metadata.Tag.PROGRAM_INFO, + data=b'', + ), + le_audio.Metadata.Entry( + tag=le_audio.Metadata.Tag.STREAMING_AUDIO_CONTEXTS, + data=bytes([0, 0]), + ), + le_audio.Metadata.Entry( + tag=le_audio.Metadata.Tag.PREFERRED_AUDIO_CONTEXTS, + data=bytes([1, 2]), + ), + ] + ) + + assert le_audio.Metadata.from_bytes(bytes(metadata)) == metadata